platform/reactors/ScriptReactor.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2009 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_SCRIPTREACTOR_HEADER__
00021 #define __PION_SCRIPTREACTOR_HEADER__
00022 
00023 #ifdef _MSC_VER
00024     #include <windows.h>
00025     #include <string.h>
00026 #else
00027     #include <errno.h>
00028     #include <unistd.h>
00029     #include <sys/wait.h>
00030 #endif
00031 #include <iosfwd>
00032 #include <string>
00033 #include <vector>
00034 #include <boost/scoped_ptr.hpp>
00035 #include <boost/thread/thread.hpp>
00036 #include <boost/iostreams/stream_buffer.hpp>
00037 #include <boost/iostreams/device/file_descriptor.hpp>
00038 #include <pion/PionConfig.hpp>
00039 #include <pion/PionLogger.hpp>
00040 #include <pion/PionException.hpp>
00041 #include <pion/PionScheduler.hpp>
00042 #include <pion/platform/Codec.hpp>
00043 #include <pion/platform/Reactor.hpp>
00044 
00045 #ifdef _MSC_VER
00046     #define FILE_DESC_TYPE          HANDLE
00047     #define INVALID_DESCRIPTOR      INVALID_HANDLE_VALUE
00048     #define CLOSE_DESCRIPTOR(x)     CloseHandle(x)
00049     #define PROCESS_INFO_TYPE       LPPROCESS_INFORMATION
00050     #define INVALID_PROCESS         NULL
00051 #else
00052     #define FILE_DESC_TYPE          int
00053     #define INVALID_DESCRIPTOR      -1
00054     #define CLOSE_DESCRIPTOR(x)     ::close(x)
00055     #define PROCESS_INFO_TYPE       pid_t
00056     #define INVALID_PROCESS         -1
00057 #endif
00058 
00059 
00060 namespace pion {        // begin namespace pion
00061 namespace plugins {     // begin namespace plugins
00062 
00063 
00067 class ScriptReactor :
00068     public pion::platform::Reactor
00069 {
00070 public: 
00071 
00073     class EmptyInputCodecException : public PionException {
00074     public:
00075         EmptyInputCodecException(const std::string& reactor_id)
00076             : PionException("ScriptReactor configuration is missing a required InputCodec parameter: ", reactor_id) {}
00077     };
00078 
00080     class EmptyOutputCodecException : public PionException {
00081     public:
00082         EmptyOutputCodecException(const std::string& reactor_id)
00083             : PionException("ScriptReactor configuration is missing a required OutputCodec parameter: ", reactor_id) {}
00084     };
00085 
00087     class EmptyCommandException : public PionException {
00088     public:
00089         EmptyCommandException(const std::string& reactor_id)
00090             : PionException("ScriptReactor configuration is missing a required Command parameter: ", reactor_id) {}
00091     };
00092 
00094     class CommandParsingException : public PionException {
00095     public:
00096         CommandParsingException(const std::string& command)
00097             : PionException("ScriptReactor was unable to parse arguments out of command string: ", command) {}
00098     };
00099 
00101     class OpenPipeException : public PionException {
00102     public:
00103         OpenPipeException(const std::string& command)
00104             : PionException("ScriptReactor failed to open pipe: ", command) {}
00105     };
00106 
00108     class ReadFromPipeException : public PionException {
00109     public:
00110         ReadFromPipeException(const std::string& reactor_id)
00111             : PionException("ScriptReactor failed reading an event from pipe: ", reactor_id) {}
00112     };
00113 
00115     class WriteToPipeException : public PionException {
00116     public:
00117         WriteToPipeException(const std::string& reactor_id)
00118             : PionException("ScriptReactor failed writing an event to pipe: ", reactor_id) {}
00119     };
00120 
00121 
00123     ScriptReactor(void)
00124         : Reactor(TYPE_PROCESSING),
00125         m_logger(PION_GET_LOGGER("pion.ScriptReactor")),
00126         m_input_pipe(INVALID_DESCRIPTOR), m_output_pipe(INVALID_DESCRIPTOR), m_child(INVALID_PROCESS)
00127     {
00128     }
00129 
00131     virtual ~ScriptReactor() { stop(); }
00132 
00134     virtual void start(void);
00135 
00137     virtual void stop(void) { stopIfRunning(); }
00138 
00146     virtual void setConfig(const pion::platform::Vocabulary& v, const xmlNodePtr config_ptr);
00147 
00154     virtual void updateVocabulary(const pion::platform::Vocabulary& v);
00155 
00160     virtual void updateCodecs(void);
00161 
00167     virtual void process(const pion::platform::EventPtr& e);
00168 
00170     inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00171 
00173     inline PionLogger getLogger(void) { return m_logger; }
00174 
00175 
00176 private:
00177 
00179     bool stopIfRunning(void);
00180 
00182     void readEvents(void);
00183 
00185     void openPipe(void);
00186 
00188     void closePipe(void);
00189 
00191     void parseArguments(void);
00192 
00193 
00194 #ifdef BOOST_IOSTREAMS_WINDOWS
00195     struct winpipe_handle_source : private boost::iostreams::file_descriptor {
00196         typedef char char_type;
00197         struct category : boost::iostreams::source_tag, boost::iostreams::closable_tag { };
00198         std::streamsize read(char_type* s, std::streamsize n) {
00199             DWORD result = 0, error = 0;
00200             if (!::ReadFile(handle(), s, n, &result, NULL) && (error = GetLastError()) != ERROR_BROKEN_PIPE)
00201                 throw boost::iostreams::detail::bad_read();
00202             return error == ERROR_BROKEN_PIPE ? -1 : static_cast<std::streamsize>(result);
00203         }
00204         using boost::iostreams::file_descriptor::close;
00205         using boost::iostreams::file_descriptor::handle;
00206         template <typename FLAG_TYPE>
00207         winpipe_handle_source(HANDLE h, FLAG_TYPE f) : boost::iostreams::file_descriptor(h, f) { }
00208         winpipe_handle_source(const winpipe_handle_source& w) :
00209             boost::iostreams::file_descriptor(static_cast<const boost::iostreams::file_descriptor&>(w)) { }
00210     };
00211 
00212     struct winpipe_handle_sink : private boost::iostreams::file_descriptor {
00213         typedef char char_type;
00214         struct category : boost::iostreams::sink_tag, boost::iostreams::closable_tag { };
00215         std::streamsize write(const char_type* s, std::streamsize n) {
00216             DWORD ignore = 0;
00217             if (!::WriteFile(handle(), s, n, &ignore, NULL))
00218                 throw boost::iostreams::detail::bad_write();
00219             return n;
00220         }
00221         using boost::iostreams::file_descriptor::close;
00222         using boost::iostreams::file_descriptor::handle;
00223         template <typename FLAG_TYPE>
00224         winpipe_handle_sink(HANDLE h, FLAG_TYPE f) : boost::iostreams::file_descriptor(h, f) { }
00225         winpipe_handle_sink(const winpipe_handle_sink& w) :
00226             boost::iostreams::file_descriptor(static_cast<const boost::iostreams::file_descriptor&>(w)) { }
00227     };
00228 
00230     typedef boost::iostreams::stream_buffer<winpipe_handle_source>  IStreamBuffer;
00231     typedef boost::iostreams::stream_buffer<winpipe_handle_sink>    OStreamBuffer;
00232 #else
00234     typedef boost::iostreams::stream_buffer<boost::iostreams::file_descriptor_source>   IStreamBuffer;
00235     typedef boost::iostreams::stream_buffer<boost::iostreams::file_descriptor_sink>     OStreamBuffer;
00236 #endif
00237 
00238 
00240     static const std::string            INPUT_CODEC_ELEMENT_NAME;
00241 
00243     static const std::string            OUTPUT_CODEC_ELEMENT_NAME;
00244 
00246     static const std::string            COMMAND_ELEMENT_NAME;
00247 
00248 
00250     PionLogger                          m_logger;
00251 
00253     pion::platform::CodecPtr            m_input_codec_ptr;
00254 
00256     pion::platform::CodecPtr            m_output_codec_ptr;
00257 
00259     std::string                         m_input_codec_id;
00260 
00262     std::string                         m_output_codec_id;
00263 
00265     std::string                         m_command;
00266 
00268     std::vector<std::string>            m_args;
00269 
00271     FILE_DESC_TYPE                      m_input_pipe;
00272 
00274     FILE_DESC_TYPE                      m_output_pipe;
00275 
00277     PROCESS_INFO_TYPE                   m_child;
00278 
00280     boost::scoped_ptr<OStreamBuffer>    m_input_streambuf_ptr;
00281 
00283     boost::scoped_ptr<IStreamBuffer>    m_output_streambuf_ptr;
00284 
00286     boost::scoped_ptr<std::ostream>     m_input_stream_ptr;
00287 
00289     boost::scoped_ptr<std::istream>     m_output_stream_ptr;
00290 
00292     boost::scoped_ptr<boost::thread>    m_thread_ptr;
00293 
00295     boost::mutex                        m_write_mutex;
00296 };
00297 
00298 
00299 }   // end namespace plugins
00300 }   // end namespace pion
00301 
00302 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7