platform/src/DatabaseInserter.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2009 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <pion/platform/ConfigManager.hpp>
00021 #include <pion/platform/DatabaseInserter.hpp>
00022 
00023 
00024 namespace pion {        // begin namespace pion
00025 namespace platform {    // begin namespace platform (Pion Platform Library)
00026 
00027 
00028 // static members of DatabaseInserter
00029 
00030 const boost::uint32_t       DatabaseInserter::DEFAULT_QUEUE_SIZE = 10000;
00031 const boost::uint32_t       DatabaseInserter::DEFAULT_QUEUE_TIMEOUT = 5;
00032 const std::string           DatabaseInserter::DEFAULT_IGNORE = "false";
00033 const std::string           DatabaseInserter::DATABASE_ELEMENT_NAME = "Database";
00034 const std::string           DatabaseInserter::TABLE_ELEMENT_NAME = "Table";
00035 const std::string           DatabaseInserter::FIELD_ELEMENT_NAME = "Field";
00036 const std::string           DatabaseInserter::QUEUE_SIZE_ELEMENT_NAME = "QueueSize";
00037 const std::string           DatabaseInserter::QUEUE_TIMEOUT_ELEMENT_NAME = "QueueTimeout";
00038 const std::string           DatabaseInserter::TERM_ATTRIBUTE_NAME = "term";
00039 const std::string           DatabaseInserter::INDEX_ATTRIBUTE_NAME = "index";
00040 const std::string           DatabaseInserter::SQL_ATTRIBUTE_NAME = "sql";
00041 const char *                DatabaseInserter::CHARSET_FOR_TABLES = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_";
00042 const std::string           DatabaseInserter::IGNORE_INSERT_ELEMENT_NAME = "IgnoreInsert";
00043 const std::string           DatabaseInserter::MAX_KEY_AGE_ELEMENT_NAME = "KeyCacheMaxAge";
00044 const boost::uint32_t       DatabaseInserter::DEFAULT_MAX_AGE = 0;
00045 const std::string           DatabaseInserter::EVENT_AGE_ELEMENT_NAME = "KeyCacheAgeTerm";
00046 
00047 
00048 // DatabaseInserter member functions
00049 
00050 void DatabaseInserter::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00051 {
00052     // stop for config changes, but cache running status
00053     // so that it can be restarted when finished
00054     const bool was_running = m_is_running;
00055     stop();
00056 
00057     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00058 
00059     // parse RuleChain configuration
00060     m_rules.setConfig(v, config_ptr);
00061 
00062     // get the maximum number of events that may be queued for insertion
00063     ConfigManager::getConfigOption(QUEUE_SIZE_ELEMENT_NAME, m_queue_max, DEFAULT_QUEUE_SIZE, config_ptr);
00064 
00065     // get the queue timeout parameter
00066     ConfigManager::getConfigOption(QUEUE_TIMEOUT_ELEMENT_NAME, m_queue_timeout, DEFAULT_QUEUE_TIMEOUT, config_ptr);
00067 
00068     // get the optional max_age parameter
00069     ConfigManager::getConfigOption(MAX_KEY_AGE_ELEMENT_NAME, m_max_age, DEFAULT_MAX_AGE, config_ptr);
00070 
00071     // get the optional max_keys parameter
00072     if (m_max_age) {
00073         std::string term_str;
00074         if (!ConfigManager::getConfigOption(EVENT_AGE_ELEMENT_NAME, term_str, config_ptr))
00075             throw MissingEventTime(EVENT_AGE_ELEMENT_NAME);
00076         m_timestamp_term_ref = v.findTerm(term_str);
00077         if (m_timestamp_term_ref == Vocabulary::UNDEFINED_TERM_REF)
00078             throw UnknownTermException(term_str);
00079     }
00080 
00081     // get the queue timeout parameter
00082     std::string ignore_str;
00083     m_ignore_insert = false;
00084     ConfigManager::getConfigOption(IGNORE_INSERT_ELEMENT_NAME, ignore_str, DEFAULT_IGNORE, config_ptr);
00085     if (ignore_str == "true")
00086         m_ignore_insert = true;
00087 
00088     // prepare the event queue
00089     m_event_queue_ptr->reserve(m_queue_max);
00090 
00091     // get the database to use
00092     if (! ConfigManager::getConfigOption(DATABASE_ELEMENT_NAME, m_database_id, config_ptr))
00093         throw EmptyDatabaseException();
00094     if (! getDatabaseManager().hasPlugin(m_database_id))
00095         throw DatabaseManager::DatabaseNotFoundException(m_database_id);
00096 
00097     // get the name of the table to store events in
00098     if (! ConfigManager::getConfigOption(TABLE_ELEMENT_NAME, m_table_name, config_ptr))
00099         throw EmptyTableException();
00100 
00101     // next, map the database fields to Terms
00102     m_field_map.clear();
00103     m_index_map.clear();
00104     m_cache_consumption = 0;
00105     m_cache_rows = 0;
00106     m_cache_terms.clear();
00107     xmlNodePtr field_node = config_ptr;
00108     m_key_term_ref = Vocabulary::UNDEFINED_TERM_REF;
00109     while ( (field_node = ConfigManager::findConfigNodeByName(FIELD_ELEMENT_NAME, field_node)) != NULL)
00110     {
00111         // parse new field mapping
00112 
00113         // start with the name of the field (element content)
00114         xmlChar *xml_char_ptr = xmlNodeGetContent(field_node);
00115         if (xml_char_ptr == NULL || xml_char_ptr[0]=='\0') {
00116             if (xml_char_ptr != NULL)
00117                 xmlFree(xml_char_ptr);
00118             throw EmptyFieldException();
00119         }
00120         const std::string field_name(reinterpret_cast<char*>(xml_char_ptr));
00121         xmlFree(xml_char_ptr);
00122 
00123         if (strspn(field_name.c_str(), CHARSET_FOR_TABLES) != field_name.length())
00124             throw IllegalCharactersException(field_name);
00125 
00126         for (unsigned i = 0; i < m_field_map.size(); i++)
00127             if (m_field_map[i].first == field_name)
00128                 throw DuplicateColumnName(field_name);
00129 
00130         // next get the Term we want to map to
00131         const std::string term_id = ConfigManager::getAttribute(TERM_ATTRIBUTE_NAME, field_node);
00132         if (term_id.empty())
00133             throw EmptyTermException();
00134 
00135         const std::string index_str = ConfigManager::getAttribute(INDEX_ATTRIBUTE_NAME, field_node);
00136 
00137         const std::string sql_str = ConfigManager::getAttribute(SQL_ATTRIBUTE_NAME, field_node);
00138 
00139         // make sure that the Term is valid
00140         const Vocabulary::TermRef term_ref = v.findTerm(term_id);
00141         if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00142             throw UnknownTermException(term_id);
00143 
00144         // add the field mapping
00145         m_field_map.push_back(std::make_pair(field_name, v[term_ref]));
00146         m_index_map.push_back(index_str);
00147 
00148         // Try to find the unique key term, if max_keys is defined
00149         if (m_max_age && index_str == "unique")
00150             m_key_term_ref = term_ref;
00151 
00152         if (!index_str.empty() && index_str != "false")
00153             m_cache_terms.push_back(v[term_ref]);
00154 
00155         // step to the next field mapping
00156         field_node = field_node->next;
00157     }
00158     if (m_field_map.empty())
00159         throw NoFieldsException();
00160 
00161     if (m_max_age && m_key_term_ref == Vocabulary::UNDEFINED_TERM_REF)
00162         throw NoUniqueKeyFound();
00163 
00164     // if config changed while running, then restart
00165     queue_lock.unlock();
00166     if (was_running)
00167         start();
00168 }
00169 
00170 void DatabaseInserter::updateVocabulary(const Vocabulary& v)
00171 {
00172     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00173     if (m_database_ptr)
00174         m_database_ptr->updateVocabulary(v);
00175 
00176     // update Term mappings (note: references never change for a given id)
00177     // TaO090319: field_map no longer contains termref, can't be updated without
00178     //          stop/start of database, i.e. reconstruct the whole field_map
00179 /*
00180     for (Query::FieldMap::iterator i = m_field_map.begin(); i != m_field_map.end(); ++i)
00181         i->second.second = v[i->first];
00182 */
00183 }
00184 
00185 void DatabaseInserter::updateDatabases(void)
00186 {
00187     // just check to see if the database was deleted (if so, stop now!)
00188     if (! getDatabaseManager().hasPlugin(m_database_id)) {
00189         stop();
00190     }
00191 }
00192 
00193 void DatabaseInserter::start(void)
00194 {
00195     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00196     if (! m_is_running) {
00197         m_is_running = true;
00198 
00199         try {
00200 
00201             // open a new database connection
00202             if (!m_database_ptr)
00203                 m_database_ptr = getDatabaseManager().getDatabase(m_database_id);
00204             PION_ASSERT(m_database_ptr);
00205 
00206             if (m_wipe && m_database_ptr->tableExists(m_table_name, m_partition)) {
00207                 m_database_ptr->dropTable(m_table_name, m_partition);
00208                 PION_LOG_DEBUG(m_logger, "Wiping partition: " << m_table_name << "/" << m_partition << " on thread: " << m_database_id);
00209             }
00210 
00211             // open up the database if it isn't already open
00212             // TODO: This only works with SQLite (wiping before opening)
00213             m_database_ptr->open(m_partition);
00214             PION_ASSERT(m_database_ptr->is_open());
00215 
00216             // create the database table if it does not yet exist
00217             m_database_ptr->createTable(m_field_map, m_table_name, m_index_map, m_partition);
00218             m_table_size = m_database_ptr->getCache(Database::DB_FILE_SIZE);
00219 
00220             // prepare the query used to insert new events into the table
00221             m_insert_query_ptr = m_ignore_insert ? m_database_ptr->prepareInsertIgnoreQuery(m_field_map, m_table_name) : m_database_ptr->prepareInsertQuery(m_field_map, m_table_name);
00222             PION_ASSERT(m_insert_query_ptr);
00223 
00224             // prepare the query used to begin new transactions
00225             m_begin_transaction_ptr = m_database_ptr->getBeginTransactionQuery();
00226             PION_ASSERT(m_begin_transaction_ptr);
00227 
00228             // prepare the query used to commit transactions
00229             m_commit_transaction_ptr = m_database_ptr->getCommitTransactionQuery();
00230             PION_ASSERT(m_commit_transaction_ptr);
00231 
00232             m_cache_overhead = m_database_ptr->getCache(Database::CACHE_INDEX_ROW_OVERHEAD);
00233             m_cache_size = m_database_ptr->getCache(Database::CACHE_PAGE_CACHE_SIZE);
00234 
00235             PION_LOG_DEBUG(m_logger, "Database cache overhead: " << m_cache_overhead << ", cache size: " << m_cache_size / 1024UL << "k");
00236         
00237         } catch (...) {
00238             // failed to initialize properly -> reset and update running state to false
00239             m_insert_query_ptr.reset();
00240             m_begin_transaction_ptr.reset();
00241             m_commit_transaction_ptr.reset();
00242             m_database_ptr.reset();
00243             m_is_running = false;
00244             throw;
00245         }
00246 
00247         // spawn a new thread that will be used to save events to the database
00248         PION_LOG_DEBUG(m_logger, "Starting worker thread: " << m_database_id);
00249         m_thread.reset(new boost::thread(boost::bind(&DatabaseInserter::insertEvents, this)));
00250 
00251         // wait for the worker thread to startup
00252         m_swapped_queue.wait(queue_lock);
00253     }
00254 }
00255 
00256 void DatabaseInserter::stop(void)
00257 {
00258     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00259     if (m_is_running) {
00260         // set flag to notify worker thread to shutdown
00261         m_is_running = false;
00262         PION_LOG_DEBUG(m_logger, "Stopping worker thread: " << m_database_id);
00263         m_wakeup_worker.notify_one();
00264         queue_lock.unlock();
00265 
00266         // wait for worker thread to shutdown
00267         m_thread->join();
00268         m_table_size = m_database_ptr->getCache(Database::DB_FILE_SIZE);
00269 
00270         // close the database connection (ensure that data is flushed)
00271         boost::mutex::scoped_lock queue_lock_two(m_queue_mutex);
00272         m_insert_query_ptr.reset();
00273         m_begin_transaction_ptr.reset();
00274         m_commit_transaction_ptr.reset();
00275         m_database_ptr.reset();
00276         
00277         // clear the event queue
00278         m_event_queue_ptr->clear();
00279         
00280         // clear the key cache
00281         m_keys.clear();
00282     }
00283 }
00284 
00285 std::size_t DatabaseInserter::getEventsQueued(void) const
00286 {
00287     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00288     return m_event_queue_ptr->size();
00289 }
00290 
00291 std::size_t DatabaseInserter::getKeyCacheSize(void) const
00292 {
00293     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00294     return m_keys.size();
00295 }
00296 
00297 void DatabaseInserter::insert(const EventPtr& e)
00298 {
00299     // if not running... there might not be filter rules...
00300     if (! m_is_running)
00301         return;
00302 
00303     // check filter rules first
00304     if ( m_rules(e) ) {
00305 
00306         boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00307 
00308         // make sure worker thread is running
00309         if (! m_is_running)
00310             return;
00311 
00312         // do we have collision avoidance?
00313         if (m_max_age) {
00314             // look for key within the event
00315             const Event::ParameterValue *param_ptr = e->getPointer(m_key_term_ref);
00316 
00317             // If key is not found, then "null key" and no insert
00318             if (param_ptr == NULL) {
00319                 // log warning if we're not ignoring insert errors
00320                 if (! m_ignore_insert)
00321                     PION_LOG_WARN(m_logger, "Event missing required table key: " << m_database_id);
00322                 return;
00323             }
00324 
00325             // Lookup key in memory cache
00326             KeyHash::iterator ki = m_keys.find(boost::get<const Event::BlobType&>(*param_ptr));
00327             
00328             // Update timestamp used for pruning
00329             boost::uint32_t event_timestamp;
00330             if (e->getUInt(m_timestamp_term_ref, event_timestamp) && event_timestamp > m_last_time) {
00331                 m_last_time = event_timestamp;
00332             }
00333 
00334             // Do we have this key already?
00335             if (ki != m_keys.end()) {
00336                 ki->second = m_last_time;               // Update age
00337                 return;                                 // Bail out
00338             }
00339 
00340             // Add key & age
00341             m_keys[boost::get<const Event::BlobType&>(*param_ptr)] = m_last_time;
00342 
00343             // Pruning will be done in insert thread
00344         }
00345 
00346         for (unsigned i = 0; i < m_cache_terms.size(); i++) {
00347             boost::uint32_t size = 0;
00348             switch (m_cache_terms[i].term_type) {
00349                 case Vocabulary::TYPE_NULL:
00350                 case Vocabulary::TYPE_OBJECT:
00351                     break;
00352                 case Vocabulary::TYPE_INT8:
00353                 case Vocabulary::TYPE_UINT8:
00354                     size = 1;
00355                     break;
00356                 case Vocabulary::TYPE_INT16:
00357                 case Vocabulary::TYPE_UINT16:
00358                     size = 2;
00359                     break;
00360                 case Vocabulary::TYPE_INT32:
00361                 case Vocabulary::TYPE_UINT32:
00362                     size = 4;
00363                     break;
00364                 case Vocabulary::TYPE_INT64:
00365                 case Vocabulary::TYPE_UINT64:
00366                     size = 8;
00367                     break;
00368                 case Vocabulary::TYPE_FLOAT:
00369                 case Vocabulary::TYPE_DOUBLE:
00370                 case Vocabulary::TYPE_LONG_DOUBLE:
00371                     size = 8;                           // 8-byte IEEE float
00372                     break;
00373                 case Vocabulary::TYPE_SHORT_STRING:
00374                 case Vocabulary::TYPE_STRING:
00375                 case Vocabulary::TYPE_LONG_STRING:
00376                 case Vocabulary::TYPE_CHAR:
00377                 case Vocabulary::TYPE_BLOB:
00378                 case Vocabulary::TYPE_ZBLOB:            // It won't be compressed in the index
00379                     try {
00380                         // Measure the size
00381                         size = (boost::get<const Event::BlobType&>(*e->getPointer(m_cache_terms[i].term_ref))).size();
00382                     } catch (...) {
00383                         size = 0;                       // Default to zero when not found
00384                     }
00385                     break;
00386                 case Vocabulary::TYPE_DATE_TIME:
00387                     size = 4+1+2+1+2 +1+ 2+1+2+1+2;     // 2009-09-29T11:43:00
00388                     break;
00389                 case Vocabulary::TYPE_DATE:
00390                     size = 4+1+2+1+2;                   // 2009-09-29
00391                     break;
00392                 case Vocabulary::TYPE_TIME:
00393                     size = 2+1+2+1+2;                   // 11:43:00
00394                     break;
00395             }
00396             m_cache_consumption += size + m_cache_overhead;
00397         }
00398         m_cache_rows++;
00399 
00400         // signal the worker thread if the queue is full (wait for swap)
00401         while (m_event_queue_ptr->size() >= m_queue_max) {
00402 
00403             // wakeup the worker thread to swap queues & insert events
00404             m_wakeup_worker.notify_one();
00405 
00406             // wait until the worker thread has swapped the queues
00407             m_swapped_queue.wait(queue_lock);
00408         
00409             // exit immediately if no longer running
00410             if (! m_is_running)
00411                 return;
00412         }
00413 
00414         // add the event to the insert queue
00415         m_event_queue_ptr->push_back(e);
00416     }
00417 }
00418 
00419 void DatabaseInserter::insertEvents(void)
00420 {
00421     PION_LOG_DEBUG(m_logger, "Worker thread is running: " << m_database_id);
00422 
00423     try {
00424 
00425         // sanity checks (should all be handled now by start())
00426         PION_ASSERT(m_database_ptr);
00427         PION_ASSERT(m_database_ptr->is_open());
00428         PION_ASSERT(m_insert_query_ptr);
00429         PION_ASSERT(m_begin_transaction_ptr);
00430         PION_ASSERT(m_commit_transaction_ptr);
00431 
00432         // queue of events pending insertion into the database
00433         boost::scoped_ptr<EventQueue>   insert_queue_ptr(new EventQueue);
00434         insert_queue_ptr->reserve(m_queue_max);
00435 
00436         // notify all threads that we have started up
00437         {
00438             // lock first to ensure start() thread is waiting when signal is sent
00439             boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00440             m_swapped_queue.notify_all();
00441         }
00442 
00443         while (m_is_running) {
00444         
00445             // check if new events are available in the "insert_queue"
00446             if (checkEventQueue(insert_queue_ptr)) {
00447 
00448                 PION_LOG_DEBUG(m_logger, "Worker thread woke with " << insert_queue_ptr->size() << " events available: " << m_database_id);
00449 
00450                 try {
00451 
00452                     // begin a new transaction
00453                     m_begin_transaction_ptr->run();
00454                     m_begin_transaction_ptr->reset();
00455     
00456                     // step through the event queue, inserting each event individually
00457                     for (unsigned int n = 0; n < insert_queue_ptr->size(); ++n) {
00458                         // bind the event to the insert query
00459                         m_insert_query_ptr->bindEvent(m_field_map, *((*insert_queue_ptr)[n]), false);
00460                         // execute the query to insert the record
00461                         m_insert_query_ptr->run();
00462                         m_insert_query_ptr->reset();
00463                     }
00464     
00465                     // end & commit the transaction
00466                     m_commit_transaction_ptr->run();
00467                     m_commit_transaction_ptr->reset();
00468 
00469                     // done flushing the queue
00470                     PION_LOG_DEBUG(m_logger, "Worker thread wrote " << insert_queue_ptr->size() << " events: " << m_database_id);
00471                     insert_queue_ptr->clear();
00472     
00473                     // Pruning needed?
00474                     if (m_max_age) {
00475                         boost::uint32_t size_before, size_after;
00476                         {
00477                             boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00478                             boost::uint32_t min_age = m_last_time - m_max_age;
00479                             size_before = m_keys.size();
00480                             KeyHash::iterator cur_it;
00481                             KeyHash::iterator i = m_keys.begin();
00482                             while (i != m_keys.end()) {
00483                                 cur_it = i;
00484                                 ++i;
00485                                 if (cur_it->second < min_age)
00486                                     m_keys.erase(cur_it);
00487                             }
00488                             size_after = m_keys.size();
00489                         }
00490                         PION_LOG_DEBUG(m_logger, "Worker thread pruned " << (size_before - size_after) << " keys from cache, " << size_after << " left");
00491                     }
00492 
00493                     m_table_size = m_database_ptr->getCache(Database::DB_FILE_SIZE);
00494 
00495                 } catch (Database::DatabaseBusyException& e) {
00496                 
00497                     // data was busy and could not recover after timeout
00498                     PION_LOG_ERROR(m_logger, "Dropping " << insert_queue_ptr->size() << " events because database was busy: " << m_database_id);
00499                     insert_queue_ptr->clear();  // drop events in insert queue
00500                     m_keys.clear();             // erase key cache since it may be inaccurate
00501 
00502                 }
00503 
00504             } else {
00505                 PION_LOG_DEBUG(m_logger, "Worker thread woke with no new events: " << m_database_id);
00506             }
00507         }
00508         
00509     } catch (std::exception& e) {
00510         PION_LOG_FATAL(m_logger, e.what());
00511         m_is_running = false;
00512         m_swapped_queue.notify_all();
00513     }
00514 
00515     PION_LOG_DEBUG(m_logger, "Worker thread is exiting: " << m_database_id);
00516 }
00517 
00518 bool DatabaseInserter::checkEventQueue(boost::scoped_ptr<EventQueue>& insert_queue_ptr)
00519 {
00520     // acquire ownership of queue
00521     boost::mutex::scoped_lock queue_lock(m_queue_mutex);
00522     
00523     // skip waiting if the queue is already full (missed wakeup signal)
00524     if (m_event_queue_ptr->size() < m_queue_max) {
00525 
00526         // wait until the queue is full or the timeout expires
00527         m_wakeup_worker.timed_wait(queue_lock,
00528             boost::get_system_time() + boost::posix_time::time_duration(0, 0, m_queue_timeout, 0) );
00529     
00530         // check for early & spurious wake-ups
00531         if (m_event_queue_ptr->size() == 0)
00532             return false;
00533     }
00534     
00535     // swap the event queues
00536     insert_queue_ptr.swap(m_event_queue_ptr);
00537 
00538     // notify threads that the event queue has been swapped
00539     m_swapped_queue.notify_all();
00540 
00541     return true;
00542 }
00543 
00544 DatabaseManager& DatabaseInserter::getDatabaseManager(void)
00545 {
00546     if (m_database_mgr_ptr == NULL)
00547         throw MissingDatabaseManagerException();
00548     return *m_database_mgr_ptr;
00549 }
00550 
00551 }   // end namespace platform
00552 }   // end namespace pion
00553 

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7