00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <iostream>
00021 #include <boost/version.hpp>
00022 #include <boost/scoped_array.hpp>
00023 #include <pion/platform/CodecFactory.hpp>
00024 #include "ScriptReactor.hpp"
00025
00026
00027 #if ( (BOOST_VERSION / 100000) > 1 || (BOOST_VERSION / 100 % 1000) >= 44 )
00028
00029 #define CLOSE_ON_EXIT_FLAGS boost::iostreams::close_handle
00030 #else
00031
00032 #define CLOSE_ON_EXIT_FLAGS true
00033 #endif
00034
00035
00036 using namespace pion::platform;
00037
00038
00039 namespace pion {
00040 namespace plugins {
00041
00042
00043
00044
00045 const std::string ScriptReactor::INPUT_CODEC_ELEMENT_NAME = "InputCodec";
00046 const std::string ScriptReactor::OUTPUT_CODEC_ELEMENT_NAME = "OutputCodec";
00047 const std::string ScriptReactor::COMMAND_ELEMENT_NAME = "Command";
00048
00049
00050
00051
00052 void ScriptReactor::start(void)
00053 {
00054 ConfigWriteLock cfg_lock(*this);
00055 if (! m_is_running) {
00056
00057 openPipe();
00058 m_is_running = true;
00059
00060
00061 PION_LOG_DEBUG(m_logger, "Starting reader thread: " << getId());
00062 m_thread_ptr.reset(new boost::thread(boost::bind(&ScriptReactor::readEvents, this)));
00063 }
00064 }
00065
00066 bool ScriptReactor::stopIfRunning(void)
00067 {
00068 ConfigWriteLock cfg_lock(*this);
00069 if (m_is_running) {
00070 PION_LOG_DEBUG(m_logger, "Waiting for reader thread: " << getId());
00071 FILE_DESC_TYPE input_pipe = m_input_pipe;
00072
00073
00074 m_input_pipe = INVALID_DESCRIPTOR;
00075
00076 CLOSE_DESCRIPTOR(input_pipe);
00077
00078 m_thread_ptr->join();
00079 PION_LOG_DEBUG(m_logger, "Cleaned up reader thread: " << getId());
00080 m_thread_ptr.reset();
00081
00082
00083 closePipe();
00084 m_is_running = false;
00085 return true;
00086 }
00087
00088 return false;
00089 }
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 void ScriptReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00128 {
00129
00130 const bool was_running = stopIfRunning();
00131
00132 {
00133
00134 ConfigWriteLock cfg_lock(*this);
00135 Reactor::setConfig(v, config_ptr);
00136
00137
00138 if (! ConfigManager::getConfigOption(INPUT_CODEC_ELEMENT_NAME, m_input_codec_id, config_ptr))
00139 throw EmptyInputCodecException(getId());
00140 m_input_codec_ptr = getCodecFactory().getCodec(m_input_codec_id);
00141 PION_ASSERT(m_input_codec_ptr);
00142
00143
00144 if (! ConfigManager::getConfigOption(OUTPUT_CODEC_ELEMENT_NAME, m_output_codec_id, config_ptr))
00145 throw EmptyOutputCodecException(getId());
00146 m_output_codec_ptr = getCodecFactory().getCodec(m_output_codec_id);
00147 PION_ASSERT(m_output_codec_ptr);
00148
00149
00150 if (! ConfigManager::getConfigOption(COMMAND_ELEMENT_NAME, m_command, config_ptr))
00151 throw EmptyCommandException(getId());
00152
00153
00154 parseArguments();
00155 }
00156
00157
00158 if (was_running)
00159 start();
00160 }
00161
00162 void ScriptReactor::updateVocabulary(const Vocabulary& v)
00163 {
00164
00165 ConfigWriteLock cfg_lock(*this);
00166 Reactor::updateVocabulary(v);
00167
00168 if (m_input_codec_ptr)
00169 m_input_codec_ptr->updateVocabulary(v);
00170
00171 if (m_output_codec_ptr)
00172 m_output_codec_ptr->updateVocabulary(v);
00173 }
00174
00175 void ScriptReactor::updateCodecs(void)
00176 {
00177
00178 if (! getCodecFactory().hasPlugin(m_input_codec_id)
00179 || ! getCodecFactory().hasPlugin(m_output_codec_id))
00180 {
00181 stop();
00182 } else {
00183
00184 ConfigWriteLock cfg_lock(*this);
00185 m_input_codec_ptr = getCodecFactory().getCodec(m_input_codec_id);
00186 m_output_codec_ptr = getCodecFactory().getCodec(m_output_codec_id);
00187 }
00188 }
00189
00190 void ScriptReactor::process(const EventPtr& e)
00191 {
00192 PION_ASSERT(m_input_stream_ptr);
00193 PION_ASSERT(m_input_codec_ptr);
00194
00195
00196 boost::mutex::scoped_lock write_lock(m_write_mutex);
00197
00198
00199 m_input_codec_ptr->write(*m_input_stream_ptr, *e);
00200 if (! *m_input_stream_ptr)
00201 throw WriteToPipeException(getId());
00202 m_input_stream_ptr->flush();
00203
00204
00205 write_lock.unlock();
00206
00207
00208 }
00209
00210 void ScriptReactor::readEvents(void)
00211 {
00212 PION_LOG_DEBUG(m_logger, "Reader thread is running: " << getId());
00213
00214 try {
00215 const Event::EventType event_type(m_output_codec_ptr->getEventType());
00216 EventFactory event_factory;
00217 EventPtr event_ptr;
00218
00219 PION_ASSERT(m_output_stream_ptr);
00220 PION_ASSERT(m_output_codec_ptr);
00221
00222 while ( isRunning() ) {
00223
00224
00225
00226
00227
00228
00229 event_factory.create(event_ptr, event_type);
00230
00231 if (m_output_codec_ptr->read(*m_output_stream_ptr, *event_ptr)) {
00232
00233 deliverEvent(event_ptr);
00234 } else if (!m_output_stream_ptr->eof()) {
00235
00236 throw ReadFromPipeException(getId());
00237 }
00238
00239 if (m_output_stream_ptr->eof())
00240 break;
00241 }
00242 } catch (std::exception& e) {
00243 PION_LOG_FATAL(m_logger, e.what());
00244 }
00245
00246
00247 PION_LOG_DEBUG(m_logger, "Reader thread is terminal: " << getId());
00248 while (m_input_pipe != INVALID_DESCRIPTOR)
00249 PionScheduler::sleep(0, 250000000);
00250
00251 PION_LOG_DEBUG(m_logger, "Reader thread is exiting: " << getId());
00252 }
00253
00254 void ScriptReactor::openPipe(void)
00255 {
00256
00257 closePipe();
00258
00259 PION_LOG_DEBUG(m_logger, "Opening pipe to command: " << m_command);
00260
00261 PION_ASSERT(! m_args.empty());
00262
00263 #ifdef _MSC_VER
00264
00265
00266
00267
00268
00269 SECURITY_ATTRIBUTES sa_attr;
00270 sa_attr.nLength = sizeof(SECURITY_ATTRIBUTES);
00271 sa_attr.bInheritHandle = TRUE;
00272 sa_attr.lpSecurityDescriptor = NULL;
00273
00274
00275 HANDLE child_stdout_read = INVALID_HANDLE_VALUE;
00276 HANDLE child_stdout_write = INVALID_HANDLE_VALUE;
00277 HANDLE child_stdin_read = INVALID_HANDLE_VALUE;
00278 HANDLE child_stdin_write = INVALID_HANDLE_VALUE;
00279
00280
00281 if ( ! CreatePipe(&child_stdout_read, &child_stdout_write, &sa_attr, 0) )
00282 throw OpenPipeException(m_command);
00283
00284 if ( ! SetHandleInformation(child_stdout_read, HANDLE_FLAG_INHERIT, 0) ) {
00285 CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00286 throw OpenPipeException(m_command);
00287 }
00288
00289 if ( ! CreatePipe(&child_stdin_read, &child_stdin_write, &sa_attr, 0) ) {
00290 CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00291 throw OpenPipeException(m_command);
00292 }
00293
00294 if ( ! SetHandleInformation(child_stdin_write, HANDLE_FLAG_INHERIT, 0) ) {
00295 CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00296 CloseHandle(child_stdin_read); CloseHandle(child_stdin_write);
00297 throw OpenPipeException(m_command);
00298 }
00299
00300
00301 char *command = _strdup(m_command.c_str());
00302 PROCESS_INFORMATION proc_info;
00303 ZeroMemory( &proc_info, sizeof(PROCESS_INFORMATION) );
00304
00305
00306 STARTUPINFO start_info;
00307 ZeroMemory( &start_info, sizeof(STARTUPINFO) );
00308 start_info.cb = sizeof(STARTUPINFO);
00309 start_info.hStdInput = child_stdin_read;
00310 start_info.hStdOutput = child_stdout_write;
00311 start_info.hStdError = INVALID_HANDLE_VALUE;
00312 start_info.dwFlags |= STARTF_USESTDHANDLES;
00313
00314
00315 BOOL result = CreateProcess(NULL,
00316 command,
00317 NULL,
00318 NULL,
00319 TRUE,
00320 DETACHED_PROCESS,
00321 NULL,
00322 NULL,
00323 &start_info,
00324 &proc_info);
00325
00326
00327 free(command);
00328 if (result) {
00329 CloseHandle(child_stdin_read);
00330 CloseHandle(child_stdout_write);
00331 m_input_pipe = child_stdin_write;
00332 m_output_pipe = child_stdout_read;
00333 m_child = (LPPROCESS_INFORMATION)malloc(sizeof(PROCESS_INFORMATION));
00334 CopyMemory(m_child, &proc_info, sizeof(PROCESS_INFORMATION));
00335 } else {
00336
00337 CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00338 CloseHandle(child_stdin_read); CloseHandle(child_stdin_write);
00339 throw OpenPipeException(m_command);
00340 }
00341
00342 #else
00343
00344
00345 int r_pipes[2];
00346 if ( ::pipe(r_pipes) != 0 )
00347 throw OpenPipeException(m_command);
00348 int w_pipes[2];
00349 if ( ::pipe(w_pipes) != 0) {
00350 ::close(r_pipes[0]); ::close(r_pipes[1]);
00351 throw OpenPipeException(m_command);
00352 }
00353
00354
00355 m_child = ::fork();
00356 if (m_child == -1) {
00357
00358
00359 ::close(r_pipes[0]); ::close(r_pipes[1]);
00360 ::close(w_pipes[0]); ::close(w_pipes[1]);
00361 throw OpenPipeException(m_command);
00362 } else if (m_child == 0) {
00363
00364
00365 ::dup2(w_pipes[0], 0);
00366 ::dup2(r_pipes[1], 1);
00367
00368
00369 ::dup2(::open("/dev/null", O_WRONLY, 0), 2);
00370 for (int fd = ::getdtablesize() - 1; fd > 2; fd--)
00371 ::close(fd);
00372
00373
00374 boost::scoped_array<char*> arg_ptr(new char*[m_args.size() + 1]);
00375 for (std::size_t n = 0; n < m_args.size(); ++n) {
00376 arg_ptr[n] = const_cast<char*>(m_args[n].c_str());
00377 }
00378 arg_ptr[ m_args.size() ] = NULL;
00379
00380
00381
00382 ::execvp(m_args[0].c_str(), arg_ptr.get());
00383 _exit(127);
00384 } else {
00385
00386
00387 ::close(w_pipes[0]);
00388 ::close(r_pipes[1]);
00389
00390
00391 m_input_pipe = w_pipes[1];
00392 m_output_pipe = r_pipes[0];
00393 }
00394
00395 #endif
00396
00397
00398 m_input_streambuf_ptr.reset(new OStreamBuffer( m_input_pipe, CLOSE_ON_EXIT_FLAGS ));
00399 m_input_stream_ptr.reset(new std::ostream(m_input_streambuf_ptr.get()));
00400 m_output_streambuf_ptr.reset(new IStreamBuffer( m_output_pipe, CLOSE_ON_EXIT_FLAGS ));
00401 m_output_stream_ptr.reset(new std::istream(m_output_streambuf_ptr.get()));
00402 }
00403
00404 void ScriptReactor::closePipe(void)
00405 {
00406 if (m_input_pipe != INVALID_DESCRIPTOR || m_output_pipe != INVALID_DESCRIPTOR) {
00407 PION_LOG_DEBUG(m_logger, "Closing pipe to command: " << m_command);
00408
00409
00410
00411
00412 #ifdef _MSC_VER
00413 if (m_child) {
00414 WaitForSingleObject(m_child->hProcess, INFINITE);
00415
00416
00417 CloseHandle(m_child->hProcess);
00418 CloseHandle(m_child->hThread);
00419 free(m_child);
00420 }
00421 #else
00422 if (m_child > 0) {
00423 ::waitpid(m_child, 0, 0);
00424
00425
00426
00427
00428 }
00429 #endif
00430
00431
00432 m_input_stream_ptr.reset();
00433 m_output_stream_ptr.reset();
00434 m_input_streambuf_ptr.reset();
00435 m_output_streambuf_ptr.reset();
00436
00437
00438 m_input_pipe = m_output_pipe = INVALID_DESCRIPTOR;
00439 m_child = INVALID_PROCESS;
00440 }
00441 }
00442
00443 void ScriptReactor::parseArguments(void)
00444 {
00445 std::string arg;
00446 std::size_t next_pos = 0;
00447 char next_delimiter = ' ';
00448
00449 PION_LOG_DEBUG(m_logger, "Parsing out command string: " << m_command);
00450
00451
00452 m_args.clear();
00453
00454
00455 while (next_pos < m_command.size()) {
00456 switch ( m_command[next_pos] ) {
00457 case '\\':
00458 if ( next_pos + 1 < m_command.size() &&
00459 (m_command[next_pos+1]==' ' || m_command[next_pos+1]=='\"'
00460 || m_command[next_pos+1]=='\'') )
00461 {
00462
00463 arg.push_back( m_command[++next_pos] );
00464 } else {
00465
00466 arg.push_back( m_command[next_pos] );
00467 }
00468 break;
00469
00470 case ' ':
00471 if (next_delimiter == ' ') {
00472 if (! arg.empty()) {
00473
00474 m_args.push_back(arg);
00475 arg.clear();
00476 }
00477 } else {
00478
00479 arg.push_back(' ');
00480 }
00481 break;
00482
00483 case '\'':
00484 case '\"':
00485 if (next_delimiter == m_command[next_pos]) {
00486
00487 m_args.push_back(arg);
00488 arg.clear();
00489 next_delimiter = ' ';
00490 } else if (arg.empty()) {
00491
00492 next_delimiter = m_command[next_pos];
00493 } else {
00494
00495 arg.push_back( m_command[next_pos] );
00496 }
00497 break;
00498
00499 default:
00500
00501 arg.push_back( m_command[next_pos] );
00502 break;
00503 }
00504
00505 ++next_pos;
00506 }
00507
00508
00509 if (!arg.empty())
00510 m_args.push_back(arg);
00511
00512
00513 if (m_args.empty())
00514 throw CommandParsingException(m_command);
00515
00516
00517 for (std::vector<std::string>::const_iterator it = m_args.begin(); it != m_args.end(); ++it) {
00518 PION_LOG_DEBUG(m_logger, "Command string argument: " << *it);
00519 }
00520 }
00521
00522 }
00523 }
00524
00525
00527 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_ScriptReactor(void) {
00528 return new pion::plugins::ScriptReactor();
00529 }
00530
00532 extern "C" PION_PLUGIN_API void pion_destroy_ScriptReactor(pion::plugins::ScriptReactor *reactor_ptr) {
00533 delete reactor_ptr;
00534 }