platform/reactors/PythonReactor.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2011 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_PYTHONREACTOR_HEADER__
00021 #define __PION_PYTHONREACTOR_HEADER__
00022 
00023 // NOTE: According to API docs, Python.h must be #include'd FIRST
00024 #include <Python.h>
00025 #include <string>
00026 #include <boost/thread/tss.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionLogger.hpp>
00029 #include <pion/PionHashMap.hpp>
00030 #include <pion/PionException.hpp>
00031 #include <pion/platform/Reactor.hpp>
00032 #include <pion/platform/ReactionEngine.hpp>
00033 
00034 
00035 namespace pion {        // begin namespace pion
00036 namespace plugins {     // begin namespace plugins
00037 
00038 
00042 class PythonReactor :
00043     public pion::platform::Reactor
00044 {
00045 public: 
00046     
00048     class UnknownTermException : public PionException {
00049     public:
00050         UnknownTermException(const std::string& term)
00051             : PionException("Unable to find required Vocabulary term: ", term) {}
00052     };
00053 
00055     class InternalPythonException : public PionException {
00056     public:
00057         InternalPythonException(const std::string& reactor_id)
00058             : PionException("PythonReactor internal API error: ", reactor_id) {}
00059     };
00060 
00062     class SourceFileNotFoundException : public PionException {
00063     public:
00064         SourceFileNotFoundException(const std::string& filename)
00065             : PionException("PythonReactor source code file not found: ", filename) {}
00066     };
00067 
00069     class ReadSourceFileException : public PionException {
00070     public:
00071         ReadSourceFileException(const std::string& filename)
00072             : PionException("PythonReactor unable to read source code from file: ", filename) {}
00073     };
00074 
00076     class FailedToCompileException : public PionException {
00077     public:
00078         FailedToCompileException(const std::string& filename)
00079             : PionException("PythonReactor compile failure: ", filename) {}
00080     };
00081 
00083     class NotCallableException : public PionException {
00084     public:
00085         NotCallableException(const std::string& name)
00086             : PionException("PythonReactor attribute defined is not callable: ", name) {}
00087     };
00088 
00090     class InitReactorObjectException : public PionException {
00091     public:
00092         InitReactorObjectException(const std::string& error_msg)
00093             : PionException("PythonReactor unable to initialize Reactor object: ", error_msg) {}
00094     };
00095 
00097     class EventConversionException : public PionException {
00098     public:
00099         EventConversionException(const std::string& error_msg)
00100             : PionException("PythonReactor event conversion error: ", error_msg) {}
00101     };
00102 
00103 
00105     PythonReactor(void);
00106     
00108     virtual ~PythonReactor();
00109     
00117     virtual void setConfig(const pion::platform::Vocabulary& v, const xmlNodePtr config_ptr);
00118     
00125     virtual void updateVocabulary(const pion::platform::Vocabulary& v);
00126     
00132     virtual void process(const pion::platform::EventPtr& e);
00133     
00143     virtual void query(std::ostream& out, const QueryBranches& branches,
00144         const QueryParams& qp);
00145 
00147     virtual void start(void);
00148     
00150     virtual void stop(void);
00151     
00159 
00160     bool deliverToConnections(PyObject *event_ptr);
00161 
00169     PyObject *getSession(PyObject *event_ptr);
00170 
00172     inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00173 
00175     inline PionLogger getLogger(void) { return m_logger; }
00176 
00178     inline const pion::platform::Vocabulary& getVocabulary(void) const {
00179         PION_ASSERT(m_vocab_ptr);
00180         return *m_vocab_ptr;
00181     }
00182     
00184     static inline boost::uint64_t boost_msec_to_fsec(boost::uint64_t n);
00185 
00187     static inline boost::uint64_t boost_fsec_to_msec(boost::uint64_t n);
00188 
00189     
00190 protected:
00191 
00197     void updateTerms(const pion::platform::Vocabulary& v);
00198 
00200     std::size_t getNumSessions(void) const;
00201 
00203     void flushSessions(void);
00204 
00210     static PyThreadState * initThreadState(void);
00211 
00217     static void releaseThreadState(PyThreadState *ptr);
00218 
00227     PyObject *findPythonFunction(PyObject *module_ptr, const std::string& func_name);
00228 
00230     void resetPythonSymbols(void);
00231     
00233     void compilePythonSource(void);
00234     
00236     void initPythonModule(void);
00237     
00239     void callPythonStart(void);
00240     
00242     void callPythonStop(void);
00243     
00245     std::string getSourceCodeFromFile(void);
00246 
00248     std::string getPythonError(void);
00249 
00250     
00252     class PythonLock {
00253     public:
00254 
00256         PythonLock(bool inverse = false)
00257             : m_inversed(inverse), m_thr_state_ptr(NULL)
00258         {
00259             m_thr_state_ptr = PythonReactor::initThreadState();
00260             if (m_inversed) {
00261                 PyEval_ReleaseThread(m_thr_state_ptr);
00262             } else {
00263                 PyEval_AcquireThread(m_thr_state_ptr);
00264             }
00265         }
00266         
00268         ~PythonLock() {
00269             if (m_inversed) {
00270                 PyEval_AcquireThread(m_thr_state_ptr);
00271             } else {
00272                 PyEval_ReleaseThread(m_thr_state_ptr);
00273             }
00274         }
00275 
00276     private:
00277     
00279         bool            m_inversed;
00280 
00282         PyThreadState * m_thr_state_ptr;
00283     };
00284 
00285     
00286 private:
00287     
00289     typedef PION_HASH_MAP<pion::platform::Event::BlobType, PyObject*, HashPionIdBlob>   SessionMap;
00290 
00291 
00293     static const std::string        START_FUNCTION_NAME;
00294 
00296     static const std::string        STOP_FUNCTION_NAME;
00297 
00299     static const std::string        PROCESS_FUNCTION_NAME;
00300 
00302     static const std::string        FILENAME_ELEMENT_NAME;
00303 
00305     static const std::string        PYTHON_SOURCE_ELEMENT_NAME;
00306 
00308     static const std::string        OPEN_SESSIONS_ELEMENT_NAME;
00309 
00311     static boost::mutex             m_init_mutex;
00312     
00314     static boost::uint32_t          m_init_num;
00315 
00317     PionLogger                      m_logger;
00318 
00320     std::string                     m_source;
00321 
00323     std::string                     m_source_file;
00324 
00326     PyObject *                      m_byte_code;
00327     
00329     PyObject *                      m_module;
00330     
00332     PyObject *                      m_start_func;
00333     
00335     PyObject *                      m_stop_func;
00336     
00338     PyObject *                      m_process_func;
00339     
00341     PyObject *                      m_reactor_ptr;
00342     
00344     pion::platform::VocabularyPtr   m_vocab_ptr;
00345     
00347     mutable boost::mutex            m_sessions_mutex;
00348 
00350     SessionMap                      m_sessions;
00351 
00353     static PyInterpreterState *     m_interp_ptr;
00354 
00356     static boost::thread_specific_ptr<PyThreadState> *  m_state_ptr;
00357 
00359     static const std::string            VOCAB_CLICKSTREAM_SESSION_EVENT;
00360     pion::platform::Vocabulary::TermRef m_session_event_term_ref;
00361 
00363     static const std::string            VOCAB_CLICKSTREAM_SESSION_ID;
00364     pion::platform::Vocabulary::TermRef m_session_id_term_ref;
00365 };
00366 
00367 
00368 // inline functions for PythonReactor
00369 
00370 inline boost::uint64_t PythonReactor::boost_msec_to_fsec(boost::uint64_t n) {
00371     switch (boost::posix_time::time_duration::resolution()) {
00372     case boost::date_time::sec:
00373         n /= 1000000;
00374         break;
00375     case boost::date_time::tenth:
00376         n /= 100000;
00377         break;
00378     case boost::date_time::hundreth:
00379         n /= 10000;
00380         break;
00381     case boost::date_time::milli:
00382         n /= 1000;
00383         break;
00384     case boost::date_time::ten_thousandth:
00385         n /= 100;
00386         break;
00387     case boost::date_time::micro:
00388         // good to go
00389         break;
00390     case boost::date_time::nano:
00391         n *= 1000;
00392         break;
00393     case boost::date_time::NumResolutions:
00394         // shouldn't happen
00395         break;
00396     }
00397     return n;
00398 }
00399 
00400 inline boost::uint64_t PythonReactor::boost_fsec_to_msec(boost::uint64_t n) {
00401     switch (boost::posix_time::time_duration::resolution()) {
00402     case boost::date_time::sec:
00403         n *= 1000000;
00404         break;
00405     case boost::date_time::tenth:
00406         n *= 100000;
00407         break;
00408     case boost::date_time::hundreth:
00409         n *= 10000;
00410         break;
00411     case boost::date_time::milli:
00412         n *= 1000;
00413         break;
00414     case boost::date_time::ten_thousandth:
00415         n *= 100;
00416         break;
00417     case boost::date_time::micro:
00418         // good to go
00419         break;
00420     case boost::date_time::nano:
00421         n /= 1000;
00422         break;
00423     case boost::date_time::NumResolutions:
00424         // shouldn't happen
00425         break;
00426     }
00427     return n;
00428 }
00429 
00430 
00431 }   // end namespace plugins
00432 }   // end namespace pion
00433 
00434 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7