platform/include/pion/platform/ReactionEngine.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_REACTIONENGINE_HEADER__
00021 #define __PION_REACTIONENGINE_HEADER__
00022 
00023 #include <string>
00024 #include <libxml/tree.h>
00025 #include <boost/bind.hpp>
00026 #include <boost/function/function0.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/platform/Event.hpp>
00030 #include <pion/platform/Reactor.hpp>
00031 #include <pion/platform/PluginConfig.hpp>
00032 #include <pion/platform/ReactionScheduler.hpp>
00033 
00034 namespace pion {        // begin namespace pion
00035 namespace platform {    // begin namespace platform (Pion Platform Library)
00036 
00037 // forward declarations
00038 class VocabularyManager;
00039 class CodecFactory;
00040 class ProtocolFactory;
00041 class DatabaseManager;
00042     
00047 class PION_PLATFORM_API ReactionEngine :
00048     public PluginConfig<Reactor>
00049 {
00050 public:
00051 
00053     class ReactorNotFoundException : public PionException {
00054     public:
00055         ReactorNotFoundException(const std::string& reactor_id)
00056             : PionException("No reactors found for identifier: ", reactor_id) {}
00057     };
00058     
00060     class ConnectionNotFoundException : public PionException {
00061     public:
00062         ConnectionNotFoundException(const std::string& connection_id)
00063             : PionException("No connections found for identifier: ", connection_id) {}
00064     };
00065 
00067     class WorkspaceNotFoundException : public PionException {
00068     public:
00069         WorkspaceNotFoundException(const std::string& workspace_id)
00070             : PionException("No Workspace found for identifier: ", workspace_id) {}
00071     };
00072 
00074     class RemoveNonEmptyWorkspaceException : public PionException {
00075     public:
00076         RemoveNonEmptyWorkspaceException(const std::string& workspace_id)
00077             : PionException("The Workspace specified for removal was not empty: ", workspace_id) {}
00078     };
00079 
00081     class EmptyConnectionIdException : public PionException {
00082     public:
00083         EmptyConnectionIdException(const std::string& config_file)
00084             : PionException("Configuration file includes a connection with an empty identifier: ", config_file) {}
00085     };
00086 
00088     class BadConnectionTypeException : public PionException {
00089     public:
00090         BadConnectionTypeException(const std::string& connection_id)
00091             : PionException("Bad connection type in configuration file: ", connection_id) {}
00092     };
00093     
00095     class EmptyFromException : public PionException {
00096     public:
00097         EmptyFromException(const std::string& connection_id)
00098             : PionException("Reactor configuration has a connection with empty From element: ", connection_id) {}
00099     };
00100     
00102     class EmptyToException : public PionException {
00103     public:
00104         EmptyToException(const std::string& connection_id)
00105             : PionException("Reactor configuration has a connection with empty To element: ", connection_id) {}
00106     };
00107     
00109     class AddConnectionConfigException : public PionException {
00110     public:
00111         AddConnectionConfigException(const std::string& connection)
00112             : PionException("Unable to add a Connection to the Reactor configuration file: ", connection) {}
00113     };
00114     
00116     class RemoveConnectionConfigException : public PionException {
00117     public:
00118         RemoveConnectionConfigException(const std::string& connection)
00119             : PionException("Unable to remove a Connection from the Reactor configuration file: ", connection) {}
00120     };
00121 
00123     class BadConnectionConfigException : public std::exception {
00124     public:
00125         virtual const char* what() const throw() {
00126             return "New Reactor connection configuration is invalid";
00127         }
00128     };
00129 
00131     class BadWorkspaceConfigException : public std::exception {
00132     public:
00133         virtual const char* what() const throw() {
00134             return "New Reactor Workspace configuration is invalid";
00135         }
00136     };
00137 
00139     class AddWorkspaceConfigException : public PionException {
00140     public:
00141         AddWorkspaceConfigException()
00142             : PionException("Unable to add a Workspace to the Reactor configuration file") {}
00143     };
00144 
00146     class SetWorkspaceConfigException : public PionException {
00147     public:
00148         SetWorkspaceConfigException()
00149             : PionException("Error setting the configuration for a Workspace") {}
00150     };
00151 
00153     class UpdateConfigOptionException : public PionException {
00154     public:
00155         UpdateConfigOptionException(const std::string& reactor_id)
00156             : PionException("updateConfigOption failed for Reactor with identifier ", reactor_id) {}
00157     };
00158 
00159 
00167     ReactionEngine(VocabularyManager& vocab_mgr,
00168                    CodecFactory& codec_factory,
00169                    ProtocolFactory& protocol_factory,
00170                    DatabaseManager& database_mgr);
00171     
00173     virtual ~ReactionEngine() { shutdown();  }
00174     
00176     virtual void openConfigFile(void);
00177 
00183     void clearReactorStats(const std::string& reactor_id);
00184 
00186     void start(void);
00187     
00189     void stop(void);
00190     
00192     void shutdown(void);
00193 
00195     void clearStats(void);
00196 
00198     void updateCodecs(void);
00199 
00201     void updateDatabases(void);
00202 
00204     void updateProtocols(void);
00205 
00207     void restartReactorsThatShouldBeRunning(void);
00208     
00214     void startReactor(const std::string& reactor_id);
00215 
00221     void stopReactor(const std::string& reactor_id);
00222 
00230     void setReactorConfig(const std::string& reactor_id,
00231                           const xmlNodePtr config_ptr);
00232     
00240     void setReactorLocation(const std::string& reactor_id,
00241                             const xmlNodePtr config_ptr);
00242 
00251     std::string addReactor(const xmlNodePtr config_ptr);
00252     
00258     void removeReactor(const std::string& reactor_id);
00259     
00275     Reactor *addTempConnectionIn(const std::string& reactor_id, 
00276                                  const std::string& connection_id,
00277                                  const std::string& connection_info,
00278                                  boost::function0<void> removed_handler);
00279     
00290     void addTempConnectionOut(const std::string& reactor_id, 
00291                               const std::string& connection_id,
00292                               const std::string& connection_info,
00293                               Reactor::EventHandler connection_handler);
00294     
00300     void removeTempConnection(const std::string& connection_id);
00301 
00310     std::string addReactorConnection(const std::string& from_id, const std::string& to_id);
00311     
00319     std::string addReactorConnection(const xmlNodePtr config_ptr);
00320     
00327     void removeReactorConnection(const std::string& from_id, const std::string& to_id);
00328     
00334     void removeReactorConnection(const std::string& connection_id);
00335 
00344     std::string addWorkspace(const char* content_buf, std::size_t content_length);
00345 
00351     void removeWorkspace(const std::string& workspace_id);
00352 
00358     void removeReactorsFromWorkspace(const std::string& workspace_id);
00359 
00367     void setWorkspaceConfig(const std::string& workspace_id, const char* content_buf, std::size_t content_length);
00368 
00377     void writeStatsXML(std::ostream& out, const std::string& reactor_id = "", const bool details = false);
00378 
00386     void writeConnectionsXML(std::ostream& out, const std::string& only_id) const;
00387 
00393     inline void writeConnectionsXML(std::ostream& out) const {
00394         std::string empty_only_id;
00395         writeConnectionsXML(out, empty_only_id);
00396     }
00397 
00406     bool writeWorkspaceXML(std::ostream& out, const std::string& workspace_id) const;
00407 
00413     void writeWorkspacesXML(std::ostream& out) const;
00414 
00423     bool writeWorkspaceLimitedConfigXML(std::ostream& out, const std::string& workspace_id) const;
00424 
00432     bool hasWorkspace(const std::string& workspace_id) const;
00433 
00442     bool creationAllowed(xmlNodePtr permission_config_ptr, xmlNodePtr config_ptr) const;
00443 
00453     bool updateAllowed(xmlNodePtr permission_config_ptr, const std::string& id, xmlNodePtr config_ptr) const;
00454 
00463     bool removalAllowed(xmlNodePtr permission_config_ptr, const std::string& id) const;
00464 
00473     bool accessAllowed(xmlNodePtr permission_config_ptr, const std::string& reactor_id) const;
00474 
00476     std::string getPermissionType(void) const { return REACTORS_PERMISSION_TYPE; }
00477 
00486     static xmlNodePtr createReactorConfig(const char *buf, std::size_t len) {
00487         return ConfigManager::createResourceConfig(Reactor::REACTOR_ELEMENT_NAME, buf, len);
00488     }
00489 
00498     static xmlNodePtr createConnectionConfig(const char *buf, std::size_t len) {
00499         return ConfigManager::createResourceConfig(CONNECTION_ELEMENT_NAME, buf, len);
00500     }
00501 
00508     inline void send(const std::string& reactor_id, EventPtr& e) {
00509         Reactor *reactor_ptr = m_plugins.get(reactor_id);
00510         if (reactor_ptr == NULL)
00511             throw ReactorNotFoundException(reactor_id);
00512         m_scheduler.post(boost::bind<void>(boost::ref(*reactor_ptr), e));
00513     }
00514 
00525     template <typename F>
00526     inline boost::signals::connection subscribe(const std::string& reactor_id, const std::string& signal_id, F f) {
00527         Reactor *reactor_ptr = m_plugins.get(reactor_id);
00528         if (reactor_ptr == NULL)
00529             throw ReactorNotFoundException(reactor_id);
00530         return reactor_ptr->subscribe(signal_id, f);
00531     }
00532 
00541     inline void query(const std::string& reactor_id, std::ostream& out,
00542         const Reactor::QueryBranches& branches, const Reactor::QueryParams& qp)
00543     {
00544         if (branches.size() < 3) {
00545             // query request for /query/<reactor_id>
00546             writeStatsXML(out, reactor_id, true);
00547         } else {
00548             Reactor *reactor_ptr = m_plugins.get(reactor_id);
00549             if (reactor_ptr == NULL)
00550                 throw ReactorNotFoundException(reactor_id);
00551             reactor_ptr->query(out, branches, qp);
00552         }
00553     }
00554 
00560     inline boost::uint64_t getTotalOperations(void) const {
00561         return m_plugins.getStatistic(boost::bind(&Reactor::getEventsIn, _1));
00562     }
00563     
00570     inline boost::uint64_t getEventsIn(const std::string& reactor_id) const {
00571         return m_plugins.getStatistic(reactor_id, boost::bind(&Reactor::getEventsIn, _1));
00572     }
00573     
00580     inline boost::uint64_t getEventsOut(const std::string& reactor_id) const {
00581         return m_plugins.getStatistic(reactor_id, boost::bind(&Reactor::getEventsOut, _1));
00582     }
00583     
00585     inline std::size_t getEventsQueued(void) const { return m_scheduler.getQueueSize(); }
00586 
00593     inline bool isRunning(const std::string& reactor_id) const {
00594         const Reactor *reactor_ptr = m_plugins.get(reactor_id);
00595         if (reactor_ptr == NULL)
00596             throw ReactorNotFoundException(reactor_id);
00597         return reactor_ptr->isRunning();
00598     }
00599 
00605     template<typename WorkFunction>
00606     inline void post(WorkFunction work_func) { m_scheduler.post(work_func); }
00607     
00609     inline boost::uint32_t getNumThreads(void) const { return m_scheduler.getNumThreads(); }
00610     
00612     inline void setNumThreads(const boost::uint32_t n) { m_scheduler.setNumThreads(n); }
00613     
00615     inline bool getMultithreadBranches(void) const { return m_multithread_branches; }
00616     
00618     inline void setMultithreadBranches(bool b) { m_multithread_branches = b; }
00619 
00621     inline bool isRunning(void) const { return m_is_running; }  
00622 
00623 
00624 private:
00625     
00627     struct TempConnection {
00638         TempConnection(bool output_connection,
00639                        const std::string& reactor_id,
00640                        const std::string& connection_id,
00641                        const std::string& connection_info,
00642                        boost::function0<void> removed_handler)
00643             : m_output_connection(output_connection), m_reactor_id(reactor_id),
00644             m_connection_id(connection_id), m_connection_info(connection_info),
00645             m_removed_handler(removed_handler)
00646         {}
00647         
00649         ~TempConnection() {}
00650         
00651         const bool                      m_output_connection;
00652         const std::string               m_reactor_id;
00653         const std::string               m_connection_id;
00654         const std::string               m_connection_info;
00655         boost::function0<void>          m_removed_handler;
00656     };
00657     
00659     typedef std::list<TempConnection>   TempConnectionList;
00660     
00662     struct ReactorConnection {
00670         ReactorConnection(const std::string& connection_id,
00671                           const std::string& from_id,
00672                           const std::string& to_id)
00673             : m_connection_id(connection_id), m_from_id(from_id), m_to_id(to_id)
00674         {}
00675         
00677         ~ReactorConnection() {}
00678         
00679         const std::string               m_connection_id;
00680         const std::string               m_from_id;
00681         const std::string               m_to_id;
00682     };
00683     
00685     typedef std::list<ReactorConnection>    ReactorConnectionList;
00686 
00687     
00699     virtual void addPluginNoLock(const std::string& plugin_id,
00700                                  const std::string& plugin_name,
00701                                  const xmlNodePtr config_ptr)
00702     {
00703         try {
00704             Reactor *reactor_ptr = m_plugins.load(plugin_id, plugin_name);
00705             reactor_ptr->setId(plugin_id);
00706             reactor_ptr->setScheduler(m_scheduler);
00707             reactor_ptr->setMultithreadBranches(m_multithread_branches);
00708             reactor_ptr->setCodecFactory(m_codec_factory);
00709             reactor_ptr->setProtocolFactory(m_protocol_factory);
00710             reactor_ptr->setDatabaseManager(m_database_mgr);
00711             reactor_ptr->setReactionEngine(*this);
00712             if (config_ptr != NULL) {
00713                 VocabularyPtr vocab_ptr(m_vocab_mgr.getVocabulary());
00714                 reactor_ptr->setConfig(*vocab_ptr, config_ptr);
00715             }
00716             try {
00717                 reactor_ptr->startOutRunning(config_ptr, true);
00718             } catch (std::exception& e) {
00719                 // log but don't propagate exceptions from startOutRunning()
00720                 PION_LOG_ERROR(m_logger, e.what());
00721             }
00722         } catch (PionPlugin::PluginNotFoundException&) {
00723             throw;
00724         } catch (std::exception& e) {
00725             throw PluginException(e.what());
00726         }
00727     }
00728 
00735     static inline std::string getConnectionAsText(const std::string& from_id,
00736                                                   const std::string& to_id)
00737     {
00738         std::string result(from_id);
00739         result += " -> ";
00740         result += to_id;
00741         return result;
00742     }
00743     
00751     void addConnectionNoLock(const std::string& connection_id,
00752                              const std::string& from_id,
00753                              const std::string& to_id);
00754     
00761     void removeConnectionNoLock(const std::string& reactor_id,
00762                                 const std::string& connection_id);
00763 
00770     void removeConnectionConfigNoLock(const std::string& from_id,
00771                                       const std::string& to_id);
00772     
00774     void stopNoLock(void);
00775 
00777     void setWorkspaceConfig(xmlNodePtr workspace_node_ptr, const char* content_buf, std::size_t content_length);
00778 
00779     
00781     static const boost::uint32_t    DEFAULT_NUM_THREADS;
00782     
00784     static const std::string        DEFAULT_CONFIG_FILE;
00785 
00787     static const std::string        CONNECTION_ELEMENT_NAME;
00788     
00790     static const std::string        TYPE_ELEMENT_NAME;
00791     
00793     static const std::string        FROM_ELEMENT_NAME;
00794     
00796     static const std::string        TO_ELEMENT_NAME;
00797     
00799     static const std::string        TOTAL_OPS_ELEMENT_NAME;
00800     
00802     static const std::string        EVENTS_QUEUED_ELEMENT_NAME;
00803 
00805     static const std::string        CONNECTION_TYPE_REACTOR;
00806 
00808     static const std::string        CONNECTION_TYPE_INPUT;
00809 
00811     static const std::string        CONNECTION_TYPE_OUTPUT;
00812 
00814     static const std::string        REACTORS_PERMISSION_TYPE;
00815 
00817     static const std::string        UNRESTRICTED_ELEMENT_NAME;
00818 
00820     static const std::string        WORKSPACE_QUALIFIER_ELEMENT_NAME;
00821 
00823     ReactionScheduler               m_scheduler;
00824 
00826     CodecFactory &                  m_codec_factory;
00827 
00829     ProtocolFactory &               m_protocol_factory;
00830 
00832     DatabaseManager &               m_database_mgr;
00833     
00835     TempConnectionList              m_temp_connections;
00836     
00838     ReactorConnectionList           m_reactor_connections;
00839 
00841     boost::signals::scoped_connection   m_codec_connection;
00842     
00844     boost::signals::scoped_connection   m_db_connection;
00845 
00847     boost::signals::scoped_connection   m_protocol_connection;
00848 
00850     bool                            m_is_running;
00851 
00854     bool                            m_multithread_branches;
00855 };
00856 
00857 
00858 }   // end namespace platform
00859 }   // end namespace pion
00860 
00861 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7