1 #ifndef HAVE_EVENT_MANAGER_H
2 #define HAVE_EVENT_MANAGER_H 1
8 #include <condition_variable>
11 #include <boost/asio.hpp>
14 #include "rdf_model.hpp"
17 #include "PersistenceService.hpp"
18 #include "AnalysisService.hpp"
28 static const std::string EXCHANGE_SERVICE_REGISTRY =
"service_registry";
34 static const std::string EXCHANGE_SERVICE_DISCOVERY =
"service_discovery";
39 static const std::string QUEUE_CONTENT_INPUT =
"content_input";
44 static const std::string QUEUE_CONTENT_OUTPUT =
"content_output";
49 static const std::string QUEUE_CONFIG_REQUEST =
"config_request";
52 class AnalysisConsumer;
53 class DiscoveryConsumer;
54 class RabbitConnectionHandler;
55 class ConfigurationClient;
62 std::mutex firstCallMutex;
73 const AMQP::Message &m_message;
74 AMQP::Channel* m_channel;
78 m_service(service), m_message(message), m_channel(channel) {}
80 void sendFinish( std::shared_ptr< mico::persistence::model::Item > i);
81 void sendErrorMessage(std::shared_ptr< mico::persistence::model::Item > i,
const mico::event::model::ErrorCodes& errcode,
const std::string& msg,
const std::string& desc);
95 const std::string& getMessage() {
return message; };
102 io_service io_service;
104 boost::asio::ip::tcp::
112 std::string password;
124 RabbitConnectionHandler* rabbit;
126 AMQP::Connection* connection;
127 AMQP::Channel* channel;
129 std::map<AnalysisService*, AnalysisConsumer*> services;
131 std::thread receiver;
133 std::condition_variable cv;
138 std::string stripPatchVersion(std::string ver);
144 std::vector<char> intermediateReceiveBuffer;
150 EventManager(
const std::string& host,
const std::string& user,
const std::string& password) :
EventManager(host, 5672, 8080, user, password) {};
155 EventManager(
const std::string& host,
int rabbitPort,
int marmottaPort,
const std::string& user,
const std::string& password);
176 void onData(AMQP::Connection *connection,
const char *data,
size_t size);
192 void onError(AMQP::Connection *connection,
const char *message);
218 void onClosed(AMQP::Connection *connection);
241 void injectItem(std::shared_ptr< mico::persistence::model::Item > item);
mico::persistence::PersistenceService * getPersistenceService()
Return a reference to the persistence service used by this event manager, e.g.
Definition: EventManager.cpp:641
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in c...
Definition: http_client.cpp:23
void registerService(AnalysisService *service)
Register the given service with the event manager.
Definition: EventManager.cpp:665
void onConnected(AMQP::Connection *connection)
Method that is called when the login attempt succeeded.
Definition: EventManager.cpp:560
Main service for accessing the MICO persistence API.
Definition: PersistenceService.hpp:51
This exception is thrown by the event manager in case a method call failed.
Definition: EventManager.hpp:89
void onData(AMQP::Connection *connection, const char *data, size_t size)
Method that is called by the AMQP library every time it has data available that should be sent to Rab...
Definition: EventManager.cpp:542
virtual ~EventManager()
Shut down the event manager, cleaning up and closing any registered channels, services and connection...
Definition: EventManager.cpp:470
void onClosed(AMQP::Connection *connection)
Method that is called when the connection was closed.
Definition: EventManager.cpp:655
Definition: EventManager.hpp:98
Definition: EventManager.cpp:70
Definition: EventManager.hpp:69
void onError(AMQP::Connection *connection, const char *message)
When the connection ends up in an error state this method is called.
Definition: EventManager.cpp:555
void unregisterService(AnalysisService *service)
Unregister the service with the given ID.
Definition: EventManager.cpp:708
Interface to be implemented by services.
Definition: AnalysisService.hpp:23
void injectItem(std::shared_ptr< mico::persistence::model::Item > item)
Trigger analysis of the given content item.
Definition: EventManager.cpp:742
A URI.
Definition: Uri.hpp:22
Definition: EventManager.hpp:59