platform/reactors/ScriptReactor.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2009 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <iostream>
00021 #include <boost/version.hpp>
00022 #include <boost/scoped_array.hpp>
00023 #include <pion/platform/CodecFactory.hpp>
00024 #include "ScriptReactor.hpp"
00025 
00026 
00027 #if ( (BOOST_VERSION / 100000) > 1 || (BOOST_VERSION / 100 % 1000) >= 44 )
00028     // For Boost.IOStreams 1.44 and later
00029     #define CLOSE_ON_EXIT_FLAGS boost::iostreams::close_handle
00030 #else
00031     // For Boost.IOStreams 1.43 and earlier
00032     #define CLOSE_ON_EXIT_FLAGS true
00033 #endif
00034 
00035 
00036 using namespace pion::platform;
00037 
00038 
00039 namespace pion {        // begin namespace pion
00040 namespace plugins {     // begin namespace plugins
00041 
00042 
00043 // static members of ScriptReactor
00044 
00045 const std::string       ScriptReactor::INPUT_CODEC_ELEMENT_NAME = "InputCodec";
00046 const std::string       ScriptReactor::OUTPUT_CODEC_ELEMENT_NAME = "OutputCodec";
00047 const std::string       ScriptReactor::COMMAND_ELEMENT_NAME = "Command";
00048 
00049 
00050 // ScriptReactor member functions
00051 
00052 void ScriptReactor::start(void)
00053 {
00054     ConfigWriteLock cfg_lock(*this);
00055     if (! m_is_running) {
00056         // open pipe to script
00057         openPipe();
00058         m_is_running = true;
00059 
00060         // spawn a new thread that will be used to read events from the script
00061         PION_LOG_DEBUG(m_logger, "Starting reader thread: " << getId());
00062         m_thread_ptr.reset(new boost::thread(boost::bind(&ScriptReactor::readEvents, this)));
00063     }
00064 }
00065 
00066 bool ScriptReactor::stopIfRunning(void)
00067 {
00068     ConfigWriteLock cfg_lock(*this);
00069     if (m_is_running) {
00070         PION_LOG_DEBUG(m_logger, "Waiting for reader thread: " << getId());
00071         FILE_DESC_TYPE input_pipe = m_input_pipe;
00072 
00073         // let the reader thread know we are intending to stop it...
00074         m_input_pipe = INVALID_DESCRIPTOR;
00075         // this should break the reader thread out of any blocking operations
00076         CLOSE_DESCRIPTOR(input_pipe);
00077 
00078         m_thread_ptr->join();
00079         PION_LOG_DEBUG(m_logger, "Cleaned up reader thread: " << getId());
00080         m_thread_ptr.reset();
00081 
00082         // close pipe to script
00083         closePipe();
00084         m_is_running = false;
00085         return true;
00086     }
00087 
00088     return false;
00089 }
00090 
00091 /* ORIGINAL VERSION
00092 bool ScriptReactor::stopIfRunning(void)
00093 {
00094     bool do_stop = false;
00095     {
00096         ConfigWriteLock cfg_lock(*this);
00097         if (m_is_running && m_input_pipe != INVALID_DESCRIPTOR) {
00098             PION_LOG_DEBUG(m_logger, "Waiting for reader thread: " << getId());
00099             FILE_DESC_TYPE input_pipe = m_input_pipe;
00100 
00101             // let the reader thread know we are intending to stop it...
00102             m_input_pipe = INVALID_DESCRIPTOR;
00103             // this should break the reader thread out of any blocking operations
00104             CLOSE_DESCRIPTOR(input_pipe);
00105 
00106             do_stop = true;
00107         }
00108     }
00109 
00110     if (do_stop) {
00111         // we have to release the write lock before joining, otherwise things deadlock =(
00112         m_thread_ptr->join();
00113 
00114         ConfigWriteLock cfg_lock(*this);
00115         PION_LOG_DEBUG(m_logger, "Cleaned up reader thread: " << getId());
00116         m_thread_ptr.reset();
00117 
00118         // close pipe to script
00119         closePipe();
00120         m_is_running = false;
00121     }
00122 
00123     return do_stop;
00124 }
00125 */
00126 
00127 void ScriptReactor::setConfig(const Vocabulary& v, const xmlNodePtr config_ptr)
00128 {
00129     // make sure it's not running (it needs to be restarted in case Command changes)
00130     const bool was_running = stopIfRunning();
00131 
00132     {
00133         // first set config options for the Reactor base class
00134         ConfigWriteLock cfg_lock(*this);
00135         Reactor::setConfig(v, config_ptr);
00136 
00137         // get the Input Codec that the Reactor should use
00138         if (! ConfigManager::getConfigOption(INPUT_CODEC_ELEMENT_NAME, m_input_codec_id, config_ptr))
00139             throw EmptyInputCodecException(getId());
00140         m_input_codec_ptr = getCodecFactory().getCodec(m_input_codec_id);
00141         PION_ASSERT(m_input_codec_ptr);
00142 
00143         // get the Output Codec that the Reactor should use
00144         if (! ConfigManager::getConfigOption(OUTPUT_CODEC_ELEMENT_NAME, m_output_codec_id, config_ptr))
00145             throw EmptyOutputCodecException(getId());
00146         m_output_codec_ptr = getCodecFactory().getCodec(m_output_codec_id);
00147         PION_ASSERT(m_output_codec_ptr);
00148 
00149         // get the script or program command to execute
00150         if (! ConfigManager::getConfigOption(COMMAND_ELEMENT_NAME, m_command, config_ptr))
00151             throw EmptyCommandException(getId());
00152 
00153         // break command string into a vector of arguments
00154         parseArguments();
00155     }
00156 
00157     // restart if the reactor was running
00158     if (was_running)
00159         start();
00160 }
00161 
00162 void ScriptReactor::updateVocabulary(const Vocabulary& v)
00163 {
00164     // first update anything in the Reactor base class that might be needed
00165     ConfigWriteLock cfg_lock(*this);
00166     Reactor::updateVocabulary(v);
00167 
00168     if (m_input_codec_ptr)
00169         m_input_codec_ptr->updateVocabulary(v);
00170 
00171     if (m_output_codec_ptr)
00172         m_output_codec_ptr->updateVocabulary(v);
00173 }
00174 
00175 void ScriptReactor::updateCodecs(void)
00176 {
00177     // check if a codec was deleted (if so, stop now!)
00178     if (! getCodecFactory().hasPlugin(m_input_codec_id)
00179         || ! getCodecFactory().hasPlugin(m_output_codec_id))
00180     {
00181         stop();
00182     } else {
00183         // update the codec pointer
00184         ConfigWriteLock cfg_lock(*this);
00185         m_input_codec_ptr = getCodecFactory().getCodec(m_input_codec_id);
00186         m_output_codec_ptr = getCodecFactory().getCodec(m_output_codec_id);
00187     }
00188 }
00189 
00190 void ScriptReactor::process(const EventPtr& e)
00191 {
00192     PION_ASSERT(m_input_stream_ptr);
00193     PION_ASSERT(m_input_codec_ptr);
00194 
00195     // lock mutex to ensure that only one Event may be written at a time
00196     boost::mutex::scoped_lock write_lock(m_write_mutex);
00197 
00198     // write the Event to the pipe
00199     m_input_codec_ptr->write(*m_input_stream_ptr, *e);
00200     if (! *m_input_stream_ptr)
00201         throw WriteToPipeException(getId());
00202     m_input_stream_ptr->flush();
00203 
00204     // unlock mutex after writing to pipe
00205     write_lock.unlock();
00206 
00207     // note: delivery to other reactors is handled by readEvents()
00208 }
00209 
00210 void ScriptReactor::readEvents(void)
00211 {
00212     PION_LOG_DEBUG(m_logger, "Reader thread is running: " << getId());
00213 
00214     try {
00215         const Event::EventType event_type(m_output_codec_ptr->getEventType());
00216         EventFactory event_factory;
00217         EventPtr event_ptr;
00218 
00219         PION_ASSERT(m_output_stream_ptr);
00220         PION_ASSERT(m_output_codec_ptr);
00221 
00222         while ( isRunning() ) {
00223             // we don't need this lock, because only setConfig can change the config,
00224             // and the first thing it does is stop the reactor...and, if we did grab
00225             // this lock, we'd have a deadlock if/when the codec-read blocks...
00226             //ConfigReadLock cfg_lock(*this);
00227 
00228             // get a new event from the EventFactory
00229             event_factory.create(event_ptr, event_type);
00230             // read an event using the output codec
00231             if (m_output_codec_ptr->read(*m_output_stream_ptr, *event_ptr)) {
00232                 // deliver the Event to other Reactors
00233                 deliverEvent(event_ptr);
00234             } else if (!m_output_stream_ptr->eof()) {
00235                 // error (other than EOF) reading event
00236                 throw ReadFromPipeException(getId());
00237             }
00238             // if we've reached EOF, we're done
00239             if (m_output_stream_ptr->eof())
00240                 break;
00241         }
00242     } catch (std::exception& e) {
00243         PION_LOG_FATAL(m_logger, e.what());
00244     }
00245 
00246     // wait for a stop to happen...
00247     PION_LOG_DEBUG(m_logger, "Reader thread is terminal: " << getId());
00248     while (m_input_pipe != INVALID_DESCRIPTOR)
00249         PionScheduler::sleep(0, 250000000);
00250 
00251     PION_LOG_DEBUG(m_logger, "Reader thread is exiting: " << getId());
00252 }
00253 
00254 void ScriptReactor::openPipe(void)
00255 {
00256     // close first if already open
00257     closePipe();
00258 
00259     PION_LOG_DEBUG(m_logger, "Opening pipe to command: " << m_command);
00260 
00261     PION_ASSERT(! m_args.empty());
00262 
00263 #ifdef _MSC_VER
00264 
00265     // create child process code adapted from:
00266     // http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx
00267 
00268     // prepare security attributes to use for creating pipes
00269     SECURITY_ATTRIBUTES sa_attr;
00270     sa_attr.nLength = sizeof(SECURITY_ATTRIBUTES);
00271     sa_attr.bInheritHandle = TRUE;
00272     sa_attr.lpSecurityDescriptor = NULL;
00273 
00274     // from int_tmain():
00275     HANDLE child_stdout_read = INVALID_HANDLE_VALUE;
00276     HANDLE child_stdout_write = INVALID_HANDLE_VALUE;
00277     HANDLE child_stdin_read = INVALID_HANDLE_VALUE;
00278     HANDLE child_stdin_write = INVALID_HANDLE_VALUE;
00279 
00280     // create pipe for child stdout
00281     if ( ! CreatePipe(&child_stdout_read, &child_stdout_write, &sa_attr, 0) )
00282         throw OpenPipeException(m_command);
00283     // ensure read handle for child stdout is not inherited
00284     if ( ! SetHandleInformation(child_stdout_read, HANDLE_FLAG_INHERIT, 0) ) {
00285         CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00286         throw OpenPipeException(m_command);
00287     }
00288     // create pipe for child stdin
00289     if ( ! CreatePipe(&child_stdin_read, &child_stdin_write, &sa_attr, 0) ) {
00290         CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00291         throw OpenPipeException(m_command);
00292     }
00293     // ensure write handle for child stdin is not inherited
00294     if ( ! SetHandleInformation(child_stdin_write, HANDLE_FLAG_INHERIT, 0) ) {
00295         CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00296         CloseHandle(child_stdin_read); CloseHandle(child_stdin_write);
00297         throw OpenPipeException(m_command);
00298     }
00299 
00300     // from CreateChildProcess(), with some liberties...:
00301     char *command = _strdup(m_command.c_str());
00302     PROCESS_INFORMATION proc_info;
00303     ZeroMemory( &proc_info, sizeof(PROCESS_INFORMATION) );
00304 
00305     // prepare startup information for child process
00306     STARTUPINFO start_info;
00307     ZeroMemory( &start_info, sizeof(STARTUPINFO) );
00308     start_info.cb = sizeof(STARTUPINFO);
00309     start_info.hStdInput = child_stdin_read;
00310     start_info.hStdOutput = child_stdout_write;
00311     start_info.hStdError = INVALID_HANDLE_VALUE;
00312     start_info.dwFlags |= STARTF_USESTDHANDLES;
00313 
00314     // create the child process
00315     BOOL result = CreateProcess(NULL,
00316         command,        // command line
00317         NULL,           // process security attributes
00318         NULL,           // primary thread security attributes
00319         TRUE,           // handles are inherited
00320         DETACHED_PROCESS,   // creation flags
00321         NULL,           // use parent's environment
00322         NULL,           // use parent's current directory
00323         &start_info,    // STARTUPINFO pointer
00324         &proc_info);    // receives PROCESS_INFORMATION
00325 
00326     // check result of creating child process
00327     free(command);
00328     if (result) {
00329         CloseHandle(child_stdin_read);
00330         CloseHandle(child_stdout_write);
00331         m_input_pipe = child_stdin_write;
00332         m_output_pipe = child_stdout_read;
00333         m_child = (LPPROCESS_INFORMATION)malloc(sizeof(PROCESS_INFORMATION));
00334         CopyMemory(m_child, &proc_info, sizeof(PROCESS_INFORMATION));
00335     } else {
00336         // close all pipes
00337         CloseHandle(child_stdout_read); CloseHandle(child_stdout_write);
00338         CloseHandle(child_stdin_read); CloseHandle(child_stdin_write);
00339         throw OpenPipeException(m_command);
00340     }
00341 
00342 #else
00343 
00344     // initialize pipes for reading and writing to script
00345     int r_pipes[2];
00346     if ( ::pipe(r_pipes) != 0 )
00347         throw OpenPipeException(m_command);
00348     int w_pipes[2];
00349     if ( ::pipe(w_pipes) != 0) {
00350         ::close(r_pipes[0]); ::close(r_pipes[1]);
00351         throw OpenPipeException(m_command);
00352     }
00353 
00354     // fork child process
00355     m_child = ::fork();
00356     if (m_child == -1) {
00357         // inside parent process
00358         // failed to fork child process
00359         ::close(r_pipes[0]); ::close(r_pipes[1]);
00360         ::close(w_pipes[0]); ::close(w_pipes[1]);
00361         throw OpenPipeException(m_command);
00362     } else if (m_child == 0) {
00363         // inside child process
00364         // bind ends of pipes used by child to STDIN/STDOUT
00365         ::dup2(w_pipes[0], 0);  // STDIN
00366         ::dup2(r_pipes[1], 1);  // STDOUT
00367 
00368         // "blackhole" STDERR, and close all others
00369         ::dup2(::open("/dev/null", O_WRONLY, 0), 2);
00370         for (int fd = ::getdtablesize() - 1; fd > 2; fd--)
00371             ::close(fd);
00372 
00373         // convert argument string vector into an array of char pointers
00374         boost::scoped_array<char*> arg_ptr(new char*[m_args.size() + 1]);
00375         for (std::size_t n = 0; n < m_args.size(); ++n) {
00376             arg_ptr[n] = const_cast<char*>(m_args[n].c_str());
00377         }
00378         arg_ptr[ m_args.size() ] = NULL;
00379 
00380         // execute command (ignore return since we're in a new process anyway)
00381         // note: execvp() is wrapper for execve() that also searches the paths
00382         ::execvp(m_args[0].c_str(), arg_ptr.get());
00383         _exit(127);
00384     } else {
00385         // inside parent process
00386         // close ends of pipes used by child
00387         ::close(w_pipes[0]);
00388         ::close(r_pipes[1]);
00389 
00390         // set file descriptors for reading/writing
00391         m_input_pipe = w_pipes[1];
00392         m_output_pipe = r_pipes[0];
00393     }
00394 
00395 #endif
00396 
00397     // prepare c++ streams to use pipes for reading and writing events
00398     m_input_streambuf_ptr.reset(new OStreamBuffer( m_input_pipe, CLOSE_ON_EXIT_FLAGS ));
00399     m_input_stream_ptr.reset(new std::ostream(m_input_streambuf_ptr.get()));
00400     m_output_streambuf_ptr.reset(new IStreamBuffer( m_output_pipe, CLOSE_ON_EXIT_FLAGS ));
00401     m_output_stream_ptr.reset(new std::istream(m_output_streambuf_ptr.get()));
00402 }
00403 
00404 void ScriptReactor::closePipe(void)
00405 {
00406     if (m_input_pipe != INVALID_DESCRIPTOR || m_output_pipe != INVALID_DESCRIPTOR) {
00407         PION_LOG_DEBUG(m_logger, "Closing pipe to command: " << m_command);
00408 
00409         // note: file descriptors are closed by the iostreams
00410 
00411         // close child process
00412 #ifdef _MSC_VER
00413         if (m_child) {
00414             WaitForSingleObject(m_child->hProcess, INFINITE);
00415             // Perhaps, after a delay...terminate it?
00416             //TerminateProcess(m_child->hProcess, 0);
00417             CloseHandle(m_child->hProcess);
00418             CloseHandle(m_child->hThread);
00419             free(m_child);
00420         }
00421 #else
00422         if (m_child > 0) {
00423             ::waitpid(m_child, 0, 0);
00424             // Perhaps, after a delay...terminate it?
00425             //::kill(m_child, SIGTERM);
00426             // Perhaps, after some more delay...kill it?
00427             //::kill(m_child, SIGKILL);
00428         }
00429 #endif
00430 
00431         // close iostreams  
00432         m_input_stream_ptr.reset();
00433         m_output_stream_ptr.reset();
00434         m_input_streambuf_ptr.reset();
00435         m_output_streambuf_ptr.reset();
00436 
00437         // reset pipe handles and child
00438         m_input_pipe = m_output_pipe = INVALID_DESCRIPTOR;
00439         m_child = INVALID_PROCESS;
00440     }
00441 }
00442 
00443 void ScriptReactor::parseArguments(void)
00444 {
00445     std::string arg;            // current argument string
00446     std::size_t next_pos = 0;   // next parsing position
00447     char next_delimiter = ' ';  // next argument delimiter
00448 
00449     PION_LOG_DEBUG(m_logger, "Parsing out command string: " << m_command);
00450 
00451     // start by clearing the arguments vector
00452     m_args.clear();
00453 
00454     // step through each character in the command string
00455     while (next_pos < m_command.size()) {
00456         switch ( m_command[next_pos] ) {
00457         case '\\':
00458             if ( next_pos + 1 < m_command.size() &&
00459                 (m_command[next_pos+1]==' ' || m_command[next_pos+1]=='\"'
00460                 || m_command[next_pos+1]=='\'') )
00461             {
00462                 // escape next character
00463                 arg.push_back( m_command[++next_pos] );
00464             } else {
00465                 // append slash
00466                 arg.push_back( m_command[next_pos] );
00467             }
00468             break;
00469 
00470         case ' ':
00471             if (next_delimiter == ' ') {
00472                 if (! arg.empty()) {    // skip leading whitespace
00473                     // finished with space-delimited argument
00474                     m_args.push_back(arg);
00475                     arg.clear();
00476                 }
00477             } else {
00478                 // append space -- not space-delimited
00479                 arg.push_back(' ');
00480             }
00481             break;
00482 
00483         case '\'':
00484         case '\"':
00485             if (next_delimiter == m_command[next_pos]) {
00486                 // finished with quote/tick-delimited argument
00487                 m_args.push_back(arg);
00488                 arg.clear();
00489                 next_delimiter = ' ';
00490             } else if (arg.empty()) {
00491                 // first char in argument: set delimiter
00492                 next_delimiter = m_command[next_pos];
00493             } else {
00494                 // append quote/tick -- not the delimiter or first character
00495                 arg.push_back( m_command[next_pos] );
00496             }
00497             break;
00498 
00499         default:
00500             // append character
00501             arg.push_back( m_command[next_pos] );
00502             break;
00503         }
00504 
00505         ++next_pos;
00506     }
00507 
00508     // push final argument if ending delimiter not found
00509     if (!arg.empty())
00510         m_args.push_back(arg);
00511 
00512     // sanity check: args should never be empty
00513     if (m_args.empty())
00514         throw CommandParsingException(m_command);
00515 
00516     // log each argument parsed for debugging
00517     for (std::vector<std::string>::const_iterator it = m_args.begin(); it != m_args.end(); ++it) {
00518         PION_LOG_DEBUG(m_logger, "Command string argument: " << *it);
00519     }
00520 }
00521 
00522 }   // end namespace plugins
00523 }   // end namespace pion
00524 
00525 
00527 extern "C" PION_PLUGIN_API pion::platform::Reactor *pion_create_ScriptReactor(void) {
00528     return new pion::plugins::ScriptReactor();
00529 }
00530 
00532 extern "C" PION_PLUGIN_API void pion_destroy_ScriptReactor(pion::plugins::ScriptReactor *reactor_ptr) {
00533     delete reactor_ptr;
00534 }

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7