00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <pion/platform/ConfigManager.hpp>
00021 #include <pion/platform/DatabaseInserter.hpp>
00022
00023
00024 namespace pion {
00025 namespace platform {
00026
00027
00028
00029
00030 const boost::uint32_t DatabaseInserter::DEFAULT_QUEUE_SIZE = 10000;
00031 const boost::uint32_t DatabaseInserter::DEFAULT_QUEUE_TIMEOUT = 5;
00032 const std::string DatabaseInserter::DEFAULT_IGNORE = "false";
00033 const std::string DatabaseInserter::DATABASE_ELEMENT_NAME = "Database";
00034 const std::string DatabaseInserter::TABLE_ELEMENT_NAME = "Table";
00035 const std::string DatabaseInserter::FIELD_ELEMENT_NAME = "Field";
00036 const std::string DatabaseInserter::QUEUE_SIZE_ELEMENT_NAME = "QueueSize";
00037 const std::string DatabaseInserter::QUEUE_TIMEOUT_ELEMENT_NAME = "QueueTimeout";
00038 const std::string DatabaseInserter::TERM_ATTRIBUTE_NAME = "term";
00039 const std::string DatabaseInserter::INDEX_ATTRIBUTE_NAME = "index";
00040 const std::string DatabaseInserter::SQL_ATTRIBUTE_NAME = "sql";
00041 const char * DatabaseInserter::CHARSET_FOR_TABLES = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_";
00042 const std::string DatabaseInserter::IGNORE_INSERT_ELEMENT_NAME = "IgnoreInsert";
00043 const std::string DatabaseInserter::MAX_KEY_AGE_ELEMENT_NAME = "KeyCacheMaxAge";
00044 const boost::uint32_t DatabaseInserter::DEFAULT_MAX_AGE = 0;
00045 const std::string DatabaseInserter::EVENT_AGE_ELEMENT_NAME = "KeyCacheAgeTerm";
00046
00047
00048
00049
00050 void DatabaseInserter::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00051 {
00052
00053
00054 const bool was_running = m_is_running;
00055 stop();
00056
00057 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00058
00059
00060 m_rules.setConfig(v, config_ptr);
00061
00062
00063 ConfigManager::getConfigOption(QUEUE_SIZE_ELEMENT_NAME, m_queue_max, DEFAULT_QUEUE_SIZE, config_ptr);
00064
00065
00066 ConfigManager::getConfigOption(QUEUE_TIMEOUT_ELEMENT_NAME, m_queue_timeout, DEFAULT_QUEUE_TIMEOUT, config_ptr);
00067
00068
00069 ConfigManager::getConfigOption(MAX_KEY_AGE_ELEMENT_NAME, m_max_age, DEFAULT_MAX_AGE, config_ptr);
00070
00071
00072 if (m_max_age) {
00073 std::string term_str;
00074 if (!ConfigManager::getConfigOption(EVENT_AGE_ELEMENT_NAME, term_str, config_ptr))
00075 throw MissingEventTime(EVENT_AGE_ELEMENT_NAME);
00076 m_timestamp_term_ref = v.findTerm(term_str);
00077 if (m_timestamp_term_ref == Vocabulary::UNDEFINED_TERM_REF)
00078 throw UnknownTermException(term_str);
00079 }
00080
00081
00082 std::string ignore_str;
00083 m_ignore_insert = false;
00084 ConfigManager::getConfigOption(IGNORE_INSERT_ELEMENT_NAME, ignore_str, DEFAULT_IGNORE, config_ptr);
00085 if (ignore_str == "true")
00086 m_ignore_insert = true;
00087
00088
00089 m_event_queue_ptr->reserve(m_queue_max);
00090
00091
00092 if (! ConfigManager::getConfigOption(DATABASE_ELEMENT_NAME, m_database_id, config_ptr))
00093 throw EmptyDatabaseException();
00094 if (! getDatabaseManager().hasPlugin(m_database_id))
00095 throw DatabaseManager::DatabaseNotFoundException(m_database_id);
00096
00097
00098 if (! ConfigManager::getConfigOption(TABLE_ELEMENT_NAME, m_table_name, config_ptr))
00099 throw EmptyTableException();
00100
00101
00102 m_field_map.clear();
00103 m_index_map.clear();
00104 m_cache_consumption = 0;
00105 m_cache_rows = 0;
00106 m_cache_terms.clear();
00107 xmlNodePtr field_node = config_ptr;
00108 m_key_term_ref = Vocabulary::UNDEFINED_TERM_REF;
00109 while ( (field_node = ConfigManager::findConfigNodeByName(FIELD_ELEMENT_NAME, field_node)) != NULL)
00110 {
00111
00112
00113
00114 xmlChar *xml_char_ptr = xmlNodeGetContent(field_node);
00115 if (xml_char_ptr == NULL || xml_char_ptr[0]=='\0') {
00116 if (xml_char_ptr != NULL)
00117 xmlFree(xml_char_ptr);
00118 throw EmptyFieldException();
00119 }
00120 const std::string field_name(reinterpret_cast<char*>(xml_char_ptr));
00121 xmlFree(xml_char_ptr);
00122
00123 if (strspn(field_name.c_str(), CHARSET_FOR_TABLES) != field_name.length())
00124 throw IllegalCharactersException(field_name);
00125
00126 for (unsigned i = 0; i < m_field_map.size(); i++)
00127 if (m_field_map[i].first == field_name)
00128 throw DuplicateColumnName(field_name);
00129
00130
00131 const std::string term_id = ConfigManager::getAttribute(TERM_ATTRIBUTE_NAME, field_node);
00132 if (term_id.empty())
00133 throw EmptyTermException();
00134
00135 const std::string index_str = ConfigManager::getAttribute(INDEX_ATTRIBUTE_NAME, field_node);
00136
00137 const std::string sql_str = ConfigManager::getAttribute(SQL_ATTRIBUTE_NAME, field_node);
00138
00139
00140 const Vocabulary::TermRef term_ref = v.findTerm(term_id);
00141 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00142 throw UnknownTermException(term_id);
00143
00144
00145 m_field_map.push_back(std::make_pair(field_name, v[term_ref]));
00146 m_index_map.push_back(index_str);
00147
00148
00149 if (m_max_age && index_str == "unique")
00150 m_key_term_ref = term_ref;
00151
00152 if (!index_str.empty() && index_str != "false")
00153 m_cache_terms.push_back(v[term_ref]);
00154
00155
00156 field_node = field_node->next;
00157 }
00158 if (m_field_map.empty())
00159 throw NoFieldsException();
00160
00161 if (m_max_age && m_key_term_ref == Vocabulary::UNDEFINED_TERM_REF)
00162 throw NoUniqueKeyFound();
00163
00164
00165 queue_lock.unlock();
00166 if (was_running)
00167 start();
00168 }
00169
00170 void DatabaseInserter::updateVocabulary(const Vocabulary& v)
00171 {
00172 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00173 if (m_database_ptr)
00174 m_database_ptr->updateVocabulary(v);
00175
00176
00177
00178
00179
00180
00181
00182
00183 }
00184
00185 void DatabaseInserter::updateDatabases(void)
00186 {
00187
00188 if (! getDatabaseManager().hasPlugin(m_database_id)) {
00189 stop();
00190 }
00191 }
00192
00193 void DatabaseInserter::start(void)
00194 {
00195 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00196 if (! m_is_running) {
00197 m_is_running = true;
00198
00199 try {
00200
00201
00202 if (!m_database_ptr)
00203 m_database_ptr = getDatabaseManager().getDatabase(m_database_id);
00204 PION_ASSERT(m_database_ptr);
00205
00206 if (m_wipe && m_database_ptr->tableExists(m_table_name, m_partition)) {
00207 m_database_ptr->dropTable(m_table_name, m_partition);
00208 PION_LOG_DEBUG(m_logger, "Wiping partition: " << m_table_name << "/" << m_partition << " on thread: " << m_database_id);
00209 }
00210
00211
00212
00213 m_database_ptr->open(m_partition);
00214 PION_ASSERT(m_database_ptr->is_open());
00215
00216
00217 m_database_ptr->createTable(m_field_map, m_table_name, m_index_map, m_partition);
00218 m_table_size = m_database_ptr->getCache(Database::DB_FILE_SIZE);
00219
00220
00221 m_insert_query_ptr = m_ignore_insert ? m_database_ptr->prepareInsertIgnoreQuery(m_field_map, m_table_name) : m_database_ptr->prepareInsertQuery(m_field_map, m_table_name);
00222 PION_ASSERT(m_insert_query_ptr);
00223
00224
00225 m_begin_transaction_ptr = m_database_ptr->getBeginTransactionQuery();
00226 PION_ASSERT(m_begin_transaction_ptr);
00227
00228
00229 m_commit_transaction_ptr = m_database_ptr->getCommitTransactionQuery();
00230 PION_ASSERT(m_commit_transaction_ptr);
00231
00232 m_cache_overhead = m_database_ptr->getCache(Database::CACHE_INDEX_ROW_OVERHEAD);
00233 m_cache_size = m_database_ptr->getCache(Database::CACHE_PAGE_CACHE_SIZE);
00234
00235 PION_LOG_DEBUG(m_logger, "Database cache overhead: " << m_cache_overhead << ", cache size: " << m_cache_size / 1024UL << "k");
00236
00237 } catch (...) {
00238
00239 m_insert_query_ptr.reset();
00240 m_begin_transaction_ptr.reset();
00241 m_commit_transaction_ptr.reset();
00242 m_database_ptr.reset();
00243 m_is_running = false;
00244 throw;
00245 }
00246
00247
00248 PION_LOG_DEBUG(m_logger, "Starting worker thread: " << m_database_id);
00249 m_thread.reset(new boost::thread(boost::bind(&DatabaseInserter::insertEvents, this)));
00250
00251
00252 m_swapped_queue.wait(queue_lock);
00253 }
00254 }
00255
00256 void DatabaseInserter::stop(void)
00257 {
00258 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00259 if (m_is_running) {
00260
00261 m_is_running = false;
00262 PION_LOG_DEBUG(m_logger, "Stopping worker thread: " << m_database_id);
00263 m_wakeup_worker.notify_one();
00264 queue_lock.unlock();
00265
00266
00267 m_thread->join();
00268 m_table_size = m_database_ptr->getCache(Database::DB_FILE_SIZE);
00269
00270
00271 boost::mutex::scoped_lock queue_lock_two(m_queue_mutex);
00272 m_insert_query_ptr.reset();
00273 m_begin_transaction_ptr.reset();
00274 m_commit_transaction_ptr.reset();
00275 m_database_ptr.reset();
00276
00277
00278 m_event_queue_ptr->clear();
00279
00280
00281 m_keys.clear();
00282 }
00283 }
00284
00285 std::size_t DatabaseInserter::getEventsQueued(void) const
00286 {
00287 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00288 return m_event_queue_ptr->size();
00289 }
00290
00291 std::size_t DatabaseInserter::getKeyCacheSize(void) const
00292 {
00293 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00294 return m_keys.size();
00295 }
00296
00297 void DatabaseInserter::insert(const EventPtr& e)
00298 {
00299
00300 if (! m_is_running)
00301 return;
00302
00303
00304 if ( m_rules(e) ) {
00305
00306 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00307
00308
00309 if (! m_is_running)
00310 return;
00311
00312
00313 if (m_max_age) {
00314
00315 const Event::ParameterValue *param_ptr = e->getPointer(m_key_term_ref);
00316
00317
00318 if (param_ptr == NULL) {
00319
00320 if (! m_ignore_insert)
00321 PION_LOG_WARN(m_logger, "Event missing required table key: " << m_database_id);
00322 return;
00323 }
00324
00325
00326 KeyHash::iterator ki = m_keys.find(boost::get<const Event::BlobType&>(*param_ptr));
00327
00328
00329 boost::uint32_t event_timestamp;
00330 if (e->getUInt(m_timestamp_term_ref, event_timestamp) && event_timestamp > m_last_time) {
00331 m_last_time = event_timestamp;
00332 }
00333
00334
00335 if (ki != m_keys.end()) {
00336 ki->second = m_last_time;
00337 return;
00338 }
00339
00340
00341 m_keys[boost::get<const Event::BlobType&>(*param_ptr)] = m_last_time;
00342
00343
00344 }
00345
00346 for (unsigned i = 0; i < m_cache_terms.size(); i++) {
00347 boost::uint32_t size = 0;
00348 switch (m_cache_terms[i].term_type) {
00349 case Vocabulary::TYPE_NULL:
00350 case Vocabulary::TYPE_OBJECT:
00351 break;
00352 case Vocabulary::TYPE_INT8:
00353 case Vocabulary::TYPE_UINT8:
00354 size = 1;
00355 break;
00356 case Vocabulary::TYPE_INT16:
00357 case Vocabulary::TYPE_UINT16:
00358 size = 2;
00359 break;
00360 case Vocabulary::TYPE_INT32:
00361 case Vocabulary::TYPE_UINT32:
00362 size = 4;
00363 break;
00364 case Vocabulary::TYPE_INT64:
00365 case Vocabulary::TYPE_UINT64:
00366 size = 8;
00367 break;
00368 case Vocabulary::TYPE_FLOAT:
00369 case Vocabulary::TYPE_DOUBLE:
00370 case Vocabulary::TYPE_LONG_DOUBLE:
00371 size = 8;
00372 break;
00373 case Vocabulary::TYPE_SHORT_STRING:
00374 case Vocabulary::TYPE_STRING:
00375 case Vocabulary::TYPE_LONG_STRING:
00376 case Vocabulary::TYPE_CHAR:
00377 case Vocabulary::TYPE_BLOB:
00378 case Vocabulary::TYPE_ZBLOB:
00379 try {
00380
00381 size = (boost::get<const Event::BlobType&>(*e->getPointer(m_cache_terms[i].term_ref))).size();
00382 } catch (...) {
00383 size = 0;
00384 }
00385 break;
00386 case Vocabulary::TYPE_DATE_TIME:
00387 size = 4+1+2+1+2 +1+ 2+1+2+1+2;
00388 break;
00389 case Vocabulary::TYPE_DATE:
00390 size = 4+1+2+1+2;
00391 break;
00392 case Vocabulary::TYPE_TIME:
00393 size = 2+1+2+1+2;
00394 break;
00395 }
00396 m_cache_consumption += size + m_cache_overhead;
00397 }
00398 m_cache_rows++;
00399
00400
00401 while (m_event_queue_ptr->size() >= m_queue_max) {
00402
00403
00404 m_wakeup_worker.notify_one();
00405
00406
00407 m_swapped_queue.wait(queue_lock);
00408
00409
00410 if (! m_is_running)
00411 return;
00412 }
00413
00414
00415 m_event_queue_ptr->push_back(e);
00416 }
00417 }
00418
00419 void DatabaseInserter::insertEvents(void)
00420 {
00421 PION_LOG_DEBUG(m_logger, "Worker thread is running: " << m_database_id);
00422
00423 try {
00424
00425
00426 PION_ASSERT(m_database_ptr);
00427 PION_ASSERT(m_database_ptr->is_open());
00428 PION_ASSERT(m_insert_query_ptr);
00429 PION_ASSERT(m_begin_transaction_ptr);
00430 PION_ASSERT(m_commit_transaction_ptr);
00431
00432
00433 boost::scoped_ptr<EventQueue> insert_queue_ptr(new EventQueue);
00434 insert_queue_ptr->reserve(m_queue_max);
00435
00436
00437 {
00438
00439 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00440 m_swapped_queue.notify_all();
00441 }
00442
00443 while (m_is_running) {
00444
00445
00446 if (checkEventQueue(insert_queue_ptr)) {
00447
00448 PION_LOG_DEBUG(m_logger, "Worker thread woke with " << insert_queue_ptr->size() << " events available: " << m_database_id);
00449
00450 try {
00451
00452
00453 m_begin_transaction_ptr->run();
00454 m_begin_transaction_ptr->reset();
00455
00456
00457 for (unsigned int n = 0; n < insert_queue_ptr->size(); ++n) {
00458
00459 m_insert_query_ptr->bindEvent(m_field_map, *((*insert_queue_ptr)[n]), false);
00460
00461 m_insert_query_ptr->run();
00462 m_insert_query_ptr->reset();
00463 }
00464
00465
00466 m_commit_transaction_ptr->run();
00467 m_commit_transaction_ptr->reset();
00468
00469
00470 PION_LOG_DEBUG(m_logger, "Worker thread wrote " << insert_queue_ptr->size() << " events: " << m_database_id);
00471 insert_queue_ptr->clear();
00472
00473
00474 if (m_max_age) {
00475 boost::uint32_t size_before, size_after;
00476 {
00477 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00478 boost::uint32_t min_age = m_last_time - m_max_age;
00479 size_before = m_keys.size();
00480 KeyHash::iterator cur_it;
00481 KeyHash::iterator i = m_keys.begin();
00482 while (i != m_keys.end()) {
00483 cur_it = i;
00484 ++i;
00485 if (cur_it->second < min_age)
00486 m_keys.erase(cur_it);
00487 }
00488 size_after = m_keys.size();
00489 }
00490 PION_LOG_DEBUG(m_logger, "Worker thread pruned " << (size_before - size_after) << " keys from cache, " << size_after << " left");
00491 }
00492
00493 m_table_size = m_database_ptr->getCache(Database::DB_FILE_SIZE);
00494
00495 } catch (Database::DatabaseBusyException& e) {
00496
00497
00498 PION_LOG_ERROR(m_logger, "Dropping " << insert_queue_ptr->size() << " events because database was busy: " << m_database_id);
00499 insert_queue_ptr->clear();
00500 m_keys.clear();
00501
00502 }
00503
00504 } else {
00505 PION_LOG_DEBUG(m_logger, "Worker thread woke with no new events: " << m_database_id);
00506 }
00507 }
00508
00509 } catch (std::exception& e) {
00510 PION_LOG_FATAL(m_logger, e.what());
00511 m_is_running = false;
00512 m_swapped_queue.notify_all();
00513 }
00514
00515 PION_LOG_DEBUG(m_logger, "Worker thread is exiting: " << m_database_id);
00516 }
00517
00518 bool DatabaseInserter::checkEventQueue(boost::scoped_ptr<EventQueue>& insert_queue_ptr)
00519 {
00520
00521 boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00522
00523
00524 if (m_event_queue_ptr->size() < m_queue_max) {
00525
00526
00527 m_wakeup_worker.timed_wait(queue_lock,
00528 boost::get_system_time() + boost::posix_time::time_duration(0, 0, m_queue_timeout, 0) );
00529
00530
00531 if (m_event_queue_ptr->size() == 0)
00532 return false;
00533 }
00534
00535
00536 insert_queue_ptr.swap(m_event_queue_ptr);
00537
00538
00539 m_swapped_queue.notify_all();
00540
00541 return true;
00542 }
00543
00544 DatabaseManager& DatabaseInserter::getDatabaseManager(void)
00545 {
00546 if (m_database_mgr_ptr == NULL)
00547 throw MissingDatabaseManagerException();
00548 return *m_database_mgr_ptr;
00549 }
00550
00551 }
00552 }
00553