platform/reactors/DatabaseOutputReactor.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; withouDatabaseOutputt even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <pion/platform/ConfigManager.hpp>
00021 #include "DatabaseOutputReactor.hpp"
00022 
00023 using namespace pion::platform;
00024 
00025 
00026 namespace pion {        // begin namespace pion
00027 namespace plugins {     // begin namespace plugins
00028 
00029 
00030 // static members of DatabaseOutputReactor
00031 
00032 const std::string           DatabaseOutputReactor::DATABASE_ELEMENT_NAME = "Database";
00033 const std::string           DatabaseOutputReactor::TABLE_ELEMENT_NAME = "Table";
00034 const std::string           DatabaseOutputReactor::FIELD_ELEMENT_NAME = "Field";
00035 const std::string           DatabaseOutputReactor::EVENTS_QUEUED_ELEMENT_NAME = "EventsQueued";
00036 const std::string           DatabaseOutputReactor::KEY_CACHE_SIZE_ELEMENT_NAME = "KeyCacheSize";
00037 
00038 
00039 // DatabaseOutputReactor member functions
00040 
00041 void DatabaseOutputReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00042 {
00043     ConfigWriteLock cfg_lock(*this);
00044     try {
00045         // stop for config changes, but cache running status
00046         // so that it can be restarted when finished
00047         bool was_running = m_is_running;
00048         if (m_is_running)
00049           stop();
00050 
00051         // This will destruct earlier, if it existed
00052         m_inserter.reset(new pion::platform::DatabaseInserter());
00053     
00054         // update reactor base class config
00055         Reactor::setConfig(v, config_ptr);
00056     
00057         // initialize inserter parameters
00058         m_inserter->setLogger(m_logger);
00059         m_inserter->setDatabaseManager(getDatabaseManager());
00060         
00061         // update inserter config
00062         m_inserter->setConfig(v, config_ptr);
00063         
00064         // restart inserter if it was running
00065         if (was_running)
00066             start();
00067     } catch (...) {
00068         m_is_running = false;
00069         throw;
00070     }
00071 }
00072 
00073 void DatabaseOutputReactor::updateVocabulary(const Vocabulary& v)
00074 {
00075     // first update anything in the Reactor base class that might be needed
00076     ConfigWriteLock cfg_lock(*this);
00077     Reactor::updateVocabulary(v);
00078     if (m_inserter)
00079         m_inserter->updateVocabulary(v);
00080 }
00081 
00082 void DatabaseOutputReactor::updateDatabases(void)
00083 {
00084     ConfigWriteLock cfg_lock(*this);
00085     if (m_inserter)
00086         m_inserter->updateDatabases();
00087 }
00088 
00089 void DatabaseOutputReactor::query(std::ostream& out, const QueryBranches& branches,
00090     const QueryParams& qp)
00091 {
00092     writeBeginReactorXML(out);
00093     writeStatsOnlyXML(out);
00094 
00095     // write number of events queued for insertion
00096     out << '<' << EVENTS_QUEUED_ELEMENT_NAME << '>' << m_inserter->getEventsQueued()
00097         << "</" << EVENTS_QUEUED_ELEMENT_NAME << '>' << std::endl;
00098 
00099     // write size of the key cache
00100     out << '<' << KEY_CACHE_SIZE_ELEMENT_NAME << '>' << m_inserter->getKeyCacheSize()
00101         << "</" << KEY_CACHE_SIZE_ELEMENT_NAME << '>' << std::endl;
00102 
00103     // write database identifier and table name
00104     out << '<' << DATABASE_ELEMENT_NAME << '>' << m_inserter->getDatabaseId() << "</" << DATABASE_ELEMENT_NAME << '>' << std::endl
00105         << '<' << TABLE_ELEMENT_NAME << '>' << m_inserter->getTableName() << "</" << TABLE_ELEMENT_NAME << '>' << std::endl;
00106 
00107     // In addition; if full status is requested, get Database/Table/Fields
00108     if (branches.size() > 2 && branches[2] == "full") {
00109         Query::FieldMap field_map(m_inserter->getFieldMap());
00110         Query::IndexMap index_map(m_inserter->getIndexMap());
00111         for (unsigned int i = 0; i < field_map.size(); i++) {
00112             // <Field id="0" col="dbcol">vocab:uri</Field>
00113             // <Field id="0" col="dbcol" index="true">vocab:uri</Field>
00114             const std::string idx_str = (index_map.size() >= i && index_map[i] != "false") ? "true" : "false";
00115             out << '<' << FIELD_ELEMENT_NAME << " id=\"" << i << "\" col=\""
00116                 << field_map[i].first << "\" index=\"" << idx_str << "\">" << field_map[i].second.term_id
00117                 << "</" << FIELD_ELEMENT_NAME << '>' << std::endl;
00118         }
00119     }
00120 
00121     writeEndReactorXML(out);
00122 }
00123 
00124 void DatabaseOutputReactor::start(void)
00125 {
00126     ConfigWriteLock cfg_lock(*this);
00127     if (! m_is_running) {
00128         m_is_running = true;
00129         if (m_inserter) {
00130             try {
00131                 m_inserter->start();
00132             } catch (...) {
00133                 // failed to start inserter -> update running state to false
00134                 m_is_running = false;
00135                 throw;
00136             }
00137         }
00138     }
00139 }
00140 
00141 void DatabaseOutputReactor::stop(void)
00142 {
00143     ConfigWriteLock cfg_lock(*this);
00144     if (m_is_running) {
00145         m_is_running = false;
00146         if (m_inserter)
00147             m_inserter->stop();
00148     }
00149 }
00150 
00151 void DatabaseOutputReactor::process(const EventPtr& e)
00152 {
00153     // add the event to the insert queue
00154     m_inserter->insert(e);
00155 
00156     // deliver the event to other Reactors (if any are connected)
00157     deliverEvent(e);
00158 }
00159 
00160 }   // end namespace plugins
00161 }   // end namespace pion
00162 
00163 
00165 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_DatabaseOutputReactor(void) {
00166     return new pion::plugins::DatabaseOutputReactor();
00167 }
00168 
00170 extern "C" PION_PLUGIN_API void pion_destroy_DatabaseOutputReactor(pion::plugins::DatabaseOutputReactor *reactor_ptr) {
00171     delete reactor_ptr;
00172 }

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7