platform/reactors/FissionReactor.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2009 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <boost/iostreams/stream.hpp>
00021 #include <boost/iostreams/device/array.hpp>
00022 #include <pion/platform/ConfigManager.hpp>
00023 #include <pion/platform/CodecFactory.hpp>
00024 #include "FissionReactor.hpp"
00025 
00026 using namespace pion::platform;
00027 
00028 
00029 namespace pion {        // begin namespace pion
00030 namespace plugins {     // begin namespace plugins
00031 
00032 
00033 // static members of FissionReactor
00034     
00035 const std::string           FissionReactor::COPY_TERM_ELEMENT_NAME = "CopyTerm";
00036 const std::string           FissionReactor::COPY_ALL_TERMS_ELEMENT_NAME = "CopyAllTerms";
00037 const std::string           FissionReactor::INPUT_EVENT_TYPE_ELEMENT_NAME = "InputEventType";
00038 const std::string           FissionReactor::INPUT_EVENT_TERM_ELEMENT_NAME = "InputEventTerm";
00039 const std::string           FissionReactor::CODEC_ELEMENT_NAME = "Codec";
00040 
00041 
00042 // FissionReactor member functions
00043 
00044 void FissionReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00045 {
00046     // first set config options for the Reactor base class
00047     ConfigWriteLock cfg_lock(*this);
00048     Reactor::setConfig(v, config_ptr);
00049 
00050     // get the input event type
00051     std::string config_str;
00052     if (! ConfigManager::getConfigOption(INPUT_EVENT_TYPE_ELEMENT_NAME, config_str, config_ptr))
00053         throw EmptyInputEventTypeException(getId());
00054 
00055     // find vocabulary term for input event type
00056     Vocabulary::TermRef term_ref = v.findTerm(config_str);
00057     if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00058         throw UnknownTermException(config_str);
00059     m_input_event_type = v[term_ref];
00060 
00061     // make sure that term is object/event type
00062     if (m_input_event_type.term_type != Vocabulary::TYPE_OBJECT)
00063         throw NotAnObjectException(config_str);
00064 
00065     // get the input event term
00066     if (! ConfigManager::getConfigOption(INPUT_EVENT_TERM_ELEMENT_NAME, config_str, config_ptr))
00067         throw EmptyInputEventTermException(getId());
00068 
00069     // find vocabulary term for input event term
00070     term_ref = v.findTerm(config_str);
00071     if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00072         throw UnknownTermException(config_str);
00073     m_input_event_term = v[term_ref];
00074 
00075     // only string types are currently supported for input event term
00076     switch (m_input_event_term.term_type) {
00077     case Vocabulary::TYPE_NULL:
00078     case Vocabulary::TYPE_OBJECT:
00079     case Vocabulary::TYPE_INT8:
00080     case Vocabulary::TYPE_INT16:
00081     case Vocabulary::TYPE_INT32:
00082     case Vocabulary::TYPE_UINT8:
00083     case Vocabulary::TYPE_UINT16:
00084     case Vocabulary::TYPE_UINT32:
00085     case Vocabulary::TYPE_INT64:
00086     case Vocabulary::TYPE_UINT64:
00087     case Vocabulary::TYPE_FLOAT:
00088     case Vocabulary::TYPE_DOUBLE:
00089     case Vocabulary::TYPE_LONG_DOUBLE:
00090     case Vocabulary::TYPE_DATE_TIME:
00091     case Vocabulary::TYPE_DATE:
00092     case Vocabulary::TYPE_TIME:
00093         throw TermNotStringException(config_str);
00094         break;
00095     case Vocabulary::TYPE_SHORT_STRING:
00096     case Vocabulary::TYPE_STRING:
00097     case Vocabulary::TYPE_LONG_STRING:
00098     case Vocabulary::TYPE_CHAR:
00099     case Vocabulary::TYPE_BLOB:
00100     case Vocabulary::TYPE_ZBLOB:
00101         break;  // these are all OK
00102     }
00103 
00104     // get the codec to use
00105     boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00106     if (! ConfigManager::getConfigOption(CODEC_ELEMENT_NAME, m_codec_id, config_ptr))
00107         throw EmptyCodecException(getId());
00108     m_codec_ptr = getCodecFactory().getCodec(m_codec_id);   
00109     PION_ASSERT(m_codec_ptr);
00110     codec_lock.unlock();
00111 
00112     // check if we should copy all terms from original event
00113     m_copy_all_terms = false;
00114     std::string copy_all_terms_str;
00115     if (ConfigManager::getConfigOption(COPY_ALL_TERMS_ELEMENT_NAME,
00116                                        copy_all_terms_str, config_ptr))
00117     {
00118         if (copy_all_terms_str == "true")
00119             m_copy_all_terms = true;
00120     }
00121 
00122     // get list of terms to copy from original event
00123     m_copy_terms.clear();
00124     xmlNodePtr copy_term_node = config_ptr;
00125     while ((copy_term_node = ConfigManager::findConfigNodeByName(COPY_TERM_ELEMENT_NAME, copy_term_node)) != NULL) {
00126         xmlChar *xml_char_ptr = xmlNodeGetContent(copy_term_node);
00127         if (xml_char_ptr != NULL) {
00128             const std::string copy_term_str(reinterpret_cast<char*>(xml_char_ptr));
00129             xmlFree(xml_char_ptr);
00130             if (! copy_term_str.empty()) {
00131                 // find the term in the Vocabulary
00132                 term_ref = v.findTerm(copy_term_str);
00133                 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00134                     throw UnknownTermException(copy_term_str);
00135 
00136                 // add it to the copy terms collection
00137                 m_copy_terms.push_back(v[term_ref]);
00138             }
00139         }
00140 
00141         // step to the next copy term
00142         copy_term_node = copy_term_node->next;
00143     }
00144 }
00145     
00146 void FissionReactor::updateVocabulary(const Vocabulary& v)
00147 {
00148     // first update anything in the Reactor base class that might be needed
00149     ConfigWriteLock cfg_lock(*this);
00150     Reactor::updateVocabulary(v);
00151 
00152     v.refreshTerm(m_input_event_type);
00153     v.refreshTerm(m_input_event_term);
00154 
00155     boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00156     if (m_codec_ptr)
00157         m_codec_ptr->updateVocabulary(v);
00158 }
00159 
00160 void FissionReactor::updateCodecs(void)
00161 {
00162     // check if the codec was deleted (if so, stop now!)
00163     if (! getCodecFactory().hasPlugin(m_codec_id)) {
00164         stop();
00165         boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00166         m_codec_ptr.reset();
00167     } else {
00168         // update the codec pointer
00169         boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00170         m_codec_ptr = getCodecFactory().getCodec(m_codec_id);
00171     }
00172 }
00173     
00174 void FissionReactor::process(const EventPtr& e)
00175 {
00176     if (e->getType() == m_input_event_type.term_ref) {
00177         // used for generating new events
00178         EventFactory event_factory;
00179         EventPtr new_ptr;
00180         bool read_result;
00181 
00182         // iterate through each value defined for the input event term
00183         Event::ValuesRange range = e->equal_range(m_input_event_term.term_ref);
00184         for (Event::ConstIterator it = range.first; it != range.second; ++it) {
00185 
00186             // create an input stream based upon the term value
00187             const Event::BlobType& ss = boost::get<const Event::BlobType&>(it->value);
00188             boost::iostreams::stream<boost::iostreams::array_source> input_stream(ss.get(), ss.size());
00189 
00190             // read Event(s) from the input stream
00191             while ( isRunning() && !input_stream.eof() ) {
00192 
00193                 // only allow one thread to use the codec at a time
00194                 boost::mutex::scoped_lock codec_lock(m_codec_mutex);
00195                 event_factory.create(new_ptr, m_codec_ptr->getEventType());
00196                 read_result = m_codec_ptr->read(input_stream, *new_ptr);
00197                 codec_lock.unlock();
00198                 if (! read_result)
00199                     break;
00200 
00201                 // copy terms from original event ?
00202                 if (m_copy_all_terms) {
00203                     // copy all terms from original event
00204                     *new_ptr += *e;
00205                 } else if (! m_copy_terms.empty() ) {
00206                     // copy some terms from original event
00207                     for (TermVector::const_iterator term_it = m_copy_terms.begin();
00208                         term_it != m_copy_terms.end(); ++term_it)
00209                     {
00210                         new_ptr->copyValues(*e, term_it->term_ref);
00211                     }
00212                 }
00213 
00214                 // deliver the event
00215                 deliverEvent(new_ptr);
00216             }
00217         }
00218     }
00219 }
00220 
00221     
00222 }   // end namespace plugins
00223 }   // end namespace pion
00224 
00225 
00227 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_FissionReactor(void) {
00228     return new pion::plugins::FissionReactor();
00229 }
00230 
00232 extern "C" PION_PLUGIN_API void pion_destroy_FissionReactor(pion::plugins::FissionReactor *reactor_ptr) {
00233     delete reactor_ptr;
00234 }

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7