platform/codecs/LogCodec.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include "LogCodec.hpp"
00021 #include <pion/platform/ConfigManager.hpp>
00022 
00023 using namespace pion::platform;
00024 
00025 
00026 namespace pion {        // begin namespace pion
00027 namespace plugins {     // begin namespace plugins
00028 
00029 
00030 // static members of LogCodec
00031 const std::string           LogCodec::CONTENT_TYPE = "text/ascii";
00032 const std::string           LogCodec::FLUSH_ELEMENT_NAME = "Flush";
00033 const std::string           LogCodec::HEADERS_ELEMENT_NAME = "Headers"; // this means ELF
00034 const std::string           LogCodec::TIME_OFFSET_ELEMENT_NAME = "TimeOffset";
00035 const std::string           LogCodec::FIELD_ELEMENT_NAME = "Field";
00036 const std::string           LogCodec::TERM_ATTRIBUTE_NAME = "term";
00037 const std::string           LogCodec::START_ATTRIBUTE_NAME = "start";
00038 const std::string           LogCodec::END_ATTRIBUTE_NAME = "end";
00039 const std::string           LogCodec::OPTIONAL_ATTRIBUTE_NAME = "optional";
00040 const std::string           LogCodec::URLENCODE_ATTRIBUTE_NAME = "urlencode";
00041 const std::string           LogCodec::ESCAPE_ATTRIBUTE_NAME = "escape";
00042 const std::string           LogCodec::EMPTY_ATTRIBUTE_NAME = "empty";
00043 const std::string           LogCodec::EVENTS_ELEMENT_NAME = "Events";
00044 const std::string           LogCodec::FIELDS_ELEMENT_NAME = "Fields";
00045 const std::string           LogCodec::SPLIT_ATTRIBUTE_NAME = "split";
00046 const std::string           LogCodec::JOIN_ATTRIBUTE_NAME = "join";
00047 const std::string           LogCodec::COMMENT_ATTRIBUTE_NAME = "comment";
00048 const std::string           LogCodec::CONSUME_ATTRIBUTE_NAME = "consume";
00049 const unsigned int          LogCodec::READ_BUFFER_SIZE = 1024 * 128;    // 128KB
00050 
00051 // defaults for various settings (tuned for ELF-like formats)
00052 const std::string           LogCodec::EVENT_SPLIT_SET = "\r\n";
00053 const std::string           LogCodec::EVENT_JOIN_STRING = OSEOL;
00054 const std::string           LogCodec::COMMENT_CHAR_SET = "#";
00055 // NOTE: If Headers is true, these defaults CANNOT be overridden
00056 const std::string           LogCodec::FIELD_SPLIT_SET = " \t";
00057 const std::string           LogCodec::FIELD_JOIN_STRING = " ";
00058 const bool                  LogCodec::CONSUME_DELIMS_FLAG = true;
00059 
00060 // special support for ELF (when Headers is true)
00061 const std::string           LogCodec::VERSION_ELF_HEADER = "#Version:";
00062 const std::string           LogCodec::DATE_ELF_HEADER = "#Date:";
00063 const std::string           LogCodec::SOFTWARE_ELF_HEADER = "#Software:";
00064 const std::string           LogCodec::FIELDS_ELF_HEADER = "#Fields:";
00065 
00066 
00067 // LogCodec member functions
00068 
00069 CodecPtr LogCodec::clone(void) const
00070 {
00071     LogCodec *new_codec(new LogCodec());
00072     new_codec->copyCodec(*this);
00073     new_codec->m_flush_after_write = m_flush_after_write;
00074     new_codec->m_handle_elf_headers = m_handle_elf_headers;
00075     new_codec->m_wrote_elf_headers = false; // Important!
00076     new_codec->m_time_offset = m_time_offset;
00077     new_codec->m_event_split = m_event_split;
00078     new_codec->m_event_join = m_event_join;
00079     new_codec->m_comment_chars = m_comment_chars;
00080     new_codec->m_field_split = m_field_split;
00081     new_codec->m_field_join = m_field_join;
00082     new_codec->m_consume_delims = m_consume_delims;
00083     for (CurrentFormat::const_iterator i = m_format.begin(); i != m_format.end(); ++i) {
00084         new_codec->mapFieldToTerm((*i)->log_field, (*i)->log_term, (*i)->log_delim_start, (*i)->log_delim_end,
00085                                   (*i)->log_opt_delims, (*i)->log_urlencode,
00086                                   (*i)->log_escape_char, (*i)->log_empty_val,
00087                                   (*i)->log_do_time_offset, (*i)->log_time_offset);
00088     }
00089     return CodecPtr(new_codec);
00090 }
00091 
00092 void LogCodec::write(std::ostream& out, const Event& e)
00093 {
00094     const Event::ParameterValue *value_ptr;
00095 
00096     // write the ELF headers if necessary
00097     if (m_handle_elf_headers && !m_wrote_elf_headers) {
00098         writeELFHeaders(out);
00099         m_wrote_elf_headers = true;
00100     }
00101 
00102     // iterate through each field in the current format
00103     CurrentFormat::const_iterator i = m_format.begin();
00104     while (i != m_format.end()) {
00105         // get the value for the field
00106         value_ptr = e.getPointer((*i)->log_term.term_ref);
00107 
00108         // check if the value is undefined
00109         if (value_ptr == NULL)
00110             (*i)->writeEmptyValue(out);
00111         else
00112             (*i)->write(out, *value_ptr);
00113 
00114         // iterate to the next field
00115         ++i;
00116         // add field-join between all fields
00117         if (i != m_format.end())
00118             out << m_field_join;
00119     }
00120 
00121     // write event-join for each event record
00122     out << m_event_join;
00123 
00124     // flush the output stream
00125     if (m_flush_after_write)
00126         out.flush();
00127 }
00128 
00129 bool LogCodec::read(std::istream& input_stream, Event& e)
00130 {
00131     if (e.getType() != getEventType())
00132         throw WrongEventTypeException();
00133     e.clear();
00134 
00135     streambuf_type *buf_ptr = input_stream.rdbuf();
00136     // skip "empty events" (i.e. records with no data) and comments
00137     int_type c = consumeVoidsAndComments(buf_ptr);
00138     // if nothing left...punt with no event generated
00139     if (traits_type::eq_int_type(c, traits_type::eof())) {
00140         input_stream.setstate(std::ios::eofbit);
00141         return false;
00142     }
00143 
00144     // iterate through each field in the format
00145     CurrentFormat::const_iterator i;
00146     char delim_start;
00147     char delim_end;
00148     char escape_char;
00149     char * const read_buf = m_read_buf.get();
00150     char * read_ptr;
00151 
00152     for (i = m_format.begin(); !traits_type::eq_int_type(c, traits_type::eof()) &&
00153            m_event_split.find(c) == std::string::npos && i != m_format.end(); ++i)
00154     {
00155         delim_start = (*i)->log_delim_start;
00156         delim_end = (*i)->log_delim_end;
00157         escape_char = (*i)->log_escape_char;
00158 
00159         if (delim_start != '\0') {
00160             if (c == delim_start)
00161                 c = buf_ptr->snextc();          // skip over start-delimiter
00162             else if (!(*i)->log_opt_delims)
00163                 break;                          // missing start-delimiter is an error, gotta punt
00164             else
00165                 delim_start = delim_end = '\0'; // didn't find start-delimiter, treat as optional
00166         }
00167 
00168         // parse the field contents
00169         read_ptr = read_buf;
00170         do {
00171             if ((delim_end != '\0' && c == delim_end) && read_ptr > read_buf && read_ptr[-1] == escape_char)
00172                 --read_ptr; // escaped end-delimiter...overwrite the escape-character
00173             else if ((delim_end != '\0' && c == delim_end) ||
00174                      (delim_end == '\0' && (m_field_split.find(c) != std::string::npos ||
00175                                             m_event_split.find(c) != std::string::npos)))
00176             {
00177                 // we've reached the end of the field contents
00178                 if (delim_end != '\0' && c == delim_end)
00179                     c = buf_ptr->snextc();  // skip over end-delimiter
00180                 break;
00181             }
00182             if (read_ptr < m_read_end)
00183                 *(read_ptr++) = c;
00184             c = buf_ptr->snextc();
00185         } while (!traits_type::eq_int_type(c, traits_type::eof()));
00186         *read_ptr = '\0';
00187 
00188         // only parse-and-set values that are not null/empty
00189         if (read_ptr != read_buf && *read_buf != '\0' && read_buf != (*i)->log_empty_val)
00190             (*i)->read(read_buf, e);
00191 
00192         // if EOF or end-of-record or not-a-field-delim, gotta punt
00193         if (traits_type::eq_int_type(c, traits_type::eof()) ||
00194             m_event_split.find(c) != std::string::npos || m_field_split.find(c) == std::string::npos)
00195             break;
00196 
00197         do {
00198             // skip delimiter(s) between fields
00199             c = buf_ptr->snextc();
00200             if (!m_consume_delims)
00201                 break;
00202         } while (!traits_type::eq_int_type(c, traits_type::eof()) && m_field_split.find(c) != std::string::npos);
00203     }
00204 
00205     // skip the rest of the record...if there's something left
00206     while (!traits_type::eq_int_type(c, traits_type::eof())) {
00207         if (m_event_split.find(c) != std::string::npos)
00208             break;
00209         c = buf_ptr->snextc();
00210     }
00211 
00212     // skip "empty events" (i.e. records with no data)
00213     while (!traits_type::eq_int_type(c, traits_type::eof())) {
00214         if (m_event_split.find(c) == std::string::npos)
00215             break;
00216         c = buf_ptr->snextc();
00217     }
00218 
00219     if (traits_type::eq_int_type(c, traits_type::eof()))
00220         input_stream.setstate(std::ios::eofbit);
00221     return true;
00222 }
00223 
00224 void LogCodec::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00225 {
00226     // first set config options for the Codec base class
00227     reset();
00228     Codec::setConfig(v, config_ptr);
00229 
00230     // check if the Codec should flush the output stream after each write
00231     m_flush_after_write = false;
00232     std::string flush_option;
00233     if (ConfigManager::getConfigOption(FLUSH_ELEMENT_NAME, flush_option, config_ptr)) {
00234         if (flush_option == "true")
00235             m_flush_after_write = true;
00236     }
00237 
00238     // check if the Codec should include headers when writing output
00239     m_handle_elf_headers = false;
00240     std::string headers_option;
00241     if (ConfigManager::getConfigOption(HEADERS_ELEMENT_NAME, headers_option, config_ptr)) {
00242         if (headers_option == "true")
00243             m_handle_elf_headers = true;
00244     }
00245 
00246     // check if the Codec should apply an offset when reading and writing dates and times
00247     bool do_time_offset = false;
00248     PionDateTime::time_duration_type time_offset(0, 0, 0);
00249     std::string time_offset_option;
00250     if (ConfigManager::getConfigOption(TIME_OFFSET_ELEMENT_NAME, time_offset_option, config_ptr)) {
00251         m_time_offset = boost::lexical_cast<boost::int32_t>(time_offset_option);
00252         if (m_time_offset != 0) {
00253             do_time_offset = true;
00254             time_offset = PionDateTime::time_duration_type(0, m_time_offset, 0);
00255         }
00256     }
00257 
00258     // next, map the fields to Terms
00259     xmlNodePtr codec_field_node = config_ptr;
00260     while ((codec_field_node = ConfigManager::findConfigNodeByName(FIELD_ELEMENT_NAME, codec_field_node)) != NULL) {
00261         // start with the name of the field (element content)
00262         xmlChar *xml_char_ptr = xmlNodeGetContent(codec_field_node);
00263         if (xml_char_ptr == NULL || xml_char_ptr[0] == '\0') {
00264             if (xml_char_ptr != NULL)
00265                 xmlFree(xml_char_ptr);
00266             throw EmptyFieldException(getId());
00267         }
00268         const std::string field_name(reinterpret_cast<char*>(xml_char_ptr));
00269         xmlFree(xml_char_ptr);
00270 
00271         // next get the Term we want to map to
00272         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(TERM_ATTRIBUTE_NAME.c_str()));
00273         if (xml_char_ptr == NULL || xml_char_ptr[0] == '\0') {
00274             if (xml_char_ptr != NULL)
00275                 xmlFree(xml_char_ptr);
00276             throw EmptyTermException(getId());
00277         }
00278         const std::string term_id(reinterpret_cast<char*>(xml_char_ptr));
00279         xmlFree(xml_char_ptr);
00280 
00281         // make sure that the Term is valid
00282         const Vocabulary::TermRef term_ref = v.findTerm(term_id);
00283         if (term_ref == Vocabulary::UNDEFINED_TERM_REF)
00284             throw UnknownTermException(term_id);
00285 
00286         // get the starting delimiter (if any)
00287         char delim_start = '\0';
00288         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(START_ATTRIBUTE_NAME.c_str()));
00289         if (xml_char_ptr != NULL) {
00290             delim_start = cstyle(reinterpret_cast<char*>(xml_char_ptr))[0];
00291             xmlFree(xml_char_ptr);
00292         }
00293 
00294         // get the ending delimiter (if any)
00295         char delim_end = '\0';
00296         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(END_ATTRIBUTE_NAME.c_str()));
00297         if (xml_char_ptr != NULL) {
00298             delim_end = cstyle(reinterpret_cast<char*>(xml_char_ptr))[0];
00299             xmlFree(xml_char_ptr);
00300         }
00301 
00302         // if only one delimiter exists, use it for both
00303         if (delim_start == '\0' && delim_end != '\0')
00304             delim_start = delim_end;
00305         else if (delim_start != '\0' && delim_end == '\0')
00306             delim_end = delim_start;
00307 
00308         // check if start/end delimiters are optional
00309         // default is false
00310         bool opt_delims = false;
00311         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(OPTIONAL_ATTRIBUTE_NAME.c_str()));
00312         if (xml_char_ptr != NULL) {
00313             const std::string opt_option(reinterpret_cast<char*>(xml_char_ptr));
00314             if (opt_option == "true")
00315                 opt_delims = true;
00316             xmlFree(xml_char_ptr);
00317         }
00318         
00319         // check if field is urlencoded
00320         // default is false
00321         bool urlencode = false;
00322         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(URLENCODE_ATTRIBUTE_NAME.c_str()));
00323         if (xml_char_ptr != NULL) {
00324             const std::string urlencode_str(reinterpret_cast<char*>(xml_char_ptr));
00325             xmlFree(xml_char_ptr);
00326             if (urlencode_str == "true")
00327                 urlencode = true;
00328         }
00329 
00330         // get the escape character (if any)
00331         // default is "\"
00332         char escape_char = '\\';
00333         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(ESCAPE_ATTRIBUTE_NAME.c_str()));
00334         if (xml_char_ptr != NULL) {
00335             escape_char = cstyle(reinterpret_cast<char*>(xml_char_ptr))[0];
00336             xmlFree(xml_char_ptr);
00337         }
00338 
00339         // get the empty value (if any)
00340         // default is "-" if there are no delimiters
00341         std::string empty_val = (delim_start == '\0') ? "-" : "";
00342         xml_char_ptr = xmlGetProp(codec_field_node, reinterpret_cast<const xmlChar*>(EMPTY_ATTRIBUTE_NAME.c_str()));
00343         if (xml_char_ptr != NULL) {
00344             empty_val = cstyle(reinterpret_cast<char*>(xml_char_ptr));
00345             xmlFree(xml_char_ptr);
00346         }
00347 
00348         // add the field mapping
00349         mapFieldToTerm(field_name, v[term_ref], delim_start, delim_end, opt_delims, urlencode, escape_char, empty_val, do_time_offset, time_offset);
00350 
00351         // step to the next field mapping
00352         codec_field_node = codec_field_node->next;
00353     }
00354 
00355     // initialize event specifications
00356     m_event_split = EVENT_SPLIT_SET;
00357     m_event_join = EVENT_JOIN_STRING;
00358     m_comment_chars = COMMENT_CHAR_SET;
00359 
00360     // handle event specifications
00361     xmlNodePtr events_node = ConfigManager::findConfigNodeByName(EVENTS_ELEMENT_NAME, config_ptr);
00362     if (events_node != NULL) {
00363         xmlChar *xml_char_ptr;
00364         // get the split set (if any)
00365         xml_char_ptr = xmlGetProp(events_node, reinterpret_cast<const xmlChar*>(SPLIT_ATTRIBUTE_NAME.c_str()));
00366         if (xml_char_ptr != NULL) {
00367             if (cstyle(reinterpret_cast<char*>(xml_char_ptr))[0] != '\0')
00368                 m_event_split = reinterpret_cast<char*>(xml_char_ptr);
00369             xmlFree(xml_char_ptr);
00370         }
00371         // get the join string (if any)
00372         xml_char_ptr = xmlGetProp(events_node, reinterpret_cast<const xmlChar*>(JOIN_ATTRIBUTE_NAME.c_str()));
00373         if (xml_char_ptr != NULL) {
00374             if (cstyle(reinterpret_cast<char*>(xml_char_ptr))[0] != '\0')
00375                 m_event_join = reinterpret_cast<char*>(xml_char_ptr);
00376             xmlFree(xml_char_ptr);
00377         }
00378         // get the comment chars (if any)
00379         xml_char_ptr = xmlGetProp(events_node, reinterpret_cast<const xmlChar*>(COMMENT_ATTRIBUTE_NAME.c_str()));
00380         if (xml_char_ptr != NULL) {
00381             if (cstyle(reinterpret_cast<char*>(xml_char_ptr))[0] != '\0')
00382                 m_comment_chars = reinterpret_cast<char*>(xml_char_ptr);
00383             xmlFree(xml_char_ptr);
00384         }
00385     }
00386 
00387     // initialize field specifications
00388     m_field_split = FIELD_SPLIT_SET;
00389     m_field_join = FIELD_JOIN_STRING;
00390     m_consume_delims = CONSUME_DELIMS_FLAG;
00391 
00392     // if this is ELF data, use defaults only!
00393     if (m_handle_elf_headers)
00394         return;
00395 
00396     // handle field specifications
00397     xmlNodePtr fields_node = ConfigManager::findConfigNodeByName(FIELDS_ELEMENT_NAME, config_ptr);
00398     if (fields_node != NULL) {
00399         xmlChar *xml_char_ptr;
00400         // get the split set (if any)
00401         xml_char_ptr = xmlGetProp(fields_node, reinterpret_cast<const xmlChar*>(SPLIT_ATTRIBUTE_NAME.c_str()));
00402         if (xml_char_ptr != NULL) {
00403             if (cstyle(reinterpret_cast<char*>(xml_char_ptr))[0] != '\0')
00404                 m_field_split = reinterpret_cast<char*>(xml_char_ptr);
00405             xmlFree(xml_char_ptr);
00406         }
00407         // get the join string (if any)
00408         xml_char_ptr = xmlGetProp(fields_node, reinterpret_cast<const xmlChar*>(JOIN_ATTRIBUTE_NAME.c_str()));
00409         if (xml_char_ptr != NULL) {
00410             if (cstyle(reinterpret_cast<char*>(xml_char_ptr))[0] != '\0')
00411                 m_field_join = reinterpret_cast<char*>(xml_char_ptr);
00412             xmlFree(xml_char_ptr);
00413         }
00414         // check if the Codec should consume consecutive field delimiters
00415         xml_char_ptr = xmlGetProp(fields_node, reinterpret_cast<const xmlChar*>(CONSUME_ATTRIBUTE_NAME.c_str()));
00416         if (xml_char_ptr != NULL) {
00417             const std::string consume_option(reinterpret_cast<char*>(xml_char_ptr));
00418             if (consume_option == "false")
00419                 m_consume_delims = false;
00420             else if (consume_option == "true")
00421                 m_consume_delims = true;
00422             xmlFree(xml_char_ptr);
00423         }
00424     }
00425 }
00426 
00427 void LogCodec::updateVocabulary(const Vocabulary& v)
00428 {
00429     // first update anything in the Codec base class that might be needed
00430     Codec::updateVocabulary(v);
00431 
00433     for (CurrentFormat::iterator i = m_format.begin(); i != m_format.end(); ++i) {
00434         // refresh term 
00435         v.refreshTerm((*i)->log_term);
00436 
00437         // for date/time types, update log_time_facet
00438         switch ((*i)->log_term.term_type) {
00439             case pion::platform::Vocabulary::TYPE_DATE_TIME:
00440             case pion::platform::Vocabulary::TYPE_DATE:
00441             case pion::platform::Vocabulary::TYPE_TIME:
00442                 (*i)->log_time_facet.setFormat((*i)->log_term.term_format);
00443                 break;
00444             default:
00445                 break; // do nothing
00446         }
00447     }
00448 }
00449 
00450 }   // end namespace plugins
00451 }   // end namespace pion
00452 
00453 
00455 extern "C" PION_PLUGIN_API pion::platform::Codec *pion_create_LogCodec(void) {
00456     return new pion::plugins::LogCodec();
00457 }
00458 
00460 extern "C" PION_PLUGIN_API void pion_destroy_LogCodec(pion::plugins::LogCodec *codec_ptr) {
00461     delete codec_ptr;
00462 }

Generated on Wed Apr 13 16:38:33 2011 for pion-platform by  doxygen 1.4.7