00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_SCRIPTREACTOR_HEADER__
00021 #define __PION_SCRIPTREACTOR_HEADER__
00022
00023 #ifdef _MSC_VER
00024 #include <windows.h>
00025 #include <string.h>
00026 #else
00027 #include <errno.h>
00028 #include <unistd.h>
00029 #include <sys/wait.h>
00030 #endif
00031 #include <iosfwd>
00032 #include <string>
00033 #include <vector>
00034 #include <boost/scoped_ptr.hpp>
00035 #include <boost/thread/thread.hpp>
00036 #include <boost/iostreams/stream_buffer.hpp>
00037 #include <boost/iostreams/device/file_descriptor.hpp>
00038 #include <pion/PionConfig.hpp>
00039 #include <pion/PionLogger.hpp>
00040 #include <pion/PionException.hpp>
00041 #include <pion/PionScheduler.hpp>
00042 #include <pion/platform/Codec.hpp>
00043 #include <pion/platform/Reactor.hpp>
00044
00045 #ifdef _MSC_VER
00046 #define FILE_DESC_TYPE HANDLE
00047 #define INVALID_DESCRIPTOR INVALID_HANDLE_VALUE
00048 #define CLOSE_DESCRIPTOR(x) CloseHandle(x)
00049 #define PROCESS_INFO_TYPE LPPROCESS_INFORMATION
00050 #define INVALID_PROCESS NULL
00051 #else
00052 #define FILE_DESC_TYPE int
00053 #define INVALID_DESCRIPTOR -1
00054 #define CLOSE_DESCRIPTOR(x) ::close(x)
00055 #define PROCESS_INFO_TYPE pid_t
00056 #define INVALID_PROCESS -1
00057 #endif
00058
00059
00060 namespace pion {
00061 namespace plugins {
00062
00063
00067 class ScriptReactor :
00068 public pion::platform::Reactor
00069 {
00070 public:
00071
00073 class EmptyInputCodecException : public PionException {
00074 public:
00075 EmptyInputCodecException(const std::string& reactor_id)
00076 : PionException("ScriptReactor configuration is missing a required InputCodec parameter: ", reactor_id) {}
00077 };
00078
00080 class EmptyOutputCodecException : public PionException {
00081 public:
00082 EmptyOutputCodecException(const std::string& reactor_id)
00083 : PionException("ScriptReactor configuration is missing a required OutputCodec parameter: ", reactor_id) {}
00084 };
00085
00087 class EmptyCommandException : public PionException {
00088 public:
00089 EmptyCommandException(const std::string& reactor_id)
00090 : PionException("ScriptReactor configuration is missing a required Command parameter: ", reactor_id) {}
00091 };
00092
00094 class CommandParsingException : public PionException {
00095 public:
00096 CommandParsingException(const std::string& command)
00097 : PionException("ScriptReactor was unable to parse arguments out of command string: ", command) {}
00098 };
00099
00101 class OpenPipeException : public PionException {
00102 public:
00103 OpenPipeException(const std::string& command)
00104 : PionException("ScriptReactor failed to open pipe: ", command) {}
00105 };
00106
00108 class ReadFromPipeException : public PionException {
00109 public:
00110 ReadFromPipeException(const std::string& reactor_id)
00111 : PionException("ScriptReactor failed reading an event from pipe: ", reactor_id) {}
00112 };
00113
00115 class WriteToPipeException : public PionException {
00116 public:
00117 WriteToPipeException(const std::string& reactor_id)
00118 : PionException("ScriptReactor failed writing an event to pipe: ", reactor_id) {}
00119 };
00120
00121
00123 ScriptReactor(void)
00124 : Reactor(TYPE_PROCESSING),
00125 m_logger(PION_GET_LOGGER("pion.ScriptReactor")),
00126 m_input_pipe(INVALID_DESCRIPTOR), m_output_pipe(INVALID_DESCRIPTOR), m_child(INVALID_PROCESS)
00127 {
00128 }
00129
00131 virtual ~ScriptReactor() { stop(); }
00132
00134 virtual void start(void);
00135
00137 virtual void stop(void) { stopIfRunning(); }
00138
00146 virtual void setConfig(const pion::platform::Vocabulary& v, const xmlNodePtr config_ptr);
00147
00154 virtual void updateVocabulary(const pion::platform::Vocabulary& v);
00155
00160 virtual void updateCodecs(void);
00161
00167 virtual void process(const pion::platform::EventPtr& e);
00168
00170 inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00171
00173 inline PionLogger getLogger(void) { return m_logger; }
00174
00175
00176 private:
00177
00179 bool stopIfRunning(void);
00180
00182 void readEvents(void);
00183
00185 void openPipe(void);
00186
00188 void closePipe(void);
00189
00191 void parseArguments(void);
00192
00193
00194 #ifdef BOOST_IOSTREAMS_WINDOWS
00195 struct winpipe_handle_source : private boost::iostreams::file_descriptor {
00196 typedef char char_type;
00197 struct category : boost::iostreams::source_tag, boost::iostreams::closable_tag { };
00198 std::streamsize read(char_type* s, std::streamsize n) {
00199 DWORD result = 0, error = 0;
00200 if (!::ReadFile(handle(), s, n, &result, NULL) && (error = GetLastError()) != ERROR_BROKEN_PIPE)
00201 throw boost::iostreams::detail::bad_read();
00202 return error == ERROR_BROKEN_PIPE ? -1 : static_cast<std::streamsize>(result);
00203 }
00204 using boost::iostreams::file_descriptor::close;
00205 using boost::iostreams::file_descriptor::handle;
00206 template <typename FLAG_TYPE>
00207 winpipe_handle_source(HANDLE h, FLAG_TYPE f) : boost::iostreams::file_descriptor(h, f) { }
00208 winpipe_handle_source(const winpipe_handle_source& w) :
00209 boost::iostreams::file_descriptor(static_cast<const boost::iostreams::file_descriptor&>(w)) { }
00210 };
00211
00212 struct winpipe_handle_sink : private boost::iostreams::file_descriptor {
00213 typedef char char_type;
00214 struct category : boost::iostreams::sink_tag, boost::iostreams::closable_tag { };
00215 std::streamsize write(const char_type* s, std::streamsize n) {
00216 DWORD ignore = 0;
00217 if (!::WriteFile(handle(), s, n, &ignore, NULL))
00218 throw boost::iostreams::detail::bad_write();
00219 return n;
00220 }
00221 using boost::iostreams::file_descriptor::close;
00222 using boost::iostreams::file_descriptor::handle;
00223 template <typename FLAG_TYPE>
00224 winpipe_handle_sink(HANDLE h, FLAG_TYPE f) : boost::iostreams::file_descriptor(h, f) { }
00225 winpipe_handle_sink(const winpipe_handle_sink& w) :
00226 boost::iostreams::file_descriptor(static_cast<const boost::iostreams::file_descriptor&>(w)) { }
00227 };
00228
00230 typedef boost::iostreams::stream_buffer<winpipe_handle_source> IStreamBuffer;
00231 typedef boost::iostreams::stream_buffer<winpipe_handle_sink> OStreamBuffer;
00232 #else
00234 typedef boost::iostreams::stream_buffer<boost::iostreams::file_descriptor_source> IStreamBuffer;
00235 typedef boost::iostreams::stream_buffer<boost::iostreams::file_descriptor_sink> OStreamBuffer;
00236 #endif
00237
00238
00240 static const std::string INPUT_CODEC_ELEMENT_NAME;
00241
00243 static const std::string OUTPUT_CODEC_ELEMENT_NAME;
00244
00246 static const std::string COMMAND_ELEMENT_NAME;
00247
00248
00250 PionLogger m_logger;
00251
00253 pion::platform::CodecPtr m_input_codec_ptr;
00254
00256 pion::platform::CodecPtr m_output_codec_ptr;
00257
00259 std::string m_input_codec_id;
00260
00262 std::string m_output_codec_id;
00263
00265 std::string m_command;
00266
00268 std::vector<std::string> m_args;
00269
00271 FILE_DESC_TYPE m_input_pipe;
00272
00274 FILE_DESC_TYPE m_output_pipe;
00275
00277 PROCESS_INFO_TYPE m_child;
00278
00280 boost::scoped_ptr<OStreamBuffer> m_input_streambuf_ptr;
00281
00283 boost::scoped_ptr<IStreamBuffer> m_output_streambuf_ptr;
00284
00286 boost::scoped_ptr<std::ostream> m_input_stream_ptr;
00287
00289 boost::scoped_ptr<std::istream> m_output_stream_ptr;
00290
00292 boost::scoped_ptr<boost::thread> m_thread_ptr;
00293
00295 boost::mutex m_write_mutex;
00296 };
00297
00298
00299 }
00300 }
00301
00302 #endif