Menu

EventLayerMiddlewareStub.h

Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2008-2010 Michael Schulze <mschulze@ivs.cs.uni-magdeburg.de>
00004  *                         Philipp Werner <philipp.werner@st.ovgu.de>
00005  * All rights reserved.
00006  *
00007  *    Redistribution and use in source and binary forms, with or without
00008  *    modification, are permitted provided that the following conditions
00009  *    are met:
00010  *
00011  *    * Redistributions of source code must retain the above copyright
00012  *      notice, this list of conditions and the following disclaimer.
00013  *
00014  *    * Redistributions in binary form must reproduce the above copyright
00015  *      notice, this list of conditions and the following disclaimer in
00016  *      the documentation and/or other materials provided with the
00017  *      distribution.
00018  *
00019  *    * Neither the name of the copyright holders nor the names of
00020  *      contributors may be used to endorse or promote products derived
00021  *      from this software without specific prior written permission.
00022  *
00023  *
00024  *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00025  *    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00026  *    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00027  *    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
00028  *    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00029  *    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00030  *    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00031  *    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00032  *    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00033  *    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00034  *    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00035  *
00036  *
00037  * $Id$
00038  *
00039  ******************************************************************************/
00040 
00041 #ifndef __EventLayerMiddlewareStub_h__
00042 #define __EventLayerMiddlewareStub_h__
00043 
00044 #include <boost/bind.hpp>
00045 #include <boost/shared_ptr.hpp>
00046 #include <boost/enable_shared_from_this.hpp>
00047 #include <boost/pool/pool.hpp>
00048 #include <boost/asio/ip/tcp.hpp>
00049 #include <boost/asio/read.hpp>
00050 #include <boost/asio/write.hpp>
00051 #include <boost/asio/placeholders.hpp>
00052 #include <string.h>
00053 #include <queue>
00054 
00055 #include "debug.h"
00056 
00057 #include "mw/common/Event.h"
00058 #include "mw/api/PublisherEventChannel.h"
00059 #include "mw/api/SubscriberEventChannel.h"
00060 
00061 #include "mw/el/LocalConnectionParameters.h"
00062 #include "mw/afp/shared/Time.h"
00063 #include "util/endianness.h"
00064 #include "util/ios.h"
00065 
00066 namespace famouso {
00067     namespace mw {
00068         namespace el {
00069 
00078             template <typename EL>
00079             class EventLayerMiddlewareStub {
00080 
00084                     class EventMemoryPool {
00085 
00086                             // Change this to singleton_pool for thread-safe implementation.
00087                             typedef boost::pool<boost::default_user_allocator_malloc_free> pool_type;
00088 
00089                             // Use multiple pools of different size to save memory
00090                             static pool_type & event_preamble() {
00091                                 static pool_type p(13); return p;
00092                             }
00093                             struct event_preamble_deleter {
00094                                 void operator()(uint8_t * p) {
00095                                     event_preamble().free(p);
00096                                 }
00097                             };
00098 
00099                             static pool_type & event_data_4k() {
00100                                 static pool_type p(4*1024); return p;
00101                             }
00102                             struct event_data_4k_deleter {
00103                                 void operator()(uint8_t * p) {
00104                                     event_data_4k().free(p);
00105                                 }
00106                             };
00107 
00108                             static pool_type & event_data_16k() {
00109                                 static pool_type p(16*1024); return p;
00110                             }
00111                             struct event_data_16k_deleter {
00112                                 void operator()(uint8_t * p) {
00113                                     event_data_16k().free(p);
00114                                 }
00115                             };
00116 
00117                             static pool_type & event_data_64k() {
00118                                 static pool_type p(64*1024); return p;
00119                             }
00120                             struct event_data_64k_deleter {
00121                                 void operator()(uint8_t * p) {
00122                                     event_data_64k().free(p);
00123                                 }
00124                             };
00125 
00126                         public:
00128                             typedef boost::shared_ptr<uint8_t> pointer;
00129 
00134                             static pointer alloc(size_t size) {
00135                                 if (size <= 13) {
00136                                     // TODO: use allocate_shared for performance improvments
00137                                     pointer p((uint8_t *)event_preamble().malloc(), event_preamble_deleter());
00138                                     return p;
00139                                 } else if (size <= 4*1024) {
00140                                     pointer p((uint8_t *)event_data_4k().malloc(), event_data_4k_deleter());
00141                                     return p;
00142                                 } else if (size <= 16*1024) {
00143                                     pointer p((uint8_t *)event_data_16k().malloc(), event_data_16k_deleter());
00144                                     return p;
00145                                 } else {
00146                                     BOOST_ASSERT(size <= 64*1024);
00147                                     pointer p((uint8_t *)event_data_64k().malloc(), event_data_64k_deleter());
00148                                     return p;
00149                                 }
00150                             }
00151                             // maybe call release_memory() sometimes
00152                     };
00153 
00154 
00158                     template <typename lECH>
00159                     class EventChannelConnection : public boost::enable_shared_from_this< EventChannelConnection<lECH> > {
00160                             typedef famouso::mw::api::PublisherEventChannel<lECH> PEC;
00161                             typedef famouso::mw::api::SubscriberEventChannel<lECH> SEC;
00162 
00164                             typedef typename EventMemoryPool::pointer data_ptr;
00165 
00166 
00168                             struct AsyncWriteRequest {
00169 
00170                                 AsyncWriteRequest(const data_ptr & preamble, const data_ptr & event_data, uint32_t event_length) :
00171                                     preamble(preamble), event_data(event_data), event_length(event_length) {
00172                                 }
00173 
00174                                 AsyncWriteRequest(const AsyncWriteRequest & awr) {
00175                                     this->preamble = awr.preamble;
00176                                     this->event_data = awr.event_data;
00177                                     this->event_length = awr.event_length;
00178                                 }
00179 
00180                                 data_ptr preamble;
00181                                 data_ptr event_data;
00182                                 uint32_t event_length;
00183                             };
00184 
00185 
00187                             static data_ptr & current_event_data() {
00188                                 static data_ptr data;
00189                                 return data;
00190                             }
00191 
00193                             static data_ptr & current_preamble() {
00194                                 static data_ptr data;
00195                                 return data;
00196                             }
00197 
00198                         public:
00202                             typedef boost::shared_ptr<EventChannelConnection> pointer;
00203 
00207                             static pointer create() {
00208                                 return pointer(new EventChannelConnection());
00209                             }
00210 
00214                             boost::asio::ip::tcp::socket& socket() {
00215                                 return socket_;
00216                             }
00217 
00221                             void start() {
00222                                 // Connection accepted and established
00223                                 // Bind Request-Handler on socket and it fires if data arrive
00224                                 async_read(socket(), boost::asio::buffer(event_head, event_head.size() - 4),
00225                                            boost::bind(&EventChannelConnection::handle_request, this->shared_from_this(),
00226                                                        boost::asio::placeholders::error,
00227                                                        boost::asio::placeholders::bytes_transferred));
00228                             }
00229 
00230                         private:
00234                             EventChannelConnection()
00235                                     : socket_(famouso::util::ios::instance()), incomplete_async_write_bytes(0),
00236                                       incomplete_async_writes(0) {
00237                             }
00238 
00244                             void report(const famouso::mw::Subject &s, const char *const str) {
00245 
00246                                 ::logging::log::emit() << str << ::logging::log::tab
00247                                     << ::logging::log::tab << "-- Subject [";
00248                                 for (uint8_t i = 0;i < 8;++i) {
00249                                     uint8_t c = s.tab()[i];
00250                                     if ((c < 32) || (c > 126)) c = 32;   // only printable characters
00251                                     ::logging::log::emit() << c ;
00252                                 }
00253                                 ::logging::log::emit() << "] -> " << ::logging::log::hex
00254                                     << s.value() << ::logging::log::endl;
00255                             }
00256 
00263                             void get_event_head(boost::shared_ptr<PEC> pec,
00264                                                 const boost::system::error_code& error,
00265                                                 size_t bytes_transferred) {
00266                                 if (!error) {
00267                                     // If it is no publich event, then it have to be an unannouncement.
00268                                     if (event_head[0] == FAMOUSO::PUBLISH) {
00269                                         // Allocate self deleting buffer for event_data from EventMemoryPool
00270                                         uint32_t event_data_size = ntohl(*(uint32_t *) & (event_head[9]));
00271                                         typename EventMemoryPool::pointer event_data = EventMemoryPool::alloc(event_data_size);
00272                                         // bind correct handler to the socket in order to
00273                                         // receive the event_data on that socket
00274                                         boost::asio::async_read(socket(), boost::asio::buffer(event_data.get(), event_data_size),
00275                                                                 boost::bind(&EventChannelConnection::get_event_data, this->shared_from_this(),
00276                                                                             pec, event_data, boost::asio::placeholders::error,
00277                                                                             boost::asio::placeholders::bytes_transferred));
00278                                         return;
00279                                     }
00280                                 }
00281                                 report(pec->subject(), "Unannouncement");
00282                             }
00283 
00291                             void get_event_data(boost::shared_ptr<PEC> pec,
00292                                                 typename EventMemoryPool::pointer event_data,
00293                                                 const boost::system::error_code& error,
00294                                                 size_t bytes_transferred) {
00295                                 if (!error) {
00296                                     // now the Event is complete
00297                                     famouso::mw::Event e(pec->subject());
00298                                     e.length = bytes_transferred;
00299                                     e.data = (uint8_t *) event_data.get();
00300                                     // publish to FAMOUSO
00301                                     // Use static members to transport the shared pointers to cb().
00302                                     current_event_data() = event_data;
00303                                     current_preamble() = create_publish_preamble(e.subject, e.length);
00304                                     pec->publish(e);
00305                                     current_event_data().reset();
00306                                     current_preamble().reset();
00307                                     // bind correct handler to the socket in order to
00308                                     // receive a new event_head on that socket
00309                                     boost::asio::async_read(socket(), boost::asio::buffer(event_head, event_head.size()),
00310                                                             boost::bind(&EventChannelConnection::get_event_head, this->shared_from_this(),
00311                                                                         pec, boost::asio::placeholders::error,
00312                                                                         boost::asio::placeholders::bytes_transferred));
00313                                     return;
00314                                 }
00315                                 report(pec->subject(), "Unannouncement");
00316                             }
00317 
00325                             void unsubscribe(boost::shared_ptr<SEC> sec,
00326                                              const boost::system::error_code& error) {
00327                                 report(sec->subject(), "Unsubscription");
00328                             }
00329 
00340                             void write_handler(data_ptr event_preamble,
00341                                                data_ptr event_data,
00342                                                const boost::system::error_code & error,
00343                                                size_t bytes_transferred) {
00344                                 // Update counters
00345                                 incomplete_async_write_bytes -= bytes_transferred;
00346                                 incomplete_async_writes--;
00347 
00348                                 // Post next async write operation if there is one
00349                                 if (!async_write_requests.empty()) {
00350                                     AsyncWriteRequest & awr = async_write_requests.front();
00351                                     post_async_write(awr.preamble, awr.event_data, awr.event_length);
00352                                     async_write_requests.pop();
00353                                 }
00354                             }
00355 
00364                             void post_async_write(data_ptr event_preamble, data_ptr event_data, uint32_t event_length) {
00365                                 boost::array<boost::asio::const_buffer, 2> bufs = {{
00366                                     boost::asio::buffer(event_preamble.get(), 13),
00367                                     boost::asio::buffer(event_data.get(), event_length)
00368                                 }};
00369                                 boost::asio::async_write(socket(), bufs, boost::asio::transfer_all(),
00370                                                          boost::bind(&EventChannelConnection::write_handler, this->shared_from_this(),
00371                                                                      event_preamble,
00372                                                                      event_data,
00373                                                                      boost::asio::placeholders::error,
00374                                                                      boost::asio::placeholders::bytes_transferred));
00375                             }
00376 
00382                             data_ptr create_publish_preamble(const famouso::mw::Subject & subject, uint32_t event_length) {
00383                                 data_ptr sp_preamble = EventMemoryPool::alloc(13);
00384                                 uint8_t * preamble = sp_preamble.get();
00385                                 preamble[0] = FAMOUSO::PUBLISH;
00386                                 uint32_t *len = (uint32_t *) & preamble[9];
00387                                 for (uint8_t i = 0;i < 8;++i)
00388                                     preamble[i+1] = subject.tab()[i];
00389                                 *len = htonl(event_length);
00390                                 return sp_preamble;
00391                             }
00392 
00398                             void cb(famouso::mw::api::SECCallBackData & cbd) {
00399                                 // Crash detection and flow control:
00400                                 // If data is published in a higher rate than the subscriber
00401                                 // can process it, the async write operation queue will flood
00402                                 // the memory. To avoid dropping data in case of high peak throughput
00403                                 // nothing is discarded before a deadline was passed. The deadline
00404                                 // is given relative to the last time no async write was pending.
00405                                 if (!incomplete_async_writes) {
00406                                     // All async writes complete -> reset deadline
00407                                     drop_deadline = afp::shared::Time::current().add_sec(1);
00408                                 }
00409 
00410                                 if (incomplete_async_write_bytes < 100000 || afp::shared::Time::current() < drop_deadline) {
00411                                     // To enable asynchronous write operations (needed to avoid flow dependencies)
00412                                     // we use buffers that are managed by shared_ptr pointers and are deleted
00413                                     // automatically after async write is completed.
00414                                     data_ptr sp_preamble;
00415                                     data_ptr sp_event_data;
00416 
00417                                     if (current_event_data()) {
00418                                         // Event published by get_event_data() (published locally).
00419                                         // -> cbd.data owned by current_event_data()
00420                                         sp_event_data = current_event_data();
00421                                         sp_preamble = current_preamble();
00422                                     } else {
00423                                         // Event comes from network layer
00424                                         // -> copy data to enable async write
00425                                         sp_event_data = EventMemoryPool::alloc(cbd.length);
00426                                         memcpy(sp_event_data.get(), cbd.data, cbd.length);
00427                                         sp_preamble = create_publish_preamble(cbd.subject, cbd.length);
00428                                     }
00429 
00430                                     // Update counters
00431                                     incomplete_async_write_bytes += cbd.length + 13;
00432                                     incomplete_async_writes++;
00433 
00434                                     if (incomplete_async_writes == 1) {
00435                                         // No other async writes pending
00436                                         // -> post it directly
00437                                         FAMOUSO_ASSERT(async_write_requests.empty());
00438                                         post_async_write(sp_preamble, sp_event_data, cbd.length);
00439                                     } else {
00440                                         // Other async writes pending
00441                                         // -> asio does not support queueing of async_write() calls (would result in interleaving)
00442                                         // -> queueing in application
00443                                         async_write_requests.push(AsyncWriteRequest(sp_preamble, sp_event_data, cbd.length));
00444                                     }
00445                                 } else {
00446                                     // The client is crashed, blocking inside the callback or just to
00447                                     // slow while processing the data. Drop current event to prevent flooding
00448                                     // memory with async write operations which are likely to be never
00449                                     // completed.
00450                                 }
00451                             }
00452 
00459                             void handle_request(const boost::system::error_code& error, size_t bytes_transferred) {
00460                                 if (!error && (bytes_transferred >= 9)) {
00461                                     switch (event_head[0]) {
00462                                         case FAMOUSO::SUBSCRIBE: {
00463                                                 // allocate a new subscribe event channel
00464                                                 boost::shared_ptr< SEC > sec(new SEC(famouso::mw::Subject(&event_head[1])));
00465                                                 // announce it to FAMOUSO
00466                                                 sec->subscribe();
00467                                                 // set a specific callback
00468                                                 sec->callback.template bind< EventChannelConnection, &EventChannelConnection::cb >(this);
00469 
00470                                                 // bind the receive function, however an unsubscription can come only
00471                                                 boost::asio::async_read(socket(), boost::asio::buffer(event_head, event_head.size()),
00472                                                                         boost::bind(&EventChannelConnection::unsubscribe, this->shared_from_this(),
00473                                                                                     sec, boost::asio::placeholders::error));
00474 
00475                                                 report(sec->subject(), "Subscription  ");
00476                                                 break;
00477                                             }
00478                                         case FAMOUSO::ANNOUNCE: {
00479                                                 // allocate a new publish event channel
00480                                                 boost::shared_ptr<PEC> pec(new PEC(famouso::mw::Subject(&event_head[1])));
00481                                                 // announce it to FAMOUSO
00482                                                 pec->announce();
00483 
00484                                                 // bind the receive function for published events or for the unannouncement event
00485                                                 boost::asio::async_read(socket(), boost::asio::buffer(event_head, event_head.size()),
00486                                                                         boost::bind(&EventChannelConnection::get_event_head, this->shared_from_this(),
00487                                                                                     pec, boost::asio::placeholders::error,
00488                                                                                     boost::asio::placeholders::bytes_transferred));
00489 
00490                                                 report(pec->subject(), "Announcement  ");
00491                                                 break;
00492                                             }
00493                                         default:
00494                                             ::logging::log::emit< ::logging::Error>() << "Wrong opcode:\t0x"
00495                                                 << event_head[0] << ::logging::log::endl;
00496                                     }
00497                                 } else {
00498                                     ::logging::log::emit< ::logging::Error>() << "Wrong message format:"
00499                                         << ::logging::log::endl;
00500                                 }
00501                             }
00502 
00504                             boost::asio::ip::tcp::socket socket_;
00505 
00507                             unsigned int incomplete_async_write_bytes;
00508 
00510                             unsigned int incomplete_async_writes;
00511 
00513                             afp::shared::Time drop_deadline;
00514 
00516                             std::queue<AsyncWriteRequest> async_write_requests;
00517 
00519                             boost::array<uint8_t, 13> event_head;
00520                     };
00521 
00522 
00523                 public:
00527                     EventLayerMiddlewareStub() : acceptor_(famouso::util::ios::instance(),
00528                                                                    boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),
00529                                                                                                   ServPort)) {
00530                         start_accept();
00531                         // start io-service to ensure asynchrone acceptance of connections
00532                         famouso::util::impl::start_ios();
00533                     }
00534 
00535                 private:
00539                     void start_accept() {
00540                         typename EventChannelConnection<EL>::pointer ecc = EventChannelConnection<EL>::create();
00541                         acceptor_.async_accept(ecc->socket(),
00542                                                boost::bind(&EventLayerMiddlewareStub::handle_accept, this, ecc,
00543                                                            boost::asio::placeholders::error));
00544                     }
00545 
00551                     void handle_accept(typename EventChannelConnection<EL>::pointer ecc,
00552                                        const boost::system::error_code& error) {
00553                         if (!error) {
00554                             ecc->start();
00555                             start_accept();
00556                         } else {
00557                             ::logging::log::emit< ::logging::Error>()
00558                                 << "Error in asynchronous acceptance of an incoming connection or CTRL^C"
00559                                 << ::logging::log::endl;
00560                         }
00561                     }
00562                     boost::asio::ip::tcp::acceptor acceptor_;
00563             };
00564 
00565         } // namespace el
00566     } // namespace mw
00567 } // namespace famouso
00568 
00569 #endif /* __EventLayerMiddlewareStub_h__ */