00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_REACTOR_HEADER__
00021 #define __PION_REACTOR_HEADER__
00022
00023 #include <iosfwd>
00024 #include <string>
00025 #include <list>
00026 #include <libxml/tree.h>
00027 #include <boost/bind.hpp>
00028 #include <boost/signal.hpp>
00029 #include <boost/thread.hpp>
00030 #include <boost/function.hpp>
00031 #include <boost/function/function1.hpp>
00032 #include <pion/PionConfig.hpp>
00033 #include <pion/PionException.hpp>
00034 #include <pion/platform/Event.hpp>
00035 #include <pion/platform/Vocabulary.hpp>
00036 #include <pion/platform/PlatformPlugin.hpp>
00037 #include <pion/platform/ReactionScheduler.hpp>
00038
00039 namespace pion {
00040 namespace platform {
00041
00045 class PION_PLATFORM_API Reactor
00046 : public PlatformPlugin
00047 {
00048 public:
00049
00051 static const std::string REACTOR_ELEMENT_NAME;
00052
00054 static const std::string RUNNING_ELEMENT_NAME;
00055
00057 static const std::string WORKSPACE_ELEMENT_NAME;
00058
00060 static const std::string X_COORDINATE_ELEMENT_NAME;
00061
00063 static const std::string Y_COORDINATE_ELEMENT_NAME;
00064
00065
00067 typedef boost::function1<void, EventPtr> EventHandler;
00068
00070 typedef std::vector<std::string> QueryBranches;
00071
00073 typedef StringDictionary QueryParams;
00074
00076 enum ReactorType {
00077 TYPE_COLLECTION, TYPE_PROCESSING, TYPE_STORAGE
00078 };
00079
00080
00082 class AlreadyConnectedException : public PionException {
00083 public:
00084 AlreadyConnectedException(const std::string& reactor_id)
00085 : PionException("Reactor is already connected: ", reactor_id) {}
00086 };
00087
00089 class ConnectionNotFoundException : public PionException {
00090 public:
00091 ConnectionNotFoundException(const std::string& reactor_id)
00092 : PionException("Tried removing an unknown connection: ", reactor_id) {}
00093 };
00094
00096 class ConfigLockException : public PionException {
00097 public:
00098 ConfigLockException(const std::string& reactor_id)
00099 : PionException("Error obtaining Reactor configuration lock: ", reactor_id) {}
00100 };
00101
00103 class UnknownSignalException : public PionException {
00104 public:
00105 UnknownSignalException(const std::string& reactor_id, const std::string& signal_id)
00106 : PionException("Unknown signal for Reactor " + reactor_id + ": ", signal_id) {}
00107 };
00108
00110 class MissingWorkspaceException : public PionException {
00111 public:
00112 MissingWorkspaceException(const std::string& reactor_id)
00113 : PionException("Reactor configuration missing required Workspace parameter: ", reactor_id) {}
00114 MissingWorkspaceException(void)
00115 : PionException("Reactor configuration missing required Workspace parameter") {}
00116 };
00117
00119 virtual ~Reactor() {}
00120
00122 virtual void start(void) {
00123 ConfigWriteLock cfg_lock(*this);
00124 m_is_running = true;
00125 }
00126
00128 virtual void stop(void) {
00129 ConfigWriteLock cfg_lock(*this);
00130 m_is_running = false;
00131 }
00132
00134 virtual void reset(void) { clearStats(); }
00135
00137 virtual void clearStats(void) {
00138
00139
00140 }
00141
00149 virtual void setConfig(const Vocabulary& v, const xmlNodePtr config_ptr);
00150
00157 virtual void updateVocabulary(const Vocabulary& v);
00158
00163 virtual void updateCodecs(void) {}
00164
00169 virtual void updateDatabases(void) {}
00170
00175 virtual void updateProtocols(void) {}
00176
00186 virtual void query(std::ostream& out, const QueryBranches& branches,
00187 const QueryParams& qp);
00188
00197 template <typename F>
00198 inline boost::signals::connection subscribe(const std::string& signal_id, F f) {
00199 ConfigWriteLock cfg_lock(*this);
00200 SignalMap::iterator it = m_signals.find(signal_id);
00201 if (it == m_signals.end())
00202 throw UnknownSignalException(getId(), signal_id);
00203 return it->second->connect(f);
00204 }
00205
00213 inline void operator()(const EventPtr& e) {
00214 if ( isRunning() ) {
00215 ConfigReadLock cfg_lock(*this);
00216
00217 if ( isRunning() ) {
00218 ++m_events_in;
00219 process(e);
00220 }
00221 }
00222 }
00223
00232 bool startOutRunning(const xmlNodePtr config_ptr, bool exec_start);
00233
00240 void addConnection(Reactor& output_reactor);
00241
00248 void addConnection(const std::string& connection_id, EventHandler connection_handler);
00249
00255 void removeConnection(const std::string& connection_id);
00256
00258 void clearConnections(void);
00259
00261 inline void writeStatsXML(std::ostream& out) const {
00262 writeBeginReactorXML(out);
00263 writeStatsOnlyXML(out);
00264 writeEndReactorXML(out);
00265 }
00266
00268 inline void setScheduler(ReactionScheduler& scheduler) { m_scheduler_ptr = & scheduler; }
00269
00271 inline void setMultithreadBranches(bool b) { m_multithread_branches = b; }
00272
00274 inline boost::uint32_t getEventsIn(void) const { return m_events_in; }
00275
00277 inline boost::uint32_t getEventsOut(void) const { return m_events_out; }
00278
00280 inline bool isRunning(void) const { return m_is_running; }
00281
00283 inline ReactorType getType(void) const { return m_type; }
00284
00286 inline std::string getWorkspace(void) const { return m_workspace_id; }
00287
00288 protected:
00289
00291 typedef boost::signal3<void, const std::string&, const std::string&, void*> SignalType;
00292
00294 typedef boost::shared_ptr<SignalType> SignalPtr;
00295
00297 typedef PION_HASH_MAP<std::string, SignalPtr, PION_HASH_STRING> SignalMap;
00298
00299
00302 class ConfigReadLock {
00303 public:
00304 ConfigReadLock(const Reactor& r)
00305 : m_reactor_ref(r)
00306 {
00307 boost::uint16_t sleep_times = 0;
00308 do {
00309 while (m_reactor_ref.m_config_change_pending) {
00310 if (++sleep_times > 50)
00311 throw ConfigLockException(m_reactor_ref.getId());
00312 boost::thread::sleep(boost::get_system_time()
00313 + boost::posix_time::millisec(100));
00314 }
00315 ++m_reactor_ref.m_config_num_readers;
00316 if (m_reactor_ref.m_config_change_pending) {
00317 --m_reactor_ref.m_config_num_readers;
00318 } else {
00319 break;
00320 }
00321 } while (true);
00322 }
00323 ~ConfigReadLock() {
00324 --m_reactor_ref.m_config_num_readers;
00325 }
00326 private:
00327 const Reactor& m_reactor_ref;
00328 };
00329
00333 class ConfigWriteLock {
00334 public:
00335 ConfigWriteLock(Reactor& r)
00336 : m_reactor_ref(r), m_already_locked(r.m_config_change_pending)
00337 {
00338 if (! m_already_locked) {
00339 boost::uint16_t sleep_times = 0;
00340 m_reactor_ref.m_config_change_pending = true;
00341 while (m_reactor_ref.m_config_num_readers > 0) {
00342 if (++sleep_times > 50)
00343 throw ConfigLockException(m_reactor_ref.getId());
00344 boost::thread::sleep(boost::get_system_time()
00345 + boost::posix_time::millisec(100));
00346 }
00347 }
00348 }
00349 ~ConfigWriteLock() {
00350 if (! m_already_locked)
00351 m_reactor_ref.m_config_change_pending = false;
00352 }
00353 private:
00354 Reactor& m_reactor_ref;
00355 bool m_already_locked;
00356 };
00357
00358
00360 Reactor(const ReactorType type)
00361 : m_is_running(false), m_type(type), m_scheduler_ptr(NULL),
00362 m_events_in(0), m_events_out(0),
00363 m_config_change_pending(false), m_config_num_readers(0)
00364 {}
00365
00367 inline ReactionScheduler& getScheduler(void) {
00368 PION_ASSERT(m_scheduler_ptr != NULL);
00369 return *m_scheduler_ptr;
00370 }
00371
00378 virtual void process(const EventPtr& e) {
00379 deliverEvent(e);
00380 }
00381
00389 inline void deliverEvent(const EventPtr& e, bool return_immediately = false) {
00390 ++m_events_out;
00391 if (! m_connections.empty()) {
00392 if (m_multithread_branches) {
00393
00394
00395
00396 ConnectionMap::iterator i = m_connections.begin();
00397
00398
00399
00400
00401
00402 while (i != m_connections.end() && ! i->second.isRunning())
00403 ++i;
00404
00405 if (i != m_connections.end()) {
00406
00407 ConnectionMap::iterator cur_thread_reactor = i;
00408
00409
00410 while (++i != m_connections.end()) {
00411 if (i->second.isRunning())
00412 i->second.post(getScheduler(), e);
00413 }
00414
00415
00416
00417
00418 if (return_immediately)
00419 cur_thread_reactor->second.post(getScheduler(), e);
00420 else
00421 cur_thread_reactor->second(e);
00422 }
00423 } else {
00424
00425
00426 for (ConnectionMap::iterator i = m_connections.begin();
00427 i != m_connections.end(); ++i)
00428 {
00429 if (i->second.isRunning()) {
00430 if (return_immediately)
00431 i->second.post(getScheduler(), e);
00432 else
00433 i->second(e);
00434 }
00435 }
00436 }
00437 }
00438 }
00439
00447 inline void deliverEvents(const EventContainer& events, bool return_immediately = false)
00448 {
00449 for (EventContainer::const_iterator event_it = events.begin();
00450 event_it != events.end(); ++event_it)
00451 {
00452 deliverEvent(*event_it, return_immediately);
00453 }
00454 }
00455
00464 inline void incrementEventsIn(void) { ++m_events_in; }
00465
00471 inline void publish(const std::string& signal_id) {
00472 SignalPtr signal_ptr(new SignalType);
00473 ConfigWriteLock cfg_lock(*this);
00474 m_signals.insert(std::make_pair(signal_id, signal_ptr));
00475 }
00476
00484 inline void signalNoLock(const std::string& signal_id, void *ptr = NULL) {
00485 SignalMap::iterator it = m_signals.find(signal_id);
00486 if (it == m_signals.end())
00487 throw UnknownSignalException(getId(), signal_id);
00488 (*it->second)(getId(), signal_id, ptr);
00489 }
00490
00497 inline void signal(const std::string& signal_id, void *ptr = NULL) {
00498 ConfigReadLock cfg_lock(*this);
00499 signalNoLock(signal_id, ptr);
00500 }
00501
00503 void writeStatsOnlyXML(std::ostream& out) const;
00504
00506 void writeBeginReactorXML(std::ostream& out) const;
00507
00509 void writeEndReactorXML(std::ostream& out) const;
00510
00511
00513 SignalMap m_signals;
00514
00516 volatile bool m_is_running;
00517
00518
00519 private:
00520
00522 class OutputConnection {
00523 public:
00525 ~OutputConnection() {}
00526
00528 explicit OutputConnection(Reactor* reactor)
00529 : m_reactor_ptr(reactor), m_event_handler(boost::ref(*reactor))
00530 {}
00531
00533 explicit OutputConnection(EventHandler handler)
00534 : m_reactor_ptr(NULL), m_event_handler(handler)
00535 {}
00536
00538 OutputConnection(const OutputConnection& c)
00539 : m_reactor_ptr(c.m_reactor_ptr), m_event_handler(c.m_event_handler)
00540 {}
00541
00543 inline bool isRunning(void) {
00544 return (m_reactor_ptr == NULL || m_reactor_ptr->isRunning());
00545 }
00546
00548 inline void operator()(const EventPtr& event_ptr) {
00549 m_event_handler(event_ptr);
00550 }
00551
00553 inline void post(ReactionScheduler& scheduler, const EventPtr& event_ptr) {
00554 scheduler.post(boost::bind<void>(m_event_handler, event_ptr));
00555 }
00556
00557 private:
00558
00560 Reactor * m_reactor_ptr;
00561
00563 EventHandler m_event_handler;
00564 };
00565
00567 typedef std::map<std::string, OutputConnection> ConnectionMap;
00568
00569
00571 static const std::string EVENTS_IN_ELEMENT_NAME;
00572
00574 static const std::string EVENTS_OUT_ELEMENT_NAME;
00575
00577 static const std::string ID_ATTRIBUTE_NAME;
00578
00579
00581 const ReactorType m_type;
00582
00584 ReactionScheduler * m_scheduler_ptr;
00585
00587 ConnectionMap m_connections;
00588
00590 std::string m_workspace_id;
00591
00594 boost::detail::atomic_count m_events_in;
00595
00598 boost::detail::atomic_count m_events_out;
00599
00602 bool m_multithread_branches;
00603
00605 mutable volatile bool m_config_change_pending;
00606
00608 mutable boost::detail::atomic_count m_config_num_readers;
00609
00610
00611 friend class ConfigReadLock;
00612 friend class ConfigWriteLock;
00613 };
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641 }
00642 }
00643
00644 #endif