platform/services/FeedService.cpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #include <sstream>
00021 #include <boost/bind.hpp>
00022 #include <pion/PionId.hpp>
00023 #include <pion/net/HTTPResponse.hpp>
00024 #include <pion/net/HTTPResponseWriter.hpp>
00025 #include "PlatformConfig.hpp"
00026 #include "FeedService.hpp"
00027 
00028 using namespace pion::net;
00029 using namespace pion::server;
00030 using namespace pion::platform;
00031 
00032 
00033 namespace pion {        // begin namespace pion
00034 namespace plugins {     // begin namespace plugins
00035 
00036 
00037 // FeedHandler member functions
00038     
00039 FeedHandler::FeedHandler(pion::platform::ReactionEngine &reaction_engine,
00040                          const std::string& reactor_id, pion::platform::CodecPtr& codec_ptr,
00041                          pion::net::TCPConnectionPtr& tcp_conn)
00042     : m_reaction_engine(reaction_engine),
00043     m_logger(PION_GET_LOGGER("pion.FeedService.FeedHandler")),
00044     m_connection_id(PionId().to_string()),
00045     m_connection_info(createConnectionInfo(tcp_conn)),
00046     m_reactor_id(reactor_id), m_codec_ptr(codec_ptr),
00047     m_tcp_conn(tcp_conn), m_tcp_stream(m_tcp_conn)
00048 {}
00049 
00050 bool FeedHandler::sendResponse(void)
00051 {
00052     // build some XML response content for the request
00053     std::stringstream ss;
00054     m_reaction_engine.writeConnectionsXML(ss, getConnectionId());
00055     
00056     // prepare a 201 (created) HTTP response
00057     HTTPResponse http_response;
00058     http_response.setStatusCode(HTTPTypes::RESPONSE_CODE_CREATED);
00059     http_response.setStatusMessage(HTTPTypes::RESPONSE_MESSAGE_CREATED);
00060     http_response.setContentType(HTTPTypes::CONTENT_TYPE_XML);
00061     http_response.setContentLength(ss.str().size());
00062     char *content_ptr = http_response.createContentBuffer();
00063     strcpy(content_ptr, ss.str().c_str());
00064     
00065     // notify peer that the connection will remain open
00066     m_tcp_conn->setLifecycle(TCPConnection::LIFECYCLE_KEEPALIVE);
00067 
00068     // send the HTTP response
00069     boost::system::error_code ec;
00070     http_response.send(*m_tcp_conn, ec);
00071     
00072     // make sure the connection will get closed when we are done
00073     m_tcp_conn->setLifecycle(TCPConnection::LIFECYCLE_CLOSE);
00074     
00075     return !ec;
00076 }
00077     
00078 std::string FeedHandler::createConnectionInfo(pion::net::TCPConnectionPtr& tcp_conn)
00079 {
00080     std::stringstream ss;
00081     ss << tcp_conn->getRemoteIp() << ':' << tcp_conn->getRemotePort();
00082     return ss.str();
00083 }
00084     
00085     
00086 // FeedWriter member functions
00087 
00088 FeedWriter::FeedWriter(pion::platform::ReactionEngine &reaction_engine,
00089                        const std::string& reactor_id, pion::platform::CodecPtr& codec_ptr,
00090                        pion::net::TCPConnectionPtr& tcp_conn)
00091     : FeedHandler(reaction_engine, reactor_id, codec_ptr, tcp_conn)
00092 {}
00093     
00094 FeedWriter::~FeedWriter()
00095 {
00096     PION_LOG_INFO(m_logger, "Closing output feed to " << getConnectionInfo()
00097                   << " (" << getConnectionId() << ')');
00098 }
00099     
00100 void FeedWriter::writeEvent(EventPtr& e)
00101 {
00102     PION_LOG_DEBUG(m_logger, "Sending event to " << getConnectionInfo()
00103                    << " (" << getConnectionId() << ')');
00104     // lock the mutex to ensure that only one Event is sent at a time
00105     boost::mutex::scoped_lock send_lock(m_mutex);
00106     if (e.get() == NULL) {
00107         // Reactor is being removed -> close the connection
00108         m_tcp_stream.close();
00109         // note that the ReactionEngine will remove the connection for us
00110     } else if (!m_tcp_stream || !m_tcp_conn->is_open()) {
00111         PION_LOG_DEBUG(m_logger, "Lost connection to " << getConnectionInfo()
00112                       << " (" << getConnectionId() << ')');
00113         // connection was lost -> tell ReactionEngine to remove the connection
00114         m_reaction_engine.post(boost::bind(&ReactionEngine::removeTempConnection,
00115                                            &m_reaction_engine, getConnectionId()));
00116     } else {
00117         try {
00118             // send the Event using the codec
00119             m_codec_ptr->write(m_tcp_stream, *e);
00120         } catch (std::exception& ex) {
00121             // stop sending Events if we encounter an exception
00122             PION_LOG_WARN(m_logger, "Error sending event to " << getConnectionInfo()
00123                           << " (" << getConnectionId() << "):" << ex.what());
00124             m_reaction_engine.post(boost::bind(&ReactionEngine::removeTempConnection,
00125                                                &m_reaction_engine, getConnectionId()));
00126         }
00127     }
00128 }
00129     
00130 void FeedWriter::start(void)
00131 {
00132     // lock the mutex to ensure that the HTTP response is sent first
00133     boost::mutex::scoped_lock send_lock(m_mutex);
00134 
00135     // tell the ReactionEngine to start sending us Events
00136     Reactor::EventHandler event_handler(boost::bind(&FeedWriter::writeEvent,
00137                                                     shared_from_this(), _1));
00138     m_reaction_engine.addTempConnectionOut(getReactorId(), getConnectionId(),
00139                                            getConnectionInfo(),
00140                                            event_handler);
00141 
00142     // send the HTTP response after the connection is successfully created
00143     if (! sendResponse())
00144         return;
00145     
00146     PION_LOG_INFO(m_logger, "Opened new output feed to " << getConnectionInfo()
00147                   << " (" << getConnectionId() << ')');
00148 }
00149     
00150     
00151 // FeedReader member functions
00152 
00153 FeedReader::FeedReader(pion::platform::ReactionEngine &reaction_engine,
00154                        const std::string& reactor_id, pion::platform::CodecPtr& codec_ptr,
00155                        pion::net::TCPConnectionPtr& tcp_conn)
00156     : FeedHandler(reaction_engine, reactor_id, codec_ptr, tcp_conn),
00157     m_reactor_ptr(NULL)
00158 {}
00159     
00160 FeedReader::~FeedReader()
00161 {
00162     PION_LOG_INFO(m_logger, "Closing input feed from " << getConnectionInfo()
00163                   << " (" << getConnectionId() << ')');
00164 }
00165         
00166 void FeedReader::reactorWasRemoved(void)
00167 {
00168     // set the Reactor pointer to null so that we know to stop sending Events
00169     boost::mutex::scoped_lock pointer_lock(m_mutex);
00170     m_reactor_ptr = NULL;
00171     // close the TCP stream to force it to stop blocking for input
00172     m_tcp_stream.close();
00173 }
00174 
00175 void FeedReader::start(void)
00176 {
00177     // initialize the Reactor pointer by registering a connection through ReactionEngine
00178     m_reactor_ptr =
00179         m_reaction_engine.addTempConnectionIn(getReactorId(), getConnectionId(),
00180                                               getConnectionInfo(),
00181                                               boost::bind(&FeedReader::reactorWasRemoved,
00182                                                           shared_from_this()));
00183     
00184     // send the HTTP response after the connection is created
00185     if (! sendResponse())
00186         return;
00187 
00188     PION_LOG_INFO(m_logger, "Opened new input feed from " << getConnectionInfo()
00189                   << " (" << getConnectionId() << ')');
00190 
00191     // just stop gracefully if exception is thrown
00192     try {
00193         // read Events form the TCP connection until it is closed or an error occurs
00194         const Event::EventType event_type(m_codec_ptr->getEventType());
00195         EventFactory event_factory;
00196         EventPtr event_ptr;
00197         event_factory.create(event_ptr, event_type);
00198         while (m_tcp_stream.is_open() && m_codec_ptr->read(m_tcp_stream, *event_ptr)) {
00199             PION_LOG_DEBUG(m_logger, "Read new event from " << getConnectionInfo()
00200                            << " (" << getConnectionId() << ')');
00201             boost::mutex::scoped_lock pointer_lock(m_mutex);
00202             // stop reading Events if the Reactor was removed
00203             if (m_reactor_ptr == NULL)
00204                 return;
00205             // discard the event if the Reactor is not running
00206             if (m_reactor_ptr->isRunning())
00207                 (*m_reactor_ptr)(event_ptr);
00208             pointer_lock.unlock();
00209             event_factory.create(event_ptr, event_type);
00210         }
00211     } catch (std::exception&) {}
00212 
00213     // un-register the connection before exiting
00214     // only if the Reactor pointer is not NULL (to prevent deadlock)
00215     boost::mutex::scoped_lock pointer_lock(m_mutex);
00216     if (m_reactor_ptr != NULL)
00217         m_reaction_engine.removeTempConnection(getConnectionId());
00218 }
00219     
00220     
00221 // FeedService member functions
00222 
00223 void FeedService::operator()(HTTPRequestPtr& request, TCPConnectionPtr& tcp_conn)
00224 {
00225     // split out the path branches from the HTTP request
00226     PathBranches branches;
00227     splitPathBranches(branches, request->getResource());
00228     
00229     // make sure that there are two extra path branches in the request
00230     if (branches.size() != 2) {
00231         // Log an error and send a 404 (Not Found) response.
00232         handleNotFoundRequest(request, tcp_conn);
00233         return;
00234     }
00235     
00236     // get the reactor_id from the first path branche
00237     const std::string reactor_id(branches[0]);
00238     if (reactor_id.empty() || !getConfig().getReactionEngine().hasPlugin(reactor_id)) {
00239         // Log an error and send a 404 (Not Found) response.
00240         handleNotFoundRequest(request, tcp_conn);
00241         return;
00242     }
00243 
00244     // Check whether the User has permission for this Reactor.
00245     bool reactor_allowed = getConfig().getUserManagerPtr()->accessAllowed(request->getUser(), getConfig().getReactionEngine(), reactor_id);
00246     if (! reactor_allowed) {
00247         // Log an error and send a 403 (Forbidden) response.
00248         std::string error_msg = "User doesn't have permission for Reactor " + reactor_id + ".";
00249         handleForbiddenRequest(request, tcp_conn, error_msg);
00250         return;
00251     }
00252 
00253     // get the codec_id from the second path branch
00254     const std::string codec_id(branches[1]);
00255     CodecPtr codec_ptr(getConfig().getCodecFactory().getCodec(codec_id));
00256     if (codec_id.empty() || !codec_ptr) {
00257         // Log an error and send a 404 (Not Found) response.
00258         handleNotFoundRequest(request, tcp_conn);
00259         return;
00260     }
00261     
00262     // check the request method to determine if we should read or write Events
00263     if (request->getMethod() == HTTPTypes::REQUEST_METHOD_GET) {
00264 
00265         // request made to receive a stream of Events
00266         
00267         // create a FeedWriter object that will be used to send Events
00268         FeedWriterPtr writer_ptr(new FeedWriter(getConfig().getReactionEngine(),
00269                                                 reactor_id, codec_ptr, tcp_conn));
00270         writer_ptr->start();
00271         
00272     } else if (request->getMethod() == HTTPTypes::REQUEST_METHOD_PUT
00273                || request->getMethod() == HTTPTypes::REQUEST_METHOD_POST)
00274     {
00275 
00276         // request made to send a stream of Events to a Reactor
00277 
00278         // create a FeedReader object that will be used to send Events
00279         FeedReaderPtr reader_ptr(new FeedReader(getConfig().getReactionEngine(),
00280                                                 reactor_id, codec_ptr, tcp_conn)); 
00281 
00282         // schedule another thread to start reading events
00283         getConfig().getServiceManager().post(boost::bind(&FeedReader::start,
00284                                                          reader_ptr));
00285         
00286     } else if (request->getMethod() == HTTPTypes::REQUEST_METHOD_HEAD) {
00287         
00288         // request is just checking if the reactor is valid -> return OK
00289         HTTPResponseWriterPtr response_writer(HTTPResponseWriter::create(tcp_conn, *request,
00290                                               boost::bind(&TCPConnection::finish, tcp_conn)));
00291         response_writer->send();
00292 
00293     } else {
00294         // Log an error and send a 405 (Method Not Allowed) response.
00295         handleMethodNotAllowed(request, tcp_conn, "GET, POST, PUT, HEAD");
00296         return;
00297     }   
00298 }
00299 
00300 }   // end namespace plugins
00301 }   // end namespace pion
00302 
00303 
00305 extern "C" PION_PLUGIN_API pion::server::PlatformService *pion_create_FeedService(void) {
00306     return new pion::plugins::FeedService();
00307 }
00308 
00310 extern "C" PION_PLUGIN_API void pion_destroy_FeedService(pion::plugins::FeedService *service_ptr) {
00311     delete service_ptr;
00312 }

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7