00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <yajl/yajl_parse.h>
00021 #include <yajl/yajl_gen.h>
00022 #include <pion/platform/ConfigManager.hpp>
00023 #include <pion/platform/Event.hpp>
00024 #include <pion/platform/Vocabulary.hpp>
00025 #include "JSONCodec.hpp"
00026
00027 using namespace pion::platform;
00028
00029
00030 namespace pion {
00031 namespace plugins {
00032
00033
00034
00035 const std::string JSONCodec::CONTENT_TYPE = "text/json";
00036 const std::string JSONCodec::FIELD_ELEMENT_NAME = "Field";
00037 const std::string JSONCodec::TERM_ATTRIBUTE_NAME = "term";
00038 const unsigned int JSONCodec::READ_BUFFER_SIZE = 4096;
00039 std::string JSONCodec::INDENT_STRING = "\t";
00040
00041
00042
00043
00044 CodecPtr JSONCodec::clone(void) const
00045 {
00046 JSONCodec *new_codec(new JSONCodec());
00047 new_codec->copyCodec(*this);
00048 for (CurrentFormat::const_iterator i = m_format.begin(); i != m_format.end(); ++i) {
00049 new_codec->mapFieldToTerm((*i)->field_name, (*i)->term);
00050 }
00051 new_codec->m_JSON_field_ptr_map = m_JSON_field_ptr_map;
00052 new_codec->m_no_events_written = true;
00053 return CodecPtr(new_codec);
00054 }
00055
00056 void JSONCodec::write(std::ostream& out, const Event& e)
00057 {
00058 if (m_no_events_written) {
00059 yajl_gen_config conf;
00060 conf.beautify = 1;
00061 conf.indentString = INDENT_STRING.c_str();
00062 m_yajl_generator = yajl_gen_alloc(&conf, NULL);
00063 yajl_gen_array_open(m_yajl_generator);
00064 m_no_events_written = false;
00065 }
00066
00067
00068 yajl_gen_map_open(m_yajl_generator);
00069
00070
00071 CurrentFormat::const_iterator i;
00072 std::string value_str;
00073 const pion::platform::Event::BlobType* ss;
00074 for (i = m_format.begin(); i != m_format.end(); ++i) {
00075
00076 pion::platform::Vocabulary::TermRef term_ref = (*i)->term.term_ref;
00077 Event::ValuesRange range = e.equal_range(term_ref);
00078
00079
00080 for (Event::ConstIterator i2 = range.first; i2 != range.second; ++i2) {
00081 yajl_gen_string(m_yajl_generator, (unsigned char*)(*i)->field_name.c_str(), (*i)->field_name.size());
00082 switch ((*i)->term.term_type) {
00083 case pion::platform::Vocabulary::TYPE_NULL:
00084 case pion::platform::Vocabulary::TYPE_OBJECT:
00085
00086 break;
00087 case pion::platform::Vocabulary::TYPE_INT8:
00088 case pion::platform::Vocabulary::TYPE_INT16:
00089 case pion::platform::Vocabulary::TYPE_INT32:
00090 yajl_gen_integer(m_yajl_generator, boost::get<boost::int32_t>(i2->value));
00091 break;
00092 case pion::platform::Vocabulary::TYPE_INT64:
00093 value_str = boost::lexical_cast<std::string>(boost::get<boost::int64_t>(i2->value));
00094 yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00095 break;
00096 case pion::platform::Vocabulary::TYPE_UINT8:
00097 case pion::platform::Vocabulary::TYPE_UINT16:
00098 case pion::platform::Vocabulary::TYPE_UINT32:
00099 yajl_gen_integer(m_yajl_generator, boost::get<boost::uint32_t>(i2->value));
00100 break;
00101 case pion::platform::Vocabulary::TYPE_UINT64:
00102 value_str = boost::lexical_cast<std::string>(boost::get<boost::uint64_t>(i2->value));
00103 yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00104 break;
00105 case pion::platform::Vocabulary::TYPE_FLOAT:
00106 yajl_gen_double(m_yajl_generator, boost::get<float>(i2->value));
00107 break;
00108 case pion::platform::Vocabulary::TYPE_DOUBLE:
00109
00110 value_str = boost::lexical_cast<std::string>(boost::get<double>(i2->value));
00111 yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00112 break;
00113 case pion::platform::Vocabulary::TYPE_LONG_DOUBLE:
00114
00115 value_str = boost::lexical_cast<std::string>(boost::get<long double>(i2->value));
00116 yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00117 break;
00118 case pion::platform::Vocabulary::TYPE_SHORT_STRING:
00119 case pion::platform::Vocabulary::TYPE_STRING:
00120 case pion::platform::Vocabulary::TYPE_LONG_STRING:
00121 case pion::platform::Vocabulary::TYPE_BLOB:
00122 case pion::platform::Vocabulary::TYPE_ZBLOB:
00123 ss = &boost::get<const pion::platform::Event::BlobType&>(i2->value);
00124 yajl_gen_string(m_yajl_generator, (unsigned char*)ss->get(), ss->size());
00125 break;
00126 case pion::platform::Vocabulary::TYPE_CHAR:
00127 ss = &boost::get<const pion::platform::Event::BlobType&>(i2->value);
00128 yajl_gen_string(m_yajl_generator, (unsigned char*)ss->get(),
00129 ss->size() < (*i)->term.term_size? ss->size() : (*i)->term.term_size);
00130 break;
00131 case pion::platform::Vocabulary::TYPE_DATE_TIME:
00132 case pion::platform::Vocabulary::TYPE_DATE:
00133 case pion::platform::Vocabulary::TYPE_TIME:
00134 (*i)->time_facet.toString(value_str, boost::get<const PionDateTime&>(i2->value));
00135 yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00136 break;
00137 }
00138 }
00139 }
00140
00141
00142 yajl_gen_map_close(m_yajl_generator);
00143
00144 const unsigned char* buf;
00145 unsigned int len;
00146 yajl_gen_get_buf(m_yajl_generator, &buf, &len);
00147 out.write((char*)buf, len);
00148 yajl_gen_clear(m_yajl_generator);
00149
00150
00151 if (m_flush_after_write)
00152 out.flush();
00153 }
00154
00155 void JSONCodec::finish(std::ostream& out)
00156 {
00157 if (m_no_events_written)
00158 return;
00159
00160
00161 yajl_gen_array_close(m_yajl_generator);
00162 const unsigned char* buf;
00163 unsigned int len;
00164 yajl_gen_get_buf(m_yajl_generator, &buf, &len);
00165 out.write((char*)buf, len);
00166 yajl_gen_clear(m_yajl_generator);
00167
00168
00169 yajl_gen_free(m_yajl_generator);
00170 m_yajl_generator = NULL;
00171 }
00172
00173 int number_handler(void* ctx, const char* number_char_ptr, unsigned int number_len)
00174 {
00175 Context* c = (Context*)ctx;
00176 JSONCodec::JSONObject& json_object = *c->json_object_ptr;
00177 json_object.insert(std::make_pair(c->term_ref, std::string(number_char_ptr, number_len)));
00178
00179 return 1;
00180 }
00181
00182 int string_handler(void* ctx, const unsigned char* string_val, unsigned int string_len)
00183 {
00184 Context* c = (Context*)ctx;
00185 JSONCodec::JSONObject& json_object = *c->json_object_ptr;
00186 json_object.insert(std::make_pair(c->term_ref, std::string((char*)string_val, string_len)));
00187
00188 return 1;
00189 }
00190
00191 int map_key_handler(void* ctx, const unsigned char* stringVal, unsigned int stringLen)
00192 {
00193 Context* c = (Context*)ctx;
00194 JSONCodec::FieldMap::const_iterator i = c->field_map.find(std::string((char*)stringVal, stringLen));
00195 c->term_ref = i->second->term.term_ref;
00196
00197 return 1;
00198 }
00199
00200 int start_map_handler(void* ctx)
00201 {
00202 Context* c = (Context*)ctx;
00203 c->json_object_ptr = JSONCodec::JSONObjectPtr(new JSONCodec::JSONObject);
00204
00205 return 1;
00206 }
00207
00208 int end_map_handler(void* ctx)
00209 {
00210 Context* c = (Context*)ctx;
00211 c->json_object_queue.push(c->json_object_ptr);
00212
00213 return 1;
00214 }
00215
00216 int start_array_handler(void* ctx)
00217 {
00218 Context* c = (Context*)ctx;
00219 c->m_array_started = true;
00220
00221 return 1;
00222 }
00223
00224 int end_array_handler(void* ctx)
00225 {
00226 Context* c = (Context*)ctx;
00227 c->m_array_ended = true;
00228
00229 return 1;
00230 }
00231
00232 static yajl_callbacks callbacks = {
00233 NULL,
00234 NULL,
00235 NULL,
00236 NULL,
00237 number_handler,
00238 string_handler,
00239 start_map_handler,
00240 map_key_handler,
00241 end_map_handler,
00242 start_array_handler,
00243 end_array_handler
00244 };
00245
00246
00247 bool JSONCodec::read(std::istream& in, Event& e)
00248 {
00249 if (e.getType() != getEventType())
00250 throw WrongEventTypeException();
00251
00252 e.clear();
00253
00254 if (m_first_read_attempt) {
00255 yajl_parser_config cfg = { 1, 1 };
00256 m_context = boost::shared_ptr<Context>(new Context(m_field_map, m_json_object_queue));
00257 m_yajl_handle = yajl_alloc(&callbacks, &cfg, NULL, (void*)m_context.get());
00258 m_first_read_attempt = false;
00259 }
00260
00261 if (!m_context->m_array_started) {
00262
00263
00264 streambuf_type* buf_ptr = in.rdbuf();
00265 int_type c = buf_ptr->sgetc();
00266 while (1) {
00267 if (traits_type::eq_int_type(c, traits_type::eof()))
00268 return false;
00269 if (c == ' ' || c == '\t' || c == '\x0A' || c == '\x0D') {
00270 c = buf_ptr->snextc();
00271 } else {
00272 break;
00273 }
00274 }
00275
00276
00277 if (c != '[')
00278 throw PionException("input stream is not a JSON array");
00279
00280
00281
00282 }
00283
00284 while (m_json_object_queue.empty()) {
00285 if (m_context->m_array_ended) {
00286
00287
00288
00289 in.setstate(std::ios::eofbit);
00290
00291 return false;
00292 }
00293
00294
00295 char data[READ_BUFFER_SIZE];
00296 streambuf_type* buf_ptr = in.rdbuf();
00297 char* p = data;
00298 std::streamsize num_bytes_read;
00299 for (num_bytes_read = 0; num_bytes_read < static_cast<std::streamsize>(READ_BUFFER_SIZE); ++num_bytes_read) {
00300 *p = buf_ptr->sbumpc();
00301 if (traits_type::eq_int_type(*p, traits_type::eof()))
00302 break;
00303 ++p;
00304 }
00305
00306 if (num_bytes_read == 0)
00307 return false;
00308
00309 yajl_status stat = yajl_parse(m_yajl_handle, (unsigned char*)data, num_bytes_read);
00310
00311 if (stat == yajl_status_ok) {
00312
00313
00314 PION_ASSERT(m_context->m_array_ended);
00315 } else if (stat == yajl_status_insufficient_data) {
00316
00317
00318 } else {
00319
00320 return false;
00321 }
00322 }
00323
00324 JSONCodec::JSONObjectPtr json_object_ptr = m_json_object_queue.front();
00325 m_json_object_queue.pop();
00326 const JSONCodec::JSONObject& json_object = *json_object_ptr;
00327
00328 for (JSONCodec::JSONObject::const_iterator i = json_object.begin(); i != json_object.end(); ++i) {
00329 const pion::platform::Vocabulary::TermRef term_ref = i->first;
00330 const std::string& value_str = i->second;
00331 const pion::platform::Vocabulary::Term& term = m_JSON_field_ptr_map[term_ref]->term;
00332 switch (term.term_type) {
00333 case pion::platform::Vocabulary::TYPE_NULL:
00334 case pion::platform::Vocabulary::TYPE_OBJECT:
00335
00336 break;
00337 case pion::platform::Vocabulary::TYPE_INT8:
00338 case pion::platform::Vocabulary::TYPE_INT16:
00339 case pion::platform::Vocabulary::TYPE_INT32:
00340 e.setInt(term_ref, boost::lexical_cast<boost::int32_t>(value_str));
00341 break;
00342 case pion::platform::Vocabulary::TYPE_INT64:
00343 e.setBigInt(term_ref, boost::lexical_cast<boost::int64_t>(value_str));
00344 break;
00345 case pion::platform::Vocabulary::TYPE_UINT8:
00346 case pion::platform::Vocabulary::TYPE_UINT16:
00347 case pion::platform::Vocabulary::TYPE_UINT32:
00348 e.setUInt(term_ref, boost::lexical_cast<boost::uint32_t>(value_str));
00349 break;
00350 case pion::platform::Vocabulary::TYPE_UINT64:
00351 e.setUBigInt(term_ref, boost::lexical_cast<boost::uint64_t>(value_str));
00352 break;
00353 case pion::platform::Vocabulary::TYPE_FLOAT:
00354 e.setFloat(term_ref, boost::lexical_cast<float>(value_str));
00355 break;
00356 case pion::platform::Vocabulary::TYPE_DOUBLE:
00357 e.setDouble(term_ref, boost::lexical_cast<double>(value_str));
00358 break;
00359 case pion::platform::Vocabulary::TYPE_LONG_DOUBLE:
00360 e.setLongDouble(term_ref, boost::lexical_cast<long double>(value_str));
00361 break;
00362 case pion::platform::Vocabulary::TYPE_SHORT_STRING:
00363 case pion::platform::Vocabulary::TYPE_STRING:
00364 case pion::platform::Vocabulary::TYPE_LONG_STRING:
00365 case pion::platform::Vocabulary::TYPE_BLOB:
00366 case pion::platform::Vocabulary::TYPE_ZBLOB:
00367 e.setString(term_ref, value_str);
00368 break;
00369 case pion::platform::Vocabulary::TYPE_CHAR:
00370 if (value_str.size() > term.term_size) {
00371 e.setString(term_ref, std::string(value_str, 0, term.term_size));
00372 } else {
00373 e.setString(term_ref, value_str);
00374 }
00375 break;
00376 case pion::platform::Vocabulary::TYPE_DATE_TIME:
00377 case pion::platform::Vocabulary::TYPE_DATE:
00378 case pion::platform::Vocabulary::TYPE_TIME:
00379 {
00380 PionDateTime dt;
00381 m_JSON_field_ptr_map[term_ref]->time_facet.fromString(value_str, dt);
00382 e.setDateTime(term_ref, dt);
00383 break;
00384 }
00385 }
00386 }
00387 return true;
00388 }
00389
00390 void JSONCodec::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00391 {
00392
00393 reset();
00394 Codec::setConfig(v, config_ptr);
00395
00396
00397
00398
00399 xmlNodePtr codec_field_node = config_ptr;
00400 while ( (codec_field_node = ConfigManager::findConfigNodeByName(FIELD_ELEMENT_NAME, codec_field_node)) != NULL) {
00401
00402
00403
00404 xmlChar *xml_char_ptr = xmlNodeGetContent(codec_field_node);
00405 if (xml_char_ptr == NULL || xml_char_ptr[0] == '\0') {
00406 if (xml_char_ptr != NULL)
00407 xmlFree(xml_char_ptr);
00408 throw EmptyFieldException(getId());
00409 }
00410 const std::string field_name(reinterpret_cast<char*>(xml_char_ptr));
00411 xmlFree(xml_char_ptr);
00412
00413
00414 xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(TERM_ATTRIBUTE_NAME.c_str()));
00415 if (xml_char_ptr == NULL || xml_char_ptr[0] == '\0') {
00416 if (xml_char_ptr != NULL)
00417 xmlFree(xml_char_ptr);
00418 throw EmptyTermException(getId());
00419 }
00420 const std::string term_id(reinterpret_cast<char*>(xml_char_ptr));
00421 xmlFree(xml_char_ptr);
00422
00423
00424 const Vocabulary::TermRef term_ref = v.findTerm(term_id);
00425 if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00426 throw UnknownTermException(term_id);
00427
00428
00429 mapFieldToTerm(field_name, v[term_ref]);
00430
00431
00432 codec_field_node = codec_field_node->next;
00433 }
00434
00435 m_JSON_field_ptr_map.clear();
00436 for (FieldMap::const_iterator i = m_field_map.begin(); i != m_field_map.end(); ++i) {
00437 if (m_JSON_field_ptr_map.find(i->second->term.term_ref) != m_JSON_field_ptr_map.end())
00438 throw PionException("Duplicate Field Term");
00439 m_JSON_field_ptr_map[i->second->term.term_ref] = i->second;
00440 }
00441 }
00442
00443 void JSONCodec::updateVocabulary(const Vocabulary& v)
00444 {
00445
00446 Codec::updateVocabulary(v);
00447
00449 for (CurrentFormat::iterator i = m_format.begin(); i != m_format.end(); ++i) {
00450
00451 v.refreshTerm((*i)->term);
00452
00453
00454 switch ((*i)->term.term_type) {
00455 case pion::platform::Vocabulary::TYPE_DATE_TIME:
00456 case pion::platform::Vocabulary::TYPE_DATE:
00457 case pion::platform::Vocabulary::TYPE_TIME:
00458 (*i)->time_facet.setFormat((*i)->term.term_format);
00459 break;
00460 default:
00461 break;
00462 }
00463 }
00464 }
00465
00466
00467 }
00468 }
00469
00470
00472 extern "C" PION_PLUGIN_API pion::platform::Codec *pion_create_JSONCodec(void) {
00473 return new pion::plugins::JSONCodec();
00474 }
00475
00477 extern "C" PION_PLUGIN_API void pion_destroy_JSONCodec(pion::plugins::JSONCodec *codec_ptr) {
00478 delete codec_ptr;
00479 }