00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_DATABASINSERTER_HEADER__
00021 #define __PION_DATABASINSERTER_HEADER__
00022
00023 #include <vector>
00024 #include <boost/scoped_ptr.hpp>
00025 #include <boost/thread/thread.hpp>
00026 #include <boost/thread/condition.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/PionLogger.hpp>
00030 #include <pion/platform/Event.hpp>
00031 #include <pion/platform/RuleChain.hpp>
00032 #include <pion/platform/Query.hpp>
00033 #include <pion/platform/Database.hpp>
00034 #include <pion/platform/DatabaseManager.hpp>
00035
00036
00037 namespace pion {
00038 namespace platform {
00039
00040
00044 class PION_PLATFORM_API DatabaseInserter
00045 {
00046 public:
00047
00050 class MissingDatabaseManagerException : public std::exception {
00051 public:
00052 virtual const char* what() const throw() {
00053 return "DatabaseInserter is missing the DatabaseManager";
00054 }
00055 };
00056
00058 class EmptyDatabaseException : public std::exception {
00059 public:
00060 virtual const char* what() const throw() {
00061 return "DatabaseInserter configuration is missing a required Database parameter";
00062 }
00063 };
00064
00066 class EmptyTableException : public std::exception {
00067 public:
00068 virtual const char* what() const throw() {
00069 return "DatabaseInserter configuration is missing a required Table parameter";
00070 }
00071 };
00072
00074 class NoFieldsException : public std::exception {
00075 public:
00076 virtual const char* what() const throw() {
00077 return "DatabaseInserter configuration must contain at least one field mapping";
00078 }
00079 };
00080
00082 class EmptyFieldException : public std::exception {
00083 public:
00084 virtual const char* what() const throw() {
00085 return "DatabaseInserter configuration includes an empty field name";
00086 }
00087 };
00088
00090 class EmptyTermException : public std::exception {
00091 public:
00092 virtual const char* what() const throw() {
00093 return "DatabaseInserter configuration is missing a term identifier";
00094 }
00095 };
00096
00098 class UnknownTermException : public PionException {
00099 public:
00100 UnknownTermException(const std::string& term_id)
00101 : PionException("DatabaseInserter configuration maps field to an unknown term: ", term_id) {}
00102 };
00103
00105 class IllegalCharactersException: public PionException {
00106 public:
00107 IllegalCharactersException(const std::string& field_name)
00108 : PionException("DatabaseInserter configuration has a field name with illegal characters: ", field_name) {}
00109 };
00110
00112 class DuplicateColumnName: public PionException {
00113 public:
00114 DuplicateColumnName(const std::string& field_name)
00115 : PionException("DatabaseInserter configuration has a duplicate field name: ", field_name) {}
00116 };
00117
00119 class NoUniqueKeyFound: public PionException {
00120 public:
00121 NoUniqueKeyFound(void)
00122 : PionException("DatabaseInserter configuration has MaxAge, but there is no Unique indexed column") {}
00123 };
00124
00126 class MissingEventTime: public PionException {
00127 public:
00128 MissingEventTime(const std::string& element )
00129 : PionException("DatabaseInserter configuration has MaxAge, but the age term is missing: " + element) {}
00130 };
00131
00133 DatabaseInserter(void) :
00134 m_logger(PION_GET_LOGGER("pion.platform.DatabaseInserter")),
00135 m_database_mgr_ptr(NULL),
00136 m_event_queue_ptr(new EventQueue),
00137 m_queue_max(DEFAULT_QUEUE_SIZE), m_queue_timeout(DEFAULT_QUEUE_TIMEOUT),
00138 m_is_running(false), m_partition(0), m_wipe(false), m_max_age(0), m_last_time(0), m_table_size(0)
00139 {}
00140
00142 virtual ~DatabaseInserter() { stop(); }
00143
00145 inline void setDatabaseManager(DatabaseManager& mgr) { m_database_mgr_ptr = & mgr; }
00146
00154 void setConfig(const Vocabulary& v, const xmlNodePtr config_ptr);
00155
00162 void updateVocabulary(const Vocabulary& v);
00163
00168 void updateDatabases(void);
00169
00175 void insert(const EventPtr& e);
00176
00179 void start(void);
00180
00182 void stop(void);
00183
00185 std::size_t getEventsQueued(void) const;
00186
00188 std::size_t getKeyCacheSize(void) const;
00189
00191 const std::string& getDatabaseId(void) const { return m_database_id; }
00192
00194 DatabasePtr getDatabasePtr(void) { return m_database_ptr; }
00195
00197 const std::string& getTableName(void) const { return m_table_name; }
00198
00200 bool setTableName(const std::string& name)
00201 {
00202 if (m_is_running) return false;
00203 m_table_name = name;
00204 return true;
00205 }
00206
00208 bool setPartition(unsigned partition)
00209 {
00210 if (m_is_running) return false;
00211 m_partition = partition;
00212 return true;
00213 }
00214
00216 bool setWipe(bool wipe)
00217 {
00218 if (m_is_running) return false;
00219 m_wipe = wipe;
00220 return true;
00221 }
00222
00223 boost::uint32_t getRotate(void) const
00224 {
00225 return m_cache_size ? (m_cache_consumption * 100UL / m_cache_size) : 0;
00226 }
00227
00228 boost::uint64_t getTableSize(void) const { return m_table_size; }
00229
00231 Query::FieldMap getFieldMap(void) const { return m_field_map; }
00232
00234 Query::IndexMap getIndexMap(void) const { return m_index_map; }
00235
00237 inline bool isRunning(void) const { return m_is_running; }
00238
00240 inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00241
00243 inline PionLogger getLogger(void) { return m_logger; }
00244
00246 inline bool tableExists(void)
00247 {
00248 if (!m_database_ptr)
00249 m_database_ptr = getDatabaseManager().getDatabase(m_database_id);
00250 return m_database_ptr->tableExists(m_table_name, m_partition);
00251 }
00252
00253
00254 private:
00255
00257 typedef std::vector<EventPtr> EventQueue;
00258
00259
00261 DatabaseManager& getDatabaseManager(void);
00262
00264 void insertEvents(void);
00265
00273 bool checkEventQueue(boost::scoped_ptr<EventQueue>& insert_queue_ptr);
00274
00275
00277 static const boost::uint32_t DEFAULT_QUEUE_SIZE;
00278
00280 static const boost::uint32_t DEFAULT_QUEUE_TIMEOUT;
00281
00283 static const std::string DATABASE_ELEMENT_NAME;
00284
00286 static const std::string TABLE_ELEMENT_NAME;
00287
00289 static const std::string FIELD_ELEMENT_NAME;
00290
00292 static const std::string QUEUE_SIZE_ELEMENT_NAME;
00293
00295 static const std::string QUEUE_TIMEOUT_ELEMENT_NAME;
00296
00298 static const std::string TERM_ATTRIBUTE_NAME;
00299
00301 static const char * CHARSET_FOR_TABLES;
00302
00304 static const std::string INDEX_ATTRIBUTE_NAME;
00305
00307 static const std::string SQL_ATTRIBUTE_NAME;
00308
00310 static const std::string IGNORE_INSERT_ELEMENT_NAME;
00311
00313 static const std::string DEFAULT_IGNORE;
00314
00316 static const std::string MAX_KEY_AGE_ELEMENT_NAME;
00317
00319 static const std::string EVENT_AGE_ELEMENT_NAME;
00320
00322 static const boost::uint32_t DEFAULT_MAX_AGE;
00323
00325 PionLogger m_logger;
00326
00328 DatabaseManager * m_database_mgr_ptr;
00329
00331 std::string m_database_id;
00332
00334 std::string m_table_name;
00335
00337 Query::FieldMap m_field_map;
00338
00340 Query::IndexMap m_index_map;
00341
00343 DatabasePtr m_database_ptr;
00344
00346 QueryPtr m_insert_query_ptr;
00347
00349 QueryPtr m_begin_transaction_ptr;
00350
00352 QueryPtr m_commit_transaction_ptr;
00353
00355 boost::scoped_ptr<EventQueue> m_event_queue_ptr;
00356
00358 boost::uint32_t m_queue_max;
00359
00361 boost::uint32_t m_queue_timeout;
00362
00364 mutable boost::mutex m_queue_mutex;
00365
00367 boost::condition m_wakeup_worker;
00368
00370 boost::condition m_swapped_queue;
00371
00373 boost::scoped_ptr<boost::thread> m_thread;
00374
00376 bool m_ignore_insert;
00377
00379 volatile bool m_is_running;
00380
00382 unsigned m_partition;
00383
00385 bool m_wipe;
00386
00388 pion::platform::RuleChain m_rules;
00389
00391 typedef PION_HASH_MAP<pion::platform::Event::BlobType, boost::uint32_t, PION_HASH(pion::platform::Event::BlobType)> KeyHash;
00392 KeyHash m_keys;
00393
00395 Vocabulary::TermRef m_key_term_ref;
00396
00398 boost::uint32_t m_max_age;
00399
00401 Vocabulary::TermRef m_timestamp_term_ref;
00402
00404 boost::uint32_t m_last_time;
00405
00407 boost::uint64_t m_cache_consumption;
00408
00410 boost::uint32_t m_cache_overhead;
00411
00413 std::vector<Vocabulary::Term> m_cache_terms;
00414
00416 boost::uint64_t m_cache_rows;
00417
00419 boost::uint64_t m_cache_size;
00420
00422 boost::uint64_t m_table_size;
00423 };
00424
00425
00426 }
00427 }
00428
00429 #endif