platform/include/pion/platform/DatabaseInserter.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2009 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_DATABASINSERTER_HEADER__
00021 #define __PION_DATABASINSERTER_HEADER__
00022 
00023 #include <vector>
00024 #include <boost/scoped_ptr.hpp>
00025 #include <boost/thread/thread.hpp>
00026 #include <boost/thread/condition.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/PionLogger.hpp>
00030 #include <pion/platform/Event.hpp>
00031 #include <pion/platform/RuleChain.hpp>
00032 #include <pion/platform/Query.hpp>
00033 #include <pion/platform/Database.hpp>
00034 #include <pion/platform/DatabaseManager.hpp>
00035 
00036 
00037 namespace pion {        // begin namespace pion
00038 namespace platform {    // begin namespace platform (Pion Platform Library)
00039 
00040 
00044 class PION_PLATFORM_API DatabaseInserter
00045 {
00046 public:
00047 
00050     class MissingDatabaseManagerException : public std::exception {
00051     public:
00052         virtual const char* what() const throw() {
00053             return "DatabaseInserter is missing the DatabaseManager";
00054         }
00055     };
00056 
00058     class EmptyDatabaseException : public std::exception {
00059     public:
00060         virtual const char* what() const throw() {
00061             return "DatabaseInserter configuration is missing a required Database parameter";
00062         }
00063     };
00064 
00066     class EmptyTableException : public std::exception {
00067     public:
00068         virtual const char* what() const throw() {
00069             return "DatabaseInserter configuration is missing a required Table parameter";
00070         }
00071     };
00072 
00074     class NoFieldsException : public std::exception {
00075     public:
00076         virtual const char* what() const throw() {
00077             return "DatabaseInserter configuration must contain at least one field mapping";
00078         }
00079     };
00080 
00082     class EmptyFieldException : public std::exception {
00083     public:
00084         virtual const char* what() const throw() {
00085             return "DatabaseInserter configuration includes an empty field name";
00086         }
00087     };
00088 
00090     class EmptyTermException : public std::exception {
00091     public:
00092         virtual const char* what() const throw() {
00093             return "DatabaseInserter configuration is missing a term identifier";
00094         }
00095     };
00096 
00098     class UnknownTermException : public PionException {
00099     public:
00100         UnknownTermException(const std::string& term_id)
00101             : PionException("DatabaseInserter configuration maps field to an unknown term: ", term_id) {}
00102     };
00103 
00105     class IllegalCharactersException: public PionException {
00106     public:
00107         IllegalCharactersException(const std::string& field_name)
00108             : PionException("DatabaseInserter configuration has a field name with illegal characters: ", field_name) {}
00109     };
00110 
00112     class DuplicateColumnName: public PionException {
00113     public:
00114         DuplicateColumnName(const std::string& field_name)
00115             : PionException("DatabaseInserter configuration has a duplicate field name: ", field_name) {}
00116     };
00117 
00119     class NoUniqueKeyFound: public PionException {
00120     public:
00121         NoUniqueKeyFound(void)
00122             : PionException("DatabaseInserter configuration has MaxAge, but there is no Unique indexed column") {}
00123     };
00124 
00126     class MissingEventTime: public PionException {
00127     public:
00128         MissingEventTime(const std::string& element )
00129             : PionException("DatabaseInserter configuration has MaxAge, but the age term is missing: " + element) {}
00130     };
00131 
00133     DatabaseInserter(void) :
00134         m_logger(PION_GET_LOGGER("pion.platform.DatabaseInserter")),
00135         m_database_mgr_ptr(NULL),
00136         m_event_queue_ptr(new EventQueue), 
00137         m_queue_max(DEFAULT_QUEUE_SIZE), m_queue_timeout(DEFAULT_QUEUE_TIMEOUT),
00138         m_is_running(false), m_partition(0), m_wipe(false), m_max_age(0), m_last_time(0), m_table_size(0)
00139     {}
00140 
00142     virtual ~DatabaseInserter() { stop(); }
00143 
00145     inline void setDatabaseManager(DatabaseManager& mgr) { m_database_mgr_ptr = & mgr; }
00146     
00154     void setConfig(const Vocabulary& v, const xmlNodePtr config_ptr);
00155 
00162     void updateVocabulary(const Vocabulary& v);
00163 
00168     void updateDatabases(void);
00169 
00175     void insert(const EventPtr& e);
00176 
00179     void start(void);
00180 
00182     void stop(void);
00183 
00185     std::size_t getEventsQueued(void) const;
00186 
00188     std::size_t getKeyCacheSize(void) const;
00189 
00191     const std::string& getDatabaseId(void) const { return m_database_id; }
00192 
00194     DatabasePtr getDatabasePtr(void) { return m_database_ptr; }
00195 
00197     const std::string& getTableName(void) const { return m_table_name; }
00198 
00200     bool setTableName(const std::string& name)
00201     {
00202         if (m_is_running) return false;
00203         m_table_name = name;
00204         return true;
00205     }
00206 
00208     bool setPartition(unsigned partition)
00209     {
00210         if (m_is_running) return false;
00211         m_partition = partition;
00212         return true;
00213     }
00214 
00216     bool setWipe(bool wipe)
00217     {
00218         if (m_is_running) return false;
00219         m_wipe = wipe;
00220         return true;
00221     }
00222 
00223     boost::uint32_t getRotate(void) const
00224     {
00225         return m_cache_size ? (m_cache_consumption * 100UL / m_cache_size) : 0;
00226     }
00227 
00228     boost::uint64_t getTableSize(void) const { return m_table_size; }
00229 
00231     Query::FieldMap getFieldMap(void) const { return m_field_map; }
00232 
00234     Query::IndexMap getIndexMap(void) const { return m_index_map; }
00235 
00237     inline bool isRunning(void) const { return m_is_running; }
00238     
00240     inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00241 
00243     inline PionLogger getLogger(void) { return m_logger; }
00244 
00246     inline bool tableExists(void)
00247     {
00248         if (!m_database_ptr)
00249             m_database_ptr = getDatabaseManager().getDatabase(m_database_id);
00250         return m_database_ptr->tableExists(m_table_name, m_partition);
00251     }
00252 
00253 
00254 private:
00255 
00257     typedef std::vector<EventPtr>   EventQueue;
00258 
00259 
00261     DatabaseManager& getDatabaseManager(void);
00262 
00264     void insertEvents(void);
00265 
00273     bool checkEventQueue(boost::scoped_ptr<EventQueue>& insert_queue_ptr);
00274 
00275 
00277     static const boost::uint32_t            DEFAULT_QUEUE_SIZE;
00278 
00280     static const boost::uint32_t            DEFAULT_QUEUE_TIMEOUT;
00281 
00283     static const std::string                DATABASE_ELEMENT_NAME;
00284 
00286     static const std::string                TABLE_ELEMENT_NAME;
00287 
00289     static const std::string                FIELD_ELEMENT_NAME;
00290 
00292     static const std::string                QUEUE_SIZE_ELEMENT_NAME;
00293 
00295     static const std::string                QUEUE_TIMEOUT_ELEMENT_NAME;
00296 
00298     static const std::string                TERM_ATTRIBUTE_NAME;
00299 
00301     static const char *                     CHARSET_FOR_TABLES;
00302 
00304     static const std::string                INDEX_ATTRIBUTE_NAME;
00305 
00307     static const std::string                SQL_ATTRIBUTE_NAME;
00308 
00310     static const std::string                IGNORE_INSERT_ELEMENT_NAME;
00311 
00313     static const std::string                DEFAULT_IGNORE;
00314 
00316     static const std::string                MAX_KEY_AGE_ELEMENT_NAME;
00317 
00319     static const std::string                EVENT_AGE_ELEMENT_NAME;
00320 
00322     static const boost::uint32_t            DEFAULT_MAX_AGE;
00323 
00325     PionLogger                              m_logger;
00326 
00328     DatabaseManager *                       m_database_mgr_ptr;
00329 
00331     std::string                             m_database_id;
00332 
00334     std::string                             m_table_name;
00335 
00337     Query::FieldMap                         m_field_map;
00338 
00340     Query::IndexMap                         m_index_map;
00341 
00343     DatabasePtr                             m_database_ptr;
00344     
00346     QueryPtr                                m_insert_query_ptr;
00347 
00349     QueryPtr                                m_begin_transaction_ptr;
00350 
00352     QueryPtr                                m_commit_transaction_ptr;
00353 
00355     boost::scoped_ptr<EventQueue>           m_event_queue_ptr;
00356 
00358     boost::uint32_t                         m_queue_max;
00359 
00361     boost::uint32_t                         m_queue_timeout;
00362 
00364     mutable boost::mutex                    m_queue_mutex;
00365 
00367     boost::condition                        m_wakeup_worker;
00368 
00370     boost::condition                        m_swapped_queue;
00371 
00373     boost::scoped_ptr<boost::thread>        m_thread;
00374 
00376     bool                                    m_ignore_insert;
00377     
00379     volatile bool                           m_is_running;
00380 
00382     unsigned                                m_partition;
00383 
00385     bool                                    m_wipe;
00386 
00388     pion::platform::RuleChain               m_rules;
00389 
00391     typedef PION_HASH_MAP<pion::platform::Event::BlobType, boost::uint32_t, PION_HASH(pion::platform::Event::BlobType)> KeyHash;
00392     KeyHash                                 m_keys;
00393 
00395     Vocabulary::TermRef                     m_key_term_ref;
00396 
00398     boost::uint32_t                         m_max_age;
00399 
00401     Vocabulary::TermRef                     m_timestamp_term_ref;
00402 
00404     boost::uint32_t                         m_last_time;
00405 
00407     boost::uint64_t                         m_cache_consumption;
00408 
00410     boost::uint32_t                         m_cache_overhead;
00411 
00413     std::vector<Vocabulary::Term>           m_cache_terms;
00414 
00416     boost::uint64_t                         m_cache_rows;
00417 
00419     boost::uint64_t                         m_cache_size;
00420 
00422     boost::uint64_t                         m_table_size;
00423 };
00424 
00425 
00426 }   // end namespace platform
00427 }   // end namespace pion
00428 
00429 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7