platform/codecs/JSONCodec.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <yajl/yajl_parse.h>
00021 #include <yajl/yajl_gen.h>
00022 #include <pion/platform/ConfigManager.hpp>
00023 #include <pion/platform/Event.hpp>
00024 #include <pion/platform/Vocabulary.hpp>
00025 #include "JSONCodec.hpp"
00026 
00027 using namespace pion::platform;
00028 
00029 
00030 namespace pion {        // begin namespace pion
00031 namespace plugins {     // begin namespace plugins
00032 
00033 
00034 // static members of JSONCodec
00035 const std::string           JSONCodec::CONTENT_TYPE = "text/json";
00036 const std::string           JSONCodec::FIELD_ELEMENT_NAME = "Field"; // TODO: Shouldn't this really be Codec::FIELD_ELEMENT_NAME?
00037 const std::string           JSONCodec::TERM_ATTRIBUTE_NAME = "term"; // TODO: Shouldn't this really be Codec::TERM_ATTRIBUTE_NAME?
00038 const unsigned int          JSONCodec::READ_BUFFER_SIZE = 4096;      // TODO: What should this be?
00039 std::string                 JSONCodec::INDENT_STRING = "\t";
00040 
00041 
00042 // JSONCodec member functions
00043 
00044 CodecPtr JSONCodec::clone(void) const
00045 {
00046     JSONCodec *new_codec(new JSONCodec());
00047     new_codec->copyCodec(*this);
00048     for (CurrentFormat::const_iterator i = m_format.begin(); i != m_format.end(); ++i) {
00049         new_codec->mapFieldToTerm((*i)->field_name, (*i)->term);
00050     }
00051     new_codec->m_JSON_field_ptr_map = m_JSON_field_ptr_map;
00052     new_codec->m_no_events_written = true;
00053     return CodecPtr(new_codec);
00054 }
00055 
00056 void JSONCodec::write(std::ostream& out, const Event& e)
00057 {
00058     if (m_no_events_written) {
00059         yajl_gen_config conf;
00060         conf.beautify = 1;
00061         conf.indentString = INDENT_STRING.c_str();
00062         m_yajl_generator = yajl_gen_alloc(&conf, NULL);
00063         yajl_gen_array_open(m_yajl_generator);
00064         m_no_events_written = false;
00065     }
00066 
00067     // output '{' to mark the beginning of the event
00068     yajl_gen_map_open(m_yajl_generator);
00069 
00070     // iterate through each field in the current format
00071     CurrentFormat::const_iterator i;
00072     std::string value_str;
00073     const pion::platform::Event::BlobType* ss;
00074     for (i = m_format.begin(); i != m_format.end(); ++i) {
00075         // get the range of values for the field Term
00076         pion::platform::Vocabulary::TermRef term_ref = (*i)->term.term_ref;
00077         Event::ValuesRange range = e.equal_range(term_ref);
00078 
00079         // generate a JSON name/value pair for each value
00080         for (Event::ConstIterator i2 = range.first; i2 != range.second; ++i2) {
00081             yajl_gen_string(m_yajl_generator, (unsigned char*)(*i)->field_name.c_str(), (*i)->field_name.size());
00082             switch ((*i)->term.term_type) {
00083                 case pion::platform::Vocabulary::TYPE_NULL:
00084                 case pion::platform::Vocabulary::TYPE_OBJECT:
00085                     // TODO: should we output an empty string instead of nothing?
00086                     break;
00087                 case pion::platform::Vocabulary::TYPE_INT8:
00088                 case pion::platform::Vocabulary::TYPE_INT16:
00089                 case pion::platform::Vocabulary::TYPE_INT32:
00090                     yajl_gen_integer(m_yajl_generator, boost::get<boost::int32_t>(i2->value));
00091                     break;
00092                 case pion::platform::Vocabulary::TYPE_INT64:
00093                     value_str = boost::lexical_cast<std::string>(boost::get<boost::int64_t>(i2->value));
00094                     yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00095                     break;
00096                 case pion::platform::Vocabulary::TYPE_UINT8:
00097                 case pion::platform::Vocabulary::TYPE_UINT16:
00098                 case pion::platform::Vocabulary::TYPE_UINT32:
00099                     yajl_gen_integer(m_yajl_generator, boost::get<boost::uint32_t>(i2->value));
00100                     break;
00101                 case pion::platform::Vocabulary::TYPE_UINT64:
00102                     value_str = boost::lexical_cast<std::string>(boost::get<boost::uint64_t>(i2->value));
00103                     yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00104                     break;
00105                 case pion::platform::Vocabulary::TYPE_FLOAT:
00106                     yajl_gen_double(m_yajl_generator, boost::get<float>(i2->value));
00107                     break;
00108                 case pion::platform::Vocabulary::TYPE_DOUBLE:
00109                     // using boost::lexical_cast<std::string> ensures precision appropriate to type double
00110                     value_str = boost::lexical_cast<std::string>(boost::get<double>(i2->value));
00111                     yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00112                     break;
00113                 case pion::platform::Vocabulary::TYPE_LONG_DOUBLE:
00114                     // using boost::lexical_cast<std::string> ensures precision appropriate to type long double
00115                     value_str = boost::lexical_cast<std::string>(boost::get<long double>(i2->value));
00116                     yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00117                     break;
00118                 case pion::platform::Vocabulary::TYPE_SHORT_STRING:
00119                 case pion::platform::Vocabulary::TYPE_STRING:
00120                 case pion::platform::Vocabulary::TYPE_LONG_STRING:
00121                 case pion::platform::Vocabulary::TYPE_BLOB:
00122                 case pion::platform::Vocabulary::TYPE_ZBLOB:
00123                     ss = &boost::get<const pion::platform::Event::BlobType&>(i2->value);
00124                     yajl_gen_string(m_yajl_generator, (unsigned char*)ss->get(), ss->size());
00125                     break;
00126                 case pion::platform::Vocabulary::TYPE_CHAR:
00127                     ss = &boost::get<const pion::platform::Event::BlobType&>(i2->value);
00128                     yajl_gen_string(m_yajl_generator, (unsigned char*)ss->get(),
00129                                     ss->size() < (*i)->term.term_size? ss->size() : (*i)->term.term_size);
00130                     break;
00131                 case pion::platform::Vocabulary::TYPE_DATE_TIME:
00132                 case pion::platform::Vocabulary::TYPE_DATE:
00133                 case pion::platform::Vocabulary::TYPE_TIME:
00134                     (*i)->time_facet.toString(value_str, boost::get<const PionDateTime&>(i2->value));
00135                     yajl_gen_string(m_yajl_generator, (unsigned char*)value_str.c_str(), value_str.size());
00136                     break;
00137             }
00138         }
00139     }
00140 
00141     // output '}' to mark the end of the event
00142     yajl_gen_map_close(m_yajl_generator);
00143 
00144     const unsigned char* buf;
00145     unsigned int len;
00146     yajl_gen_get_buf(m_yajl_generator, &buf, &len);
00147     out.write((char*)buf, len);
00148     yajl_gen_clear(m_yajl_generator);
00149 
00150     // flush the output stream
00151     if (m_flush_after_write)
00152         out.flush();
00153 }
00154 
00155 void JSONCodec::finish(std::ostream& out)
00156 {
00157     if (m_no_events_written)
00158         return;
00159 
00160     // write the JSON array end token ']'
00161     yajl_gen_array_close(m_yajl_generator);
00162     const unsigned char* buf;
00163     unsigned int len;
00164     yajl_gen_get_buf(m_yajl_generator, &buf, &len);
00165     out.write((char*)buf, len);
00166     yajl_gen_clear(m_yajl_generator);
00167 
00168     // we're done with the generator, so release it
00169     yajl_gen_free(m_yajl_generator);
00170     m_yajl_generator = NULL;
00171 }
00172 
00173 int number_handler(void* ctx, const char* number_char_ptr, unsigned int number_len)
00174 {
00175     Context* c = (Context*)ctx;
00176     JSONCodec::JSONObject& json_object = *c->json_object_ptr;
00177     json_object.insert(std::make_pair(c->term_ref, std::string(number_char_ptr, number_len)));
00178 
00179     return 1;
00180 }
00181 
00182 int string_handler(void* ctx, const unsigned char* string_val, unsigned int string_len)
00183 {
00184     Context* c = (Context*)ctx;
00185     JSONCodec::JSONObject& json_object = *c->json_object_ptr;
00186     json_object.insert(std::make_pair(c->term_ref, std::string((char*)string_val, string_len)));
00187 
00188     return 1;
00189 }
00190 
00191 int map_key_handler(void* ctx, const unsigned char* stringVal, unsigned int stringLen)
00192 {
00193     Context* c = (Context*)ctx;
00194     JSONCodec::FieldMap::const_iterator i = c->field_map.find(std::string((char*)stringVal, stringLen));
00195     c->term_ref = i->second->term.term_ref;
00196 
00197     return 1;
00198 }
00199 
00200 int start_map_handler(void* ctx)
00201 {
00202     Context* c = (Context*)ctx;
00203     c->json_object_ptr = JSONCodec::JSONObjectPtr(new JSONCodec::JSONObject);
00204     
00205     return 1;
00206 }
00207 
00208 int end_map_handler(void* ctx)
00209 {
00210     Context* c = (Context*)ctx;
00211     c->json_object_queue.push(c->json_object_ptr);
00212 
00213     return 1;
00214 }
00215 
00216 int start_array_handler(void* ctx)
00217 {
00218     Context* c = (Context*)ctx;
00219     c->m_array_started = true;
00220 
00221     return 1;
00222 }
00223 
00224 int end_array_handler(void* ctx)
00225 {
00226     Context* c = (Context*)ctx;
00227     c->m_array_ended = true;
00228 
00229     return 1;
00230 }
00231 
00232 static yajl_callbacks callbacks = {
00233     NULL,
00234     NULL,
00235     NULL,
00236     NULL,
00237     number_handler,
00238     string_handler,
00239     start_map_handler,
00240     map_key_handler,
00241     end_map_handler,
00242     start_array_handler,
00243     end_array_handler
00244 };
00245 
00246 
00247 bool JSONCodec::read(std::istream& in, Event& e)
00248 {
00249     if (e.getType() != getEventType())
00250         throw WrongEventTypeException();
00251 
00252     e.clear();
00253 
00254     if (m_first_read_attempt) {
00255         yajl_parser_config cfg = { 1, 1 };
00256         m_context = boost::shared_ptr<Context>(new Context(m_field_map, m_json_object_queue));
00257         m_yajl_handle = yajl_alloc(&callbacks, &cfg, NULL, (void*)m_context.get());
00258         m_first_read_attempt = false;
00259     }
00260 
00261     if (!m_context->m_array_started) {
00262 
00263         // consume white space
00264         streambuf_type* buf_ptr = in.rdbuf();
00265         int_type c = buf_ptr->sgetc();
00266         while (1) {
00267             if (traits_type::eq_int_type(c, traits_type::eof()))
00268                 return false;
00269             if (c == ' ' || c == '\t' || c == '\x0A' || c == '\x0D') {
00270                 c = buf_ptr->snextc();
00271             } else {
00272                 break;
00273             }
00274         }
00275 
00276         // first non-whitespace char must be '['
00277         if (c != '[')
00278             throw PionException("input stream is not a JSON array");
00279 
00280         // Since c has not been consumed, m_context->m_array_started will be set to true
00281         // the first time yajl_parse is called.
00282     }
00283 
00284     while (m_json_object_queue.empty()) {
00285         if (m_context->m_array_ended) {
00286             // Setting eofbit gives the caller a way to distinguish between the case where there is
00287             // currently not enough data in the stream to parse an Event and the case where the
00288             // end marker of the Event sequence has been reached (and no more Events are in the queue).
00289             in.setstate(std::ios::eofbit);
00290 
00291             return false;
00292         }
00293 
00294         // read up to READ_BUFFER_SIZE bytes into a buffer from the input stream
00295         char data[READ_BUFFER_SIZE];
00296         streambuf_type* buf_ptr = in.rdbuf();
00297         char* p = data;
00298         std::streamsize num_bytes_read;
00299         for (num_bytes_read = 0; num_bytes_read < static_cast<std::streamsize>(READ_BUFFER_SIZE); ++num_bytes_read) {
00300             *p = buf_ptr->sbumpc();
00301             if (traits_type::eq_int_type(*p, traits_type::eof()))
00302                 break;
00303             ++p;
00304         }
00305 
00306         if (num_bytes_read == 0)
00307             return false;
00308 
00309         yajl_status stat = yajl_parse(m_yajl_handle, (unsigned char*)data, num_bytes_read);
00310 
00311         if (stat == yajl_status_ok) {
00312             // The end of the JSON array was parsed.  Events might have been added to the queue
00313             // before the end was reached, so continue.
00314             PION_ASSERT(m_context->m_array_ended);
00315         } else if (stat == yajl_status_insufficient_data) {
00316             // The queue might or might not have an Event now, so continue.
00317             // If the queue is still empty, more data will be read in.
00318         } else {
00319             // TODO: handle yajl_status_client_canceled and yajl_status_error
00320             return false;
00321         }
00322     }
00323 
00324     JSONCodec::JSONObjectPtr json_object_ptr = m_json_object_queue.front();
00325     m_json_object_queue.pop();
00326     const JSONCodec::JSONObject& json_object = *json_object_ptr;
00327 
00328     for (JSONCodec::JSONObject::const_iterator i = json_object.begin(); i != json_object.end(); ++i) {
00329         const pion::platform::Vocabulary::TermRef term_ref = i->first;
00330         const std::string& value_str = i->second;
00331         const pion::platform::Vocabulary::Term& term = m_JSON_field_ptr_map[term_ref]->term;
00332         switch (term.term_type) {
00333             case pion::platform::Vocabulary::TYPE_NULL:
00334             case pion::platform::Vocabulary::TYPE_OBJECT:
00335                 // do nothing
00336                 break;
00337             case pion::platform::Vocabulary::TYPE_INT8:
00338             case pion::platform::Vocabulary::TYPE_INT16:
00339             case pion::platform::Vocabulary::TYPE_INT32:
00340                 e.setInt(term_ref, boost::lexical_cast<boost::int32_t>(value_str));
00341                 break;
00342             case pion::platform::Vocabulary::TYPE_INT64:
00343                 e.setBigInt(term_ref, boost::lexical_cast<boost::int64_t>(value_str));
00344                 break;
00345             case pion::platform::Vocabulary::TYPE_UINT8:
00346             case pion::platform::Vocabulary::TYPE_UINT16:
00347             case pion::platform::Vocabulary::TYPE_UINT32:
00348                 e.setUInt(term_ref, boost::lexical_cast<boost::uint32_t>(value_str));
00349                 break;
00350             case pion::platform::Vocabulary::TYPE_UINT64:
00351                 e.setUBigInt(term_ref, boost::lexical_cast<boost::uint64_t>(value_str));
00352                 break;
00353             case pion::platform::Vocabulary::TYPE_FLOAT:
00354                 e.setFloat(term_ref, boost::lexical_cast<float>(value_str));
00355                 break;
00356             case pion::platform::Vocabulary::TYPE_DOUBLE:
00357                 e.setDouble(term_ref, boost::lexical_cast<double>(value_str));
00358                 break;
00359             case pion::platform::Vocabulary::TYPE_LONG_DOUBLE:
00360                 e.setLongDouble(term_ref, boost::lexical_cast<long double>(value_str));
00361                 break;
00362             case pion::platform::Vocabulary::TYPE_SHORT_STRING:
00363             case pion::platform::Vocabulary::TYPE_STRING:
00364             case pion::platform::Vocabulary::TYPE_LONG_STRING:
00365             case pion::platform::Vocabulary::TYPE_BLOB:
00366             case pion::platform::Vocabulary::TYPE_ZBLOB:
00367                 e.setString(term_ref, value_str);
00368                 break;
00369             case pion::platform::Vocabulary::TYPE_CHAR:
00370                 if (value_str.size() > term.term_size) {
00371                     e.setString(term_ref, std::string(value_str, 0, term.term_size));
00372                 } else {
00373                     e.setString(term_ref, value_str);
00374                 }
00375                 break;
00376             case pion::platform::Vocabulary::TYPE_DATE_TIME:
00377             case pion::platform::Vocabulary::TYPE_DATE:
00378             case pion::platform::Vocabulary::TYPE_TIME:
00379             {
00380                 PionDateTime dt;
00381                 m_JSON_field_ptr_map[term_ref]->time_facet.fromString(value_str, dt);
00382                 e.setDateTime(term_ref, dt);
00383                 break;
00384             }
00385         }
00386     }
00387     return true;
00388 }
00389 
00390 void JSONCodec::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00391 {
00392     // first set config options for the Codec base class
00393     reset();
00394     Codec::setConfig(v, config_ptr);
00395 
00396     // TODO: options
00397 
00398     // next, map the fields to Terms
00399     xmlNodePtr codec_field_node = config_ptr;
00400     while ( (codec_field_node = ConfigManager::findConfigNodeByName(FIELD_ELEMENT_NAME, codec_field_node)) != NULL) {
00401         // parse new field mapping
00402 
00403         // start with the name of the field (element content)
00404         xmlChar *xml_char_ptr = xmlNodeGetContent(codec_field_node);
00405         if (xml_char_ptr == NULL || xml_char_ptr[0] == '\0') {
00406             if (xml_char_ptr != NULL)
00407                 xmlFree(xml_char_ptr);
00408             throw EmptyFieldException(getId());
00409         }
00410         const std::string field_name(reinterpret_cast<char*>(xml_char_ptr));
00411         xmlFree(xml_char_ptr);
00412 
00413         // next get the Term we want to map to
00414         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(TERM_ATTRIBUTE_NAME.c_str()));
00415         if (xml_char_ptr == NULL || xml_char_ptr[0] == '\0') {
00416             if (xml_char_ptr != NULL)
00417                 xmlFree(xml_char_ptr);
00418             throw EmptyTermException(getId());
00419         }
00420         const std::string term_id(reinterpret_cast<char*>(xml_char_ptr));
00421         xmlFree(xml_char_ptr);
00422 
00423         // make sure that the Term is valid
00424         const Vocabulary::TermRef term_ref = v.findTerm(term_id);
00425         if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00426             throw UnknownTermException(term_id);
00427 
00428         // add the field mapping
00429         mapFieldToTerm(field_name, v[term_ref]);
00430 
00431         // step to the next field mapping
00432         codec_field_node = codec_field_node->next;
00433     }
00434 
00435     m_JSON_field_ptr_map.clear();
00436     for (FieldMap::const_iterator i = m_field_map.begin(); i != m_field_map.end(); ++i) {
00437         if (m_JSON_field_ptr_map.find(i->second->term.term_ref) != m_JSON_field_ptr_map.end())
00438             throw PionException("Duplicate Field Term");
00439         m_JSON_field_ptr_map[i->second->term.term_ref] = i->second;
00440     }
00441 }
00442 
00443 void JSONCodec::updateVocabulary(const Vocabulary& v)
00444 {
00445     // first update anything in the Codec base class that might be needed
00446     Codec::updateVocabulary(v);
00447 
00449     for (CurrentFormat::iterator i = m_format.begin(); i != m_format.end(); ++i) {
00450         // refresh term 
00451         v.refreshTerm((*i)->term);
00452 
00453         // for date/time types, update time_facet
00454         switch ((*i)->term.term_type) {
00455             case pion::platform::Vocabulary::TYPE_DATE_TIME:
00456             case pion::platform::Vocabulary::TYPE_DATE:
00457             case pion::platform::Vocabulary::TYPE_TIME:
00458                 (*i)->time_facet.setFormat((*i)->term.term_format);
00459                 break;
00460             default:
00461                 break; // do nothing
00462         }
00463     }
00464 }
00465 
00466 
00467 }   // end namespace plugins
00468 }   // end namespace pion
00469 
00470 
00472 extern "C" PION_PLUGIN_API pion::platform::Codec *pion_create_JSONCodec(void) {
00473     return new pion::plugins::JSONCodec();
00474 }
00475 
00477 extern "C" PION_PLUGIN_API void pion_destroy_JSONCodec(pion::plugins::JSONCodec *codec_ptr) {
00478     delete codec_ptr;
00479 }

Generated on Wed Apr 13 16:38:33 2011 for pion-platform by  doxygen 1.4.7