MICO Platform
 All Classes Namespaces Functions Variables Friends
EventManager.hpp
1 #ifndef HAVE_EVENT_MANAGER_H
2 #define HAVE_EVENT_MANAGER_H 1
3 
4 #include <string>
5 #include <map>
6 #include <thread>
7 #include <mutex>
8 #include <condition_variable>
9 
10 // network I/O
11 #include <boost/asio.hpp>
12 
13 #include <amqpcpp.h>
14 #include "rdf_model.hpp"
15 #include "ContentItem.hpp"
16 #include "PersistenceService.hpp"
17 #include "AnalysisService.hpp"
18 
19 namespace mico {
20  namespace event {
21 
26  static const std::string EXCHANGE_SERVICE_REGISTRY = "service_registry";
32  static const std::string EXCHANGE_SERVICE_DISCOVERY = "service_discovery";
33 
37  static const std::string QUEUE_CONTENT_INPUT = "content_input";
38 
42  static const std::string QUEUE_CONTENT_OUTPUT = "content_output";
43 
47  static const std::string QUEUE_CONFIG_REQUEST = "config_request";
48 
49 
50  class AnalysisConsumer;
51  class DiscoveryConsumer;
52  class RabbitConnectionHandler;
53  class ConfigurationClient;
54 
55  // Bug: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/25
56  // Got fixed with version 2.2.0
58  private:
59  bool firstCall;
60  std::mutex firstCallMutex;
61 
62  public:
64  bool isFirstCall();
65  };
66 
67 
72  private:
73  std::string message;
74  public:
75  EventManagerException(std::string message) : message(message) {};
76 
77  const std::string& getMessage() { return message; };
78  };
79 
80  class EventManager : public AMQP::ConnectionHandler
81  {
82  private:
83  boost::asio::
84  io_service io_service;
85 
86  boost::asio::ip::tcp::
87  socket socket;
88 
89  std::string host;
90  int rabbitPort;
91  int marmottaPort;
92 
93  std::string user;
94  std::string password;
95 
96  bool connected;
97  bool unavailable;
98 
100  PersistenceService *persistence = NULL;
101 
102  size_t recv_len;
103  char* recv_buf;
104 
105 
106  RabbitConnectionHandler* rabbit;
107 
108  AMQP::Connection* connection;
109  AMQP::Channel* channel;
110 
111  std::map<AnalysisService*, AnalysisConsumer*> services;
112 
113  std::thread receiver;
114  std::mutex m;
115  std::condition_variable cv;
116 
117  void doConnect();
118  void doRead();
119 
120  ConfigurationClient *configurationClient = NULL; //<! configuration service
121 
122  AMQPCPPOnCloseBugfix amqpWorkaround;
123 
124  std::vector<char> intermediateReceiveBuffer;
125 
126  public:
127 
128  EventManager(const std::string& host) : EventManager(host, "mico", "mico") {};
129 
130  EventManager(const std::string& host, const std::string& user, const std::string& password) : EventManager(host, 5672, 8080, user, password) {};
131 
135  EventManager(const std::string& host, int rabbitPort, int marmottaPort, const std::string& user, const std::string& password);
136 
140  virtual ~EventManager();
141 
148 
156  void onData(AMQP::Connection *connection, const char *data, size_t size);
157 
172  void onError(AMQP::Connection *connection, const char *message);
173 
188  void onConnected(AMQP::Connection *connection);
189 
198  void onClosed(AMQP::Connection *connection);
199 
205  void registerService(AnalysisService* service);
206 
207 
213  void unregisterService(AnalysisService* service);
214 
222 
223  };
224 
225  }
226 }
227 #endif
mico::persistence::PersistenceService * getPersistenceService()
Return a reference to the persistence service used by this event manager, e.g.
Definition: EventManager.cpp:466
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in c...
Definition: http_client.cpp:23
void registerService(AnalysisService *service)
Register the given service with the event manager.
Definition: EventManager.cpp:490
void onConnected(AMQP::Connection *connection)
Method that is called when the login attempt succeeded.
Definition: EventManager.cpp:390
void injectContentItem(const mico::persistence::ContentItem &item)
Trigger analysis of the given content item.
Definition: EventManager.cpp:556
Main service for accessing the MICO persistence API.
Definition: PersistenceService.hpp:41
This exception is thrown by the event manager in case a method call failed.
Definition: EventManager.hpp:71
void onData(AMQP::Connection *connection, const char *data, size_t size)
Method that is called by the AMQP library every time it has data available that should be sent to Rab...
Definition: EventManager.cpp:372
virtual ~EventManager()
Shut down the event manager, cleaning up and closing any registered channels, services and connection...
Definition: EventManager.cpp:308
void onClosed(AMQP::Connection *connection)
Method that is called when the connection was closed.
Definition: EventManager.cpp:480
Representation of a ContentItem.
Definition: ContentItem.hpp:76
Definition: EventManager.hpp:80
Definition: EventManager.cpp:69
void onError(AMQP::Connection *connection, const char *message)
When the connection ends up in an error state this method is called.
Definition: EventManager.cpp:385
void unregisterService(AnalysisService *service)
Unregister the service with the given ID.
Definition: EventManager.cpp:525
Interface to be implemented by services.
Definition: AnalysisService.hpp:18
Definition: EventManager.hpp:57