00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #ifndef __EventLayerMiddlewareStub_h__
00042 #define __EventLayerMiddlewareStub_h__
00043
00044 #include <boost/bind.hpp>
00045 #include <boost/shared_ptr.hpp>
00046 #include <boost/enable_shared_from_this.hpp>
00047 #include <boost/pool/pool.hpp>
00048 #include <boost/asio/ip/tcp.hpp>
00049 #include <boost/asio/read.hpp>
00050 #include <boost/asio/write.hpp>
00051 #include <boost/asio/placeholders.hpp>
00052 #include <string.h>
00053 #include <queue>
00054
00055 #include "debug.h"
00056
00057 #include "mw/common/Event.h"
00058 #include "mw/api/PublisherEventChannel.h"
00059 #include "mw/api/SubscriberEventChannel.h"
00060
00061 #include "mw/el/LocalConnectionParameters.h"
00062 #include "mw/afp/shared/Time.h"
00063 #include "util/endianness.h"
00064 #include "util/ios.h"
00065
00066 namespace famouso {
00067 namespace mw {
00068 namespace el {
00069
00078 template <typename EL>
00079 class EventLayerMiddlewareStub {
00080
00084 class EventMemoryPool {
00085
00086
00087 typedef boost::pool<boost::default_user_allocator_malloc_free> pool_type;
00088
00089
00090 static pool_type & event_preamble() {
00091 static pool_type p(13); return p;
00092 }
00093 struct event_preamble_deleter {
00094 void operator()(uint8_t * p) {
00095 event_preamble().free(p);
00096 }
00097 };
00098
00099 static pool_type & event_data_4k() {
00100 static pool_type p(4*1024); return p;
00101 }
00102 struct event_data_4k_deleter {
00103 void operator()(uint8_t * p) {
00104 event_data_4k().free(p);
00105 }
00106 };
00107
00108 static pool_type & event_data_16k() {
00109 static pool_type p(16*1024); return p;
00110 }
00111 struct event_data_16k_deleter {
00112 void operator()(uint8_t * p) {
00113 event_data_16k().free(p);
00114 }
00115 };
00116
00117 static pool_type & event_data_64k() {
00118 static pool_type p(64*1024); return p;
00119 }
00120 struct event_data_64k_deleter {
00121 void operator()(uint8_t * p) {
00122 event_data_64k().free(p);
00123 }
00124 };
00125
00126 public:
00128 typedef boost::shared_ptr<uint8_t> pointer;
00129
00134 static pointer alloc(size_t size) {
00135 if (size <= 13) {
00136
00137 pointer p((uint8_t *)event_preamble().malloc(), event_preamble_deleter());
00138 return p;
00139 } else if (size <= 4*1024) {
00140 pointer p((uint8_t *)event_data_4k().malloc(), event_data_4k_deleter());
00141 return p;
00142 } else if (size <= 16*1024) {
00143 pointer p((uint8_t *)event_data_16k().malloc(), event_data_16k_deleter());
00144 return p;
00145 } else {
00146 BOOST_ASSERT(size <= 64*1024);
00147 pointer p((uint8_t *)event_data_64k().malloc(), event_data_64k_deleter());
00148 return p;
00149 }
00150 }
00151
00152 };
00153
00154
00158 template <typename lECH>
00159 class EventChannelConnection : public boost::enable_shared_from_this< EventChannelConnection<lECH> > {
00160 typedef famouso::mw::api::PublisherEventChannel<lECH> PEC;
00161 typedef famouso::mw::api::SubscriberEventChannel<lECH> SEC;
00162
00164 typedef typename EventMemoryPool::pointer data_ptr;
00165
00166
00168 struct AsyncWriteRequest {
00169
00170 AsyncWriteRequest(const data_ptr & preamble, const data_ptr & event_data, uint32_t event_length) :
00171 preamble(preamble), event_data(event_data), event_length(event_length) {
00172 }
00173
00174 AsyncWriteRequest(const AsyncWriteRequest & awr) {
00175 this->preamble = awr.preamble;
00176 this->event_data = awr.event_data;
00177 this->event_length = awr.event_length;
00178 }
00179
00180 data_ptr preamble;
00181 data_ptr event_data;
00182 uint32_t event_length;
00183 };
00184
00185
00187 static data_ptr & current_event_data() {
00188 static data_ptr data;
00189 return data;
00190 }
00191
00193 static data_ptr & current_preamble() {
00194 static data_ptr data;
00195 return data;
00196 }
00197
00198 public:
00202 typedef boost::shared_ptr<EventChannelConnection> pointer;
00203
00207 static pointer create() {
00208 return pointer(new EventChannelConnection());
00209 }
00210
00214 boost::asio::ip::tcp::socket& socket() {
00215 return socket_;
00216 }
00217
00221 void start() {
00222
00223
00224 async_read(socket(), boost::asio::buffer(event_head, event_head.size() - 4),
00225 boost::bind(&EventChannelConnection::handle_request, this->shared_from_this(),
00226 boost::asio::placeholders::error,
00227 boost::asio::placeholders::bytes_transferred));
00228 }
00229
00230 private:
00234 EventChannelConnection()
00235 : socket_(famouso::util::ios::instance()), incomplete_async_write_bytes(0),
00236 incomplete_async_writes(0) {
00237 }
00238
00244 void report(const famouso::mw::Subject &s, const char *const str) {
00245
00246 ::logging::log::emit() << str << ::logging::log::tab
00247 << ::logging::log::tab << "-- Subject [";
00248 for (uint8_t i = 0;i < 8;++i) {
00249 uint8_t c = s.tab()[i];
00250 if ((c < 32) || (c > 126)) c = 32;
00251 ::logging::log::emit() << c ;
00252 }
00253 ::logging::log::emit() << "] -> " << ::logging::log::hex
00254 << s.value() << ::logging::log::endl;
00255 }
00256
00263 void get_event_head(boost::shared_ptr<PEC> pec,
00264 const boost::system::error_code& error,
00265 size_t bytes_transferred) {
00266 if (!error) {
00267
00268 if (event_head[0] == FAMOUSO::PUBLISH) {
00269
00270 uint32_t event_data_size = ntohl(*(uint32_t *) & (event_head[9]));
00271 typename EventMemoryPool::pointer event_data = EventMemoryPool::alloc(event_data_size);
00272
00273
00274 boost::asio::async_read(socket(), boost::asio::buffer(event_data.get(), event_data_size),
00275 boost::bind(&EventChannelConnection::get_event_data, this->shared_from_this(),
00276 pec, event_data, boost::asio::placeholders::error,
00277 boost::asio::placeholders::bytes_transferred));
00278 return;
00279 }
00280 }
00281 report(pec->subject(), "Unannouncement");
00282 }
00283
00291 void get_event_data(boost::shared_ptr<PEC> pec,
00292 typename EventMemoryPool::pointer event_data,
00293 const boost::system::error_code& error,
00294 size_t bytes_transferred) {
00295 if (!error) {
00296
00297 famouso::mw::Event e(pec->subject());
00298 e.length = bytes_transferred;
00299 e.data = (uint8_t *) event_data.get();
00300
00301
00302 current_event_data() = event_data;
00303 current_preamble() = create_publish_preamble(e.subject, e.length);
00304 pec->publish(e);
00305 current_event_data().reset();
00306 current_preamble().reset();
00307
00308
00309 boost::asio::async_read(socket(), boost::asio::buffer(event_head, event_head.size()),
00310 boost::bind(&EventChannelConnection::get_event_head, this->shared_from_this(),
00311 pec, boost::asio::placeholders::error,
00312 boost::asio::placeholders::bytes_transferred));
00313 return;
00314 }
00315 report(pec->subject(), "Unannouncement");
00316 }
00317
00325 void unsubscribe(boost::shared_ptr<SEC> sec,
00326 const boost::system::error_code& error) {
00327 report(sec->subject(), "Unsubscription");
00328 }
00329
00340 void write_handler(data_ptr event_preamble,
00341 data_ptr event_data,
00342 const boost::system::error_code & error,
00343 size_t bytes_transferred) {
00344
00345 incomplete_async_write_bytes -= bytes_transferred;
00346 incomplete_async_writes--;
00347
00348
00349 if (!async_write_requests.empty()) {
00350 AsyncWriteRequest & awr = async_write_requests.front();
00351 post_async_write(awr.preamble, awr.event_data, awr.event_length);
00352 async_write_requests.pop();
00353 }
00354 }
00355
00364 void post_async_write(data_ptr event_preamble, data_ptr event_data, uint32_t event_length) {
00365 boost::array<boost::asio::const_buffer, 2> bufs = {{
00366 boost::asio::buffer(event_preamble.get(), 13),
00367 boost::asio::buffer(event_data.get(), event_length)
00368 }};
00369 boost::asio::async_write(socket(), bufs, boost::asio::transfer_all(),
00370 boost::bind(&EventChannelConnection::write_handler, this->shared_from_this(),
00371 event_preamble,
00372 event_data,
00373 boost::asio::placeholders::error,
00374 boost::asio::placeholders::bytes_transferred));
00375 }
00376
00382 data_ptr create_publish_preamble(const famouso::mw::Subject & subject, uint32_t event_length) {
00383 data_ptr sp_preamble = EventMemoryPool::alloc(13);
00384 uint8_t * preamble = sp_preamble.get();
00385 preamble[0] = FAMOUSO::PUBLISH;
00386 uint32_t *len = (uint32_t *) & preamble[9];
00387 for (uint8_t i = 0;i < 8;++i)
00388 preamble[i+1] = subject.tab()[i];
00389 *len = htonl(event_length);
00390 return sp_preamble;
00391 }
00392
00398 void cb(famouso::mw::api::SECCallBackData & cbd) {
00399
00400
00401
00402
00403
00404
00405 if (!incomplete_async_writes) {
00406
00407 drop_deadline = afp::shared::Time::current().add_sec(1);
00408 }
00409
00410 if (incomplete_async_write_bytes < 100000 || afp::shared::Time::current() < drop_deadline) {
00411
00412
00413
00414 data_ptr sp_preamble;
00415 data_ptr sp_event_data;
00416
00417 if (current_event_data()) {
00418
00419
00420 sp_event_data = current_event_data();
00421 sp_preamble = current_preamble();
00422 } else {
00423
00424
00425 sp_event_data = EventMemoryPool::alloc(cbd.length);
00426 memcpy(sp_event_data.get(), cbd.data, cbd.length);
00427 sp_preamble = create_publish_preamble(cbd.subject, cbd.length);
00428 }
00429
00430
00431 incomplete_async_write_bytes += cbd.length + 13;
00432 incomplete_async_writes++;
00433
00434 if (incomplete_async_writes == 1) {
00435
00436
00437 FAMOUSO_ASSERT(async_write_requests.empty());
00438 post_async_write(sp_preamble, sp_event_data, cbd.length);
00439 } else {
00440
00441
00442
00443 async_write_requests.push(AsyncWriteRequest(sp_preamble, sp_event_data, cbd.length));
00444 }
00445 } else {
00446
00447
00448
00449
00450 }
00451 }
00452
00459 void handle_request(const boost::system::error_code& error, size_t bytes_transferred) {
00460 if (!error && (bytes_transferred >= 9)) {
00461 switch (event_head[0]) {
00462 case FAMOUSO::SUBSCRIBE: {
00463
00464 boost::shared_ptr< SEC > sec(new SEC(famouso::mw::Subject(&event_head[1])));
00465
00466 sec->subscribe();
00467
00468 sec->callback.template bind< EventChannelConnection, &EventChannelConnection::cb >(this);
00469
00470
00471 boost::asio::async_read(socket(), boost::asio::buffer(event_head, event_head.size()),
00472 boost::bind(&EventChannelConnection::unsubscribe, this->shared_from_this(),
00473 sec, boost::asio::placeholders::error));
00474
00475 report(sec->subject(), "Subscription ");
00476 break;
00477 }
00478 case FAMOUSO::ANNOUNCE: {
00479
00480 boost::shared_ptr<PEC> pec(new PEC(famouso::mw::Subject(&event_head[1])));
00481
00482 pec->announce();
00483
00484
00485 boost::asio::async_read(socket(), boost::asio::buffer(event_head, event_head.size()),
00486 boost::bind(&EventChannelConnection::get_event_head, this->shared_from_this(),
00487 pec, boost::asio::placeholders::error,
00488 boost::asio::placeholders::bytes_transferred));
00489
00490 report(pec->subject(), "Announcement ");
00491 break;
00492 }
00493 default:
00494 ::logging::log::emit< ::logging::Error>() << "Wrong opcode:\t0x"
00495 << event_head[0] << ::logging::log::endl;
00496 }
00497 } else {
00498 ::logging::log::emit< ::logging::Error>() << "Wrong message format:"
00499 << ::logging::log::endl;
00500 }
00501 }
00502
00504 boost::asio::ip::tcp::socket socket_;
00505
00507 unsigned int incomplete_async_write_bytes;
00508
00510 unsigned int incomplete_async_writes;
00511
00513 afp::shared::Time drop_deadline;
00514
00516 std::queue<AsyncWriteRequest> async_write_requests;
00517
00519 boost::array<uint8_t, 13> event_head;
00520 };
00521
00522
00523 public:
00527 EventLayerMiddlewareStub() : acceptor_(famouso::util::ios::instance(),
00528 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),
00529 ServPort)) {
00530 start_accept();
00531
00532 famouso::util::impl::start_ios();
00533 }
00534
00535 private:
00539 void start_accept() {
00540 typename EventChannelConnection<EL>::pointer ecc = EventChannelConnection<EL>::create();
00541 acceptor_.async_accept(ecc->socket(),
00542 boost::bind(&EventLayerMiddlewareStub::handle_accept, this, ecc,
00543 boost::asio::placeholders::error));
00544 }
00545
00551 void handle_accept(typename EventChannelConnection<EL>::pointer ecc,
00552 const boost::system::error_code& error) {
00553 if (!error) {
00554 ecc->start();
00555 start_accept();
00556 } else {
00557 ::logging::log::emit< ::logging::Error>()
00558 << "Error in asynchronous acceptance of an incoming connection or CTRL^C"
00559 << ::logging::log::endl;
00560 }
00561 }
00562 boost::asio::ip::tcp::acceptor acceptor_;
00563 };
00564
00565 }
00566 }
00567 }
00568
00569 #endif