00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_PYTHONREACTOR_HEADER__
00021 #define __PION_PYTHONREACTOR_HEADER__
00022
00023
00024 #include <Python.h>
00025 #include <string>
00026 #include <boost/thread/tss.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionLogger.hpp>
00029 #include <pion/PionHashMap.hpp>
00030 #include <pion/PionException.hpp>
00031 #include <pion/platform/Reactor.hpp>
00032 #include <pion/platform/ReactionEngine.hpp>
00033
00034
00035 namespace pion {
00036 namespace plugins {
00037
00038
00042 class PythonReactor :
00043 public pion::platform::Reactor
00044 {
00045 public:
00046
00048 class UnknownTermException : public PionException {
00049 public:
00050 UnknownTermException(const std::string& term)
00051 : PionException("Unable to find required Vocabulary term: ", term) {}
00052 };
00053
00055 class InternalPythonException : public PionException {
00056 public:
00057 InternalPythonException(const std::string& reactor_id)
00058 : PionException("PythonReactor internal API error: ", reactor_id) {}
00059 };
00060
00062 class SourceFileNotFoundException : public PionException {
00063 public:
00064 SourceFileNotFoundException(const std::string& filename)
00065 : PionException("PythonReactor source code file not found: ", filename) {}
00066 };
00067
00069 class ReadSourceFileException : public PionException {
00070 public:
00071 ReadSourceFileException(const std::string& filename)
00072 : PionException("PythonReactor unable to read source code from file: ", filename) {}
00073 };
00074
00076 class FailedToCompileException : public PionException {
00077 public:
00078 FailedToCompileException(const std::string& filename)
00079 : PionException("PythonReactor compile failure: ", filename) {}
00080 };
00081
00083 class NotCallableException : public PionException {
00084 public:
00085 NotCallableException(const std::string& name)
00086 : PionException("PythonReactor attribute defined is not callable: ", name) {}
00087 };
00088
00090 class InitReactorObjectException : public PionException {
00091 public:
00092 InitReactorObjectException(const std::string& error_msg)
00093 : PionException("PythonReactor unable to initialize Reactor object: ", error_msg) {}
00094 };
00095
00097 class EventConversionException : public PionException {
00098 public:
00099 EventConversionException(const std::string& error_msg)
00100 : PionException("PythonReactor event conversion error: ", error_msg) {}
00101 };
00102
00103
00105 PythonReactor(void);
00106
00108 virtual ~PythonReactor();
00109
00117 virtual void setConfig(const pion::platform::Vocabulary& v, const xmlNodePtr config_ptr);
00118
00125 virtual void updateVocabulary(const pion::platform::Vocabulary& v);
00126
00132 virtual void process(const pion::platform::EventPtr& e);
00133
00143 virtual void query(std::ostream& out, const QueryBranches& branches,
00144 const QueryParams& qp);
00145
00147 virtual void start(void);
00148
00150 virtual void stop(void);
00151
00159
00160 bool deliverToConnections(PyObject *event_ptr);
00161
00169 PyObject *getSession(PyObject *event_ptr);
00170
00172 inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00173
00175 inline PionLogger getLogger(void) { return m_logger; }
00176
00178 inline const pion::platform::Vocabulary& getVocabulary(void) const {
00179 PION_ASSERT(m_vocab_ptr);
00180 return *m_vocab_ptr;
00181 }
00182
00184 static inline boost::uint64_t boost_msec_to_fsec(boost::uint64_t n);
00185
00187 static inline boost::uint64_t boost_fsec_to_msec(boost::uint64_t n);
00188
00189
00190 protected:
00191
00197 void updateTerms(const pion::platform::Vocabulary& v);
00198
00200 std::size_t getNumSessions(void) const;
00201
00203 void flushSessions(void);
00204
00210 static PyThreadState * initThreadState(void);
00211
00217 static void releaseThreadState(PyThreadState *ptr);
00218
00227 PyObject *findPythonFunction(PyObject *module_ptr, const std::string& func_name);
00228
00230 void resetPythonSymbols(void);
00231
00233 void compilePythonSource(void);
00234
00236 void initPythonModule(void);
00237
00239 void callPythonStart(void);
00240
00242 void callPythonStop(void);
00243
00245 std::string getSourceCodeFromFile(void);
00246
00248 std::string getPythonError(void);
00249
00250
00252 class PythonLock {
00253 public:
00254
00256 PythonLock(bool inverse = false)
00257 : m_inversed(inverse), m_thr_state_ptr(NULL)
00258 {
00259 m_thr_state_ptr = PythonReactor::initThreadState();
00260 if (m_inversed) {
00261 PyEval_ReleaseThread(m_thr_state_ptr);
00262 } else {
00263 PyEval_AcquireThread(m_thr_state_ptr);
00264 }
00265 }
00266
00268 ~PythonLock() {
00269 if (m_inversed) {
00270 PyEval_AcquireThread(m_thr_state_ptr);
00271 } else {
00272 PyEval_ReleaseThread(m_thr_state_ptr);
00273 }
00274 }
00275
00276 private:
00277
00279 bool m_inversed;
00280
00282 PyThreadState * m_thr_state_ptr;
00283 };
00284
00285
00286 private:
00287
00289 typedef PION_HASH_MAP<pion::platform::Event::BlobType, PyObject*, HashPionIdBlob> SessionMap;
00290
00291
00293 static const std::string START_FUNCTION_NAME;
00294
00296 static const std::string STOP_FUNCTION_NAME;
00297
00299 static const std::string PROCESS_FUNCTION_NAME;
00300
00302 static const std::string FILENAME_ELEMENT_NAME;
00303
00305 static const std::string PYTHON_SOURCE_ELEMENT_NAME;
00306
00308 static const std::string OPEN_SESSIONS_ELEMENT_NAME;
00309
00311 static boost::mutex m_init_mutex;
00312
00314 static boost::uint32_t m_init_num;
00315
00317 PionLogger m_logger;
00318
00320 std::string m_source;
00321
00323 std::string m_source_file;
00324
00326 PyObject * m_byte_code;
00327
00329 PyObject * m_module;
00330
00332 PyObject * m_start_func;
00333
00335 PyObject * m_stop_func;
00336
00338 PyObject * m_process_func;
00339
00341 PyObject * m_reactor_ptr;
00342
00344 pion::platform::VocabularyPtr m_vocab_ptr;
00345
00347 mutable boost::mutex m_sessions_mutex;
00348
00350 SessionMap m_sessions;
00351
00353 static PyInterpreterState * m_interp_ptr;
00354
00356 static boost::thread_specific_ptr<PyThreadState> * m_state_ptr;
00357
00359 static const std::string VOCAB_CLICKSTREAM_SESSION_EVENT;
00360 pion::platform::Vocabulary::TermRef m_session_event_term_ref;
00361
00363 static const std::string VOCAB_CLICKSTREAM_SESSION_ID;
00364 pion::platform::Vocabulary::TermRef m_session_id_term_ref;
00365 };
00366
00367
00368
00369
00370 inline boost::uint64_t PythonReactor::boost_msec_to_fsec(boost::uint64_t n) {
00371 switch (boost::posix_time::time_duration::resolution()) {
00372 case boost::date_time::sec:
00373 n /= 1000000;
00374 break;
00375 case boost::date_time::tenth:
00376 n /= 100000;
00377 break;
00378 case boost::date_time::hundreth:
00379 n /= 10000;
00380 break;
00381 case boost::date_time::milli:
00382 n /= 1000;
00383 break;
00384 case boost::date_time::ten_thousandth:
00385 n /= 100;
00386 break;
00387 case boost::date_time::micro:
00388
00389 break;
00390 case boost::date_time::nano:
00391 n *= 1000;
00392 break;
00393 case boost::date_time::NumResolutions:
00394
00395 break;
00396 }
00397 return n;
00398 }
00399
00400 inline boost::uint64_t PythonReactor::boost_fsec_to_msec(boost::uint64_t n) {
00401 switch (boost::posix_time::time_duration::resolution()) {
00402 case boost::date_time::sec:
00403 n *= 1000000;
00404 break;
00405 case boost::date_time::tenth:
00406 n *= 100000;
00407 break;
00408 case boost::date_time::hundreth:
00409 n *= 10000;
00410 break;
00411 case boost::date_time::milli:
00412 n *= 1000;
00413 break;
00414 case boost::date_time::ten_thousandth:
00415 n *= 100;
00416 break;
00417 case boost::date_time::micro:
00418
00419 break;
00420 case boost::date_time::nano:
00421 n /= 1000;
00422 break;
00423 case boost::date_time::NumResolutions:
00424
00425 break;
00426 }
00427 return n;
00428 }
00429
00430
00431 }
00432 }
00433
00434 #endif