platform/services/MonitorService.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2010 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_MONITORSERVICE_HEADER__
00021 #define __PION_MONITORSERVICE_HEADER__
00022 
00023 #include <string>
00024 #include <iosfwd>
00025 #include <boost/date_time/gregorian/gregorian.hpp>
00026 #include <boost/enable_shared_from_this.hpp>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/circular_buffer.hpp>
00029 #include <boost/thread/mutex.hpp>
00030 #include <boost/unordered_map.hpp>
00031 #include <pion/net/HTTPTypes.hpp>
00032 #include <pion/PionConfig.hpp>
00033 #include <pion/PionLogger.hpp>
00034 #include <pion/net/TCPStream.hpp>
00035 #include <pion/platform/Codec.hpp>
00036 #include <pion/platform/Reactor.hpp>
00037 #include <pion/platform/ReactionEngine.hpp>
00038 #include "PlatformService.hpp"
00039 
00040 
00041 namespace pion {        // begin namespace pion
00042 namespace plugins {     // begin namespace plugins
00043 
00044 
00048 class MonitorWriter
00049     : public boost::enable_shared_from_this<MonitorWriter>
00050 {
00051     typedef std::set<pion::platform::Vocabulary::TermRef>   TermRefSet;
00052 
00053     typedef boost::circular_buffer<pion::platform::EventPtr> EventBuffer;
00054 
00055 private:
00057     pion::platform::ReactionEngine &    m_reaction_engine;
00058 
00060     PionLogger                          m_logger;   
00061 
00063     const std::string                   m_connection_id;
00064 
00066     const std::string                   m_reactor_id;
00067 
00069     mutable boost::mutex                m_mutex;
00070 
00072     EventBuffer                         m_event_buffer;
00073 
00075     unsigned                            m_size;
00076 
00078     bool                                m_scroll;
00079 
00081     platform::VocabularyPtr             m_vocab_ptr;
00082 
00084     unsigned                            m_truncate;
00085 
00087     volatile bool                       m_stopped;
00088 
00090     bool                                m_hide_all;
00091 
00093     TermRefSet                          m_show_terms;
00094 
00096     TermRefSet                          m_suppressed_terms;
00097 
00099     TermRefSet                          m_terms_seen;
00100 
00102     TermRefSet                          m_events_seen;
00103 
00105     TermRefSet                          m_filtered_events;
00106 
00108     boost::uint32_t                     m_event_counter;
00109     boost::uint32_t                     m_change_counter;
00110 
00112     boost::posix_time::ptime            m_age;
00113 
00114 public:
00115 
00116     typedef boost::unordered_map<pion::platform::Vocabulary::TermRef, unsigned> TermCol;
00117     
00119     ~MonitorWriter()
00120     {
00121 //      PION_LOG_INFO(m_logger, "Closing output feed to " << getConnectionId());
00122         stop();
00123 //      PION_LOG_INFO(m_logger, "Closing output feed #2 to " << getConnectionId());
00124         boost::mutex::scoped_lock send_lock(m_mutex);
00125         m_event_buffer.clear();
00126 //      PION_LOG_INFO(m_logger, "Closing output feed #3 to " << getConnectionId());
00127     }
00128 
00139     MonitorWriter(pion::platform::ReactionEngine &reaction_engine, platform::VocabularyPtr& vptr,
00140                const std::string& reactor_id, unsigned size, bool scroll, PionLogger logger)
00141         : m_reaction_engine(reaction_engine), m_logger(logger), m_connection_id(PionId().to_string()), m_reactor_id(reactor_id),
00142         m_event_buffer(size), m_size(size), m_scroll(scroll), m_vocab_ptr(vptr), m_truncate(100), m_stopped(false),
00143         m_hide_all(false), m_event_counter(0), m_change_counter(0)
00144     { }
00145     
00152     void writeEvent(pion::platform::EventPtr& e);
00153 
00155     void setQP(const pion::net::HTTPTypes::QueryParams& qp);
00156     
00158     void start(const pion::net::HTTPTypes::QueryParams& qp);
00159 
00166     void stop(bool Stop = true, bool Flush = false) {
00167         if (m_stopped == false && Stop == true)
00168             m_reaction_engine.removeTempConnection(getConnectionId());
00169         m_stopped = true;
00170         if (Flush) {
00171             m_event_buffer.clear();
00172             m_age = boost::date_time::not_a_date_time;
00173         }
00174     }
00175 
00184     void SerializeXML(pion::platform::Vocabulary::TermRef tref,
00185         const pion::platform::Event::ParameterValue& value,
00186         std::ostream& xml, TermCol& cols);
00187 
00189     inline const std::string& getConnectionId(void) const { return m_connection_id; }
00190 
00192     inline const std::string& getReactorId(void) const { return m_reactor_id; }
00193 
00195     std::string getStatus(const pion::net::HTTPTypes::QueryParams& qp);
00196 
00198     boost::posix_time::ptime getAge(void) const { return m_age; }
00199     void setAge(void) { m_age = boost::posix_time::second_clock::local_time(); }
00200 };
00201 
00203 typedef boost::shared_ptr<MonitorWriter>    MonitorWriterPtr;
00204 
00205 
00206 
00210 class MonitorService
00211     : public pion::server::PlatformService
00212 {
00214     PionLogger                          m_logger;   
00215 
00217     std::vector<MonitorWriterPtr>       m_writers;
00218 
00220     volatile bool                       m_running;
00221 
00222 public:
00223     
00225     MonitorService(void)
00226         : PlatformService("pion.MonitorService"),
00227         m_logger(PION_GET_LOGGER("pion.MonitorService")),
00228         m_writers(WRITERS),                                 // a default of ten simultaneous monitors allowed
00229         m_running(true)
00230     { }
00231     
00233     virtual ~MonitorService()
00234     {
00235         if (m_running) {
00236             m_running = false;
00237 //          PION_LOG_INFO(m_logger, "MonitorService: starting shut down");
00238             for (unsigned i = 0; i < m_writers.size(); i++)
00239                 if (m_writers[i] != NULL)
00240                     m_writers[i]->stop();
00241             m_writers.clear();
00242 //          PION_LOG_INFO(m_logger, "MonitorService: Done");
00243         }
00244     }
00245     
00252     virtual void operator()(pion::net::HTTPRequestPtr& request,
00253                             pion::net::TCPConnectionPtr& tcp_conn);
00254 
00256     std::string getPermissionType(void) const { return MONITOR_SERVICE_PERMISSION_TYPE; }
00257 
00258 private:
00259 
00260     static const std::string            MONITOR_SERVICE_PERMISSION_TYPE;
00261     static const unsigned               WRITERS;
00262 };
00263 
00264     
00265 }   // end namespace plugins
00266 }   // end namespace pion
00267 
00268 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7