platform/include/pion/platform/Reactor.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_REACTOR_HEADER__
00021 #define __PION_REACTOR_HEADER__
00022 
00023 #include <iosfwd>
00024 #include <string>
00025 #include <list>
00026 #include <libxml/tree.h>
00027 #include <boost/bind.hpp>
00028 #include <boost/signal.hpp>
00029 #include <boost/thread.hpp>
00030 #include <boost/function.hpp>
00031 #include <boost/function/function1.hpp>
00032 #include <pion/PionConfig.hpp>
00033 #include <pion/PionException.hpp>
00034 #include <pion/platform/Event.hpp>
00035 #include <pion/platform/Vocabulary.hpp>
00036 #include <pion/platform/PlatformPlugin.hpp>
00037 #include <pion/platform/ReactionScheduler.hpp>
00038 
00039 namespace pion {        // begin namespace pion
00040 namespace platform {    // begin namespace platform (Pion Platform Library)
00041 
00045 class PION_PLATFORM_API Reactor
00046     : public PlatformPlugin
00047 {
00048 public:
00049 
00051     static const std::string        REACTOR_ELEMENT_NAME;
00052     
00054     static const std::string        RUNNING_ELEMENT_NAME;
00055 
00057     static const std::string        WORKSPACE_ELEMENT_NAME;
00058 
00060     static const std::string        X_COORDINATE_ELEMENT_NAME;
00061 
00063     static const std::string        Y_COORDINATE_ELEMENT_NAME;
00064 
00065 
00067     typedef boost::function1<void, EventPtr>    EventHandler;
00068 
00070     typedef std::vector<std::string>            QueryBranches;
00071     
00073     typedef StringDictionary                    QueryParams;
00074     
00076     enum ReactorType {
00077         TYPE_COLLECTION, TYPE_PROCESSING, TYPE_STORAGE
00078     };
00079     
00080     
00082     class AlreadyConnectedException : public PionException {
00083     public:
00084         AlreadyConnectedException(const std::string& reactor_id)
00085             : PionException("Reactor is already connected: ", reactor_id) {}
00086     };
00087     
00089     class ConnectionNotFoundException : public PionException {
00090     public:
00091         ConnectionNotFoundException(const std::string& reactor_id)
00092             : PionException("Tried removing an unknown connection: ", reactor_id) {}
00093     };
00094     
00096     class ConfigLockException : public PionException {
00097     public:
00098         ConfigLockException(const std::string& reactor_id)
00099             : PionException("Error obtaining Reactor configuration lock: ", reactor_id) {}
00100     };
00101 
00103     class UnknownSignalException : public PionException {
00104     public:
00105         UnknownSignalException(const std::string& reactor_id, const std::string& signal_id)
00106             : PionException("Unknown signal for Reactor " + reactor_id + ": ", signal_id) {}
00107     };
00108 
00110     class MissingWorkspaceException : public PionException {
00111     public:
00112         MissingWorkspaceException(const std::string& reactor_id)
00113             : PionException("Reactor configuration missing required Workspace parameter: ", reactor_id) {}
00114         MissingWorkspaceException(void)
00115             : PionException("Reactor configuration missing required Workspace parameter") {}
00116     };
00117 
00119     virtual ~Reactor() {}
00120     
00122     virtual void start(void) {
00123         ConfigWriteLock cfg_lock(*this);
00124         m_is_running = true;
00125     }
00126     
00128     virtual void stop(void) {
00129         ConfigWriteLock cfg_lock(*this);
00130         m_is_running = false;
00131     }
00132     
00134     virtual void reset(void) { clearStats(); }
00135     
00137     virtual void clearStats(void) {
00138         // atomic_count doesn't support assignment -- ugh!
00139         //m_events_in = m_events_out = boost::detail::atomic_count(0);
00140     }
00141     
00149     virtual void setConfig(const Vocabulary& v, const xmlNodePtr config_ptr);
00150 
00157     virtual void updateVocabulary(const Vocabulary& v);
00158 
00163     virtual void updateCodecs(void) {}
00164     
00169     virtual void updateDatabases(void) {}
00170     
00175     virtual void updateProtocols(void) {}
00176     
00186     virtual void query(std::ostream& out, const QueryBranches& branches,
00187         const QueryParams& qp);
00188     
00197     template <typename F>
00198     inline boost::signals::connection subscribe(const std::string& signal_id, F f) {
00199         ConfigWriteLock cfg_lock(*this);
00200         SignalMap::iterator it = m_signals.find(signal_id);
00201         if (it == m_signals.end())
00202             throw UnknownSignalException(getId(), signal_id);
00203         return it->second->connect(f);
00204     }
00205 
00213     inline void operator()(const EventPtr& e) {
00214         if ( isRunning() ) {
00215             ConfigReadLock cfg_lock(*this);
00216             // re-check after locking
00217             if ( isRunning() ) {
00218                 ++m_events_in;
00219                 process(e);
00220             }
00221         }
00222     }
00223 
00232     bool startOutRunning(const xmlNodePtr config_ptr, bool exec_start);
00233 
00240     void addConnection(Reactor& output_reactor);
00241     
00248     void addConnection(const std::string& connection_id, EventHandler connection_handler);
00249     
00255     void removeConnection(const std::string& connection_id);
00256     
00258     void clearConnections(void);
00259 
00261     inline void writeStatsXML(std::ostream& out) const {
00262         writeBeginReactorXML(out);
00263         writeStatsOnlyXML(out);
00264         writeEndReactorXML(out);
00265     }
00266 
00268     inline void setScheduler(ReactionScheduler& scheduler) { m_scheduler_ptr = & scheduler; }
00269 
00271     inline void setMultithreadBranches(bool b) { m_multithread_branches = b; }
00272 
00274     inline boost::uint32_t getEventsIn(void) const { return m_events_in; }
00275 
00277     inline boost::uint32_t getEventsOut(void) const { return m_events_out; }
00278 
00280     inline bool isRunning(void) const { return m_is_running; }
00281     
00283     inline ReactorType getType(void) const { return m_type; }
00284 
00286     inline std::string getWorkspace(void) const { return m_workspace_id; }
00287 
00288 protected:
00289 
00291     typedef boost::signal3<void, const std::string&, const std::string&, void*> SignalType;
00292 
00294     typedef boost::shared_ptr<SignalType> SignalPtr;
00295     
00297     typedef PION_HASH_MAP<std::string, SignalPtr, PION_HASH_STRING> SignalMap;
00298 
00299 
00302     class ConfigReadLock {
00303     public:
00304         ConfigReadLock(const Reactor& r)
00305             : m_reactor_ref(r)
00306         {
00307             boost::uint16_t sleep_times = 0;
00308             do {
00309                 while (m_reactor_ref.m_config_change_pending) {
00310                     if (++sleep_times > 50)
00311                         throw ConfigLockException(m_reactor_ref.getId());
00312                     boost::thread::sleep(boost::get_system_time()
00313                         + boost::posix_time::millisec(100));
00314                 }
00315                 ++m_reactor_ref.m_config_num_readers;
00316                 if (m_reactor_ref.m_config_change_pending) {
00317                     --m_reactor_ref.m_config_num_readers;
00318                 } else {
00319                     break;
00320                 }
00321             } while (true);
00322         }
00323         ~ConfigReadLock() {
00324             --m_reactor_ref.m_config_num_readers;
00325         }
00326     private:
00327         const Reactor&  m_reactor_ref;
00328     };
00329     
00333     class ConfigWriteLock {
00334     public:
00335         ConfigWriteLock(Reactor& r)
00336             : m_reactor_ref(r), m_already_locked(r.m_config_change_pending)
00337         {
00338             if (! m_already_locked) {
00339                 boost::uint16_t sleep_times = 0;
00340                 m_reactor_ref.m_config_change_pending = true;
00341                 while (m_reactor_ref.m_config_num_readers > 0) {
00342                     if (++sleep_times > 50)
00343                         throw ConfigLockException(m_reactor_ref.getId());
00344                     boost::thread::sleep(boost::get_system_time()
00345                         + boost::posix_time::millisec(100));
00346                 }
00347             }
00348         }
00349         ~ConfigWriteLock() {
00350             if (! m_already_locked)
00351                 m_reactor_ref.m_config_change_pending = false;
00352         }
00353     private:
00354         Reactor&    m_reactor_ref;
00355         bool        m_already_locked;
00356     };
00357         
00358         
00360     Reactor(const ReactorType type)
00361         : m_is_running(false), m_type(type), m_scheduler_ptr(NULL), 
00362         m_events_in(0), m_events_out(0),
00363         m_config_change_pending(false), m_config_num_readers(0)
00364     {}
00365 
00367     inline ReactionScheduler& getScheduler(void) {
00368         PION_ASSERT(m_scheduler_ptr != NULL);
00369         return *m_scheduler_ptr;
00370     }
00371         
00378     virtual void process(const EventPtr& e) {
00379         deliverEvent(e);
00380     }
00381 
00389     inline void deliverEvent(const EventPtr& e, bool return_immediately = false) {
00390         ++m_events_out;
00391         if (! m_connections.empty()) {
00392             if (m_multithread_branches) {
00393                 // iterate through each Reactor after the first one and send the Event
00394                 // using the scheduler.  This way, the entire thread pool will be used
00395                 // for processing pipelines
00396                 ConnectionMap::iterator i = m_connections.begin();
00397                 
00398                 // skip Reactors that are not running to avoid queueing work
00399                 // to another thread when it is unnecessary.  Otherwise, if the
00400                 // first Reactor is not running, all work will get queued rather
00401                 // than the "longest path" being executed by the current thread
00402                 while (i != m_connections.end() && ! i->second.isRunning())
00403                     ++i;
00404 
00405                 if (i != m_connections.end()) {
00406                     // the first running reactor will be handled by the current thread
00407                     ConnectionMap::iterator cur_thread_reactor = i;
00408                     
00409                     // queue all other branches to be handled by other threads
00410                     while (++i != m_connections.end()) {
00411                         if (i->second.isRunning())
00412                             i->second.post(getScheduler(), e);
00413                     }
00414                 
00415                     // send to the first Reactor using the same thread
00416                     // this helps to reduce context switching by ensuring
00417                     // that the longer processing chains remain unbroken
00418                     if (return_immediately)
00419                         cur_thread_reactor->second.post(getScheduler(), e);
00420                     else
00421                         cur_thread_reactor->second(e);
00422                 }
00423             } else {
00424                 // simple scheduling just iterates through connections and use the
00425                 // same thread to carry the event through all the reaction chains
00426                 for (ConnectionMap::iterator i = m_connections.begin();
00427                      i != m_connections.end(); ++i)
00428                 {
00429                     if (i->second.isRunning()) {
00430                         if (return_immediately)
00431                             i->second.post(getScheduler(), e);
00432                         else
00433                             i->second(e);
00434                     }
00435                 }
00436             }
00437         }
00438     }
00439     
00447     inline void deliverEvents(const EventContainer& events, bool return_immediately = false)
00448     {
00449         for (EventContainer::const_iterator event_it = events.begin();
00450                 event_it != events.end(); ++event_it)
00451         {
00452             deliverEvent(*event_it, return_immediately);
00453         }
00454     }
00455 
00464     inline void incrementEventsIn(void) { ++m_events_in; }
00465 
00471     inline void publish(const std::string& signal_id) {
00472         SignalPtr signal_ptr(new SignalType);
00473         ConfigWriteLock cfg_lock(*this);
00474         m_signals.insert(std::make_pair(signal_id, signal_ptr));
00475     }
00476 
00484     inline void signalNoLock(const std::string& signal_id, void *ptr = NULL) {
00485         SignalMap::iterator it = m_signals.find(signal_id);
00486         if (it == m_signals.end())
00487             throw UnknownSignalException(getId(), signal_id);
00488         (*it->second)(getId(), signal_id, ptr);
00489     }
00490 
00497     inline void signal(const std::string& signal_id, void *ptr = NULL) {
00498         ConfigReadLock cfg_lock(*this);
00499         signalNoLock(signal_id, ptr);
00500     }
00501 
00503     void writeStatsOnlyXML(std::ostream& out) const;
00504     
00506     void writeBeginReactorXML(std::ostream& out) const;
00507     
00509     void writeEndReactorXML(std::ostream& out) const;       
00510 
00511 
00513     SignalMap                   m_signals;
00514 
00516     volatile bool               m_is_running;
00517 
00518     
00519 private:
00520 
00522     class OutputConnection {
00523     public:
00525         ~OutputConnection() {}
00526         
00528         explicit OutputConnection(Reactor* reactor)
00529             : m_reactor_ptr(reactor), m_event_handler(boost::ref(*reactor))
00530         {}
00531 
00533         explicit OutputConnection(EventHandler handler)
00534             : m_reactor_ptr(NULL), m_event_handler(handler)
00535         {}
00536 
00538         OutputConnection(const OutputConnection& c)
00539             : m_reactor_ptr(c.m_reactor_ptr), m_event_handler(c.m_event_handler)
00540         {}
00541 
00543         inline bool isRunning(void) {
00544             return (m_reactor_ptr == NULL || m_reactor_ptr->isRunning());
00545         }
00546         
00548         inline void operator()(const EventPtr& event_ptr) {
00549             m_event_handler(event_ptr);
00550         }
00551         
00553         inline void post(ReactionScheduler& scheduler, const EventPtr& event_ptr) {
00554             scheduler.post(boost::bind<void>(m_event_handler, event_ptr));
00555         }
00556 
00557     private:
00558         
00560         Reactor *           m_reactor_ptr;
00561         
00563         EventHandler        m_event_handler;
00564     };
00565     
00567     typedef std::map<std::string, OutputConnection> ConnectionMap;
00568     
00569     
00571     static const std::string        EVENTS_IN_ELEMENT_NAME;
00572     
00574     static const std::string        EVENTS_OUT_ELEMENT_NAME;
00575 
00577     static const std::string        ID_ATTRIBUTE_NAME;
00578 
00579 
00581     const ReactorType               m_type;
00582     
00584     ReactionScheduler *             m_scheduler_ptr;
00585     
00587     ConnectionMap                   m_connections;
00588 
00590     std::string                     m_workspace_id;
00591 
00594     boost::detail::atomic_count     m_events_in;
00595 
00598     boost::detail::atomic_count     m_events_out;
00599     
00602     bool                            m_multithread_branches;
00603     
00605     mutable volatile bool                   m_config_change_pending;
00606     
00608     mutable boost::detail::atomic_count     m_config_num_readers;
00609 
00610     // allow configuration lock classes to modify private variables 
00611     friend class ConfigReadLock;
00612     friend class ConfigWriteLock;
00613 };
00614 
00615 
00616 //
00617 // The following symbols must be defined for any reactor that you would
00618 // like to be able to load dynamically using the ReactionEngine::load()
00619 // function.  These are not required for any reactors that you only want to link
00620 // directly into your programs.
00621 //
00622 // Make sure that you replace "REACTOR" with the name of your derived class.
00623 // This name must also match the name of the object file (excluding the
00624 // extension).  These symbols must be linked into your reactor's object file,
00625 // not included in any headers that it may use (declarations are OK in headers
00626 // but not the definitions).
00627 //
00628 // The "pion_create" function is used to create new instances of your reactor.
00629 // The "pion_destroy" function is used to destroy instances of your reactor.
00630 //
00631 // extern "C" Reactor *pion_create_REACTOR(void) {
00632 //      return new REACTOR;
00633 // }
00634 //
00635 // extern "C" void pion_destroy_REACTOR(REACTOR *reactor_ptr) {
00636 //      delete reactor_ptr;
00637 // }
00638 //
00639 
00640 
00641 }   // end namespace platform
00642 }   // end namespace pion
00643 
00644 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7