00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_MONITORSERVICE_HEADER__
00021 #define __PION_MONITORSERVICE_HEADER__
00022
00023 #include <string>
00024 #include <iosfwd>
00025 #include <boost/date_time/gregorian/gregorian.hpp>
00026 #include <boost/enable_shared_from_this.hpp>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/circular_buffer.hpp>
00029 #include <boost/thread/mutex.hpp>
00030 #include <boost/unordered_map.hpp>
00031 #include <pion/net/HTTPTypes.hpp>
00032 #include <pion/PionConfig.hpp>
00033 #include <pion/PionLogger.hpp>
00034 #include <pion/net/TCPStream.hpp>
00035 #include <pion/platform/Codec.hpp>
00036 #include <pion/platform/Reactor.hpp>
00037 #include <pion/platform/ReactionEngine.hpp>
00038 #include "PlatformService.hpp"
00039
00040
00041 namespace pion {
00042 namespace plugins {
00043
00044
00048 class MonitorWriter
00049 : public boost::enable_shared_from_this<MonitorWriter>
00050 {
00051 typedef std::set<pion::platform::Vocabulary::TermRef> TermRefSet;
00052
00053 typedef boost::circular_buffer<pion::platform::EventPtr> EventBuffer;
00054
00055 private:
00057 pion::platform::ReactionEngine & m_reaction_engine;
00058
00060 PionLogger m_logger;
00061
00063 const std::string m_connection_id;
00064
00066 const std::string m_reactor_id;
00067
00069 mutable boost::mutex m_mutex;
00070
00072 EventBuffer m_event_buffer;
00073
00075 unsigned m_size;
00076
00078 bool m_scroll;
00079
00081 platform::VocabularyPtr m_vocab_ptr;
00082
00084 unsigned m_truncate;
00085
00087 volatile bool m_stopped;
00088
00090 bool m_hide_all;
00091
00093 TermRefSet m_show_terms;
00094
00096 TermRefSet m_suppressed_terms;
00097
00099 TermRefSet m_terms_seen;
00100
00102 TermRefSet m_events_seen;
00103
00105 TermRefSet m_filtered_events;
00106
00108 boost::uint32_t m_event_counter;
00109 boost::uint32_t m_change_counter;
00110
00112 boost::posix_time::ptime m_age;
00113
00114 public:
00115
00116 typedef boost::unordered_map<pion::platform::Vocabulary::TermRef, unsigned> TermCol;
00117
00119 ~MonitorWriter()
00120 {
00121
00122 stop();
00123
00124 boost::mutex::scoped_lock send_lock(m_mutex);
00125 m_event_buffer.clear();
00126
00127 }
00128
00139 MonitorWriter(pion::platform::ReactionEngine &reaction_engine, platform::VocabularyPtr& vptr,
00140 const std::string& reactor_id, unsigned size, bool scroll, PionLogger logger)
00141 : m_reaction_engine(reaction_engine), m_logger(logger), m_connection_id(PionId().to_string()), m_reactor_id(reactor_id),
00142 m_event_buffer(size), m_size(size), m_scroll(scroll), m_vocab_ptr(vptr), m_truncate(100), m_stopped(false),
00143 m_hide_all(false), m_event_counter(0), m_change_counter(0)
00144 { }
00145
00152 void writeEvent(pion::platform::EventPtr& e);
00153
00155 void setQP(const pion::net::HTTPTypes::QueryParams& qp);
00156
00158 void start(const pion::net::HTTPTypes::QueryParams& qp);
00159
00166 void stop(bool Stop = true, bool Flush = false) {
00167 if (m_stopped == false && Stop == true)
00168 m_reaction_engine.removeTempConnection(getConnectionId());
00169 m_stopped = true;
00170 if (Flush) {
00171 m_event_buffer.clear();
00172 m_age = boost::date_time::not_a_date_time;
00173 }
00174 }
00175
00184 void SerializeXML(pion::platform::Vocabulary::TermRef tref,
00185 const pion::platform::Event::ParameterValue& value,
00186 std::ostream& xml, TermCol& cols);
00187
00189 inline const std::string& getConnectionId(void) const { return m_connection_id; }
00190
00192 inline const std::string& getReactorId(void) const { return m_reactor_id; }
00193
00195 std::string getStatus(const pion::net::HTTPTypes::QueryParams& qp);
00196
00198 boost::posix_time::ptime getAge(void) const { return m_age; }
00199 void setAge(void) { m_age = boost::posix_time::second_clock::local_time(); }
00200 };
00201
00203 typedef boost::shared_ptr<MonitorWriter> MonitorWriterPtr;
00204
00205
00206
00210 class MonitorService
00211 : public pion::server::PlatformService
00212 {
00214 PionLogger m_logger;
00215
00217 std::vector<MonitorWriterPtr> m_writers;
00218
00220 volatile bool m_running;
00221
00222 public:
00223
00225 MonitorService(void)
00226 : PlatformService("pion.MonitorService"),
00227 m_logger(PION_GET_LOGGER("pion.MonitorService")),
00228 m_writers(WRITERS),
00229 m_running(true)
00230 { }
00231
00233 virtual ~MonitorService()
00234 {
00235 if (m_running) {
00236 m_running = false;
00237
00238 for (unsigned i = 0; i < m_writers.size(); i++)
00239 if (m_writers[i] != NULL)
00240 m_writers[i]->stop();
00241 m_writers.clear();
00242
00243 }
00244 }
00245
00252 virtual void operator()(pion::net::HTTPRequestPtr& request,
00253 pion::net::TCPConnectionPtr& tcp_conn);
00254
00256 std::string getPermissionType(void) const { return MONITOR_SERVICE_PERMISSION_TYPE; }
00257
00258 private:
00259
00260 static const std::string MONITOR_SERVICE_PERMISSION_TYPE;
00261 static const unsigned WRITERS;
00262 };
00263
00264
00265 }
00266 }
00267
00268 #endif