platform/reactors/LogInputReactor.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_LOGINPUTREACTOR_HEADER__
00021 #define __PION_LOGINPUTREACTOR_HEADER__
00022 
00023 #include <set>
00024 #include <string>
00025 #include <boost/asio.hpp>
00026 #include <boost/regex.hpp>
00027 #include <boost/scoped_ptr.hpp>
00028 #include <boost/thread/condition.hpp>
00029 #include <boost/iostreams/filtering_stream.hpp>
00030 #include <pion/PionConfig.hpp>
00031 #include <pion/PionLogger.hpp>
00032 #include <pion/PionException.hpp>
00033 #include <pion/platform/Event.hpp>
00034 #include <pion/platform/Codec.hpp>
00035 #include <pion/platform/Reactor.hpp>
00036 
00037 
00038 namespace pion {        // begin namespace pion
00039 namespace plugins {     // begin namespace plugins
00040 
00041 
00045 class LogInputReactor :
00046     public pion::platform::Reactor
00047 {
00048 public:
00049 
00051     class EmptyCodecException : public PionException {
00052     public:
00053         EmptyCodecException(const std::string& reactor_id)
00054             : PionException("LogInputReactor configuration is missing a required Codec parameter: ", reactor_id) {}
00055     };
00056 
00058     class EmptyFilenameException : public PionException {
00059     public:
00060         EmptyFilenameException(const std::string& reactor_id)
00061             : PionException("LogInputReactor configuration is missing a required Filename parameter: ", reactor_id) {}
00062     };
00063     
00065     class EmptyDirectoryException : public PionException {
00066     public:
00067         EmptyDirectoryException(const std::string& reactor_id)
00068             : PionException("LogInputReactor configuration is missing a required Directory parameter: ", reactor_id) {}
00069     };
00070     
00072     class DirectoryNotFoundException : public PionException {
00073     public:
00074         DirectoryNotFoundException(const std::string& dir)
00075             : PionException("LogInputReactor directory not found: ", dir) {}
00076     };
00077     
00079     class NotADirectoryException : public PionException {
00080     public:
00081         NotADirectoryException(const std::string& dir)
00082             : PionException("LogInputReactor Directory parameter is not a directory: ", dir) {}
00083     };
00084     
00086     class BadFrequencyException : public PionException {
00087     public:
00088         BadFrequencyException(const std::string& dir)
00089             : PionException("LogInputReactor frequency must be greater than zero: ", dir) {}
00090     };
00091     
00093     class OpenLogException : public PionException {
00094     public:
00095         OpenLogException(const std::string& log_filename)
00096             : PionException("Unable to open log file for reading: ", log_filename) {}
00097     };
00098 
00100     class EmptyLogException : public PionException {
00101     public:
00102         EmptyLogException(const std::string& log_filename)
00103             : PionException("Log file is empty: ", log_filename) {}
00104     };
00105     
00107     class ReadEventException : public PionException {
00108     public:
00109         ReadEventException(const std::string& log_filename)
00110             : PionException("Unable to read event from log file: ", log_filename) {}
00111     };
00112 
00113     
00115     LogInputReactor(void)
00116         : Reactor(TYPE_COLLECTION),
00117         m_logger(PION_GET_LOGGER("pion.LogInputReactor")),
00118         m_just_one(false), m_tail_f(false), m_frequency(DEFAULT_FREQUENCY), m_worker_is_active(false)
00119     {
00120         // publish the "FinishedLog" signal
00121         publish("FinishedLog");
00122     }
00123     
00125     virtual ~LogInputReactor() { stop(); }
00126     
00134     virtual void setConfig(const pion::platform::Vocabulary& v, const xmlNodePtr config_ptr);
00135     
00145     virtual void query(std::ostream& out, const QueryBranches& branches,
00146         const QueryParams& qp);
00147     
00149     virtual void start(void);
00150     
00152     virtual void stop(void);
00153     
00155     inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00156     
00158     inline PionLogger getLogger(void) { return m_logger; }
00159     
00160         
00161 private:
00162 
00164     typedef std::set<std::string>       LogFileCollection;
00165 
00167     typedef std::pair<boost::shared_ptr<boost::iostreams::filtering_istream>, 
00168                       boost::shared_ptr<boost::uint64_t> >          StreamData;
00169 
00171     typedef std::map<std::string, StreamData>                       StreamMap;
00172 
00178     void scheduleLogFileCheck(boost::uint32_t seconds);
00179     
00181     void checkForLogFiles(void);
00182 
00184     void recordLogFileAsDone(void);
00185 
00187     inline void scheduleReadFromLog(void) {
00188         getScheduler().getIOService().post(boost::bind(&LogInputReactor::readFromLog, this));
00189     }
00190     
00192     void readFromLog(void);
00193     
00199     void getLogFilesInLogDirectory(LogFileCollection& files);
00200     
00207     inline void deliverEventFromLog(const pion::platform::EventPtr& e, bool return_immediately = false) {
00208         incrementEventsIn();
00209         ConfigReadLock cfg_lock(*this);
00210         deliverEvent(e, return_immediately);
00211     }
00212 
00214     inline void finishWorkerThread(void) {
00215         boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00216         PION_LOG_DEBUG(m_logger, "Log reader thread has finished: " << getId());
00217         m_worker_is_active = false;
00218         m_worker_stopped.notify_all();
00219     }
00220 
00221     
00223     static const boost::uint32_t        DEFAULT_FREQUENCY;
00224     
00226     static const std::string            CODEC_ELEMENT_NAME;
00227 
00229     static const std::string            DIRECTORY_ELEMENT_NAME;
00230 
00232     static const std::string            FILENAME_ELEMENT_NAME;
00233 
00235     static const std::string            JUST_ONE_ELEMENT_NAME;
00236 
00238     static const std::string            TAIL_F_ELEMENT_NAME;
00239 
00241     static const std::string            FREQUENCY_ELEMENT_NAME;
00242 
00244     static const std::string            CURRENT_LOG_ELEMENT_NAME;
00245 
00247     static const std::string            CONSUMED_LOG_ELEMENT_NAME;
00248 
00249     
00251     PionLogger                          m_logger;
00252     
00254     std::string                         m_codec_id;
00255     
00257     bool                                m_just_one;
00258 
00260     bool                                m_tail_f;
00261 
00263     boost::uint32_t                     m_frequency;
00264 
00266     std::string                         m_log_directory;
00267 
00269     boost::regex                        m_log_regex;
00270     
00272     LogFileCollection                   m_logs_consumed;
00273     
00275     std::string                         m_log_file;
00276 
00278     StreamData                          m_current_stream_data;
00279 
00281     StreamMap                           m_open_streams;
00282 
00284     boost::scoped_ptr<boost::asio::deadline_timer>  m_timer_ptr;
00285 
00287     std::string                         m_history_cache_filename;
00288 
00290     std::string                         m_current_log_file_cache_filename;
00291 
00293     std::map<std::string, boost::uint64_t>  m_num_events_read_previously;
00294 
00296     boost::mutex                        m_logs_consumed_mutex;
00297 
00299     boost::mutex                        m_worker_mutex;
00300 
00302     boost::condition                    m_worker_stopped;
00303     
00305     volatile bool                       m_worker_is_active;
00306 };
00307 
00308 
00309 }   // end namespace plugins
00310 }   // end namespace pion
00311 
00312 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7