MICO Platform
 All Classes Namespaces Functions Variables Friends
EventManager.hpp
1 #ifndef HAVE_EVENT_MANAGER_H
2 #define HAVE_EVENT_MANAGER_H 1
3 
4 #include <string>
5 #include <map>
6 #include <thread>
7 #include <mutex>
8 #include <condition_variable>
9 
10 // network I/O
11 #include <boost/asio.hpp>
12 
13 #include <amqpcpp.h>
14 #include "rdf_model.hpp"
15 #include "Item.hpp"
16 #include "Uri.hpp"
17 #include "PersistenceService.hpp"
18 #include "AnalysisService.hpp"
19 #include "Event.pb.h"
20 
21 namespace mico {
22  namespace event {
23 
28  static const std::string EXCHANGE_SERVICE_REGISTRY = "service_registry";
34  static const std::string EXCHANGE_SERVICE_DISCOVERY = "service_discovery";
35 
39  static const std::string QUEUE_CONTENT_INPUT = "content_input";
40 
44  static const std::string QUEUE_CONTENT_OUTPUT = "content_output";
45 
49  static const std::string QUEUE_CONFIG_REQUEST = "config_request";
50 
51 
52  class AnalysisConsumer;
53  class DiscoveryConsumer;
54  class RabbitConnectionHandler;
55  class ConfigurationClient;
56 
57  // Bug: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/25
58  // Got fixed with version 2.2.0
60  private:
61  bool firstCall;
62  std::mutex firstCallMutex;
63 
64  public:
66  bool isFirstCall();
67  };
68 
70  {
71  private:
72  AnalysisService& m_service;
73  const AMQP::Message &m_message;
74  AMQP::Channel* m_channel;
75 
76  public:
77  AnalysisResponse(AnalysisService& service, const AMQP::Message &message, AMQP::Channel* channel):
78  m_service(service), m_message(message), m_channel(channel) {}
79 
80  void sendFinish( std::shared_ptr< mico::persistence::model::Item > i);
81  void sendErrorMessage(std::shared_ptr< mico::persistence::model::Item > i, const mico::event::model::ErrorCodes& errcode, const std::string& msg, const std::string& desc);
82  void sendProgress( std::shared_ptr< mico::persistence::model::Item > i, const mico::persistence::model::URI& part, const float& progress);
83  void sendNew( std::shared_ptr< mico::persistence::model::Item > i, const mico::persistence::model::URI& part);
84  };
85 
90  private:
91  std::string message;
92  public:
93  EventManagerException(std::string message) : message(message) {};
94 
95  const std::string& getMessage() { return message; };
96  };
97 
98  class EventManager : public AMQP::ConnectionHandler
99  {
100  private:
101  boost::asio::
102  io_service io_service;
103 
104  boost::asio::ip::tcp::
105  socket socket;
106 
107  std::string host;
108  int rabbitPort;
109  int marmottaPort;
110 
111  std::string user;
112  std::string password;
113 
114  bool connected;
115  bool unavailable;
116 
118  PersistenceService *persistence = NULL;
119 
120  size_t recv_len;
121  char* recv_buf;
122 
123 
124  RabbitConnectionHandler* rabbit;
125 
126  AMQP::Connection* connection;
127  AMQP::Channel* channel;
128 
129  std::map<AnalysisService*, AnalysisConsumer*> services;
130 
131  std::thread receiver;
132  std::mutex m;
133  std::condition_variable cv;
134 
135  void doConnect();
136  void doRead();
137 
138  std::string stripPatchVersion(std::string ver);
139 
140  ConfigurationClient *configurationClient = NULL; //<! configuration service
141 
142  AMQPCPPOnCloseBugfix amqpWorkaround;
143 
144  std::vector<char> intermediateReceiveBuffer;
145 
146  public:
147 
148  EventManager(const std::string& host) : EventManager(host, "mico", "mico") {};
149 
150  EventManager(const std::string& host, const std::string& user, const std::string& password) : EventManager(host, 5672, 8080, user, password) {};
151 
155  EventManager(const std::string& host, int rabbitPort, int marmottaPort, const std::string& user, const std::string& password);
156 
160  virtual ~EventManager();
161 
168 
176  void onData(AMQP::Connection *connection, const char *data, size_t size);
177 
192  void onError(AMQP::Connection *connection, const char *message);
193 
208  void onConnected(AMQP::Connection *connection);
209 
218  void onClosed(AMQP::Connection *connection);
219 
225  void registerService(AnalysisService* service);
226 
227 
233  void unregisterService(AnalysisService* service);
234 
241  void injectItem(std::shared_ptr< mico::persistence::model::Item > item);
242  };
243 
244  }
245 }
246 #endif
mico::persistence::PersistenceService * getPersistenceService()
Return a reference to the persistence service used by this event manager, e.g.
Definition: EventManager.cpp:641
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in c...
Definition: http_client.cpp:23
void registerService(AnalysisService *service)
Register the given service with the event manager.
Definition: EventManager.cpp:665
void onConnected(AMQP::Connection *connection)
Method that is called when the login attempt succeeded.
Definition: EventManager.cpp:560
Main service for accessing the MICO persistence API.
Definition: PersistenceService.hpp:51
This exception is thrown by the event manager in case a method call failed.
Definition: EventManager.hpp:89
void onData(AMQP::Connection *connection, const char *data, size_t size)
Method that is called by the AMQP library every time it has data available that should be sent to Rab...
Definition: EventManager.cpp:542
virtual ~EventManager()
Shut down the event manager, cleaning up and closing any registered channels, services and connection...
Definition: EventManager.cpp:470
void onClosed(AMQP::Connection *connection)
Method that is called when the connection was closed.
Definition: EventManager.cpp:655
Definition: EventManager.hpp:98
Definition: EventManager.cpp:70
Definition: EventManager.hpp:69
void onError(AMQP::Connection *connection, const char *message)
When the connection ends up in an error state this method is called.
Definition: EventManager.cpp:555
void unregisterService(AnalysisService *service)
Unregister the service with the given ID.
Definition: EventManager.cpp:708
Interface to be implemented by services.
Definition: AnalysisService.hpp:23
void injectItem(std::shared_ptr< mico::persistence::model::Item > item)
Trigger analysis of the given content item.
Definition: EventManager.cpp:742
A URI.
Definition: Uri.hpp:22
Definition: EventManager.hpp:59