MICO Platform  1.0.0
 All Classes Namespaces Functions Variables Friends
EventManager.hpp
1 #ifndef HAVE_EVENT_MANAGER_H
2 #define HAVE_EVENT_MANAGER_H 1
3 
4 #include <string>
5 #include <map>
6 #include <thread>
7 #include <mutex>
8 #include <condition_variable>
9 
10 // network I/O
11 #include <boost/asio.hpp>
12 
13 #include "amqpcpp.h"
14 #include "rdf_model.hpp"
15 #include "ContentItem.hpp"
16 #include "PersistenceService.hpp"
17 #include "AnalysisService.hpp"
18 
19 namespace mico {
20  namespace event {
21 
26  static const std::string EXCHANGE_SERVICE_REGISTRY = "service_registry";
32  static const std::string EXCHANGE_SERVICE_DISCOVERY = "service_discovery";
33 
37  static const std::string QUEUE_CONTENT_INPUT = "content_input";
38 
42  static const std::string QUEUE_CONTENT_OUTPUT = "content_output";
43 
44 
45  class AnalysisConsumer;
46  class DiscoveryConsumer;
47  class RabbitConnectionHandler;
48 
53  private:
54  std::string message;
55  public:
56  EventManagerException(std::string message) : message(message) {};
57 
58  const std::string& getMessage() { return message; };
59  };
60 
61  class EventManager : public AMQP::ConnectionHandler
62  {
63  private:
64  boost::asio::
65  io_service io_service;
66 
67  boost::asio::ip::tcp::
68  socket socket;
69 
70  std::string host;
71  int rabbitPort;
72  int marmottaPort;
73 
74  std::string user;
75  std::string password;
76 
77  bool connected;
78  bool unavailable;
79 
81  PersistenceService persistence;
82 
83  size_t recv_len;
84  char* recv_buf;
85 
86 
87  RabbitConnectionHandler* rabbit;
88 
89  AMQP::Connection* connection;
90  AMQP::Channel* channel;
91 
92  std::map<AnalysisService*, AnalysisConsumer*> services;
93 
94  std::thread receiver;
95  std::mutex m;
96  std::condition_variable cv;
97 
98  void doConnect();
99  void doRead();
100  public:
101 
102  EventManager(const std::string& host) : EventManager(host, "mico", "mico") {};
103 
104  EventManager(const std::string& host, const std::string& user, const std::string& password) : EventManager(host, 5672, 8080, user, password) {};
105 
109  EventManager(const std::string& host, int rabbitPort, int marmottaPort, const std::string& user, const std::string& password);
110 
114  virtual ~EventManager();
115 
121  PersistenceService& getPersistenceService() { return persistence; };
122 
130  void onData(AMQP::Connection *connection, const char *data, size_t size);
131 
146  void onError(AMQP::Connection *connection, const char *message);
147 
162  void onConnected(AMQP::Connection *connection);
163 
172  void onClosed(AMQP::Connection *connection);
173 
179  void registerService(AnalysisService* service);
180 
181 
187  void unregisterService(AnalysisService* service);
188 
196 
197  };
198 
199  }
200 }
201 #endif
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in c...
Definition: http_client.cpp:23
mico::persistence::PersistenceService & getPersistenceService()
Return a reference to the persistence service used by this event manager, e.g.
Definition: EventManager.hpp:121
void registerService(AnalysisService *service)
Register the given service with the event manager.
Definition: EventManager.cpp:319
void onConnected(AMQP::Connection *connection)
Method that is called when the login attempt succeeded.
Definition: EventManager.cpp:245
void injectContentItem(const mico::persistence::ContentItem &item)
Trigger analysis of the given content item.
Definition: EventManager.cpp:385
Main service for accessing the MICO persistence API.
Definition: PersistenceService.hpp:41
This exception is thrown by the event manager in case a method call failed.
Definition: EventManager.hpp:52
void onData(AMQP::Connection *connection, const char *data, size_t size)
Method that is called by the AMQP library every time it has data available that should be sent to Rab...
Definition: EventManager.cpp:227
virtual ~EventManager()
Shut down the event manager, cleaning up and closing any registered channels, services and connection...
Definition: EventManager.cpp:176
void onClosed(AMQP::Connection *connection)
Method that is called when the connection was closed.
Definition: EventManager.cpp:309
Representation of a ContentItem.
Definition: ContentItem.hpp:76
Definition: EventManager.hpp:61
void onError(AMQP::Connection *connection, const char *message)
When the connection ends up in an error state this method is called.
Definition: EventManager.cpp:240
void unregisterService(AnalysisService *service)
Unregister the service with the given ID.
Definition: EventManager.cpp:354
Interface to be implemented by services.
Definition: AnalysisService.hpp:18