00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <pion/platform/ConfigManager.hpp>
00021 #include "DatabaseOutputReactor.hpp"
00022
00023 using namespace pion::platform;
00024
00025
00026 namespace pion {
00027 namespace plugins {
00028
00029
00030
00031
00032 const std::string DatabaseOutputReactor::DATABASE_ELEMENT_NAME = "Database";
00033 const std::string DatabaseOutputReactor::TABLE_ELEMENT_NAME = "Table";
00034 const std::string DatabaseOutputReactor::FIELD_ELEMENT_NAME = "Field";
00035 const std::string DatabaseOutputReactor::EVENTS_QUEUED_ELEMENT_NAME = "EventsQueued";
00036 const std::string DatabaseOutputReactor::KEY_CACHE_SIZE_ELEMENT_NAME = "KeyCacheSize";
00037
00038
00039
00040
00041 void DatabaseOutputReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00042 {
00043 ConfigWriteLock cfg_lock(*this);
00044 try {
00045
00046
00047 bool was_running = m_is_running;
00048 if (m_is_running)
00049 stop();
00050
00051
00052 m_inserter.reset(new pion::platform::DatabaseInserter());
00053
00054
00055 Reactor::setConfig(v, config_ptr);
00056
00057
00058 m_inserter->setLogger(m_logger);
00059 m_inserter->setDatabaseManager(getDatabaseManager());
00060
00061
00062 m_inserter->setConfig(v, config_ptr);
00063
00064
00065 if (was_running)
00066 start();
00067 } catch (...) {
00068 m_is_running = false;
00069 throw;
00070 }
00071 }
00072
00073 void DatabaseOutputReactor::updateVocabulary(const Vocabulary& v)
00074 {
00075
00076 ConfigWriteLock cfg_lock(*this);
00077 Reactor::updateVocabulary(v);
00078 if (m_inserter)
00079 m_inserter->updateVocabulary(v);
00080 }
00081
00082 void DatabaseOutputReactor::updateDatabases(void)
00083 {
00084 ConfigWriteLock cfg_lock(*this);
00085 if (m_inserter)
00086 m_inserter->updateDatabases();
00087 }
00088
00089 void DatabaseOutputReactor::query(std::ostream& out, const QueryBranches& branches,
00090 const QueryParams& qp)
00091 {
00092 writeBeginReactorXML(out);
00093 writeStatsOnlyXML(out);
00094
00095
00096 out << '<' << EVENTS_QUEUED_ELEMENT_NAME << '>' << m_inserter->getEventsQueued()
00097 << "</" << EVENTS_QUEUED_ELEMENT_NAME << '>' << std::endl;
00098
00099
00100 out << '<' << KEY_CACHE_SIZE_ELEMENT_NAME << '>' << m_inserter->getKeyCacheSize()
00101 << "</" << KEY_CACHE_SIZE_ELEMENT_NAME << '>' << std::endl;
00102
00103
00104 out << '<' << DATABASE_ELEMENT_NAME << '>' << m_inserter->getDatabaseId() << "</" << DATABASE_ELEMENT_NAME << '>' << std::endl
00105 << '<' << TABLE_ELEMENT_NAME << '>' << m_inserter->getTableName() << "</" << TABLE_ELEMENT_NAME << '>' << std::endl;
00106
00107
00108 if (branches.size() > 2 && branches[2] == "full") {
00109 Query::FieldMap field_map(m_inserter->getFieldMap());
00110 Query::IndexMap index_map(m_inserter->getIndexMap());
00111 for (unsigned int i = 0; i < field_map.size(); i++) {
00112
00113
00114 const std::string idx_str = (index_map.size() >= i && index_map[i] != "false") ? "true" : "false";
00115 out << '<' << FIELD_ELEMENT_NAME << " id=\"" << i << "\" col=\""
00116 << field_map[i].first << "\" index=\"" << idx_str << "\">" << field_map[i].second.term_id
00117 << "</" << FIELD_ELEMENT_NAME << '>' << std::endl;
00118 }
00119 }
00120
00121 writeEndReactorXML(out);
00122 }
00123
00124 void DatabaseOutputReactor::start(void)
00125 {
00126 ConfigWriteLock cfg_lock(*this);
00127 if (! m_is_running) {
00128 m_is_running = true;
00129 if (m_inserter) {
00130 try {
00131 m_inserter->start();
00132 } catch (...) {
00133
00134 m_is_running = false;
00135 throw;
00136 }
00137 }
00138 }
00139 }
00140
00141 void DatabaseOutputReactor::stop(void)
00142 {
00143 ConfigWriteLock cfg_lock(*this);
00144 if (m_is_running) {
00145 m_is_running = false;
00146 if (m_inserter)
00147 m_inserter->stop();
00148 }
00149 }
00150
00151 void DatabaseOutputReactor::process(const EventPtr& e)
00152 {
00153
00154 m_inserter->insert(e);
00155
00156
00157 deliverEvent(e);
00158 }
00159
00160 }
00161 }
00162
00163
00165 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_DatabaseOutputReactor(void) {
00166 return new pion::plugins::DatabaseOutputReactor();
00167 }
00168
00170 extern "C" PION_PLUGIN_API void pion_destroy_DatabaseOutputReactor(pion::plugins::DatabaseOutputReactor *reactor_ptr) {
00171 delete reactor_ptr;
00172 }