platform/reactors/LogInputReactor.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifdef _MSC_VER
00021 // This could be any valid .lib file; its only purpose is to prevent the compiler  
00022 // from trying to link to boost_zlib-*.lib (e.g. boost_zip-vc80-mt-1_37.dll).  
00023 // LogInputReactor only uses zlib indirectly, through boost_iostreams-*.dll.
00024 #define BOOST_ZLIB_BINARY "zdll.lib"
00025 
00026 // See above comment.
00027 #define BOOST_BZIP2_BINARY "bzip2.lib"
00028 #endif
00029 
00030 
00031 #include <fstream>
00032 #include <boost/filesystem.hpp>
00033 #include <boost/filesystem/operations.hpp>
00034 #include <boost/algorithm/string/predicate.hpp>
00035 #include <boost/date_time/posix_time/posix_time_duration.hpp>
00036 #include <boost/iostreams/device/file.hpp>
00037 #include <boost/iostreams/filtering_stream.hpp>
00038 #include <boost/iostreams/filter/gzip.hpp>
00039 #include <boost/iostreams/filter/bzip2.hpp>
00040 #include <pion/PionScheduler.hpp>
00041 #include <pion/platform/ConfigManager.hpp>
00042 #include <pion/platform/CodecFactory.hpp>
00043 #include <pion/platform/ReactionEngine.hpp>
00044 #include "LogInputReactor.hpp"
00045 
00046 using namespace pion::platform;
00047 namespace bfs = boost::filesystem;
00048 
00049 
00050 namespace pion {        // begin namespace pion
00051 namespace plugins {     // begin namespace plugins
00052 
00053 
00054 // static members of LogInputReactor
00055     
00056 const boost::uint32_t       LogInputReactor::DEFAULT_FREQUENCY = 1;
00057 const std::string           LogInputReactor::CODEC_ELEMENT_NAME = "Codec";
00058 const std::string           LogInputReactor::DIRECTORY_ELEMENT_NAME = "Directory";
00059 const std::string           LogInputReactor::FILENAME_ELEMENT_NAME = "Filename";
00060 const std::string           LogInputReactor::JUST_ONE_ELEMENT_NAME = "JustOne";
00061 const std::string           LogInputReactor::TAIL_F_ELEMENT_NAME = "TailF";
00062 const std::string           LogInputReactor::FREQUENCY_ELEMENT_NAME = "Frequency";
00063 const std::string           LogInputReactor::CURRENT_LOG_ELEMENT_NAME = "CurrentLog";
00064 const std::string           LogInputReactor::CONSUMED_LOG_ELEMENT_NAME = "ConsumedLog";
00065 
00066 
00067 // LogInputReactor member functions
00068 
00069 void LogInputReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00070 {
00071     // first set config options for the Reactor base class
00072     ConfigWriteLock cfg_lock(*this);
00073     Reactor::setConfig(v, config_ptr);
00074     
00075     // get the Codec that the Reactor should use
00076     if (! ConfigManager::getConfigOption(CODEC_ELEMENT_NAME, m_codec_id, config_ptr))
00077         throw EmptyCodecException(getId());
00078     
00079     // get the directory where the Reactor will look for new log files
00080     if (! ConfigManager::getConfigOption(DIRECTORY_ELEMENT_NAME, m_log_directory, config_ptr))
00081         throw EmptyDirectoryException(getId());
00082     
00083     // resolve paths relative to the ReactionEngine's config file location
00084     m_log_directory = getReactionEngine().resolveRelativeDataPath(m_log_directory);
00085 
00086     // make sure that the directory exists
00087     if (! boost::filesystem::exists(m_log_directory) )
00088         throw DirectoryNotFoundException(m_log_directory);
00089     if (! boost::filesystem::is_directory(m_log_directory) )
00090         throw NotADirectoryException(m_log_directory);
00091     
00092     // get the filename regex to use for finding log files
00093     std::string filename_str;
00094     if (! ConfigManager::getConfigOption(FILENAME_ELEMENT_NAME, filename_str, config_ptr))
00095         throw EmptyFilenameException(getId());
00096     m_log_regex = filename_str;
00097     
00098     // check if the the Reactor should only read the first Event & duplicate it (for testing)
00099     m_just_one = false;
00100     std::string just_one_option;
00101     if (ConfigManager::getConfigOption(JUST_ONE_ELEMENT_NAME, just_one_option,
00102                                        config_ptr))
00103     {
00104         if (just_one_option == "true")
00105             m_just_one = true;
00106     }
00107 
00108     // check if "tail -f" option was requested
00109     m_tail_f = false;
00110     std::string tail_f_option;
00111     if (ConfigManager::getConfigOption(TAIL_F_ELEMENT_NAME, tail_f_option,
00112                                        config_ptr))
00113     {
00114         if (tail_f_option == "true")
00115             m_tail_f = true;
00116     }
00117 
00118     // get the frequency to check for new logs (if defined)
00119     std::string frequency_str;
00120     if (ConfigManager::getConfigOption(FREQUENCY_ELEMENT_NAME, frequency_str, config_ptr)) {
00121         const boost::uint32_t frequency_value = boost::lexical_cast<boost::uint32_t>(frequency_str);
00122         if (frequency_value <= 0)
00123             throw BadFrequencyException(getId());
00124         m_frequency = frequency_value;
00125     } else {
00126         m_frequency = DEFAULT_FREQUENCY;
00127     }
00128 
00129     // assign names for the cache files
00130     m_history_cache_filename = getId() + ".cache";
00131     m_history_cache_filename = getReactionEngine().resolveRelativePath(m_history_cache_filename);
00132     m_current_log_file_cache_filename = getId() + "-cur.cache";
00133     m_current_log_file_cache_filename = getReactionEngine().resolveRelativePath(m_current_log_file_cache_filename);
00134 }
00135     
00136 void LogInputReactor::query(std::ostream& out, const QueryBranches& branches,
00137     const QueryParams& qp)
00138 {
00139     writeBeginReactorXML(out);
00140     writeStatsOnlyXML(out);
00141     
00142     boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00143 
00144     out << '<' << CURRENT_LOG_ELEMENT_NAME << '>' << m_log_file
00145         << "</" << CURRENT_LOG_ELEMENT_NAME << '>' << std::endl;
00146         
00147     for (LogFileCollection::const_iterator i = m_logs_consumed.begin();
00148         i != m_logs_consumed.end(); ++i)
00149     {
00150         out << '<' << CONSUMED_LOG_ELEMENT_NAME << '>' << *i
00151             << "</" << CONSUMED_LOG_ELEMENT_NAME << '>' << std::endl;
00152     }
00153 
00154     logs_consumed_lock.unlock();
00155     
00156     writeEndReactorXML(out);
00157 }
00158 
00159 void LogInputReactor::start(void)
00160 {
00161     boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00162     if (! m_is_running) {
00163         boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00164 
00165         // Process history cache (list of log files that have already been consumed) if present.
00166         if (bfs::exists(m_history_cache_filename)) {
00167             std::ifstream history_cache(m_history_cache_filename.c_str());
00168             if (! history_cache)
00169                 throw PionException("Unable to open history cache file for reading.");
00170             m_logs_consumed.clear();
00171             std::string already_consumed_file;
00172             while (history_cache >> already_consumed_file) {
00173                 m_logs_consumed.insert(already_consumed_file);
00174             }
00175         }
00176 
00177         // Read the log file cache file to get the number of events previously read for all log files
00178         // that were open for reading the last time this Reactor was stopped.
00179         if (bfs::exists(m_current_log_file_cache_filename)) {
00180             std::ifstream current_log_file_cache(m_current_log_file_cache_filename.c_str());
00181             if (! current_log_file_cache)
00182                 throw PionException("Unable to open current log file cache file for reading.");
00183             getline(current_log_file_cache, m_log_file, '|');
00184             while (! current_log_file_cache.eof()) {
00185                 current_log_file_cache >> m_num_events_read_previously[m_log_file];
00186                 getline(current_log_file_cache, m_log_file, '|');
00187             }
00188             current_log_file_cache.close();
00189             bfs::remove(m_current_log_file_cache_filename);
00190         }
00191 
00192         m_is_running = true;
00193         m_worker_is_active = true;
00194         getScheduler().getIOService().post(boost::bind(&LogInputReactor::checkForLogFiles, this));
00195     }
00196 }
00197     
00198 void LogInputReactor::stop(void)
00199 {
00200     boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00201     if (m_is_running) {
00202         // set flag to notify reader thread to shutdown
00203         PION_LOG_DEBUG(m_logger, "Stopping worker thread: " << getId());
00204         m_is_running = false;
00205         m_timer_ptr.reset();
00206 
00207         // wait until the worker thread has finished
00208         while (m_worker_is_active) {
00209             m_worker_stopped.wait(worker_lock);
00210         }
00211         PION_LOG_DEBUG(m_logger, "Worker thread has finished: " << getId());
00212 
00213         // Write filename and number of events read for all open streams to a cache file.
00214         if (m_open_streams.empty()) {
00215             boost::filesystem::remove(m_current_log_file_cache_filename);
00216         } else {
00217             PION_LOG_DEBUG(m_logger, "Updating log cache file: " << getId());
00218             std::ofstream current_log_file_cache(m_current_log_file_cache_filename.c_str());
00219             if (! current_log_file_cache)
00220                 throw PionException("Unable to open current log file cache file for writing.");
00221             for (StreamMap::iterator it = m_open_streams.begin(); it != m_open_streams.end(); ++it) {
00222                 current_log_file_cache << it->first << "|" << *it->second.second << std::endl;
00223             }
00224         }
00225 
00226         // Close all the open streams.
00227         for (StreamMap::iterator it = m_open_streams.begin(); it != m_open_streams.end(); ++it) {
00228             boost::shared_ptr<boost::iostreams::filtering_istream> log_stream = it->second.first;
00229             // Remove and close all Filters and Devices.
00230             while (! log_stream->empty()) log_stream->pop();
00231         }
00232         m_open_streams.clear();
00233     }
00234 }
00235 
00236 void LogInputReactor::scheduleLogFileCheck(boost::uint32_t seconds)
00237 {
00238     boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00239     if (! m_is_running) {
00240         // stop() was called after last isRunning() check in checkForLogFiles()
00241         worker_lock.unlock();
00242         finishWorkerThread();
00243     } else if (seconds == 0) {
00244         getScheduler().getIOService().post(boost::bind(&LogInputReactor::checkForLogFiles, this));
00245     } else {
00246         if (! m_timer_ptr)
00247             m_timer_ptr.reset(new boost::asio::deadline_timer(getScheduler().getIOService()));
00248         m_timer_ptr->expires_from_now(boost::posix_time::seconds(seconds));
00249         m_timer_ptr->async_wait(boost::bind(&LogInputReactor::checkForLogFiles, this));
00250     }
00251 }
00252 
00253 void LogInputReactor::checkForLogFiles(void)
00254 {
00255     // make sure that the reactor is still running
00256     if (! isRunning() ) {
00257         finishWorkerThread();
00258         return;
00259     }
00260 
00261     PION_LOG_DEBUG(m_logger, "Checking for new log files in " << m_log_directory);
00262 
00263     ConfigReadLock cfg_lock(*this);
00264 
00265     try {
00266         // get the current logs located in the log directory
00267         LogFileCollection current_logs;
00268         getLogFilesInLogDirectory(current_logs);
00269     
00270         boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00271 
00272         // remove logs from the consumed collection that are no longer there
00273         LogFileCollection::iterator temp_itr;
00274         LogFileCollection::iterator log_itr = m_logs_consumed.begin();
00275         while (log_itr != m_logs_consumed.end()) {
00276             temp_itr = log_itr++;
00277             if (current_logs.find(*temp_itr) == current_logs.end())
00278                 m_logs_consumed.erase(temp_itr);
00279         }
00280 
00281         // TODO: Is this useful?  I wrote it, then realized I couldn't test it
00282         // because I couldn't delete log files while they were open.
00283         /*
00284         // remove logs from the open log stream collection that are no longer there
00285         StreamMap::iterator temp_itr_2;
00286         StreamMap::iterator log_itr_2 = m_open_streams.begin();
00287         while (log_itr_2 != m_open_streams.end()) {
00288             temp_itr_2 = log_itr_2++;
00289             if (! bfs::exists(temp_itr_2->first))
00290                 m_open_streams.erase(temp_itr_2);
00291         }
00292         */
00293 
00294         // Update the history cache, which should always contain the same list as m_logs_consumed.
00295         if (m_logs_consumed.empty()) {
00296             bfs::remove(m_history_cache_filename);
00297         } else {
00298             std::ofstream history_cache(m_history_cache_filename.c_str());
00299             if (! history_cache)
00300                 throw PionException("Unable to open history cache file for writing.");
00301             for (log_itr = m_logs_consumed.begin(); log_itr != m_logs_consumed.end(); ++log_itr) {
00302                 history_cache << *log_itr << std::endl;
00303             }
00304         }
00305 
00306         // check for an existing log that has not yet been consumed
00307         for (log_itr = current_logs.begin(); log_itr != current_logs.end(); ++log_itr) {
00308             if (m_logs_consumed.find(*log_itr) == m_logs_consumed.end())
00309                 break;
00310         }
00311 
00312         if (log_itr == current_logs.end()) {
00313             // no new logs to consume
00314 
00315             if (m_tail_f) {
00316                 // Check the open streams to see if any have new records.
00317                 StreamMap::iterator it;
00318                 for (it = m_open_streams.begin(); it != m_open_streams.end(); ++it) {
00319                     it->second.first->clear();
00320                     if (it->second.first->peek() != EOF)
00321                         break;
00322                 }
00323                 if (it == m_open_streams.end()) {
00324                     // No open streams with new records found, so sleep until it is time to check again.
00325                     PION_LOG_DEBUG(m_logger, "No new or incremented logs (sleeping for " << m_frequency
00326                                    << " seconds): " << m_log_directory);
00327                     scheduleLogFileCheck(m_frequency);
00328                 } else {
00329                     m_log_file = it->first;
00330                     PION_LOG_DEBUG(m_logger, "Found an open log file with new records to consume: " << m_log_file);
00331                     m_current_stream_data = it->second;
00332                     scheduleReadFromLog();
00333                 }
00334             } else {
00335                 // sleep until it is time to check again
00336                 PION_LOG_DEBUG(m_logger, "No new logs (sleeping for " << m_frequency
00337                                << " seconds): " << m_log_directory);
00338                 scheduleLogFileCheck(m_frequency);
00339             }
00340         } else {
00341             // found a new log to consume
00342             
00343             // re-calculate the full path to the file
00344             bfs::path full_path(m_log_directory);
00345             full_path /= *log_itr;
00346             m_log_file = full_path.file_string();
00347 
00348             PION_LOG_DEBUG(m_logger, "Found a new log file to consume: " << m_log_file);
00349             m_current_stream_data = StreamData(
00350                 boost::shared_ptr<boost::iostreams::filtering_istream>(new boost::iostreams::filtering_istream), 
00351                 boost::shared_ptr<boost::uint64_t>(new boost::uint64_t(0)));
00352             m_open_streams[m_log_file] = m_current_stream_data;
00353             scheduleReadFromLog();
00354         }
00355     } catch (std::exception& e) {
00356         PION_LOG_ERROR(m_logger, e.what());
00357         finishWorkerThread();
00358         m_is_running = false;
00359     }
00360 }
00361 
00362 void LogInputReactor::readFromLog(void)
00363 {
00364     // make sure that the reactor is still running
00365     if (! m_is_running) {
00366         finishWorkerThread();
00367         return;
00368     }
00369 
00370     boost::shared_ptr<boost::iostreams::filtering_istream> log_stream = m_current_stream_data.first;
00371     try {
00372         // open up the log file for reading (if not open already)
00373         boost::uint64_t num_events_to_skip = 0;
00374         bool compressed = false;
00375         bool already_open = log_stream->is_complete();  // is_complete() returns true if a Device, in this case a file, is attached to the stream.
00376         if (! already_open) {
00377 
00378             // Insert a decompression filter if the file suffix indicates one is needed.
00379             if (boost::algorithm::iequals(bfs::extension(bfs::path(m_log_file)), ".gz")) {
00380                 log_stream->push(boost::iostreams::gzip_decompressor());
00381                 compressed = true;
00382             }
00383             if (boost::algorithm::iequals(bfs::extension(bfs::path(m_log_file)), ".bz2")) {
00384                 log_stream->push(boost::iostreams::bzip2_decompressor());
00385                 compressed = true;
00386             }
00387 
00388             // Open and attach a file.
00389             log_stream->push(boost::iostreams::file_source(m_log_file.c_str(), std::ios::in | std::ios::binary));
00390             if (! log_stream->is_complete())
00391                 throw OpenLogException(m_log_file);
00392 
00393             if (log_stream->peek() == EOF && ! m_tail_f) {
00394                 std::string log_file_tmp = m_log_file;
00395                 recordLogFileAsDone();
00396 
00397                 // TODO: Should this really throw an exception, or would a warning be good enough?
00398                 throw EmptyLogException(log_file_tmp);
00399             }
00400 
00401             // If there were any previously read Events, we need to skip over them, because the log file has been reopened since they were read.
00402             std::map<std::string, boost::uint64_t>::iterator it = m_num_events_read_previously.find(m_log_file);
00403             if (it != m_num_events_read_previously.end()) {
00404                 num_events_to_skip = it->second;
00405                 PION_LOG_DEBUG(m_logger, "Resuming log file parsing by skipping " << num_events_to_skip << " events");
00406                 m_num_events_read_previously.erase(it);
00407             }
00408 
00409         }
00410 
00411         // Get a new Codec for reading the current log file.
00412         CodecPtr codec_ptr;
00413         {
00414             ConfigReadLock cfg_lock(*this);
00415             codec_ptr = getCodecFactory().getCodec(m_codec_id);
00416             PION_ASSERT(codec_ptr);
00417         }
00418 
00419         const Event::EventType event_type(codec_ptr->getEventType());
00420         EventFactory event_factory;
00421         EventPtr event_ptr;
00422         do {
00423             // get a new event from the EventFactory
00424             event_factory.create(event_ptr, event_type);
00425 
00426             // read an Event from the log file (convert into an Event using the Codec)
00427             if (! isRunning()) break;
00428             bool event_read = codec_ptr->read(*log_stream, *event_ptr);
00429             if (! event_read && ! log_stream->eof())
00430                 throw ReadEventException(m_log_file);
00431             if (event_read) {
00432                 // Increment the count of events for this file.
00433                 ++(*m_current_stream_data.second);
00434 
00435                 // Ignore Events that were previously read.
00436                 if (num_events_to_skip) {
00437                     num_events_to_skip--;
00438                     continue;
00439                 }
00440             }
00441 
00442             // check if only the first Event should be read
00443             if (m_just_one) {
00444                 PION_LOG_DEBUG(m_logger, "JustOne: generating lots of event copies for testing");
00445 
00446                 // Remove and close all Filters and Devices.
00447                 while (! log_stream->empty()) log_stream->pop();
00448 
00449                 // just duplicate the event repeatedly until the Reactor is stopped
00450                 EventPtr original_event_ptr(event_ptr);
00451                 while (isRunning()) {
00452                     // duplicate the original event
00453                     event_factory.create(event_ptr, event_type);
00454                     *event_ptr += *original_event_ptr;
00455                     // deliver the Event to connected Reactors
00456                     deliverEventFromLog(event_ptr);
00457                 }
00458                 break;
00459             }
00460 
00461             // check for end of file
00462             if (log_stream->eof()) {
00463                 if (m_tail_f && ! compressed) {
00464                     log_stream->clear();
00465                     PION_LOG_DEBUG(m_logger, "Finished consuming currently available records in log file: " << m_log_file);
00466                 } else {
00467                     // all done with this log file
00468                     PION_LOG_DEBUG(m_logger, "Finished consuming log file: " << m_log_file);
00469 
00470                     // Remove and close all Filters and Devices.
00471                     while (! log_stream->empty()) log_stream->pop();
00472 
00473                     StreamMap::iterator it = m_open_streams.find(m_log_file);
00474                     if (it != m_open_streams.end()) // sanity check
00475                         m_open_streams.erase(it);
00476 
00477                     recordLogFileAsDone();
00478                 }
00479 
00480                 // deliver the Event to connected Reactors
00481                 if (event_read)
00482                     deliverEventFromLog(event_ptr);
00483 
00484                 break;
00485             }
00486 
00487             // deliver the Event to connected Reactors
00488             if (event_read)
00489                 deliverEventFromLog(event_ptr);
00490 
00491         } while (isRunning()); 
00492 
00493     } catch (std::exception& e) {
00494         PION_LOG_ERROR(m_logger, e.what());
00495 
00496         // Remove and close all Filters and Devices.
00497         while (! log_stream->empty()) log_stream->pop();
00498 
00499         recordLogFileAsDone();
00500     }   
00501 
00502     // check for more logs
00503     // note: if not running, the new thread should just return (almost) immediately
00504     scheduleLogFileCheck(0);
00505 }
00506 
00507 void LogInputReactor::getLogFilesInLogDirectory(LogFileCollection& files)
00508 {
00509     bfs::path dir_path(m_log_directory);
00510     for (bfs::directory_iterator itr(dir_path); itr!=bfs::directory_iterator(); ++itr) {
00511         if (bfs::is_regular(itr->status())) {
00512             const std::string filename(itr->path().leaf());
00513             if (boost::regex_search(filename, m_log_regex)) {
00514                 if (! m_tail_f || (m_open_streams.find(itr->path().file_string()) == m_open_streams.end()))
00515                     files.insert(filename);
00516             }
00517         }
00518     }
00519 }
00520 
00521 void LogInputReactor::recordLogFileAsDone() {
00522     // No need to record anything if "JustOne" is enabled.
00523     if (m_just_one) return;
00524 
00525     // Add the current log file to the list of consumed files and the history cache.
00526     bfs::path log_file_path(m_log_file);
00527     boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00528     m_log_file.clear();
00529     m_logs_consumed.insert(log_file_path.leaf());
00530     std::ofstream history_cache(m_history_cache_filename.c_str(), std::ios::out | std::ios::app);
00531     if (! history_cache)
00532         throw PionException("Unable to open history cache file for writing.");
00533     {
00534         // signal finished processing log
00535         std::string log_leaf = log_file_path.leaf();
00536         signal("FinishedLog", (void*) &log_leaf);
00537     }
00538     history_cache << log_file_path.leaf() << std::endl;
00539 }
00540     
00541 }   // end namespace plugins
00542 }   // end namespace pion
00543 
00544 
00546 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_LogInputReactor(void) {
00547     return new pion::plugins::LogInputReactor();
00548 }
00549 
00551 extern "C" PION_PLUGIN_API void pion_destroy_LogInputReactor(pion::plugins::LogInputReactor *reactor_ptr) {
00552     delete reactor_ptr;
00553 }
00554 

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7