00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <boost/iostreams/stream.hpp>
00021 #include <boost/iostreams/device/array.hpp>
00022 #include <pion/platform/ConfigManager.hpp>
00023 #include <pion/platform/CodecFactory.hpp>
00024 #include "FissionReactor.hpp"
00025
00026 using namespace pion::platform;
00027
00028
00029 namespace pion {
00030 namespace plugins {
00031
00032
00033
00034
00035 const std::string FissionReactor::COPY_TERM_ELEMENT_NAME = "CopyTerm";
00036 const std::string FissionReactor::COPY_ALL_TERMS_ELEMENT_NAME = "CopyAllTerms";
00037 const std::string FissionReactor::INPUT_EVENT_TYPE_ELEMENT_NAME = "InputEventType";
00038 const std::string FissionReactor::INPUT_EVENT_TERM_ELEMENT_NAME = "InputEventTerm";
00039 const std::string FissionReactor::CODEC_ELEMENT_NAME = "Codec";
00040
00041
00042
00043
00044 void FissionReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00045 {
00046
00047 ConfigWriteLock cfg_lock(*this);
00048 Reactor::setConfig(v, config_ptr);
00049
00050
00051 std::string config_str;
00052 if (! ConfigManager::getConfigOption(INPUT_EVENT_TYPE_ELEMENT_NAME, config_str, config_ptr))
00053 throw EmptyInputEventTypeException(getId());
00054
00055
00056 Vocabulary::TermRef term_ref = v.findTerm(config_str);
00057 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00058 throw UnknownTermException(config_str);
00059 m_input_event_type = v[term_ref];
00060
00061
00062 if (m_input_event_type.term_type != Vocabulary::TYPE_OBJECT)
00063 throw NotAnObjectException(config_str);
00064
00065
00066 if (! ConfigManager::getConfigOption(INPUT_EVENT_TERM_ELEMENT_NAME, config_str, config_ptr))
00067 throw EmptyInputEventTermException(getId());
00068
00069
00070 term_ref = v.findTerm(config_str);
00071 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00072 throw UnknownTermException(config_str);
00073 m_input_event_term = v[term_ref];
00074
00075
00076 switch (m_input_event_term.term_type) {
00077 case Vocabulary::TYPE_NULL:
00078 case Vocabulary::TYPE_OBJECT:
00079 case Vocabulary::TYPE_INT8:
00080 case Vocabulary::TYPE_INT16:
00081 case Vocabulary::TYPE_INT32:
00082 case Vocabulary::TYPE_UINT8:
00083 case Vocabulary::TYPE_UINT16:
00084 case Vocabulary::TYPE_UINT32:
00085 case Vocabulary::TYPE_INT64:
00086 case Vocabulary::TYPE_UINT64:
00087 case Vocabulary::TYPE_FLOAT:
00088 case Vocabulary::TYPE_DOUBLE:
00089 case Vocabulary::TYPE_LONG_DOUBLE:
00090 case Vocabulary::TYPE_DATE_TIME:
00091 case Vocabulary::TYPE_DATE:
00092 case Vocabulary::TYPE_TIME:
00093 throw TermNotStringException(config_str);
00094 break;
00095 case Vocabulary::TYPE_SHORT_STRING:
00096 case Vocabulary::TYPE_STRING:
00097 case Vocabulary::TYPE_LONG_STRING:
00098 case Vocabulary::TYPE_CHAR:
00099 case Vocabulary::TYPE_BLOB:
00100 case Vocabulary::TYPE_ZBLOB:
00101 break;
00102 }
00103
00104
00105 boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00106 if (! ConfigManager::getConfigOption(CODEC_ELEMENT_NAME, m_codec_id, config_ptr))
00107 throw EmptyCodecException(getId());
00108 m_codec_ptr = getCodecFactory().getCodec(m_codec_id);
00109 PION_ASSERT(m_codec_ptr);
00110 codec_lock.unlock();
00111
00112
00113 m_copy_all_terms = false;
00114 std::string copy_all_terms_str;
00115 if (ConfigManager::getConfigOption(COPY_ALL_TERMS_ELEMENT_NAME,
00116 copy_all_terms_str, config_ptr))
00117 {
00118 if (copy_all_terms_str == "true")
00119 m_copy_all_terms = true;
00120 }
00121
00122
00123 m_copy_terms.clear();
00124 xmlNodePtr copy_term_node = config_ptr;
00125 while ((copy_term_node = ConfigManager::findConfigNodeByName(COPY_TERM_ELEMENT_NAME, copy_term_node)) != NULL) {
00126 xmlChar *xml_char_ptr = xmlNodeGetContent(copy_term_node);
00127 if (xml_char_ptr != NULL) {
00128 const std::string copy_term_str(reinterpret_cast<char*>(xml_char_ptr));
00129 xmlFree(xml_char_ptr);
00130 if (! copy_term_str.empty()) {
00131
00132 term_ref = v.findTerm(copy_term_str);
00133 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00134 throw UnknownTermException(copy_term_str);
00135
00136
00137 m_copy_terms.push_back(v[term_ref]);
00138 }
00139 }
00140
00141
00142 copy_term_node = copy_term_node->next;
00143 }
00144 }
00145
00146 void FissionReactor::updateVocabulary(const Vocabulary& v)
00147 {
00148
00149 ConfigWriteLock cfg_lock(*this);
00150 Reactor::updateVocabulary(v);
00151
00152 v.refreshTerm(m_input_event_type);
00153 v.refreshTerm(m_input_event_term);
00154
00155 boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00156 if (m_codec_ptr)
00157 m_codec_ptr->updateVocabulary(v);
00158 }
00159
00160 void FissionReactor::updateCodecs(void)
00161 {
00162
00163 if (! getCodecFactory().hasPlugin(m_codec_id)) {
00164 stop();
00165 boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00166 m_codec_ptr.reset();
00167 } else {
00168
00169 boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00170 m_codec_ptr = getCodecFactory().getCodec(m_codec_id);
00171 }
00172 }
00173
00174 void FissionReactor::process(const EventPtr& e)
00175 {
00176 if (e->getType() == m_input_event_type.term_ref) {
00177
00178 EventFactory event_factory;
00179 EventPtr new_ptr;
00180 bool read_result;
00181
00182
00183 Event::ValuesRange range = e->equal_range(m_input_event_term.term_ref);
00184 for (Event::ConstIterator it = range.first; it != range.second; ++it) {
00185
00186
00187 const Event::BlobType& ss = boost::get<const Event::BlobType&>(it->value);
00188 boost::iostreams::stream<boost::iostreams::array_source> input_stream(ss.get(), ss.size());
00189
00190
00191 while ( isRunning() && !input_stream.eof() ) {
00192
00193
00194 boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00195 event_factory.create(new_ptr, m_codec_ptr->getEventType());
00196 read_result = m_codec_ptr->read(input_stream, *new_ptr);
00197 codec_lock.unlock();
00198 if (! read_result)
00199 break;
00200
00201
00202 if (m_copy_all_terms) {
00203
00204 *new_ptr += *e;
00205 } else if (! m_copy_terms.empty() ) {
00206
00207 for (TermVector::const_iterator term_it = m_copy_terms.begin();
00208 term_it != m_copy_terms.end(); ++term_it)
00209 {
00210 new_ptr->copyValues(*e, term_it->term_ref);
00211 }
00212 }
00213
00214
00215 deliverEvent(new_ptr);
00216 }
00217 }
00218 }
00219 }
00220
00221
00222 }
00223 }
00224
00225
00227 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_FissionReactor(void) {
00228 return new pion::plugins::FissionReactor();
00229 }
00230
00232 extern "C" PION_PLUGIN_API void pion_destroy_FissionReactor(pion::plugins::FissionReactor *reactor_ptr) {
00233 delete reactor_ptr;
00234 }