00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <Python.h>
00022 #include <frameobject.h>
00023 #include <cstring>
00024 #include "structmember.h"
00025 #include "datetime.h"
00026 #include <sstream>
00027 #include <fstream>
00028 #include <boost/filesystem/operations.hpp>
00029 #include <pion/platform/ConfigManager.hpp>
00030 #include "PythonReactor.hpp"
00031
00032
00033 #if PY_VERSION_HEX < 0x02050000
00034 typedef int Py_ssize_t;
00035 typedef inquiry lenfunc;
00036 #define PY_SSIZE_T_MAX INT_MAX
00037 #define PY_SSIZE_T_MIN INT_MIN
00038 #endif
00039
00040 using namespace std;
00041 using namespace pion::platform;
00042
00043
00044 namespace pion {
00045 namespace plugins {
00046
00047
00048
00049
00050
00051 static bool
00052 Python_getTermRef(const Vocabulary& v, PyObject *obj, Vocabulary::TermRef& term_ref)
00053 {
00054 term_ref = Vocabulary::UNDEFINED_TERM_REF;
00055
00056 if (PyInt_Check(obj) || PyLong_Check(obj)) {
00057 term_ref = PyLong_AsUnsignedLong(obj);
00058 if (term_ref == Vocabulary::UNDEFINED_TERM_REF) {
00059 PyErr_SetString(PyExc_KeyError, "undefined term reference");
00060 } else if (term_ref > v.size()) {
00061 term_ref = Vocabulary::UNDEFINED_TERM_REF;
00062 PyErr_SetString(PyExc_KeyError, "out-of-range term reference");
00063 }
00064 } else if (PyString_Check(obj)) {
00065 const char *term_str = PyString_AsString(obj);
00066 term_ref = v.findTerm(term_str);
00067 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00068 (void)PyErr_Format(PyExc_KeyError, "term '%s' not found", term_str);
00069 } else {
00070 PyErr_SetString(PyExc_TypeError, "invalid argument");
00071 }
00072
00073 return (term_ref != Vocabulary::UNDEFINED_TERM_REF);
00074 }
00075
00076
00077 static PyObject *Event_create(PyObject *reactor_ptr, const EventPtr& event_ptr, bool is_unique);
00078
00079
00080
00081
00082 typedef struct {
00083 PyObject_HEAD
00084 PyObject * dict;
00085 PythonReactor * __this;
00086 } PythonReactorObject;
00087
00088 static void
00089 Reactor_dealloc(PythonReactorObject* self)
00090 {
00091 Py_XDECREF(self->dict);
00092 self->ob_type->tp_free((PyObject*)self);
00093 }
00094
00095 static PyObject *
00096 Reactor_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
00097 {
00098 PythonReactorObject *self;
00099 self = (PythonReactorObject *)type->tp_alloc(type, 0);
00100 if (self != NULL) {
00101 self->dict = PyDict_New();
00102 if (self->dict == NULL) {
00103 Py_DECREF(self);
00104 return NULL;
00105 }
00106 self->__this = NULL;
00107 }
00108 return (PyObject *)self;
00109 }
00110
00111 static PyObject*
00112 Reactor_event(PythonReactorObject *self, PyObject *args)
00113 {
00114
00115 PyObject *term_id_obj;
00116
00117 if (! PyArg_ParseTuple(args, "O:reactor.event", &term_id_obj)) {
00118 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00119 return NULL;
00120 }
00121
00122
00123 Vocabulary::TermRef event_type;
00124 PythonReactor *ptr = self->__this;
00125 PION_ASSERT(ptr);
00126 if (! Python_getTermRef(ptr->getVocabulary(), term_id_obj, event_type))
00127 return NULL;
00128
00129
00130 EventFactory f;
00131 EventPtr new_ptr;
00132 f.create(new_ptr, event_type);
00133
00134 return Event_create((PyObject*)self, new_ptr, true);
00135 }
00136
00137 static PyObject*
00138 Reactor_deliver(PythonReactorObject *self, PyObject *args)
00139 {
00140
00141 PyObject *event_ptr;
00142 if (! PyArg_ParseTuple(args, "O:reactor.deliver", &event_ptr)) {
00143 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00144 return NULL;
00145 }
00146
00147
00148
00149
00150 PythonReactor *ptr = self->__this;
00151 PION_ASSERT(ptr);
00152 if (! ptr->deliverToConnections(event_ptr))
00153 return NULL;
00154
00155
00156 Py_INCREF(Py_None);
00157 return Py_None;
00158 }
00159
00160 static PyObject*
00161 Reactor_getsession(PythonReactorObject *self, PyObject *args)
00162 {
00163
00164 PyObject *event_ptr;
00165 if (! PyArg_ParseTuple(args, "O:reactor.getsession", &event_ptr)) {
00166 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00167 return NULL;
00168 }
00169
00170
00171
00172 PythonReactor *ptr = self->__this;
00173 PION_ASSERT(ptr);
00174 return ptr->getSession(event_ptr);
00175 }
00176
00177 static PyObject*
00178 Reactor_getterm(PythonReactorObject *self, PyObject *args)
00179 {
00180
00181 PyObject *term_id_obj;
00182
00183 if (! PyArg_ParseTuple(args, "O:reactor.getterm", &term_id_obj)) {
00184 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00185 return NULL;
00186 }
00187 char *term_id = PyString_AsString(term_id_obj);
00188 if (term_id == NULL || *term_id == '\0') {
00189 PyErr_SetString(PyExc_TypeError, "parameter must be a string");
00190 return NULL;
00191 }
00192 PION_ASSERT(self->__this);
00193 return PyLong_FromUnsignedLong(self->__this->getVocabulary().findTerm(term_id));
00194 }
00195
00196 static PyObject*
00197 Reactor_GetAttr(PyObject *obj, PyObject *attr_name)
00198 {
00199 PyObject *retval = PyObject_GenericGetAttr(obj, attr_name);
00200
00201 if (retval == NULL) {
00202 PyErr_Clear();
00203 PythonReactorObject *self = (PythonReactorObject*) obj;
00204 const char *attr_str = PyString_AsString(attr_name);
00205 PION_ASSERT(self->__this);
00206 if (strcmp(attr_str, "id") == 0) {
00207 retval = PyString_FromString(self->__this->getId().c_str());
00208 } else if (strcmp(attr_str, "name") == 0) {
00209 retval = PyString_FromString(self->__this->getName().c_str());
00210 } else {
00211
00212 retval = PyDict_GetItem(self->dict, attr_name);
00213 if (retval == NULL) {
00214 (void)PyErr_Format(PyExc_AttributeError, "'pion.reactor' object has no attribute '%s'", attr_str);
00215 } else {
00216 Py_INCREF(retval);
00217 }
00218 }
00219 }
00220
00221 return retval;
00222 }
00223
00224 static int
00225 Reactor_SetAttr(PyObject *obj, PyObject *attr_name, PyObject *value)
00226 {
00227 int retval = -1;
00228 char *attr_str = PyString_AsString(attr_name);
00229 PythonReactorObject *self = (PythonReactorObject*) obj;
00230
00231 if (strcmp(attr_str, "id") == 0 || strcmp(attr_str, "name") == 0) {
00232 (void)PyErr_Format(PyExc_AttributeError, "Read-only attribute: %s", attr_str);
00233 retval = -1;
00234 } else if (value == NULL) {
00235 retval = PyDict_DelItem(self->dict, attr_name);
00236 } else {
00237 retval = PyDict_SetItem(self->dict, attr_name, value);
00238 }
00239
00240 return retval;
00241 }
00242
00243 static PyMethodDef Reactor_methods[] = {
00244 {(char*)"event", (PyCFunction)Reactor_event, METH_VARARGS,
00245 (char*)"Constructs a new pion.event object for the given type."},
00246 {(char*)"deliver", (PyCFunction)Reactor_deliver, METH_VARARGS,
00247 (char*)"Delivers an event to the reactor's output connections."},
00248 {(char*)"getterm", (PyCFunction)Reactor_getterm, METH_VARARGS,
00249 (char*)"Returns a numeric term reference for the given identifer."},
00250 {(char*)"getsession", (PyCFunction)Reactor_getsession, METH_VARARGS,
00251 (char*)"Returns a unique object for the event's session."},
00252 {NULL}
00253 };
00254
00255 static PyMemberDef Reactor_members[] = {
00256 {NULL}
00257 };
00258
00259 static PyTypeObject PythonReactorType = {
00260 PyObject_HEAD_INIT(NULL)
00261 0,
00262 "pion.reactor",
00263 sizeof(PythonReactorObject),
00264 0,
00265 (destructor)Reactor_dealloc,
00266 0,
00267 0,
00268 0,
00269 0,
00270 0,
00271 0,
00272 0,
00273 0,
00274 0,
00275 0,
00276 0,
00277 Reactor_GetAttr,
00278 Reactor_SetAttr,
00279 0,
00280 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
00281 "pion reactor objects",
00282 0,
00283 0,
00284 0,
00285 0,
00286 0,
00287 0,
00288 Reactor_methods,
00289 Reactor_members,
00290 0,
00291 0,
00292 0,
00293 0,
00294 0,
00295 0,
00296 0,
00297 0,
00298 Reactor_new,
00299 };
00300
00301 static PythonReactorObject *
00302 Reactor_create(PythonReactor *this_ptr)
00303 {
00304 PythonReactorObject *self;
00305 self = (PythonReactorObject *) PythonReactorType.tp_alloc(&PythonReactorType, 0);
00306 if (self != NULL) {
00307 self->dict = PyDict_New();
00308 self->__this = this_ptr;
00309 }
00310 return self;
00311 }
00312
00313
00314
00315
00316 typedef struct {
00317 PyObject_HEAD
00318 bool is_unique;
00319 EventPtr event_ptr;
00320 PythonReactorObject * reactor_ptr;
00321 } PythonEventObject;
00322
00323 static void
00324 Event_dealloc(PythonEventObject* self)
00325 {
00326 self->event_ptr.reset();
00327 self->ob_type->tp_free((PyObject*)self);
00328 }
00329
00330 static PyObject *
00331 Event_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
00332 {
00333 PythonEventObject *self;
00334 self = (PythonEventObject *)type->tp_alloc(type, 0);
00335 if (self != NULL) {
00336 self->is_unique = false;
00337 self->reactor_ptr = NULL;
00338 }
00339 return (PyObject *)self;
00340 }
00341
00342 static const Vocabulary&
00343 Event_getVocabulary(PythonEventObject *self)
00344 {
00345 PION_ASSERT(self->reactor_ptr);
00346 PION_ASSERT(self->reactor_ptr->__this);
00347 return self->reactor_ptr->__this->getVocabulary();
00348 }
00349
00350 static bool
00351 Event_getTermRef(PythonEventObject *self, PyObject *obj, Vocabulary::TermRef& term_ref)
00352 {
00353 return Python_getTermRef(Event_getVocabulary(self), obj, term_ref);
00354 }
00355
00356 static int
00357 Event_init(PythonEventObject *self, PyObject *args, PyObject *kwds)
00358 {
00359 PyObject *reactor_ptr=NULL;
00360 PyObject *type=NULL;
00361 static char *kwlist[] = {(char*)"reactor", (char*)"type", NULL};
00362
00363 if (! PyArg_ParseTupleAndKeywords(args, kwds, "OO:pion.event", kwlist, &reactor_ptr, &type))
00364 return -1;
00365
00366 EventFactory f;
00367 self->reactor_ptr = (PythonReactorObject*) reactor_ptr;
00368 PION_ASSERT(self->reactor_ptr);
00369 Event::EventType event_type;
00370 if (! Event_getTermRef(self, type, event_type))
00371 return -1;
00372
00373 f.create(self->event_ptr, event_type);
00374 self->is_unique = true;
00375
00376 return 0;
00377 }
00378
00379 static int
00380 Event_print(PyObject *obj, FILE *fp, int flags)
00381 {
00382 PythonEventObject *self = (PythonEventObject*) obj;
00383
00384 if (self->event_ptr.get() != NULL) {
00385 PION_ASSERT(self->reactor_ptr);
00386 const Event& e = *(self->event_ptr);
00387 std::string str;
00388
00389 fprintf(fp, "\npion.event (type=%s)\n===========================================================\n", Event_getVocabulary(self)[e.getType()].term_id.c_str());
00390
00391 for (Event::ConstIterator it = e.begin(); it != e.end(); ++it) {
00392 const Vocabulary::Term& term = Event_getVocabulary(self)[it->term_ref];
00393 Event::write(str, it->value, term);
00394 fprintf(fp, "--- %s = %s\n", term.term_id.c_str(), str.c_str());
00395 }
00396 } else {
00397 fprintf(fp, "pion.event (empty)\n");
00398 }
00399
00400 return 0;
00401 }
00402
00403 static PyObject*
00404 Event_getValue(const Event::ParameterValue& value, Vocabulary::DataType term_type)
00405 {
00406
00407 PyDateTime_IMPORT;
00408
00409
00410 PyObject *retval = NULL;
00411 switch(term_type) {
00412 case Vocabulary::TYPE_NULL:
00413 case Vocabulary::TYPE_OBJECT:
00414
00415 retval = PyInt_FromLong(1);
00416 break;
00417 case Vocabulary::TYPE_INT8:
00418 case Vocabulary::TYPE_INT16:
00419 case Vocabulary::TYPE_INT32:
00420 retval = PyInt_FromLong(boost::get<boost::int32_t>(value));
00421 break;
00422 case Vocabulary::TYPE_UINT8:
00423 case Vocabulary::TYPE_UINT16:
00424 retval = PyInt_FromLong(boost::get<boost::uint32_t>(value));
00425 break;
00426 case Vocabulary::TYPE_UINT32:
00427 retval = PyLong_FromUnsignedLong(boost::get<boost::uint32_t>(value));
00428 break;
00429 case Vocabulary::TYPE_INT64:
00430 retval = PyLong_FromLongLong(boost::get<boost::int64_t>(value));
00431 break;
00432 case Vocabulary::TYPE_UINT64:
00433 retval = PyLong_FromUnsignedLongLong(boost::get<boost::uint64_t>(value));
00434 break;
00435 case Vocabulary::TYPE_FLOAT:
00436 retval = PyFloat_FromDouble(boost::get<float>(value));
00437 break;
00438 case Vocabulary::TYPE_DOUBLE:
00439 retval = PyFloat_FromDouble(boost::get<double>(value));
00440 break;
00441 case Vocabulary::TYPE_LONG_DOUBLE:
00442 retval = PyFloat_FromDouble(boost::get<long double>(value));
00443 break;
00444 case Vocabulary::TYPE_SHORT_STRING:
00445 case Vocabulary::TYPE_STRING:
00446 case Vocabulary::TYPE_LONG_STRING:
00447 case Vocabulary::TYPE_CHAR:
00448 case Vocabulary::TYPE_BLOB:
00449 case Vocabulary::TYPE_ZBLOB:
00450 {
00451 const Event::BlobType& b = boost::get<const Event::BlobType&>(value);
00452 retval = PyString_FromStringAndSize(b.get(), b.size());
00453 break;
00454 }
00455 case Vocabulary::TYPE_DATE_TIME:
00456 {
00457 const PionDateTime& d = boost::get<const PionDateTime&>(value);
00458 retval = PyDateTime_FromDateAndTime(d.date().year(),
00459 d.date().month(), d.date().day(),
00460 d.time_of_day().hours(), d.time_of_day().minutes(),
00461 d.time_of_day().seconds(),
00462 d.time_of_day().total_microseconds() % 1000000UL);
00463 break;
00464 }
00465 case Vocabulary::TYPE_DATE:
00466 {
00467 const PionDateTime& d = boost::get<const PionDateTime&>(value);
00468 retval = PyDate_FromDate(d.date().year(), d.date().month(),
00469 d.date().day());
00470 break;
00471 }
00472 case Vocabulary::TYPE_TIME:
00473 {
00474 const PionDateTime& d = boost::get<const PionDateTime&>(value);
00475 retval = PyTime_FromTime(d.time_of_day().hours(),
00476 d.time_of_day().minutes(), d.time_of_day().seconds(),
00477 d.time_of_day().total_microseconds() % 1000000UL);
00478 break;
00479 }
00480 }
00481
00482 return retval;
00483 }
00484
00485 static PyObject*
00486 Event_getBase(PythonEventObject *self, PyObject *term, PyObject *default_value)
00487 {
00488 PyObject *retval = NULL;
00489 Vocabulary::TermRef term_ref;
00490 if (! Event_getTermRef(self, term, term_ref))
00491 return NULL;
00492
00493 PION_ASSERT(self->event_ptr.get());
00494
00495
00496 const Event::ParameterValue *param_ptr = self->event_ptr->getPointer(term_ref);
00497
00498 if (param_ptr == NULL) {
00499
00500 if (default_value) {
00501 Py_INCREF(default_value);
00502 retval = default_value;
00503 } else {
00504 Py_INCREF(Py_None);
00505 retval = Py_None;
00506 }
00507
00508 } else {
00509
00510 retval = Event_getValue(*param_ptr, Event_getVocabulary(self)[term_ref].term_type);
00511 if (retval == NULL) {
00512 PyErr_SetString(PyExc_RuntimeError, "event parameter conversion failed");
00513 }
00514 }
00515
00516 return retval;
00517 }
00518
00519 static PyObject*
00520 Event_getMap(PythonEventObject *self, PyObject *term)
00521 {
00522 return Event_getBase(self, term, NULL);
00523 }
00524
00525 static PyObject*
00526 Event_getFunc(PythonEventObject *self, PyObject *args)
00527 {
00528
00529 PyObject *term = NULL;
00530 PyObject *default_value = NULL;
00531
00532
00533 if (! PyArg_ParseTuple(args, "O|O:event.get", &term, &default_value)) {
00534 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00535 return NULL;
00536 }
00537
00538 return Event_getBase(self, term, default_value);
00539 }
00540
00541 static PyObject*
00542 Event_getlist(PythonEventObject *self, PyObject *args)
00543 {
00544
00545 PyObject *term = NULL;
00546 PyObject *default_value = NULL;
00547
00548 if (! PyArg_ParseTuple(args, "O|O:event.getlist", &term, &default_value)) {
00549 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00550 return NULL;
00551 }
00552
00553 PyObject *retval = NULL;
00554 Vocabulary::TermRef term_ref;
00555 if (! Event_getTermRef(self, term, term_ref))
00556 return NULL;
00557
00558
00559 Event::ValuesRange range = self->event_ptr->equal_range(term_ref);
00560
00561 if (range.first == range.second) {
00562
00563 if (default_value) {
00564 Py_INCREF(default_value);
00565 retval = default_value;
00566 } else {
00567 retval = PyList_New(0);
00568 }
00569
00570 } else {
00571
00572 const Vocabulary::DataType term_type = Event_getVocabulary(self)[term_ref].term_type;
00573 retval = PyList_New(0);
00574
00575 for (Event::ConstIterator it = range.first; it != range.second; ++it) {
00576 PyObject *param = Event_getValue(it->value, term_type);
00577 if (param == NULL) {
00578 PyErr_SetString(PyExc_RuntimeError, "event parameter conversion failed");
00579 Py_DECREF(retval);
00580 return NULL;
00581 }
00582 PyList_Append(retval, param);
00583 }
00584 }
00585
00586 return retval;
00587 }
00588
00589 static Py_ssize_t
00590 Event_conversion_error(const std::string& term_id, const char *msg)
00591 {
00592 std::string error_msg(msg);
00593 error_msg += " for ";
00594 error_msg += term_id;
00595 PyErr_SetString(PyExc_TypeError, error_msg.c_str());
00596 return -1;
00597 }
00598
00599 static Py_ssize_t
00600 Event_setTerm(PythonEventObject *self, Vocabulary::TermRef term_ref, PyObject *value)
00601 {
00602
00603 PyDateTime_IMPORT;
00604
00605 const Vocabulary::Term& t = Event_getVocabulary(self)[term_ref];
00606 Event& e = *self->event_ptr;
00607
00608
00609 switch( t.term_type ) {
00610 case Vocabulary::TYPE_NULL:
00611 case Vocabulary::TYPE_OBJECT:
00612
00613 e.setInt(term_ref, 1);
00614 break;
00615 case Vocabulary::TYPE_INT8:
00616 case Vocabulary::TYPE_INT16:
00617 case Vocabulary::TYPE_INT32:
00618 if (PyLong_Check(value))
00619 e.setInt(term_ref, PyLong_AsLong(value) );
00620 else if (PyInt_Check(value))
00621 e.setInt(term_ref, PyInt_AsLong(value) );
00622 else
00623 return Event_conversion_error(t.term_id, "int or long required");
00624 break;
00625 case Vocabulary::TYPE_UINT8:
00626 case Vocabulary::TYPE_UINT16:
00627 if (PyLong_Check(value))
00628 e.setUInt(term_ref, PyLong_AsUnsignedLong(value) );
00629 else if (PyInt_Check(value))
00630 e.setUInt(term_ref, PyInt_AsLong(value) );
00631 else
00632 return Event_conversion_error(t.term_id, "int or long required");
00633 break;
00634 case Vocabulary::TYPE_UINT32:
00635 if (PyLong_Check(value))
00636 e.setUInt(term_ref, PyLong_AsUnsignedLong(value) );
00637 else if (PyInt_Check(value))
00638 e.setUInt(term_ref, PyInt_AsUnsignedLongMask(value) );
00639 else
00640 return Event_conversion_error(t.term_id, "int or long required");
00641 break;
00642 case Vocabulary::TYPE_INT64:
00643 if (PyLong_Check(value))
00644 e.setBigInt(term_ref, PyLong_AsLongLong(value) );
00645 else if (PyInt_Check(value))
00646 e.setBigInt(term_ref, PyInt_AsLong(value) );
00647 else
00648 return Event_conversion_error(t.term_id, "int or long required");
00649 break;
00650 case Vocabulary::TYPE_UINT64:
00651 if (PyLong_Check(value))
00652 e.setUBigInt(term_ref, PyLong_AsUnsignedLongLong(value) );
00653 else if (PyInt_Check(value))
00654 e.setUBigInt(term_ref, PyInt_AsUnsignedLongLongMask(value) );
00655 else
00656 return Event_conversion_error(t.term_id, "int or long required");
00657 break;
00658 case Vocabulary::TYPE_FLOAT:
00659 if (PyFloat_Check(value))
00660 e.setFloat(term_ref, PyFloat_AsDouble(value) );
00661 else if (PyInt_Check(value))
00662 e.setFloat(term_ref, PyInt_AsLong(value) );
00663 else
00664 return Event_conversion_error(t.term_id, "int or float required");
00665 break;
00666 case Vocabulary::TYPE_DOUBLE:
00667 if (PyFloat_Check(value))
00668 e.setDouble(term_ref, PyFloat_AsDouble(value) );
00669 else if (PyInt_Check(value))
00670 e.setDouble(term_ref, PyInt_AsLong(value) );
00671 else
00672 return Event_conversion_error(t.term_id, "int or float required");
00673 break;
00674 case Vocabulary::TYPE_LONG_DOUBLE:
00675 if (PyFloat_Check(value))
00676 e.setLongDouble(term_ref, PyFloat_AsDouble(value) );
00677 else if (PyInt_Check(value))
00678 e.setLongDouble(term_ref, PyInt_AsLong(value) );
00679 else
00680 return Event_conversion_error(t.term_id, "int or float required");
00681 break;
00682 case Vocabulary::TYPE_SHORT_STRING:
00683 case Vocabulary::TYPE_STRING:
00684 case Vocabulary::TYPE_LONG_STRING:
00685 case Vocabulary::TYPE_CHAR:
00686 case Vocabulary::TYPE_BLOB:
00687 case Vocabulary::TYPE_ZBLOB:
00688 if (PyString_Check(value)) {
00689 char *buf = PyString_AsString(value);
00690 Py_ssize_t len = PyString_Size(value);
00691 e.setString(term_ref, buf, len);
00692 } else {
00693 return Event_conversion_error(t.term_id, "str required");
00694 }
00695 break;
00696 case Vocabulary::TYPE_DATE_TIME:
00697 {
00698 if (! PyDateTime_Check(value))
00699 return Event_conversion_error(t.term_id, "datetime required");
00700 boost::gregorian::date date(PyDateTime_GET_YEAR(value),
00701 PyDateTime_GET_MONTH(value), PyDateTime_GET_DAY(value));
00702 boost::posix_time::time_duration time(PyDateTime_DATE_GET_HOUR(value),
00703 PyDateTime_DATE_GET_MINUTE(value), PyDateTime_DATE_GET_SECOND(value),
00704 PythonReactor::boost_msec_to_fsec(PyDateTime_DATE_GET_MICROSECOND(value)) );
00705 PionDateTime d(date, time);
00706 e.setDateTime(term_ref, d);
00707 break;
00708 }
00709 case Vocabulary::TYPE_DATE:
00710 {
00711 if (PyDateTime_Check(value)) {
00712 boost::gregorian::date date(PyDateTime_GET_YEAR(value),
00713 PyDateTime_GET_MONTH(value), PyDateTime_GET_DAY(value));
00714 boost::posix_time::time_duration time(PyDateTime_DATE_GET_HOUR(value),
00715 PyDateTime_DATE_GET_MINUTE(value), PyDateTime_DATE_GET_SECOND(value),
00716 PythonReactor::boost_msec_to_fsec(PyDateTime_DATE_GET_MICROSECOND(value)) );
00717 PionDateTime d(date, time);
00718 e.setDateTime(term_ref, d);
00719 } else if (PyDate_Check(value)) {
00720 boost::gregorian::date date(PyDateTime_GET_YEAR(value),
00721 PyDateTime_GET_MONTH(value), PyDateTime_GET_DAY(value));
00722 PionDateTime d(date);
00723 e.setDateTime(term_ref, d);
00724 } else {
00725 return Event_conversion_error(t.term_id, "date or datetime required");
00726 }
00727 break;
00728 }
00729 case Vocabulary::TYPE_TIME:
00730 {
00731 if (PyDateTime_Check(value)) {
00732 boost::gregorian::date date(PyDateTime_GET_YEAR(value),
00733 PyDateTime_GET_MONTH(value), PyDateTime_GET_DAY(value));
00734 boost::posix_time::time_duration time(PyDateTime_DATE_GET_HOUR(value),
00735 PyDateTime_DATE_GET_MINUTE(value), PyDateTime_DATE_GET_SECOND(value),
00736 PythonReactor::boost_msec_to_fsec(PyDateTime_DATE_GET_MICROSECOND(value)) );
00737 PionDateTime d(date, time);
00738 e.setDateTime(term_ref, d);
00739 } else if (PyTime_Check(value)) {
00740 boost::gregorian::date date(1970, 1, 1);
00741 boost::posix_time::time_duration time(PyDateTime_TIME_GET_HOUR(value),
00742 PyDateTime_TIME_GET_MINUTE(value), PyDateTime_TIME_GET_SECOND(value),
00743 PythonReactor::boost_msec_to_fsec(PyDateTime_TIME_GET_MICROSECOND(value)) );
00744 PionDateTime d(date, time);
00745 e.setDateTime(term_ref, d);
00746 } else {
00747 return Event_conversion_error(t.term_id, "time or datetime required");
00748 }
00749 break;
00750 }
00751 }
00752
00753 return 0;
00754 }
00755
00756 static Py_ssize_t
00757 Event_setBase(PythonEventObject *self, PyObject *term, PyObject *obj, bool clear_first)
00758 {
00759 PION_ASSERT(self->event_ptr.get());
00760
00761
00762 if (! self->is_unique) {
00763
00764
00765
00766
00767 EventFactory f;
00768 EventPtr old_ptr(self->event_ptr);
00769 f.create(self->event_ptr, old_ptr->getType());
00770 *self->event_ptr += *old_ptr;
00771
00772 self->is_unique = true;
00773 }
00774
00775
00776 Vocabulary::TermRef term_ref;
00777 if (! Event_getTermRef(self, term, term_ref))
00778 return -1;
00779
00780
00781 if (clear_first)
00782 self->event_ptr->clear(term_ref);
00783
00784
00785 Py_ssize_t retval = 0;
00786 if (PySequence_Check(obj) && !PyString_Check(obj)) {
00787 Py_ssize_t size = PySequence_Size(obj);
00788 if (size > 0) {
00789 for (Py_ssize_t n = 0; n < size; ++n) {
00790 retval = Event_setTerm(self, term_ref, PySequence_GetItem(obj, n));
00791 if (retval != 0)
00792 break;
00793 }
00794 }
00795 } else {
00796 retval = Event_setTerm(self, term_ref, obj);
00797 }
00798
00799 return retval;
00800 }
00801
00802 static Py_ssize_t
00803 Event_setMap(PythonEventObject *self, PyObject *key, PyObject *value)
00804 {
00805 return Event_setBase(self, key, value, true);
00806 }
00807
00808 static PyObject *
00809 Event_setFunc(PythonEventObject *self, PyObject *args)
00810 {
00811
00812 PyObject *term;
00813 PyObject *value;
00814
00815 if (! PyArg_ParseTuple(args, "OO:event.set", &term, &value)) {
00816 PyErr_SetString(PyExc_TypeError, "missing required parameter");
00817 return NULL;
00818 }
00819
00820 PyObject *retval = NULL;
00821 Py_ssize_t result = Event_setBase(self, term, value, false);
00822 if (result == 0) {
00823 Py_INCREF(Py_None);
00824 retval = Py_None;
00825 }
00826
00827 return retval;
00828 }
00829
00830 static PyObject*
00831 Event_clear(PythonEventObject *self, PyObject *args)
00832 {
00833
00834 PyObject *term;
00835
00836 if (! PyArg_ParseTuple(args, "|O:event.clear", &term)) {
00837 PyErr_SetString(PyExc_TypeError, "error parsing arguments");
00838 return NULL;
00839 }
00840
00841 if (term) {
00842 Vocabulary::TermRef term_ref;
00843 if (! Event_getTermRef(self, term, term_ref))
00844 return NULL;
00845 if (self->event_ptr.get())
00846 self->event_ptr->clear(term_ref);
00847 } else {
00848 if (self->event_ptr.get())
00849 self->event_ptr->clear();
00850 }
00851
00852 Py_INCREF(Py_None);
00853 return Py_None;
00854 }
00855
00856 static PyObject*
00857 Event_empty(PythonEventObject *self)
00858 {
00859 PyObject *retval = NULL;
00860
00861 if (self->event_ptr.get() == NULL) {
00862 retval = Py_True;
00863 } else {
00864 retval = (self->event_ptr->empty() ? Py_True : Py_False);
00865 }
00866
00867 Py_INCREF(retval);
00868 return retval;
00869 }
00870
00871 static PyObject*
00872 Event_copy(PythonEventObject *self)
00873 {
00874
00875
00876
00877 EventFactory f;
00878 EventPtr new_ptr;
00879 f.create(new_ptr, self->event_ptr->getType());
00880 *new_ptr += *self->event_ptr;
00881
00882 return Event_create((PyObject*)self->reactor_ptr, new_ptr, true);
00883 }
00884
00885 static PyObject*
00886 Event_has_key(PythonEventObject *self, PyObject *args)
00887 {
00888
00889 PyObject *term;
00890
00891 if (! PyArg_ParseTuple(args, "O:event.has_key", &term)) {
00892 PyErr_SetString(PyExc_TypeError, "error parsing arguments");
00893 return NULL;
00894 }
00895
00896
00897 Vocabulary::TermRef term_ref;
00898 if (! Event_getTermRef(self, term, term_ref))
00899 return NULL;
00900
00901 PyObject *retval = Py_False;
00902 if (self->event_ptr.get()) {
00903 if (self->event_ptr->isDefined(term_ref))
00904 retval = Py_True;
00905 }
00906
00907 Py_INCREF(retval);
00908 return retval;
00909 }
00910
00911 static PyObject*
00912 Event_getReactor(PythonEventObject *self, void *closure)
00913 {
00914 PION_ASSERT(self->reactor_ptr);
00915 Py_INCREF(self->reactor_ptr);
00916 return (PyObject*) self->reactor_ptr;
00917 }
00918
00919 static PyObject *
00920 Event_getType(PythonEventObject *self, void *closure)
00921 {
00922 Event::EventType event_type = Vocabulary::UNDEFINED_TERM_REF;
00923 if (self->event_ptr.get() != NULL)
00924 event_type = self->event_ptr->getType();
00925 return PyLong_FromUnsignedLong(event_type);
00926 }
00927
00928 static PyObject *
00929 Event_getTypeString(PythonEventObject *self, void *closure)
00930 {
00931 Event::EventType event_type = Vocabulary::UNDEFINED_TERM_REF;
00932 if (self->event_ptr.get() != NULL)
00933 event_type = self->event_ptr->getType();
00934 return PyString_FromString(Event_getVocabulary(self)[event_type].term_id.c_str());
00935 }
00936
00937 static Py_ssize_t
00938 Event_size(PyObject *obj)
00939 {
00940
00941 PyErr_SetString(PyExc_NotImplementedError, "len(pion.event) not implemented");
00942 return -1;
00943 }
00944
00945 static PyMappingMethods Event_map_methods = {
00946 (lenfunc)Event_size,
00947 (binaryfunc)Event_getMap,
00948 (objobjargproc)Event_setMap,
00949 };
00950
00951 static PyMethodDef Event_methods[] = {
00952 {(char*)"getlist", (PyCFunction)Event_getlist, METH_VARARGS,
00953 (char*)"Returns a list of all values for a given term."},
00954 {(char*)"get", (PyCFunction)Event_getFunc, METH_VARARGS,
00955 (char*)"Returns the value of an Event parameter."},
00956 {(char*)"set", (PyCFunction)Event_setFunc, METH_VARARGS,
00957 (char*)"Sets the value of an Event parameter."},
00958 {(char*)"clear", (PyCFunction)Event_clear, METH_VARARGS,
00959 (char*)"Clears all items from the Event."},
00960 {(char*)"empty", (PyCFunction)Event_empty, METH_NOARGS,
00961 (char*)"Checks to see if the Event contains zero items."},
00962 {(char*)"copy", (PyCFunction)Event_copy, METH_NOARGS,
00963 (char*)"Creates and returns a unique copy of the event."},
00964 {(char*)"has_key", (PyCFunction)Event_has_key, METH_VARARGS,
00965 (char*)"Returns True if the event has one or more values for a given term."},
00966 {NULL}
00967 };
00968
00969 static PyMemberDef Event_members[] = {
00970 {NULL}
00971 };
00972
00973 static PyGetSetDef Event_getseters[] = {
00974 {(char*)"type",
00975 (getter)Event_getType, NULL,
00976 (char*)"numeric identifier for the type of event",
00977 NULL},
00978 {(char*)"typestr",
00979 (getter)Event_getTypeString, NULL,
00980 (char*)"string identifier for the type of event",
00981 NULL},
00982 {(char*)"reactor",
00983 (getter)Event_getReactor, NULL,
00984 (char*)"reactor associated with this event",
00985 NULL},
00986 {NULL}
00987 };
00988
00989 static PyTypeObject PythonEventType = {
00990 PyObject_HEAD_INIT(NULL)
00991 0,
00992 "pion.event",
00993 sizeof(PythonEventObject),
00994 0,
00995 (destructor)Event_dealloc,
00996 Event_print,
00997 0,
00998 0,
00999 0,
01000 0,
01001 0,
01002 0,
01003 &Event_map_methods,
01004 0,
01005 0,
01006 0,
01007 0,
01008 0,
01009 0,
01010 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
01011 "pion event objects",
01012 0,
01013 0,
01014 0,
01015 0,
01016 0,
01017 0,
01018 Event_methods,
01019 Event_members,
01020 Event_getseters,
01021 0,
01022 0,
01023 0,
01024 0,
01025 0,
01026 (initproc)Event_init,
01027 0,
01028 Event_new,
01029 };
01030
01031 static PyObject *
01032 Event_create(PyObject *reactor_ptr, const EventPtr& event_ptr, bool is_unique)
01033 {
01034 PythonEventObject *self;
01035 self = (PythonEventObject *) PythonEventType.tp_alloc(&PythonEventType, 0);
01036 if (self != NULL) {
01037 self->is_unique = is_unique;
01038 self->event_ptr = event_ptr;
01039 self->reactor_ptr = (PythonReactorObject*) reactor_ptr;
01040 }
01041 return (PyObject*) self;
01042 }
01043
01044
01045
01046
01047 typedef struct {
01048 PyObject_HEAD
01049 PyObject *id;
01050 PyObject *dict;
01051 } PythonSessionObject;
01052
01053 static void
01054 Session_dealloc(PythonSessionObject* self)
01055 {
01056 Py_XDECREF(self->id);
01057 Py_XDECREF(self->dict);
01058 self->ob_type->tp_free((PyObject*)self);
01059 }
01060
01061 static PyObject *
01062 Session_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
01063 {
01064 PythonSessionObject *self = (PythonSessionObject *)type->tp_alloc(type, 0);
01065 if (self != NULL) {
01066 self->id = PyString_FromString("");
01067 if (self->id == NULL) {
01068 Py_DECREF(self);
01069 return NULL;
01070 }
01071 self->dict = PyDict_New();
01072 if (self->dict == NULL) {
01073 Py_DECREF(self->id);
01074 Py_DECREF(self);
01075 return NULL;
01076 }
01077 }
01078 return (PyObject *)self;
01079 }
01080
01081 static int
01082 Session_init(PythonSessionObject *self, PyObject *args, PyObject *kwds)
01083 {
01084 static char *kwlist[] = {(char*)"session_id", NULL};
01085 PyObject *session_id=NULL;
01086 PyObject *tmp=NULL;
01087
01088 if (! PyArg_ParseTupleAndKeywords(args, kwds, "O:pion.session", kwlist, &session_id))
01089 return -1;
01090
01091 if (session_id) {
01092 tmp = self->id;
01093 Py_INCREF(session_id);
01094 self->id = session_id;
01095 Py_XDECREF(tmp);
01096 }
01097
01098 return 0;
01099 }
01100
01101 static PyObject*
01102 Session_GetAttr(PyObject *obj, PyObject *attr_name)
01103 {
01104 PyObject *retval = PyObject_GenericGetAttr(obj, attr_name);
01105
01106 if (retval == NULL) {
01107 PyErr_Clear();
01108 PythonSessionObject *self = (PythonSessionObject*) obj;
01109 char *attr_str = PyString_AsString(attr_name);
01110 if (strcmp(attr_str, "id") == 0) {
01111 retval = self->id;
01112 Py_INCREF(retval);
01113 } else {
01114
01115 retval = PyDict_GetItem(self->dict, attr_name);
01116 if (retval == NULL) {
01117 (void)PyErr_Format(PyExc_AttributeError, "'pion.session' object has no attribute '%s'", attr_str);
01118 } else {
01119 Py_INCREF(retval);
01120 }
01121 }
01122 }
01123
01124 return retval;
01125 }
01126
01127 static int
01128 Session_SetAttr(PyObject *obj, PyObject *attr_name, PyObject *value)
01129 {
01130 int retval = -1;
01131 PythonSessionObject *self = (PythonSessionObject*) obj;
01132 char *attr_str = PyString_AsString(attr_name);
01133
01134 if (strcmp(attr_str, "id") == 0) {
01135 (void)PyErr_Format(PyExc_AttributeError, "Read-only attribute: %s", attr_str);
01136 retval = -1;
01137 } else if (value == NULL) {
01138 retval = PyDict_DelItem(self->dict, attr_name);
01139 } else {
01140 retval = PyDict_SetItem(self->dict, attr_name, value);
01141 }
01142
01143 return retval;
01144 }
01145
01146 static PyTypeObject PythonSessionType = {
01147 PyObject_HEAD_INIT(NULL)
01148 0,
01149 "pion.session",
01150 sizeof(PythonSessionObject),
01151 0,
01152 (destructor)Session_dealloc,
01153 0,
01154 0,
01155 0,
01156 0,
01157 0,
01158 0,
01159 0,
01160 0,
01161 0,
01162 0,
01163 0,
01164 Session_GetAttr,
01165 Session_SetAttr,
01166 0,
01167 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
01168 "pion session objects",
01169 0,
01170 0,
01171 0,
01172 0,
01173 0,
01174 0,
01175 0,
01176 0,
01177 0,
01178 0,
01179 0,
01180 0,
01181 0,
01182 0,
01183 (initproc)Session_init,
01184 0,
01185 Session_new,
01186 };
01187
01188 static PythonSessionObject *
01189 Session_create(const Event::BlobType& session_id)
01190 {
01191 PythonSessionObject *self;
01192 self = (PythonSessionObject *) PythonSessionType.tp_alloc(&PythonSessionType, 0);
01193 if (self != NULL) {
01194 self->id = PyString_FromString(session_id.get());
01195 self->dict = PyDict_New();
01196 }
01197 return self;
01198 }
01199
01200
01201 static PyMethodDef PionPythonCallbackMethods[] = {
01202 {NULL, NULL, 0, NULL}
01203 };
01204
01205
01206
01207
01208 const string PythonReactor::START_FUNCTION_NAME = "start";
01209 const string PythonReactor::STOP_FUNCTION_NAME = "stop";
01210 const string PythonReactor::PROCESS_FUNCTION_NAME = "process";
01211 const string PythonReactor::FILENAME_ELEMENT_NAME = "Filename";
01212 const string PythonReactor::PYTHON_SOURCE_ELEMENT_NAME = "PythonSource";
01213 const string PythonReactor::OPEN_SESSIONS_ELEMENT_NAME = "OpenSessions";
01214 const string PythonReactor::VOCAB_CLICKSTREAM_SESSION_EVENT="urn:vocab:clickstream#session-event";
01215 const string PythonReactor::VOCAB_CLICKSTREAM_SESSION_ID="urn:vocab:clickstream#session-id";
01216 boost::mutex PythonReactor::m_init_mutex;
01217 boost::uint32_t PythonReactor::m_init_num = 0;
01218 PyInterpreterState * PythonReactor::m_interp_ptr = NULL;
01219 boost::thread_specific_ptr<PyThreadState> * PythonReactor::m_state_ptr = NULL;
01220
01221
01222
01223
01224 PythonReactor::PythonReactor(void)
01225 : Reactor(TYPE_PROCESSING),
01226 m_logger(PION_GET_LOGGER("pion.PythonReactor")),
01227 m_byte_code(NULL), m_module(NULL),
01228 m_start_func(NULL), m_stop_func(NULL), m_process_func(NULL),
01229 m_reactor_ptr(NULL),
01230 m_session_event_term_ref(Vocabulary::UNDEFINED_TERM_REF),
01231 m_session_id_term_ref(Vocabulary::UNDEFINED_TERM_REF)
01232 {
01233 boost::mutex::scoped_lock init_lock(m_init_mutex);
01234 if (++m_init_num == 1) {
01235 PION_LOG_DEBUG(m_logger, "Initializing Python interpreter");
01236
01237 m_state_ptr = new boost::thread_specific_ptr<PyThreadState>(&PythonReactor::releaseThreadState);
01238
01239 Py_OptimizeFlag = 2;
01240
01241 Py_Initialize();
01242
01243 PyObject *m = Py_InitModule("pion", PionPythonCallbackMethods);
01244 if (PyType_Ready(&PythonReactorType) < 0) {
01245 PION_LOG_ERROR(m_logger, "Error initializing pion.reactor data type");
01246 } else {
01247 Py_INCREF(&PythonReactorType);
01248 PyModule_AddObject(m, "reactor", (PyObject*) &PythonReactorType);
01249 }
01250
01251 if (PyType_Ready(&PythonEventType) < 0) {
01252 PION_LOG_ERROR(m_logger, "Error initializing pion.event data type");
01253 } else {
01254 Py_INCREF(&PythonEventType);
01255 PyModule_AddObject(m, "event", (PyObject*) &PythonEventType);
01256 }
01257
01258 if (PyType_Ready(&PythonSessionType) < 0) {
01259 PION_LOG_ERROR(m_logger, "Error initializing pion.session data type");
01260 } else {
01261 Py_INCREF(&PythonSessionType);
01262 PyModule_AddObject(m, "session", (PyObject*) &PythonSessionType);
01263 }
01264
01265 PyEval_InitThreads();
01266
01267 PyThreadState *thr_state_ptr = PyThreadState_Get();
01268 m_interp_ptr = thr_state_ptr->interp;
01269 PION_ASSERT(m_interp_ptr);
01270
01271 PyEval_ReleaseThread(thr_state_ptr);
01272
01273 m_state_ptr->reset(thr_state_ptr);
01274 }
01275 }
01276
01277 PythonReactor::~PythonReactor()
01278 {
01279 stop();
01280
01281
01282 PyThreadState *thr_state_ptr = PythonReactor::initThreadState();
01283 PyEval_AcquireThread(thr_state_ptr);
01284
01285 try {
01286
01287 Py_XDECREF(m_reactor_ptr);
01288 resetPythonSymbols();
01289
01290 boost::mutex::scoped_lock init_lock(m_init_mutex);
01291 if (--m_init_num == 0) {
01292
01293 PION_LOG_DEBUG(m_logger, "Releasing Python thread states");
01294
01295
01296 m_state_ptr->release();
01297
01298
01299 delete m_state_ptr;
01300 m_state_ptr = NULL;
01301 m_interp_ptr = NULL;
01302
01303 PION_LOG_DEBUG(m_logger, "Shutting down Python interpreter");
01304 Py_Finalize();
01305 } else {
01306 PyEval_ReleaseThread(thr_state_ptr);
01307 }
01308 } catch (std::exception& e) {
01309 PyEval_ReleaseThread(thr_state_ptr);
01310 PION_LOG_ERROR(m_logger, e.what());
01311 }
01312 }
01313
01314 void PythonReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
01315 {
01316
01317 ConfigWriteLock cfg_lock(*this);
01318 Reactor::setConfig(v, config_ptr);
01319
01320
01321 m_source.clear();
01322 ConfigManager::getConfigOption(PYTHON_SOURCE_ELEMENT_NAME, m_source, config_ptr);
01323
01324
01325 m_source_file.clear();
01326 if (ConfigManager::getConfigOption(FILENAME_ELEMENT_NAME, m_source_file, config_ptr)) {
01327 PION_LOG_DEBUG(m_logger, "Loading Python source code from: " << m_source_file);
01328 m_source = getSourceCodeFromFile();
01329 }
01330
01331
01332 m_vocab_ptr.reset(new Vocabulary(v));
01333 updateTerms(*m_vocab_ptr);
01334
01335
01336 PythonLock py_lock;
01337
01338
01339
01340 if (isRunning())
01341 callPythonStop();
01342
01343
01344 Py_XDECREF(m_reactor_ptr);
01345 m_reactor_ptr = (PyObject*) Reactor_create(this);
01346 if (m_reactor_ptr == NULL)
01347 throw InitReactorObjectException(getPythonError());
01348
01349
01350 compilePythonSource();
01351
01352
01353 if (isRunning()) {
01354 initPythonModule();
01355 callPythonStart();
01356 }
01357 }
01358
01359 void PythonReactor::updateVocabulary(const Vocabulary& v)
01360 {
01361 ConfigWriteLock cfg_lock(*this);
01362 Reactor::updateVocabulary(v);
01363 m_vocab_ptr.reset(new Vocabulary(v));
01364 updateTerms(*m_vocab_ptr);
01365 }
01366
01367 void PythonReactor::updateTerms(const Vocabulary& v)
01368 {
01369 m_session_event_term_ref = v.findTerm(VOCAB_CLICKSTREAM_SESSION_EVENT);
01370 if (m_session_event_term_ref == Vocabulary::UNDEFINED_TERM_REF)
01371 throw UnknownTermException(VOCAB_CLICKSTREAM_SESSION_EVENT);
01372
01373 m_session_id_term_ref = v.findTerm(VOCAB_CLICKSTREAM_SESSION_ID);
01374 if (m_session_id_term_ref == Vocabulary::UNDEFINED_TERM_REF)
01375 throw UnknownTermException(VOCAB_CLICKSTREAM_SESSION_ID);
01376 }
01377
01378 void PythonReactor::start(void)
01379 {
01380 ConfigWriteLock cfg_lock(*this);
01381 if (! m_is_running) {
01382 PION_LOG_DEBUG(m_logger, "Starting reactor: " << getId());
01383
01384
01385 PythonLock py_lock;
01386
01387 if (! m_source_file.empty()) {
01388
01389 string src_code = getSourceCodeFromFile();
01390 if (src_code != m_source) {
01391 PION_LOG_DEBUG(m_logger, "Reloading Python source code from: " << m_source_file);
01392 m_source = src_code;
01393 compilePythonSource();
01394 }
01395 }
01396
01397
01398 initPythonModule();
01399 callPythonStart();
01400 m_is_running = true;
01401 }
01402 }
01403
01404 void PythonReactor::stop(void)
01405 {
01406 ConfigWriteLock cfg_lock(*this);
01407 if (m_is_running) {
01408 PION_LOG_DEBUG(m_logger, "Stopping reactor: " << getId());
01409
01410
01411 PythonLock py_lock;
01412
01413
01414 callPythonStop();
01415
01416
01417 flushSessions();
01418
01419
01420 Py_XDECREF(m_start_func);
01421 Py_XDECREF(m_stop_func);
01422 Py_XDECREF(m_process_func);
01423 Py_XDECREF(m_module);
01424
01425 m_start_func = m_stop_func = m_process_func = m_module = NULL;
01426
01427 m_is_running = false;
01428 }
01429 }
01430
01431 void PythonReactor::process(const EventPtr& e)
01432 {
01433 if (m_process_func) {
01434
01435
01436 PythonLock py_lock;
01437
01438
01439 PyObject *py_event = NULL;
01440 try {
01441 py_event = Event_create(m_reactor_ptr, e, false);
01442 } catch (...) {
01443 if (PyErr_Occurred())
01444 PION_LOG_ERROR(m_logger, getPythonError());
01445 throw;
01446 }
01447
01448
01449
01450
01451
01452 PyObject *python_args = PyTuple_New(2);
01453 if (! python_args) {
01454 Py_DECREF(py_event);
01455 throw InternalPythonException(getId());
01456 }
01457 Py_INCREF(m_reactor_ptr);
01458 PyTuple_SetItem(python_args, 0, m_reactor_ptr);
01459 PyTuple_SetItem(python_args, 1, py_event);
01460
01461
01462 PION_LOG_DEBUG(m_logger, "Calling Python process() function");
01463 PyObject *retval = PyObject_CallObject(m_process_func, python_args);
01464
01465
01466 if (retval == NULL && PyErr_Occurred()) {
01467 Py_DECREF(python_args);
01468 PION_LOG_ERROR(m_logger, "in process(): " << getPythonError());
01469 }
01470 Py_DECREF(python_args);
01471 Py_XDECREF(retval);
01472
01473 } else {
01474
01475 PION_LOG_DEBUG(m_logger, "Delivering pion event to connections");
01476 deliverEvent(e);
01477 }
01478
01479
01480 if (e->getType() == m_session_event_term_ref) {
01481
01482 const Event::ParameterValue *param_ptr = e->getPointer(m_session_id_term_ref);
01483 if (param_ptr != NULL) {
01484 const Event::BlobType session_id(boost::get<const Event::BlobType&>(*param_ptr));
01485 if (! session_id.empty()) {
01486 boost::mutex::scoped_lock sessions_lock(m_sessions_mutex);
01487 SessionMap::iterator it = m_sessions.find(session_id);
01488 if (it != m_sessions.end()) {
01489 Py_XDECREF(it->second);
01490 m_sessions.erase(it);
01491 PION_LOG_DEBUG(m_logger, "Removed completed session: " << session_id.get());
01492 }
01493 }
01494 }
01495 }
01496 }
01497
01498 void PythonReactor::query(std::ostream& out, const QueryBranches& branches,
01499 const QueryParams& qp)
01500 {
01501
01502 writeBeginReactorXML(out);
01503 writeStatsOnlyXML(out);
01504
01505
01506 out << '<' << OPEN_SESSIONS_ELEMENT_NAME << '>' << getNumSessions()
01507 << "</" << OPEN_SESSIONS_ELEMENT_NAME << '>' << std::endl;
01508
01509
01510 writeEndReactorXML(out);
01511 }
01512
01513 bool PythonReactor::deliverToConnections(PyObject *event_ptr)
01514 {
01515
01516
01517
01518 PION_LOG_DEBUG(m_logger, "Delivering python event to connections");
01519
01520
01521 if (! PyObject_IsInstance(event_ptr, (PyObject*) &PythonEventType)) {
01522 PyErr_SetString(PyExc_TypeError, "parameter must be a pion.event");
01523 return false;
01524 }
01525
01526
01527 try {
01528 PythonEventObject *event_obj_ptr = (PythonEventObject*) event_ptr;
01529
01530 event_obj_ptr->is_unique = false;
01531
01532 PythonLock py_lock(true);
01533 deliverEvent(event_obj_ptr->event_ptr);
01534 } catch (std::exception& e) {
01535 std::string error_msg(e.what());
01536 if (PyErr_Occurred()) {
01537 error_msg += " (";
01538 error_msg += getPythonError();
01539 error_msg += ')';
01540 }
01541 PION_LOG_ERROR(m_logger, error_msg);
01542 } catch (...) {
01543 if (PyErr_Occurred())
01544 PION_LOG_ERROR(m_logger, getPythonError());
01545 else
01546 PION_LOG_ERROR(m_logger, "caught unrecognized exception");
01547 }
01548
01549 return true;
01550 }
01551
01552 PyObject *PythonReactor::getSession(PyObject *event_ptr)
01553 {
01554
01555 if (! PyObject_IsInstance(event_ptr, (PyObject*) &PythonEventType)) {
01556 PyErr_SetString(PyExc_TypeError, "parameter must be a pion.event");
01557 return NULL;
01558 }
01559
01560
01561 PythonEventObject *event_obj_ptr = (PythonEventObject*) event_ptr;
01562 PION_ASSERT(event_obj_ptr->event_ptr.get());
01563 const Event& e = *(event_obj_ptr->event_ptr);
01564 const Event::ParameterValue *param_ptr = e.getPointer(m_session_id_term_ref);
01565 if (param_ptr == NULL) {
01566 PyErr_SetString(PyExc_TypeError, "event is missing session identifier");
01567 return NULL;
01568 }
01569 const Event::BlobType session_id(boost::get<const Event::BlobType&>(*param_ptr));
01570 if (session_id.empty()) {
01571 PyErr_SetString(PyExc_TypeError, "event has empty session identifier");
01572 return NULL;
01573 }
01574
01575
01576 PyObject *retval = NULL;
01577 boost::mutex::scoped_lock sessions_lock(m_sessions_mutex);
01578 SessionMap::iterator it = m_sessions.find(session_id);
01579 if (it == m_sessions.end()) {
01580 retval = (PyObject*) Session_create(session_id);
01581 m_sessions.insert(std::make_pair(session_id, retval));
01582 PION_LOG_DEBUG(m_logger, "Created new session object for " << session_id.get());
01583 } else {
01584 retval = it->second;
01585 PION_LOG_DEBUG(m_logger, "Using existing session object for " << session_id.get());
01586 }
01587
01588 Py_XINCREF(retval);
01589 return retval;
01590 }
01591
01592 std::size_t PythonReactor::getNumSessions(void) const
01593 {
01594 boost::mutex::scoped_lock sessions_lock(m_sessions_mutex);
01595 return m_sessions.size();
01596 }
01597
01598 void PythonReactor::flushSessions(void)
01599 {
01600 boost::mutex::scoped_lock sessions_lock(m_sessions_mutex);
01601 size_t num_sessions = m_sessions.size();
01602
01603 if (num_sessions > 0) {
01604 for (SessionMap::const_iterator it = m_sessions.begin();
01605 it != m_sessions.end(); ++it)
01606 {
01607 Py_XDECREF(it->second);
01608 }
01609 m_sessions.clear();
01610 PION_LOG_DEBUG(m_logger, "Flushing " << num_sessions << " session objects");
01611 }
01612 }
01613
01614 PyThreadState *PythonReactor::initThreadState(void)
01615 {
01616
01617 PyThreadState *thr_state_ptr = m_state_ptr->get();
01618 if (thr_state_ptr == NULL) {
01619
01620 thr_state_ptr = PyThreadState_New(m_interp_ptr);
01621 m_state_ptr->reset(thr_state_ptr);
01622 }
01623 return thr_state_ptr;
01624 }
01625
01626 void PythonReactor::releaseThreadState(PyThreadState *ptr)
01627 {
01628 PyThreadState_Clear(ptr);
01629 PyThreadState_Delete(ptr);
01630 }
01631
01632 PyObject *PythonReactor::findPythonFunction(PyObject *module_ptr, const std::string& func_name)
01633 {
01634 PyObject *func_ptr = PyObject_GetAttrString(module_ptr, const_cast<char*>(func_name.c_str()));
01635 if (func_ptr) {
01636 if (! PyCallable_Check(func_ptr)) {
01637 Py_DECREF(func_ptr);
01638 throw NotCallableException(func_name);
01639 }
01640 PION_LOG_DEBUG(m_logger, "Found " << func_name << "() function");
01641 } else {
01642 PyErr_Clear();
01643 PION_LOG_DEBUG(m_logger, "Unable to find " << func_name << "() function");
01644 }
01645 return func_ptr;
01646 }
01647
01648 void PythonReactor::resetPythonSymbols(void)
01649 {
01650
01651 PION_LOG_DEBUG(m_logger, "Resetting Python symbols");
01652 Py_XDECREF(m_start_func);
01653 Py_XDECREF(m_stop_func);
01654 Py_XDECREF(m_process_func);
01655 Py_XDECREF(m_module);
01656 Py_XDECREF(m_byte_code);
01657 m_start_func = m_stop_func = m_process_func = m_module = m_byte_code = NULL;
01658 }
01659
01660 void PythonReactor::compilePythonSource(void)
01661 {
01662
01663
01664
01665 resetPythonSymbols();
01666
01667 if (! m_source.empty()) {
01668
01669 const char * const py_path_ptr = Py_GetPath();
01670 std::string py_path_str(getReactionEngine().resolveRelativePath("pymodules"));
01671 if (py_path_ptr == NULL || strstr(py_path_ptr, py_path_str.c_str()) == NULL) {
01672 if (py_path_ptr && *py_path_ptr != '\0') {
01673 #ifdef _MSC_VER
01674 py_path_str += ';';
01675 #else
01676 py_path_str += ':';
01677 #endif
01678 py_path_str += py_path_ptr;
01679 }
01680 PySys_SetPath(const_cast<char*>(py_path_str.c_str()));
01681 }
01682
01683 PION_LOG_DEBUG(m_logger, "Compiling Python source code");
01684 m_byte_code = Py_CompileString(m_source.c_str(), m_source_file.c_str(), Py_file_input);
01685 if (m_byte_code == NULL) {
01686 throw FailedToCompileException(getPythonError());
01687 }
01688 }
01689 }
01690
01691 void PythonReactor::initPythonModule(void)
01692 {
01693
01694
01695 if (m_byte_code) {
01696 PION_LOG_DEBUG(m_logger, "Initializing Python module");
01697
01698 std::string modname = "pion." + getId();
01699
01700 m_module = PyImport_ExecCodeModule(const_cast<char*>(modname.c_str()), m_byte_code);
01701 if (m_module == NULL) {
01702 Py_DECREF(m_byte_code);
01703 m_byte_code = NULL;
01704 throw FailedToCompileException(getPythonError());
01705 }
01706
01707
01708 m_start_func = findPythonFunction(m_module, START_FUNCTION_NAME);
01709
01710
01711 m_stop_func = findPythonFunction(m_module, STOP_FUNCTION_NAME);
01712
01713
01714 m_process_func = findPythonFunction(m_module, PROCESS_FUNCTION_NAME);
01715 }
01716 }
01717
01718 void PythonReactor::callPythonStart(void)
01719 {
01720
01721 if (m_start_func) {
01722
01723 PION_LOG_DEBUG(m_logger, "Calling Python start() function");
01724 PyObject *python_args = PyTuple_New(1);
01725 if (! python_args)
01726 throw InternalPythonException(getId());
01727 Py_INCREF(m_reactor_ptr);
01728 PyTuple_SetItem(python_args, 0, m_reactor_ptr);
01729 PyObject *retval = PyObject_CallObject(m_start_func, python_args);
01730 Py_DECREF(python_args);
01731
01732
01733 if (retval == NULL && PyErr_Occurred()) {
01734 PION_LOG_ERROR(m_logger, "in start(): " << getPythonError());
01735 }
01736
01737 Py_XDECREF(retval);
01738 }
01739 }
01740
01741 void PythonReactor::callPythonStop(void)
01742 {
01743
01744 if (m_stop_func) {
01745
01746 PION_LOG_DEBUG(m_logger, "Calling Python stop() function");
01747 PyObject *python_args = PyTuple_New(1);
01748 if (! python_args)
01749 throw InternalPythonException(getId());
01750 Py_INCREF(m_reactor_ptr);
01751 PyTuple_SetItem(python_args, 0, m_reactor_ptr);
01752 PyObject *retval = PyObject_CallObject(m_stop_func, python_args);
01753 Py_DECREF(python_args);
01754
01755
01756 if (retval == NULL && PyErr_Occurred()) {
01757 PION_LOG_ERROR(m_logger, "in stop(): " << getPythonError());
01758 }
01759
01760 Py_XDECREF(retval);
01761 }
01762 }
01763
01764 std::string PythonReactor::getSourceCodeFromFile(void)
01765 {
01766
01767 string src_file = getReactionEngine().resolveRelativePath(m_source_file);
01768 if (! boost::filesystem::exists(src_file) )
01769 throw SourceFileNotFoundException(m_source_file);
01770
01771
01772 ifstream src_stream(src_file.c_str(), ios::in);
01773 if (! src_stream.is_open())
01774 throw ReadSourceFileException(m_source_file);
01775
01776
01777 ostringstream str_stream;
01778 try {
01779 str_stream << src_stream.rdbuf();
01780 src_stream.close();
01781 } catch (...) {
01782 throw ReadSourceFileException(m_source_file);
01783 }
01784
01785 return str_stream.str();
01786 }
01787
01788 std::string PythonReactor::getPythonError(void)
01789 {
01790
01791 std::string error_str;
01792 PyObject *ptype = NULL;
01793 PyObject *pvalue = NULL;
01794 PyObject *ptraceback = NULL;
01795 PyObject *psyntax = NULL;
01796
01797 PyErr_Fetch(&ptype, &pvalue, &ptraceback);
01798
01799 if (ptype) {
01800 PyTypeObject* type_obj = (PyTypeObject*) ptype;
01801 error_str += type_obj->tp_name;
01802 error_str += ": ";
01803 } else {
01804 error_str += "Exception: ";
01805 }
01806
01807 if (pvalue) {
01808 if (PyErr_GivenExceptionMatches(ptype, PyExc_SyntaxError)
01809 && PyTuple_Check(pvalue) && PyTuple_Size(pvalue) >= 2)
01810 {
01811 PyObject *str_obj = PyObject_Str(PyTuple_GetItem(pvalue, 0));
01812 if (str_obj) {
01813 error_str += PyString_AsString(str_obj);
01814 Py_DECREF(str_obj);
01815 }
01816 psyntax = PyTuple_GetItem(pvalue, 1);
01817 } else {
01818 PyObject *str_obj = PyObject_Str(pvalue);
01819 if (str_obj) {
01820 error_str += PyString_AsString(str_obj);
01821 Py_DECREF(str_obj);
01822 }
01823 }
01824 }
01825
01826 if (ptraceback) {
01827 PyTracebackObject* traceback = (PyTracebackObject*)ptraceback;
01828 while (traceback->tb_next != NULL)
01829 traceback = traceback->tb_next;
01830 error_str += " (";
01831 const char *cptr = PyString_AsString(traceback->tb_frame->f_code->co_filename);
01832 if (cptr && *cptr != '\0') {
01833 error_str += cptr;
01834 error_str += " ";
01835 }
01836 error_str += "line ";
01837 error_str += boost::lexical_cast<std::string>(traceback->tb_lineno);
01838 error_str += ")";
01839 } else if (psyntax) {
01840 PyObject *str_obj = PyObject_Str(psyntax);
01841 if (str_obj) {
01842 error_str += " ";
01843 error_str += PyString_AsString(str_obj);
01844 Py_DECREF(str_obj);
01845 }
01846 }
01847
01848 Py_XDECREF(ptype);
01849 Py_XDECREF(pvalue);
01850 Py_XDECREF(ptraceback);
01851
01852 return error_str;
01853 }
01854
01855
01856 }
01857 }
01858
01859
01861 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_PythonReactor(void) {
01862 return new pion::plugins::PythonReactor();
01863 }
01864
01866 extern "C" PION_PLUGIN_API void pion_destroy_PythonReactor(pion::plugins::PythonReactor *reactor_ptr) {
01867 delete reactor_ptr;
01868 }