00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_REACTIONENGINE_HEADER__
00021 #define __PION_REACTIONENGINE_HEADER__
00022
00023 #include <string>
00024 #include <libxml/tree.h>
00025 #include <boost/bind.hpp>
00026 #include <boost/function/function0.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/platform/Event.hpp>
00030 #include <pion/platform/Reactor.hpp>
00031 #include <pion/platform/PluginConfig.hpp>
00032 #include <pion/platform/ReactionScheduler.hpp>
00033
00034 namespace pion {
00035 namespace platform {
00036
00037
00038 class VocabularyManager;
00039 class CodecFactory;
00040 class ProtocolFactory;
00041 class DatabaseManager;
00042
00047 class PION_PLATFORM_API ReactionEngine :
00048 public PluginConfig<Reactor>
00049 {
00050 public:
00051
00053 class ReactorNotFoundException : public PionException {
00054 public:
00055 ReactorNotFoundException(const std::string& reactor_id)
00056 : PionException("No reactors found for identifier: ", reactor_id) {}
00057 };
00058
00060 class ConnectionNotFoundException : public PionException {
00061 public:
00062 ConnectionNotFoundException(const std::string& connection_id)
00063 : PionException("No connections found for identifier: ", connection_id) {}
00064 };
00065
00067 class WorkspaceNotFoundException : public PionException {
00068 public:
00069 WorkspaceNotFoundException(const std::string& workspace_id)
00070 : PionException("No Workspace found for identifier: ", workspace_id) {}
00071 };
00072
00074 class RemoveNonEmptyWorkspaceException : public PionException {
00075 public:
00076 RemoveNonEmptyWorkspaceException(const std::string& workspace_id)
00077 : PionException("The Workspace specified for removal was not empty: ", workspace_id) {}
00078 };
00079
00081 class EmptyConnectionIdException : public PionException {
00082 public:
00083 EmptyConnectionIdException(const std::string& config_file)
00084 : PionException("Configuration file includes a connection with an empty identifier: ", config_file) {}
00085 };
00086
00088 class BadConnectionTypeException : public PionException {
00089 public:
00090 BadConnectionTypeException(const std::string& connection_id)
00091 : PionException("Bad connection type in configuration file: ", connection_id) {}
00092 };
00093
00095 class EmptyFromException : public PionException {
00096 public:
00097 EmptyFromException(const std::string& connection_id)
00098 : PionException("Reactor configuration has a connection with empty From element: ", connection_id) {}
00099 };
00100
00102 class EmptyToException : public PionException {
00103 public:
00104 EmptyToException(const std::string& connection_id)
00105 : PionException("Reactor configuration has a connection with empty To element: ", connection_id) {}
00106 };
00107
00109 class AddConnectionConfigException : public PionException {
00110 public:
00111 AddConnectionConfigException(const std::string& connection)
00112 : PionException("Unable to add a Connection to the Reactor configuration file: ", connection) {}
00113 };
00114
00116 class RemoveConnectionConfigException : public PionException {
00117 public:
00118 RemoveConnectionConfigException(const std::string& connection)
00119 : PionException("Unable to remove a Connection from the Reactor configuration file: ", connection) {}
00120 };
00121
00123 class BadConnectionConfigException : public std::exception {
00124 public:
00125 virtual const char* what() const throw() {
00126 return "New Reactor connection configuration is invalid";
00127 }
00128 };
00129
00131 class BadWorkspaceConfigException : public std::exception {
00132 public:
00133 virtual const char* what() const throw() {
00134 return "New Reactor Workspace configuration is invalid";
00135 }
00136 };
00137
00139 class AddWorkspaceConfigException : public PionException {
00140 public:
00141 AddWorkspaceConfigException()
00142 : PionException("Unable to add a Workspace to the Reactor configuration file") {}
00143 };
00144
00146 class SetWorkspaceConfigException : public PionException {
00147 public:
00148 SetWorkspaceConfigException()
00149 : PionException("Error setting the configuration for a Workspace") {}
00150 };
00151
00153 class UpdateConfigOptionException : public PionException {
00154 public:
00155 UpdateConfigOptionException(const std::string& reactor_id)
00156 : PionException("updateConfigOption failed for Reactor with identifier ", reactor_id) {}
00157 };
00158
00159
00167 ReactionEngine(VocabularyManager& vocab_mgr,
00168 CodecFactory& codec_factory,
00169 ProtocolFactory& protocol_factory,
00170 DatabaseManager& database_mgr);
00171
00173 virtual ~ReactionEngine() { shutdown(); }
00174
00176 virtual void openConfigFile(void);
00177
00183 void clearReactorStats(const std::string& reactor_id);
00184
00186 void start(void);
00187
00189 void stop(void);
00190
00192 void shutdown(void);
00193
00195 void clearStats(void);
00196
00198 void updateCodecs(void);
00199
00201 void updateDatabases(void);
00202
00204 void updateProtocols(void);
00205
00207 void restartReactorsThatShouldBeRunning(void);
00208
00214 void startReactor(const std::string& reactor_id);
00215
00221 void stopReactor(const std::string& reactor_id);
00222
00230 void setReactorConfig(const std::string& reactor_id,
00231 const xmlNodePtr config_ptr);
00232
00240 void setReactorLocation(const std::string& reactor_id,
00241 const xmlNodePtr config_ptr);
00242
00251 std::string addReactor(const xmlNodePtr config_ptr);
00252
00258 void removeReactor(const std::string& reactor_id);
00259
00275 Reactor *addTempConnectionIn(const std::string& reactor_id,
00276 const std::string& connection_id,
00277 const std::string& connection_info,
00278 boost::function0<void> removed_handler);
00279
00290 void addTempConnectionOut(const std::string& reactor_id,
00291 const std::string& connection_id,
00292 const std::string& connection_info,
00293 Reactor::EventHandler connection_handler);
00294
00300 void removeTempConnection(const std::string& connection_id);
00301
00310 std::string addReactorConnection(const std::string& from_id, const std::string& to_id);
00311
00319 std::string addReactorConnection(const xmlNodePtr config_ptr);
00320
00327 void removeReactorConnection(const std::string& from_id, const std::string& to_id);
00328
00334 void removeReactorConnection(const std::string& connection_id);
00335
00344 std::string addWorkspace(const char* content_buf, std::size_t content_length);
00345
00351 void removeWorkspace(const std::string& workspace_id);
00352
00358 void removeReactorsFromWorkspace(const std::string& workspace_id);
00359
00367 void setWorkspaceConfig(const std::string& workspace_id, const char* content_buf, std::size_t content_length);
00368
00377 void writeStatsXML(std::ostream& out, const std::string& reactor_id = "", const bool details = false);
00378
00386 void writeConnectionsXML(std::ostream& out, const std::string& only_id) const;
00387
00393 inline void writeConnectionsXML(std::ostream& out) const {
00394 std::string empty_only_id;
00395 writeConnectionsXML(out, empty_only_id);
00396 }
00397
00406 bool writeWorkspaceXML(std::ostream& out, const std::string& workspace_id) const;
00407
00413 void writeWorkspacesXML(std::ostream& out) const;
00414
00423 bool writeWorkspaceLimitedConfigXML(std::ostream& out, const std::string& workspace_id) const;
00424
00432 bool hasWorkspace(const std::string& workspace_id) const;
00433
00442 bool creationAllowed(xmlNodePtr permission_config_ptr, xmlNodePtr config_ptr) const;
00443
00453 bool updateAllowed(xmlNodePtr permission_config_ptr, const std::string& id, xmlNodePtr config_ptr) const;
00454
00463 bool removalAllowed(xmlNodePtr permission_config_ptr, const std::string& id) const;
00464
00473 bool accessAllowed(xmlNodePtr permission_config_ptr, const std::string& reactor_id) const;
00474
00476 std::string getPermissionType(void) const { return REACTORS_PERMISSION_TYPE; }
00477
00486 static xmlNodePtr createReactorConfig(const char *buf, std::size_t len) {
00487 return ConfigManager::createResourceConfig(Reactor::REACTOR_ELEMENT_NAME, buf, len);
00488 }
00489
00498 static xmlNodePtr createConnectionConfig(const char *buf, std::size_t len) {
00499 return ConfigManager::createResourceConfig(CONNECTION_ELEMENT_NAME, buf, len);
00500 }
00501
00508 inline void send(const std::string& reactor_id, EventPtr& e) {
00509 Reactor *reactor_ptr = m_plugins.get(reactor_id);
00510 if (reactor_ptr == NULL)
00511 throw ReactorNotFoundException(reactor_id);
00512 m_scheduler.post(boost::bind<void>(boost::ref(*reactor_ptr), e));
00513 }
00514
00525 template <typename F>
00526 inline boost::signals::connection subscribe(const std::string& reactor_id, const std::string& signal_id, F f) {
00527 Reactor *reactor_ptr = m_plugins.get(reactor_id);
00528 if (reactor_ptr == NULL)
00529 throw ReactorNotFoundException(reactor_id);
00530 return reactor_ptr->subscribe(signal_id, f);
00531 }
00532
00541 inline void query(const std::string& reactor_id, std::ostream& out,
00542 const Reactor::QueryBranches& branches, const Reactor::QueryParams& qp)
00543 {
00544 if (branches.size() < 3) {
00545
00546 writeStatsXML(out, reactor_id, true);
00547 } else {
00548 Reactor *reactor_ptr = m_plugins.get(reactor_id);
00549 if (reactor_ptr == NULL)
00550 throw ReactorNotFoundException(reactor_id);
00551 reactor_ptr->query(out, branches, qp);
00552 }
00553 }
00554
00560 inline boost::uint64_t getTotalOperations(void) const {
00561 return m_plugins.getStatistic(boost::bind(&Reactor::getEventsIn, _1));
00562 }
00563
00570 inline boost::uint64_t getEventsIn(const std::string& reactor_id) const {
00571 return m_plugins.getStatistic(reactor_id, boost::bind(&Reactor::getEventsIn, _1));
00572 }
00573
00580 inline boost::uint64_t getEventsOut(const std::string& reactor_id) const {
00581 return m_plugins.getStatistic(reactor_id, boost::bind(&Reactor::getEventsOut, _1));
00582 }
00583
00585 inline std::size_t getEventsQueued(void) const { return m_scheduler.getQueueSize(); }
00586
00593 inline bool isRunning(const std::string& reactor_id) const {
00594 const Reactor *reactor_ptr = m_plugins.get(reactor_id);
00595 if (reactor_ptr == NULL)
00596 throw ReactorNotFoundException(reactor_id);
00597 return reactor_ptr->isRunning();
00598 }
00599
00605 template<typename WorkFunction>
00606 inline void post(WorkFunction work_func) { m_scheduler.post(work_func); }
00607
00609 inline boost::uint32_t getNumThreads(void) const { return m_scheduler.getNumThreads(); }
00610
00612 inline void setNumThreads(const boost::uint32_t n) { m_scheduler.setNumThreads(n); }
00613
00615 inline bool getMultithreadBranches(void) const { return m_multithread_branches; }
00616
00618 inline void setMultithreadBranches(bool b) { m_multithread_branches = b; }
00619
00621 inline bool isRunning(void) const { return m_is_running; }
00622
00623
00624 private:
00625
00627 struct TempConnection {
00638 TempConnection(bool output_connection,
00639 const std::string& reactor_id,
00640 const std::string& connection_id,
00641 const std::string& connection_info,
00642 boost::function0<void> removed_handler)
00643 : m_output_connection(output_connection), m_reactor_id(reactor_id),
00644 m_connection_id(connection_id), m_connection_info(connection_info),
00645 m_removed_handler(removed_handler)
00646 {}
00647
00649 ~TempConnection() {}
00650
00651 const bool m_output_connection;
00652 const std::string m_reactor_id;
00653 const std::string m_connection_id;
00654 const std::string m_connection_info;
00655 boost::function0<void> m_removed_handler;
00656 };
00657
00659 typedef std::list<TempConnection> TempConnectionList;
00660
00662 struct ReactorConnection {
00670 ReactorConnection(const std::string& connection_id,
00671 const std::string& from_id,
00672 const std::string& to_id)
00673 : m_connection_id(connection_id), m_from_id(from_id), m_to_id(to_id)
00674 {}
00675
00677 ~ReactorConnection() {}
00678
00679 const std::string m_connection_id;
00680 const std::string m_from_id;
00681 const std::string m_to_id;
00682 };
00683
00685 typedef std::list<ReactorConnection> ReactorConnectionList;
00686
00687
00699 virtual void addPluginNoLock(const std::string& plugin_id,
00700 const std::string& plugin_name,
00701 const xmlNodePtr config_ptr)
00702 {
00703 try {
00704 Reactor *reactor_ptr = m_plugins.load(plugin_id, plugin_name);
00705 reactor_ptr->setId(plugin_id);
00706 reactor_ptr->setScheduler(m_scheduler);
00707 reactor_ptr->setMultithreadBranches(m_multithread_branches);
00708 reactor_ptr->setCodecFactory(m_codec_factory);
00709 reactor_ptr->setProtocolFactory(m_protocol_factory);
00710 reactor_ptr->setDatabaseManager(m_database_mgr);
00711 reactor_ptr->setReactionEngine(*this);
00712 if (config_ptr != NULL) {
00713 VocabularyPtr vocab_ptr(m_vocab_mgr.getVocabulary());
00714 reactor_ptr->setConfig(*vocab_ptr, config_ptr);
00715 }
00716 try {
00717 reactor_ptr->startOutRunning(config_ptr, true);
00718 } catch (std::exception& e) {
00719
00720 PION_LOG_ERROR(m_logger, e.what());
00721 }
00722 } catch (PionPlugin::PluginNotFoundException&) {
00723 throw;
00724 } catch (std::exception& e) {
00725 throw PluginException(e.what());
00726 }
00727 }
00728
00735 static inline std::string getConnectionAsText(const std::string& from_id,
00736 const std::string& to_id)
00737 {
00738 std::string result(from_id);
00739 result += " -> ";
00740 result += to_id;
00741 return result;
00742 }
00743
00751 void addConnectionNoLock(const std::string& connection_id,
00752 const std::string& from_id,
00753 const std::string& to_id);
00754
00761 void removeConnectionNoLock(const std::string& reactor_id,
00762 const std::string& connection_id);
00763
00770 void removeConnectionConfigNoLock(const std::string& from_id,
00771 const std::string& to_id);
00772
00774 void stopNoLock(void);
00775
00777 void setWorkspaceConfig(xmlNodePtr workspace_node_ptr, const char* content_buf, std::size_t content_length);
00778
00779
00781 static const boost::uint32_t DEFAULT_NUM_THREADS;
00782
00784 static const std::string DEFAULT_CONFIG_FILE;
00785
00787 static const std::string CONNECTION_ELEMENT_NAME;
00788
00790 static const std::string TYPE_ELEMENT_NAME;
00791
00793 static const std::string FROM_ELEMENT_NAME;
00794
00796 static const std::string TO_ELEMENT_NAME;
00797
00799 static const std::string TOTAL_OPS_ELEMENT_NAME;
00800
00802 static const std::string EVENTS_QUEUED_ELEMENT_NAME;
00803
00805 static const std::string CONNECTION_TYPE_REACTOR;
00806
00808 static const std::string CONNECTION_TYPE_INPUT;
00809
00811 static const std::string CONNECTION_TYPE_OUTPUT;
00812
00814 static const std::string REACTORS_PERMISSION_TYPE;
00815
00817 static const std::string UNRESTRICTED_ELEMENT_NAME;
00818
00820 static const std::string WORKSPACE_QUALIFIER_ELEMENT_NAME;
00821
00823 ReactionScheduler m_scheduler;
00824
00826 CodecFactory & m_codec_factory;
00827
00829 ProtocolFactory & m_protocol_factory;
00830
00832 DatabaseManager & m_database_mgr;
00833
00835 TempConnectionList m_temp_connections;
00836
00838 ReactorConnectionList m_reactor_connections;
00839
00841 boost::signals::scoped_connection m_codec_connection;
00842
00844 boost::signals::scoped_connection m_db_connection;
00845
00847 boost::signals::scoped_connection m_protocol_connection;
00848
00850 bool m_is_running;
00851
00854 bool m_multithread_branches;
00855 };
00856
00857
00858 }
00859 }
00860
00861 #endif