platform/reactors/LogOutputReactor.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <boost/filesystem.hpp>
00021 #include <boost/filesystem/operations.hpp>
00022 #include <pion/platform/Reactor.hpp>
00023 #include <pion/platform/CodecFactory.hpp>
00024 #include <pion/platform/ReactionEngine.hpp>
00025 #include "LogOutputReactor.hpp"
00026 
00027 using namespace pion::platform;
00028 
00029 
00030 namespace pion {        // begin namespace pion
00031 namespace plugins {     // begin namespace plugins
00032 
00033 
00034 // static members of LogOutputReactor
00035 
00036 const std::string           LogOutputReactor::CODEC_ELEMENT_NAME = "Codec";
00037 const std::string           LogOutputReactor::FILENAME_ELEMENT_NAME = "Filename";
00038     
00039     
00040 // LogOutputReactor member functions
00041 
00042 void LogOutputReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00043 {
00044     // first set config options for the Reactor base class
00045     ConfigWriteLock cfg_lock(*this);
00046     Reactor::setConfig(v, config_ptr);
00047     
00048     // if running, close the current output log & stop (and save status)
00049     bool was_running = m_is_running;
00050     if (m_is_running) {
00051         closeLogFileNoLock();
00052         m_is_running = false;
00053     }
00054 
00055     // get the Codec that the Reactor should use
00056     if (! ConfigManager::getConfigOption(CODEC_ELEMENT_NAME, m_codec_id, config_ptr))
00057         throw EmptyCodecException(getId());
00058     // make sure the codec exists (but leave it uninitialized)
00059     (void)getCodecFactory().getCodec(m_codec_id);
00060     
00061     // get the filename regex to use for finding log files
00062     if (! ConfigManager::getConfigOption(FILENAME_ELEMENT_NAME, m_log_filename, config_ptr))
00063         throw EmptyFilenameException(getId());
00064     
00065     // resolve paths relative to the ReactionEngine's config file location
00066     m_log_filename = getReactionEngine().resolveRelativeDataPath(m_log_filename);
00067 
00068     // if running, open the new output log
00069     if (was_running) {
00070         openLogFileNoLock();
00071         m_is_running = true;
00072     }
00073 }
00074     
00075 void LogOutputReactor::updateVocabulary(const Vocabulary& v)
00076 {
00077     // first update anything in the Reactor base class that might be needed
00078     ConfigWriteLock cfg_lock(*this);
00079     Reactor::updateVocabulary(v);
00080     if (m_codec_ptr)
00081         m_codec_ptr->updateVocabulary(v);
00082 }
00083 
00084 void LogOutputReactor::updateCodecs(void)
00085 {
00086     // check if the codec was deleted (if so, stop now!)
00087     if (! getCodecFactory().hasPlugin(m_codec_id)) {
00088         stop();
00089     } else {
00090         // update the codec pointer
00091         ConfigWriteLock cfg_lock(*this);
00092         if (m_codec_ptr)
00093             m_codec_ptr = getCodecFactory().getCodec(m_codec_id);
00094     }
00095 }
00096     
00097 void LogOutputReactor::process(const EventPtr& e)
00098 {
00099     PION_ASSERT(m_codec_ptr);
00100     PION_ASSERT(m_log_stream.is_open());
00101     
00102     // make sure that the event type matches the codec
00103     if (e->getType() != m_codec_ptr->getEventType())
00104         return;
00105 
00106     // lock mutex to ensure that only one Event may be written at a time
00107     boost::mutex::scoped_lock log_writer_lock(m_log_writer_mutex);
00108     
00109     // write the Event to the log file
00110     m_codec_ptr->write(m_log_stream, *e);
00111     if (! m_log_stream)
00112         throw WriteToLogException(m_log_filename);
00113         
00114     // unlock mutex after writing to log
00115     log_writer_lock.unlock();
00116 
00117     // deliver the Event to other Reactors
00118     deliverEvent(e);
00119 }
00120 
00121 void LogOutputReactor::query(std::ostream& out, const QueryBranches& branches,
00122     const QueryParams& qp)
00123 {
00124     if (branches.size() > 2 && branches[2] == "rotate") {
00125         ConfigWriteLock cfg_lock(*this);
00126 
00127         // Send the default query response.  This is protected by the mutex so
00128         // that the statistics correspond to the point at which the file was saved.
00129         Reactor::query(out, branches, qp);
00130 
00131         pion::PionTimeFacet facet("-%Y%m%d-%H%M%S");
00132         std::string timestamp = facet.toString(boost::get_system_time());
00133         boost::filesystem::path p(m_log_filename);
00134         boost::filesystem::path timestamped_path = p.branch_path() / (basename(p) + timestamp + extension(p));
00135         if (boost::filesystem::exists(timestamped_path)) {
00136             throw LogRotationException(m_log_filename);
00137         }
00138 
00139         if (isRunning()) {
00140             closeLogFileNoLock();
00141         }
00142 
00143         // Timestamp the log file if it exists.
00144         if (boost::filesystem::exists(m_log_filename)) {
00145             boost::filesystem::rename(m_log_filename, timestamped_path);
00146         }
00147 
00148         if (isRunning()) {
00149             openLogFileNoLock();
00150         }
00151     } else {
00152         // Send the default query response.
00153         Reactor::query(out, branches, qp);
00154     }
00155 }
00156 
00157 void LogOutputReactor::start(void)
00158 {
00159     ConfigWriteLock cfg_lock(*this);
00160     if (! m_is_running) {
00161         openLogFileNoLock();
00162         m_is_running = true;
00163     }
00164 }
00165 
00166 void LogOutputReactor::openLogFileNoLock(void)
00167 {
00168     // open up the file for writing
00169     m_log_stream.open(m_log_filename.c_str(), std::ios::out | std::ios::app | std::ios::binary);
00170     if (! m_log_stream.is_open())
00171         throw OpenLogException(m_log_filename);
00172 
00173     // initialize the codec for writing
00174     m_codec_ptr = getCodecFactory().getCodec(m_codec_id);
00175     PION_ASSERT(m_codec_ptr);
00176 
00177     PION_LOG_DEBUG(m_logger, "Opened output log file: " << m_log_filename);
00178 }
00179 
00180 void LogOutputReactor::stop(void)
00181 {
00182     ConfigWriteLock cfg_lock(*this);
00183     // if left unhandled, can cause crash on Windows platform 
00184     // when invoked via PluginManager::run(boost::bind(&Reactor::stop, _1))
00185     try {
00186         if (m_is_running) {
00187             closeLogFileNoLock();
00188             m_is_running = false;
00189         }
00190     } catch(std::exception& e) {
00191         PION_LOG_ERROR(m_logger, e.what());
00192     }
00193 }
00194 
00195 void LogOutputReactor::closeLogFileNoLock(void)
00196 {
00197     // close the log file if it is open
00198     if (m_log_stream.is_open()) {
00199         m_codec_ptr->finish(m_log_stream);
00200         m_codec_ptr.reset();
00201         m_log_stream.close();
00202         // remove the log file if no events were written to it
00203         if (boost::filesystem::file_size(m_log_filename) == 0) {
00204             boost::filesystem::remove(m_log_filename);
00205             PION_LOG_DEBUG(m_logger, "Closing empty output log (removing file): " << m_log_filename);
00206         } else {
00207             PION_LOG_DEBUG(m_logger, "Closing output log file: " << m_log_filename);
00208         }
00209         // clear any pending error flags to be safe
00210         m_log_stream.clear();
00211     }   
00212 }
00213 
00214     
00215 }   // end namespace plugins
00216 }   // end namespace pion
00217 
00218 
00220 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_LogOutputReactor(void) {
00221     return new pion::plugins::LogOutputReactor();
00222 }
00223 
00225 extern "C" PION_PLUGIN_API void pion_destroy_LogOutputReactor(pion::plugins::LogOutputReactor *reactor_ptr) {
00226     delete reactor_ptr;
00227 }

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7