00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021
00022
00023
00024 #define BOOST_ZLIB_BINARY "zdll.lib"
00025
00026
00027 #define BOOST_BZIP2_BINARY "bzip2.lib"
00028 #endif
00029
00030
00031 #include <fstream>
00032 #include <boost/filesystem.hpp>
00033 #include <boost/filesystem/operations.hpp>
00034 #include <boost/algorithm/string/predicate.hpp>
00035 #include <boost/date_time/posix_time/posix_time_duration.hpp>
00036 #include <boost/iostreams/device/file.hpp>
00037 #include <boost/iostreams/filtering_stream.hpp>
00038 #include <boost/iostreams/filter/gzip.hpp>
00039 #include <boost/iostreams/filter/bzip2.hpp>
00040 #include <pion/PionScheduler.hpp>
00041 #include <pion/platform/ConfigManager.hpp>
00042 #include <pion/platform/CodecFactory.hpp>
00043 #include <pion/platform/ReactionEngine.hpp>
00044 #include "LogInputReactor.hpp"
00045
00046 using namespace pion::platform;
00047 namespace bfs = boost::filesystem;
00048
00049
00050 namespace pion {
00051 namespace plugins {
00052
00053
00054
00055
00056 const boost::uint32_t LogInputReactor::DEFAULT_FREQUENCY = 1;
00057 const std::string LogInputReactor::CODEC_ELEMENT_NAME = "Codec";
00058 const std::string LogInputReactor::DIRECTORY_ELEMENT_NAME = "Directory";
00059 const std::string LogInputReactor::FILENAME_ELEMENT_NAME = "Filename";
00060 const std::string LogInputReactor::JUST_ONE_ELEMENT_NAME = "JustOne";
00061 const std::string LogInputReactor::TAIL_F_ELEMENT_NAME = "TailF";
00062 const std::string LogInputReactor::FREQUENCY_ELEMENT_NAME = "Frequency";
00063 const std::string LogInputReactor::CURRENT_LOG_ELEMENT_NAME = "CurrentLog";
00064 const std::string LogInputReactor::CONSUMED_LOG_ELEMENT_NAME = "ConsumedLog";
00065
00066
00067
00068
00069 void LogInputReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00070 {
00071
00072 ConfigWriteLock cfg_lock(*this);
00073 Reactor::setConfig(v, config_ptr);
00074
00075
00076 if (! ConfigManager::getConfigOption(CODEC_ELEMENT_NAME, m_codec_id, config_ptr))
00077 throw EmptyCodecException(getId());
00078
00079
00080 if (! ConfigManager::getConfigOption(DIRECTORY_ELEMENT_NAME, m_log_directory, config_ptr))
00081 throw EmptyDirectoryException(getId());
00082
00083
00084 m_log_directory = getReactionEngine().resolveRelativeDataPath(m_log_directory);
00085
00086
00087 if (! boost::filesystem::exists(m_log_directory) )
00088 throw DirectoryNotFoundException(m_log_directory);
00089 if (! boost::filesystem::is_directory(m_log_directory) )
00090 throw NotADirectoryException(m_log_directory);
00091
00092
00093 std::string filename_str;
00094 if (! ConfigManager::getConfigOption(FILENAME_ELEMENT_NAME, filename_str, config_ptr))
00095 throw EmptyFilenameException(getId());
00096 m_log_regex = filename_str;
00097
00098
00099 m_just_one = false;
00100 std::string just_one_option;
00101 if (ConfigManager::getConfigOption(JUST_ONE_ELEMENT_NAME, just_one_option,
00102 config_ptr))
00103 {
00104 if (just_one_option == "true")
00105 m_just_one = true;
00106 }
00107
00108
00109 m_tail_f = false;
00110 std::string tail_f_option;
00111 if (ConfigManager::getConfigOption(TAIL_F_ELEMENT_NAME, tail_f_option,
00112 config_ptr))
00113 {
00114 if (tail_f_option == "true")
00115 m_tail_f = true;
00116 }
00117
00118
00119 std::string frequency_str;
00120 if (ConfigManager::getConfigOption(FREQUENCY_ELEMENT_NAME, frequency_str, config_ptr)) {
00121 const boost::uint32_t frequency_value = boost::lexical_cast<boost::uint32_t>(frequency_str);
00122 if (frequency_value <= 0)
00123 throw BadFrequencyException(getId());
00124 m_frequency = frequency_value;
00125 } else {
00126 m_frequency = DEFAULT_FREQUENCY;
00127 }
00128
00129
00130 m_history_cache_filename = getId() + ".cache";
00131 m_history_cache_filename = getReactionEngine().resolveRelativePath(m_history_cache_filename);
00132 m_current_log_file_cache_filename = getId() + "-cur.cache";
00133 m_current_log_file_cache_filename = getReactionEngine().resolveRelativePath(m_current_log_file_cache_filename);
00134 }
00135
00136 void LogInputReactor::query(std::ostream& out, const QueryBranches& branches,
00137 const QueryParams& qp)
00138 {
00139 writeBeginReactorXML(out);
00140 writeStatsOnlyXML(out);
00141
00142 boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00143
00144 out << '<' << CURRENT_LOG_ELEMENT_NAME << '>' << m_log_file
00145 << "</" << CURRENT_LOG_ELEMENT_NAME << '>' << std::endl;
00146
00147 for (LogFileCollection::const_iterator i = m_logs_consumed.begin();
00148 i != m_logs_consumed.end(); ++i)
00149 {
00150 out << '<' << CONSUMED_LOG_ELEMENT_NAME << '>' << *i
00151 << "</" << CONSUMED_LOG_ELEMENT_NAME << '>' << std::endl;
00152 }
00153
00154 logs_consumed_lock.unlock();
00155
00156 writeEndReactorXML(out);
00157 }
00158
00159 void LogInputReactor::start(void)
00160 {
00161 boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00162 if (! m_is_running) {
00163 boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00164
00165
00166 if (bfs::exists(m_history_cache_filename)) {
00167 std::ifstream history_cache(m_history_cache_filename.c_str());
00168 if (! history_cache)
00169 throw PionException("Unable to open history cache file for reading.");
00170 m_logs_consumed.clear();
00171 std::string already_consumed_file;
00172 while (history_cache >> already_consumed_file) {
00173 m_logs_consumed.insert(already_consumed_file);
00174 }
00175 }
00176
00177
00178
00179 if (bfs::exists(m_current_log_file_cache_filename)) {
00180 std::ifstream current_log_file_cache(m_current_log_file_cache_filename.c_str());
00181 if (! current_log_file_cache)
00182 throw PionException("Unable to open current log file cache file for reading.");
00183 getline(current_log_file_cache, m_log_file, '|');
00184 while (! current_log_file_cache.eof()) {
00185 current_log_file_cache >> m_num_events_read_previously[m_log_file];
00186 getline(current_log_file_cache, m_log_file, '|');
00187 }
00188 current_log_file_cache.close();
00189 bfs::remove(m_current_log_file_cache_filename);
00190 }
00191
00192 m_is_running = true;
00193 m_worker_is_active = true;
00194 getScheduler().getIOService().post(boost::bind(&LogInputReactor::checkForLogFiles, this));
00195 }
00196 }
00197
00198 void LogInputReactor::stop(void)
00199 {
00200 boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00201 if (m_is_running) {
00202
00203 PION_LOG_DEBUG(m_logger, "Stopping worker thread: " << getId());
00204 m_is_running = false;
00205 m_timer_ptr.reset();
00206
00207
00208 while (m_worker_is_active) {
00209 m_worker_stopped.wait(worker_lock);
00210 }
00211 PION_LOG_DEBUG(m_logger, "Worker thread has finished: " << getId());
00212
00213
00214 if (m_open_streams.empty()) {
00215 boost::filesystem::remove(m_current_log_file_cache_filename);
00216 } else {
00217 PION_LOG_DEBUG(m_logger, "Updating log cache file: " << getId());
00218 std::ofstream current_log_file_cache(m_current_log_file_cache_filename.c_str());
00219 if (! current_log_file_cache)
00220 throw PionException("Unable to open current log file cache file for writing.");
00221 for (StreamMap::iterator it = m_open_streams.begin(); it != m_open_streams.end(); ++it) {
00222 current_log_file_cache << it->first << "|" << *it->second.second << std::endl;
00223 }
00224 }
00225
00226
00227 for (StreamMap::iterator it = m_open_streams.begin(); it != m_open_streams.end(); ++it) {
00228 boost::shared_ptr<boost::iostreams::filtering_istream> log_stream = it->second.first;
00229
00230 while (! log_stream->empty()) log_stream->pop();
00231 }
00232 m_open_streams.clear();
00233 }
00234 }
00235
00236 void LogInputReactor::scheduleLogFileCheck(boost::uint32_t seconds)
00237 {
00238 boost::mutex::scoped_lock worker_lock(m_worker_mutex);
00239 if (! m_is_running) {
00240
00241 worker_lock.unlock();
00242 finishWorkerThread();
00243 } else if (seconds == 0) {
00244 getScheduler().getIOService().post(boost::bind(&LogInputReactor::checkForLogFiles, this));
00245 } else {
00246 if (! m_timer_ptr)
00247 m_timer_ptr.reset(new boost::asio::deadline_timer(getScheduler().getIOService()));
00248 m_timer_ptr->expires_from_now(boost::posix_time::seconds(seconds));
00249 m_timer_ptr->async_wait(boost::bind(&LogInputReactor::checkForLogFiles, this));
00250 }
00251 }
00252
00253 void LogInputReactor::checkForLogFiles(void)
00254 {
00255
00256 if (! isRunning() ) {
00257 finishWorkerThread();
00258 return;
00259 }
00260
00261 PION_LOG_DEBUG(m_logger, "Checking for new log files in " << m_log_directory);
00262
00263 ConfigReadLock cfg_lock(*this);
00264
00265 try {
00266
00267 LogFileCollection current_logs;
00268 getLogFilesInLogDirectory(current_logs);
00269
00270 boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00271
00272
00273 LogFileCollection::iterator temp_itr;
00274 LogFileCollection::iterator log_itr = m_logs_consumed.begin();
00275 while (log_itr != m_logs_consumed.end()) {
00276 temp_itr = log_itr++;
00277 if (current_logs.find(*temp_itr) == current_logs.end())
00278 m_logs_consumed.erase(temp_itr);
00279 }
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295 if (m_logs_consumed.empty()) {
00296 bfs::remove(m_history_cache_filename);
00297 } else {
00298 std::ofstream history_cache(m_history_cache_filename.c_str());
00299 if (! history_cache)
00300 throw PionException("Unable to open history cache file for writing.");
00301 for (log_itr = m_logs_consumed.begin(); log_itr != m_logs_consumed.end(); ++log_itr) {
00302 history_cache << *log_itr << std::endl;
00303 }
00304 }
00305
00306
00307 for (log_itr = current_logs.begin(); log_itr != current_logs.end(); ++log_itr) {
00308 if (m_logs_consumed.find(*log_itr) == m_logs_consumed.end())
00309 break;
00310 }
00311
00312 if (log_itr == current_logs.end()) {
00313
00314
00315 if (m_tail_f) {
00316
00317 StreamMap::iterator it;
00318 for (it = m_open_streams.begin(); it != m_open_streams.end(); ++it) {
00319 it->second.first->clear();
00320 if (it->second.first->peek() != EOF)
00321 break;
00322 }
00323 if (it == m_open_streams.end()) {
00324
00325 PION_LOG_DEBUG(m_logger, "No new or incremented logs (sleeping for " << m_frequency
00326 << " seconds): " << m_log_directory);
00327 scheduleLogFileCheck(m_frequency);
00328 } else {
00329 m_log_file = it->first;
00330 PION_LOG_DEBUG(m_logger, "Found an open log file with new records to consume: " << m_log_file);
00331 m_current_stream_data = it->second;
00332 scheduleReadFromLog();
00333 }
00334 } else {
00335
00336 PION_LOG_DEBUG(m_logger, "No new logs (sleeping for " << m_frequency
00337 << " seconds): " << m_log_directory);
00338 scheduleLogFileCheck(m_frequency);
00339 }
00340 } else {
00341
00342
00343
00344 bfs::path full_path(m_log_directory);
00345 full_path /= *log_itr;
00346 m_log_file = full_path.file_string();
00347
00348 PION_LOG_DEBUG(m_logger, "Found a new log file to consume: " << m_log_file);
00349 m_current_stream_data = StreamData(
00350 boost::shared_ptr<boost::iostreams::filtering_istream>(new boost::iostreams::filtering_istream),
00351 boost::shared_ptr<boost::uint64_t>(new boost::uint64_t(0)));
00352 m_open_streams[m_log_file] = m_current_stream_data;
00353 scheduleReadFromLog();
00354 }
00355 } catch (std::exception& e) {
00356 PION_LOG_ERROR(m_logger, e.what());
00357 finishWorkerThread();
00358 m_is_running = false;
00359 }
00360 }
00361
00362 void LogInputReactor::readFromLog(void)
00363 {
00364
00365 if (! m_is_running) {
00366 finishWorkerThread();
00367 return;
00368 }
00369
00370 boost::shared_ptr<boost::iostreams::filtering_istream> log_stream = m_current_stream_data.first;
00371 try {
00372
00373 boost::uint64_t num_events_to_skip = 0;
00374 bool compressed = false;
00375 bool already_open = log_stream->is_complete();
00376 if (! already_open) {
00377
00378
00379 if (boost::algorithm::iequals(bfs::extension(bfs::path(m_log_file)), ".gz")) {
00380 log_stream->push(boost::iostreams::gzip_decompressor());
00381 compressed = true;
00382 }
00383 if (boost::algorithm::iequals(bfs::extension(bfs::path(m_log_file)), ".bz2")) {
00384 log_stream->push(boost::iostreams::bzip2_decompressor());
00385 compressed = true;
00386 }
00387
00388
00389 log_stream->push(boost::iostreams::file_source(m_log_file.c_str(), std::ios::in | std::ios::binary));
00390 if (! log_stream->is_complete())
00391 throw OpenLogException(m_log_file);
00392
00393 if (log_stream->peek() == EOF && ! m_tail_f) {
00394 std::string log_file_tmp = m_log_file;
00395 recordLogFileAsDone();
00396
00397
00398 throw EmptyLogException(log_file_tmp);
00399 }
00400
00401
00402 std::map<std::string, boost::uint64_t>::iterator it = m_num_events_read_previously.find(m_log_file);
00403 if (it != m_num_events_read_previously.end()) {
00404 num_events_to_skip = it->second;
00405 PION_LOG_DEBUG(m_logger, "Resuming log file parsing by skipping " << num_events_to_skip << " events");
00406 m_num_events_read_previously.erase(it);
00407 }
00408
00409 }
00410
00411
00412 CodecPtr codec_ptr;
00413 {
00414 ConfigReadLock cfg_lock(*this);
00415 codec_ptr = getCodecFactory().getCodec(m_codec_id);
00416 PION_ASSERT(codec_ptr);
00417 }
00418
00419 const Event::EventType event_type(codec_ptr->getEventType());
00420 EventFactory event_factory;
00421 EventPtr event_ptr;
00422 do {
00423
00424 event_factory.create(event_ptr, event_type);
00425
00426
00427 if (! isRunning()) break;
00428 bool event_read = codec_ptr->read(*log_stream, *event_ptr);
00429 if (! event_read && ! log_stream->eof())
00430 throw ReadEventException(m_log_file);
00431 if (event_read) {
00432
00433 ++(*m_current_stream_data.second);
00434
00435
00436 if (num_events_to_skip) {
00437 num_events_to_skip--;
00438 continue;
00439 }
00440 }
00441
00442
00443 if (m_just_one) {
00444 PION_LOG_DEBUG(m_logger, "JustOne: generating lots of event copies for testing");
00445
00446
00447 while (! log_stream->empty()) log_stream->pop();
00448
00449
00450 EventPtr original_event_ptr(event_ptr);
00451 while (isRunning()) {
00452
00453 event_factory.create(event_ptr, event_type);
00454 *event_ptr += *original_event_ptr;
00455
00456 deliverEventFromLog(event_ptr);
00457 }
00458 break;
00459 }
00460
00461
00462 if (log_stream->eof()) {
00463 if (m_tail_f && ! compressed) {
00464 log_stream->clear();
00465 PION_LOG_DEBUG(m_logger, "Finished consuming currently available records in log file: " << m_log_file);
00466 } else {
00467
00468 PION_LOG_DEBUG(m_logger, "Finished consuming log file: " << m_log_file);
00469
00470
00471 while (! log_stream->empty()) log_stream->pop();
00472
00473 StreamMap::iterator it = m_open_streams.find(m_log_file);
00474 if (it != m_open_streams.end())
00475 m_open_streams.erase(it);
00476
00477 recordLogFileAsDone();
00478 }
00479
00480
00481 if (event_read)
00482 deliverEventFromLog(event_ptr);
00483
00484 break;
00485 }
00486
00487
00488 if (event_read)
00489 deliverEventFromLog(event_ptr);
00490
00491 } while (isRunning());
00492
00493 } catch (std::exception& e) {
00494 PION_LOG_ERROR(m_logger, e.what());
00495
00496
00497 while (! log_stream->empty()) log_stream->pop();
00498
00499 recordLogFileAsDone();
00500 }
00501
00502
00503
00504 scheduleLogFileCheck(0);
00505 }
00506
00507 void LogInputReactor::getLogFilesInLogDirectory(LogFileCollection& files)
00508 {
00509 bfs::path dir_path(m_log_directory);
00510 for (bfs::directory_iterator itr(dir_path); itr!=bfs::directory_iterator(); ++itr) {
00511 if (bfs::is_regular(itr->status())) {
00512 const std::string filename(itr->path().leaf());
00513 if (boost::regex_search(filename, m_log_regex)) {
00514 if (! m_tail_f || (m_open_streams.find(itr->path().file_string()) == m_open_streams.end()))
00515 files.insert(filename);
00516 }
00517 }
00518 }
00519 }
00520
00521 void LogInputReactor::recordLogFileAsDone() {
00522
00523 if (m_just_one) return;
00524
00525
00526 bfs::path log_file_path(m_log_file);
00527 boost::mutex::scoped_lock logs_consumed_lock(m_logs_consumed_mutex);
00528 m_log_file.clear();
00529 m_logs_consumed.insert(log_file_path.leaf());
00530 std::ofstream history_cache(m_history_cache_filename.c_str(), std::ios::out | std::ios::app);
00531 if (! history_cache)
00532 throw PionException("Unable to open history cache file for writing.");
00533 {
00534
00535 std::string log_leaf = log_file_path.leaf();
00536 signal("FinishedLog", (void*) &log_leaf);
00537 }
00538 history_cache << log_file_path.leaf() << std::endl;
00539 }
00540
00541 }
00542 }
00543
00544
00546 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_LogInputReactor(void) {
00547 return new pion::plugins::LogInputReactor();
00548 }
00549
00551 extern "C" PION_PLUGIN_API void pion_destroy_LogInputReactor(pion::plugins::LogInputReactor *reactor_ptr) {
00552 delete reactor_ptr;
00553 }
00554