00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_LOGINPUTREACTOR_HEADER__
00021 #define __PION_LOGINPUTREACTOR_HEADER__
00022
00023 #include <set>
00024 #include <string>
00025 #include <boost/asio.hpp>
00026 #include <boost/regex.hpp>
00027 #include <boost/scoped_ptr.hpp>
00028 #include <boost/thread/condition.hpp>
00029 #include <boost/iostreams/filtering_stream.hpp>
00030 #include <pion/PionConfig.hpp>
00031 #include <pion/PionLogger.hpp>
00032 #include <pion/PionException.hpp>
00033 #include <pion/platform/Event.hpp>
00034 #include <pion/platform/Codec.hpp>
00035 #include <pion/platform/Reactor.hpp>
00036
00037
00038 namespace pion {
00039 namespace plugins {
00040
00041
00045 class LogInputReactor :
00046 public pion::platform::Reactor
00047 {
00048 public:
00049
00051 class EmptyCodecException : public PionException {
00052 public:
00053 EmptyCodecException(const std::string& reactor_id)
00054 : PionException("LogInputReactor configuration is missing a required Codec parameter: ", reactor_id) {}
00055 };
00056
00058 class EmptyFilenameException : public PionException {
00059 public:
00060 EmptyFilenameException(const std::string& reactor_id)
00061 : PionException("LogInputReactor configuration is missing a required Filename parameter: ", reactor_id) {}
00062 };
00063
00065 class EmptyDirectoryException : public PionException {
00066 public:
00067 EmptyDirectoryException(const std::string& reactor_id)
00068 : PionException("LogInputReactor configuration is missing a required Directory parameter: ", reactor_id) {}
00069 };
00070
00072 class DirectoryNotFoundException : public PionException {
00073 public:
00074 DirectoryNotFoundException(const std::string& dir)
00075 : PionException("LogInputReactor directory not found: ", dir) {}
00076 };
00077
00079 class NotADirectoryException : public PionException {
00080 public:
00081 NotADirectoryException(const std::string& dir)
00082 : PionException("LogInputReactor Directory parameter is not a directory: ", dir) {}
00083 };
00084
00086 class BadFrequencyException : public PionException {
00087 public:
00088 BadFrequencyException(const std::string& dir)
00089 : PionException("LogInputReactor frequency must be greater than zero: ", dir) {}
00090 };
00091
00093 class OpenLogException : public PionException {
00094 public:
00095 OpenLogException(const std::string& log_filename)
00096 : PionException("Unable to open log file for reading: ", log_filename) {}
00097 };
00098
00100 class EmptyLogException : public PionException {
00101 public:
00102 EmptyLogException(const std::string& log_filename)
00103 : PionException("Log file is empty: ", log_filename) {}
00104 };
00105
00107 class ReadEventException : public PionException {
00108 public:
00109 ReadEventException(const std::string& log_filename)
00110 : PionException("Unable to read event from log file: ", log_filename) {}
00111 };
00112
00113
00115 LogInputReactor(void)
00116 : Reactor(TYPE_COLLECTION),
00117 m_logger(PION_GET_LOGGER("pion.LogInputReactor")),
00118 m_just_one(false), m_tail_f(false), m_frequency(DEFAULT_FREQUENCY), m_worker_is_active(false)
00119 {
00120
00121 publish("FinishedLog");
00122 }
00123
00125 virtual ~LogInputReactor() { stop(); }
00126
00134 virtual void setConfig(const pion::platform::Vocabulary& v, const xmlNodePtr config_ptr);
00135
00145 virtual void query(std::ostream& out, const QueryBranches& branches,
00146 const QueryParams& qp);
00147
00149 virtual void start(void);
00150
00152 virtual void stop(void);
00153
00155 inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00156
00158 inline PionLogger getLogger(void) { return m_logger; }
00159
00160
00161 private:
00162
00164 typedef std::set<std::string> LogFileCollection;
00165
00167 typedef std::pair<boost::shared_ptr<boost::iostreams::filtering_istream>,
00168 boost::shared_ptr<boost::uint64_t> > StreamData;
00169
00171 typedef std::map<std::string, StreamData> StreamMap;
00172
00178 void scheduleLogFileCheck(boost::uint32_t seconds);
00179
00181 void checkForLogFiles(void);
00182
00184 void recordLogFileAsDone(void);
00185
00187 inline void scheduleReadFromLog(void) {
00188 getScheduler().getIOService().post(boost::bind(&LogInputReactor::readFromLog, this));
00189 }
00190
00192 void readFromLog(void);
00193
00199 void getLogFilesInLogDirectory(LogFileCollection& files);
00200
00207 inline void deliverEventFromLog(const pion::platform::EventPtr& e, bool return_immediately = false) {
00208 incrementEventsIn();
00209 ConfigReadLock cfg_lock(*this);
00210 deliverEvent(e, return_immediately);
00211 }
00212
00214 inline void finishWorkerThread(void) {
00215 boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00216 PION_LOG_DEBUG(m_logger, "Log reader thread has finished: " << getId());
00217 m_worker_is_active = false;
00218 m_worker_stopped.notify_all();
00219 }
00220
00221
00223 static const boost::uint32_t DEFAULT_FREQUENCY;
00224
00226 static const std::string CODEC_ELEMENT_NAME;
00227
00229 static const std::string DIRECTORY_ELEMENT_NAME;
00230
00232 static const std::string FILENAME_ELEMENT_NAME;
00233
00235 static const std::string JUST_ONE_ELEMENT_NAME;
00236
00238 static const std::string TAIL_F_ELEMENT_NAME;
00239
00241 static const std::string FREQUENCY_ELEMENT_NAME;
00242
00244 static const std::string CURRENT_LOG_ELEMENT_NAME;
00245
00247 static const std::string CONSUMED_LOG_ELEMENT_NAME;
00248
00249
00251 PionLogger m_logger;
00252
00254 std::string m_codec_id;
00255
00257 bool m_just_one;
00258
00260 bool m_tail_f;
00261
00263 boost::uint32_t m_frequency;
00264
00266 std::string m_log_directory;
00267
00269 boost::regex m_log_regex;
00270
00272 LogFileCollection m_logs_consumed;
00273
00275 std::string m_log_file;
00276
00278 StreamData m_current_stream_data;
00279
00281 StreamMap m_open_streams;
00282
00284 boost::scoped_ptr<boost::asio::deadline_timer> m_timer_ptr;
00285
00287 std::string m_history_cache_filename;
00288
00290 std::string m_current_log_file_cache_filename;
00291
00293 std::map<std::string, boost::uint64_t> m_num_events_read_previously;
00294
00296 boost::mutex m_logs_consumed_mutex;
00297
00299 boost::mutex m_worker_mutex;
00300
00302 boost::condition m_worker_stopped;
00303
00305 volatile bool m_worker_is_active;
00306 };
00307
00308
00309 }
00310 }
00311
00312 #endif