00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <sstream>
00021 #include <boost/bind.hpp>
00022 #include <pion/PionId.hpp>
00023 #include <pion/net/HTTPResponse.hpp>
00024 #include <pion/net/HTTPResponseWriter.hpp>
00025 #include "PlatformConfig.hpp"
00026 #include "FeedService.hpp"
00027
00028 using namespace pion::net;
00029 using namespace pion::server;
00030 using namespace pion::platform;
00031
00032
00033 namespace pion {
00034 namespace plugins {
00035
00036
00037
00038
00039 FeedHandler::FeedHandler(pion::platform::ReactionEngine &reaction_engine,
00040 const std::string& reactor_id, pion::platform::CodecPtr& codec_ptr,
00041 pion::net::TCPConnectionPtr& tcp_conn)
00042 : m_reaction_engine(reaction_engine),
00043 m_logger(PION_GET_LOGGER("pion.FeedService.FeedHandler")),
00044 m_connection_id(PionId().to_string()),
00045 m_connection_info(createConnectionInfo(tcp_conn)),
00046 m_reactor_id(reactor_id), m_codec_ptr(codec_ptr),
00047 m_tcp_conn(tcp_conn), m_tcp_stream(m_tcp_conn)
00048 {}
00049
00050 bool FeedHandler::sendResponse(void)
00051 {
00052
00053 std::stringstream ss;
00054 m_reaction_engine.writeConnectionsXML(ss, getConnectionId());
00055
00056
00057 HTTPResponse http_response;
00058 http_response.setStatusCode(HTTPTypes::RESPONSE_CODE_CREATED);
00059 http_response.setStatusMessage(HTTPTypes::RESPONSE_MESSAGE_CREATED);
00060 http_response.setContentType(HTTPTypes::CONTENT_TYPE_XML);
00061 http_response.setContentLength(ss.str().size());
00062 char *content_ptr = http_response.createContentBuffer();
00063 strcpy(content_ptr, ss.str().c_str());
00064
00065
00066 m_tcp_conn->setLifecycle(TCPConnection::LIFECYCLE_KEEPALIVE);
00067
00068
00069 boost::system::error_code ec;
00070 http_response.send(*m_tcp_conn, ec);
00071
00072
00073 m_tcp_conn->setLifecycle(TCPConnection::LIFECYCLE_CLOSE);
00074
00075 return !ec;
00076 }
00077
00078 std::string FeedHandler::createConnectionInfo(pion::net::TCPConnectionPtr& tcp_conn)
00079 {
00080 std::stringstream ss;
00081 ss << tcp_conn->getRemoteIp() << ':' << tcp_conn->getRemotePort();
00082 return ss.str();
00083 }
00084
00085
00086
00087
00088 FeedWriter::FeedWriter(pion::platform::ReactionEngine &reaction_engine,
00089 const std::string& reactor_id, pion::platform::CodecPtr& codec_ptr,
00090 pion::net::TCPConnectionPtr& tcp_conn)
00091 : FeedHandler(reaction_engine, reactor_id, codec_ptr, tcp_conn)
00092 {}
00093
00094 FeedWriter::~FeedWriter()
00095 {
00096 PION_LOG_INFO(m_logger, "Closing output feed to " << getConnectionInfo()
00097 << " (" << getConnectionId() << ')');
00098 }
00099
00100 void FeedWriter::writeEvent(EventPtr& e)
00101 {
00102 PION_LOG_DEBUG(m_logger, "Sending event to " << getConnectionInfo()
00103 << " (" << getConnectionId() << ')');
00104
00105 boost::mutex::scoped_lock send_lock(m_mutex);
00106 if (e.get() == NULL) {
00107
00108 m_tcp_stream.close();
00109
00110 } else if (!m_tcp_stream || !m_tcp_conn->is_open()) {
00111 PION_LOG_DEBUG(m_logger, "Lost connection to " << getConnectionInfo()
00112 << " (" << getConnectionId() << ')');
00113
00114 m_reaction_engine.post(boost::bind(&ReactionEngine::removeTempConnection,
00115 &m_reaction_engine, getConnectionId()));
00116 } else {
00117 try {
00118
00119 m_codec_ptr->write(m_tcp_stream, *e);
00120 } catch (std::exception& ex) {
00121
00122 PION_LOG_WARN(m_logger, "Error sending event to " << getConnectionInfo()
00123 << " (" << getConnectionId() << "):" << ex.what());
00124 m_reaction_engine.post(boost::bind(&ReactionEngine::removeTempConnection,
00125 &m_reaction_engine, getConnectionId()));
00126 }
00127 }
00128 }
00129
00130 void FeedWriter::start(void)
00131 {
00132
00133 boost::mutex::scoped_lock send_lock(m_mutex);
00134
00135
00136 Reactor::EventHandler event_handler(boost::bind(&FeedWriter::writeEvent,
00137 shared_from_this(), _1));
00138 m_reaction_engine.addTempConnectionOut(getReactorId(), getConnectionId(),
00139 getConnectionInfo(),
00140 event_handler);
00141
00142
00143 if (! sendResponse())
00144 return;
00145
00146 PION_LOG_INFO(m_logger, "Opened new output feed to " << getConnectionInfo()
00147 << " (" << getConnectionId() << ')');
00148 }
00149
00150
00151
00152
00153 FeedReader::FeedReader(pion::platform::ReactionEngine &reaction_engine,
00154 const std::string& reactor_id, pion::platform::CodecPtr& codec_ptr,
00155 pion::net::TCPConnectionPtr& tcp_conn)
00156 : FeedHandler(reaction_engine, reactor_id, codec_ptr, tcp_conn),
00157 m_reactor_ptr(NULL)
00158 {}
00159
00160 FeedReader::~FeedReader()
00161 {
00162 PION_LOG_INFO(m_logger, "Closing input feed from " << getConnectionInfo()
00163 << " (" << getConnectionId() << ')');
00164 }
00165
00166 void FeedReader::reactorWasRemoved(void)
00167 {
00168
00169 boost::mutex::scoped_lock pointer_lock(m_mutex);
00170 m_reactor_ptr = NULL;
00171
00172 m_tcp_stream.close();
00173 }
00174
00175 void FeedReader::start(void)
00176 {
00177
00178 m_reactor_ptr =
00179 m_reaction_engine.addTempConnectionIn(getReactorId(), getConnectionId(),
00180 getConnectionInfo(),
00181 boost::bind(&FeedReader::reactorWasRemoved,
00182 shared_from_this()));
00183
00184
00185 if (! sendResponse())
00186 return;
00187
00188 PION_LOG_INFO(m_logger, "Opened new input feed from " << getConnectionInfo()
00189 << " (" << getConnectionId() << ')');
00190
00191
00192 try {
00193
00194 const Event::EventType event_type(m_codec_ptr->getEventType());
00195 EventFactory event_factory;
00196 EventPtr event_ptr;
00197 event_factory.create(event_ptr, event_type);
00198 while (m_tcp_stream.is_open() && m_codec_ptr->read(m_tcp_stream, *event_ptr)) {
00199 PION_LOG_DEBUG(m_logger, "Read new event from " << getConnectionInfo()
00200 << " (" << getConnectionId() << ')');
00201 boost::mutex::scoped_lock pointer_lock(m_mutex);
00202
00203 if (m_reactor_ptr == NULL)
00204 return;
00205
00206 if (m_reactor_ptr->isRunning())
00207 (*m_reactor_ptr)(event_ptr);
00208 pointer_lock.unlock();
00209 event_factory.create(event_ptr, event_type);
00210 }
00211 } catch (std::exception&) {}
00212
00213
00214
00215 boost::mutex::scoped_lock pointer_lock(m_mutex);
00216 if (m_reactor_ptr != NULL)
00217 m_reaction_engine.removeTempConnection(getConnectionId());
00218 }
00219
00220
00221
00222
00223 void FeedService::operator()(HTTPRequestPtr& request, TCPConnectionPtr& tcp_conn)
00224 {
00225
00226 PathBranches branches;
00227 splitPathBranches(branches, request->getResource());
00228
00229
00230 if (branches.size() != 2) {
00231
00232 handleNotFoundRequest(request, tcp_conn);
00233 return;
00234 }
00235
00236
00237 const std::string reactor_id(branches[0]);
00238 if (reactor_id.empty() || !getConfig().getReactionEngine().hasPlugin(reactor_id)) {
00239
00240 handleNotFoundRequest(request, tcp_conn);
00241 return;
00242 }
00243
00244
00245 bool reactor_allowed = getConfig().getUserManagerPtr()->accessAllowed(request->getUser(), getConfig().getReactionEngine(), reactor_id);
00246 if (! reactor_allowed) {
00247
00248 std::string error_msg = "User doesn't have permission for Reactor " + reactor_id + ".";
00249 handleForbiddenRequest(request, tcp_conn, error_msg);
00250 return;
00251 }
00252
00253
00254 const std::string codec_id(branches[1]);
00255 CodecPtr codec_ptr(getConfig().getCodecFactory().getCodec(codec_id));
00256 if (codec_id.empty() || !codec_ptr) {
00257
00258 handleNotFoundRequest(request, tcp_conn);
00259 return;
00260 }
00261
00262
00263 if (request->getMethod() == HTTPTypes::REQUEST_METHOD_GET) {
00264
00265
00266
00267
00268 FeedWriterPtr writer_ptr(new FeedWriter(getConfig().getReactionEngine(),
00269 reactor_id, codec_ptr, tcp_conn));
00270 writer_ptr->start();
00271
00272 } else if (request->getMethod() == HTTPTypes::REQUEST_METHOD_PUT
00273 || request->getMethod() == HTTPTypes::REQUEST_METHOD_POST)
00274 {
00275
00276
00277
00278
00279 FeedReaderPtr reader_ptr(new FeedReader(getConfig().getReactionEngine(),
00280 reactor_id, codec_ptr, tcp_conn));
00281
00282
00283 getConfig().getServiceManager().post(boost::bind(&FeedReader::start,
00284 reader_ptr));
00285
00286 } else if (request->getMethod() == HTTPTypes::REQUEST_METHOD_HEAD) {
00287
00288
00289 HTTPResponseWriterPtr response_writer(HTTPResponseWriter::create(tcp_conn, *request,
00290 boost::bind(&TCPConnection::finish, tcp_conn)));
00291 response_writer->send();
00292
00293 } else {
00294
00295 handleMethodNotAllowed(request, tcp_conn, "GET, POST, PUT, HEAD");
00296 return;
00297 }
00298 }
00299
00300 }
00301 }
00302
00303
00305 extern "C" PION_PLUGIN_API pion::server::PlatformService *pion_create_FeedService(void) {
00306 return new pion::plugins::FeedService();
00307 }
00308
00310 extern "C" PION_PLUGIN_API void pion_destroy_FeedService(pion::plugins::FeedService *service_ptr) {
00311 delete service_ptr;
00312 }