Menu

EventLayerClientStub.h

Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2008-2010 Michael Schulze <mschulze@ivs.cs.uni-magdeburg.de>
00004  *                         Philipp Werner <philipp.werner@st.ovgu.de>
00005  * All rights reserved.
00006  *
00007  *    Redistribution and use in source and binary forms, with or without
00008  *    modification, are permitted provided that the following conditions
00009  *    are met:
00010  *
00011  *    * Redistributions of source code must retain the above copyright
00012  *      notice, this list of conditions and the following disclaimer.
00013  *
00014  *    * Redistributions in binary form must reproduce the above copyright
00015  *      notice, this list of conditions and the following disclaimer in
00016  *      the documentation and/or other materials provided with the
00017  *      distribution.
00018  *
00019  *    * Neither the name of the copyright holders nor the names of
00020  *      contributors may be used to endorse or promote products derived
00021  *      from this software without specific prior written permission.
00022  *
00023  *
00024  *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00025  *    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00026  *    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00027  *    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
00028  *    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00029  *    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00030  *    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00031  *    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00032  *    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00033  *    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00034  *    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00035  *
00036  *
00037  * $Id$
00038  *
00039  ******************************************************************************/
00040 
00041 #ifndef __EventLayerClientStub_h__
00042 #define __EventLayerClientStub_h__
00043 
00044 #include <boost/asio/ip/tcp.hpp>
00045 #include <boost/asio/read.hpp>
00046 #include <boost/asio/write.hpp>
00047 #include <boost/thread.hpp>
00048 #include <boost/bind.hpp>
00049 #include <boost/shared_ptr.hpp>
00050 #include <boost/enable_shared_from_this.hpp>
00051 
00052 #include "mw/api/SubscriberEventChannel.h"
00053 #include "mw/api/PublisherEventChannel.h"
00054 
00055 #include "mw/el/LocalConnectionParameters.h"
00056 #include "util/endianness.h"
00057 #include "case/Delegate.h"
00058 #include "util/ios.h"
00059 #include "debug.h"
00060 
00061 #include <map>
00062 
00063 namespace famouso {
00064     namespace mw {
00065         namespace el {
00066 
00067             template< class T >
00068             class NotifyWorkerThread {
00069 
00070                     T &sec;
00071                 public:
00072                     void action() {
00073                         uint8_t recvBuffer[BUFSIZE];
00074                         int recvMsgSize;
00075                         try {
00076                             while ((recvMsgSize = boost::asio::read(*sec.snn(), boost::asio::buffer(recvBuffer, 13))) > 0) {
00077                                 // ermitteln der Laenge des Events
00078                                 unsigned int len = ntohl(*(uint32_t *) & (recvBuffer[9]));
00079                                 // und den Rest aus dem Socket holen
00080                                 if ((recvMsgSize = boost::asio::read(*sec.snn(), boost::asio::buffer(recvBuffer, len))) > 0) {
00081                                     // Event aufbauen und veroeffentlichen
00082                                     Event e(sec.subject());
00083                                     e.length = len;
00084                                     e.data = (uint8_t *) recvBuffer;
00085                                     sec.callback(e);
00086                                 }
00087                             }
00088                         } catch (...) {
00089                             // unsubscribe will disallow receive -> exception to stop thread
00090                         }
00091                     }
00092                 public:
00093                     NotifyWorkerThread(T &ec) : sec(ec) {
00094                     }
00095             };
00096 
00097             class EventLayerClientStub {
00098 
00099                     typedef famouso::mw::api::SubscriberEventChannel<EventLayerClientStub>  SEC;
00100                     typedef std::pair<boost::thread *, NotifyWorkerThread<SEC> *> NotifyThreadData;
00101                     typedef std::map<SEC *, NotifyThreadData> NotifyThreadMap;
00102                     NotifyThreadMap notify_threads;
00103                     boost::mutex notify_threads_mutex;
00104 
00105                     void do_connection_socket(famouso::mw::api::EventChannel<EventLayerClientStub> &ec) {
00106                         ec.snn() = new boost::asio::ip::tcp::socket(famouso::util::ios::instance());
00107                         // Establish connection with the ech
00108                         boost::asio::ip::tcp::endpoint endpoint(
00109                             boost::asio::ip::address::from_string(servAddress), ServPort);
00110 
00111                         try {
00112                             ec.snn()->connect(endpoint);
00113                         } catch (...) {
00114                             ::logging::log::emit< ::logging::Error>()
00115                                 << "An error occurred while connecting to the ech"
00116                                 << ::logging::log::endl;
00117                             abort();
00118                         }
00119                     }
00120                 public:
00121                     typedef boost::asio::ip::tcp::socket *SNN;
00122 
00123                     void init() {
00124                         famouso::util::impl::start_ios();
00125                     }
00126 
00127                     EventLayerClientStub() {
00128                         init();
00129                     }
00130 
00131                     // announce legt hier nur einen Socket an und meldet sich
00132                     // bei localen EventChannelHandler an
00133                     void announce(famouso::mw::api::PublisherEventChannel<EventLayerClientStub> &ec) {
00134                         TRACE_FUNCTION;
00135                         do_connection_socket(ec);
00136                         uint8_t transferBuffer[9] = {FAMOUSO::ANNOUNCE};
00137                         for (uint8_t i = 0;i < 8;++i)
00138                             transferBuffer[i+1] = ec.subject().tab()[i];
00139                         // Send the announcement to the ech
00140                         boost::asio::write(*ec.snn(), boost::asio::buffer(transferBuffer, sizeof(transferBuffer)));
00141                     }
00142 
00143                     // Publish uebermittelt die Daten
00144                     // an den localen ECH
00145                     void publish(famouso::mw::api::PublisherEventChannel<EventLayerClientStub> &ec, const Event &e) {
00146                         TRACE_FUNCTION;
00147                         ::logging::log::emit< ::logging::Info>() << "Publish channel "
00148                                  << ::logging::log::hex << ec.select() << ::logging::log::endl;
00149                         // Hier koennte der Test erfolgen, ob die Subjects vom Event
00150                         // und vom EventChannel gleich sind, weil nur dann
00151                         // das Event in diesen Kanal gehoert
00152                         //
00153                         // Ist mit einem assert zu machen und auch ob das Subject des
00154                         // Kanals ueberhaupt gebunden ist, aber dies ist einfach, weil sonst
00155                         // keine Verbindung zum ech besteht und das send fehlschlaegt
00156                         uint8_t transferBuffer[13] = {FAMOUSO::PUBLISH};
00157                         for (uint8_t i = 0;i < 8;++i)
00158                             transferBuffer[i+1] = ec.subject().tab()[i];
00159                         uint32_t *len = (uint32_t *) & transferBuffer[9];
00160                         *len = htonl(e.length);
00161                         // Send the announcement to the ech
00162                         boost::asio::write(*ec.snn(), boost::asio::buffer(transferBuffer, sizeof(transferBuffer)));
00163                         boost::asio::write(*ec.snn(), boost::asio::buffer(e.data, e.length));
00164                     }
00165 
00166                     // Verbindung  zum  ECH oeffnen und Subject subscribieren
00167                     void subscribe(famouso::mw::api::SubscriberEventChannel<EventLayerClientStub> &ec) {
00168                         TRACE_FUNCTION;
00169                         ::logging::log::emit< ::logging::Info>() << "Subscribe channel "
00170                                  << ::logging::log::hex << ec.select() << ::logging::log::endl;
00171                         do_connection_socket(ec);
00172                         // create subscribe message
00173                         uint8_t transferBuffer[9] = {FAMOUSO::SUBSCRIBE};
00174                         for (uint8_t i = 0;i < 8;++i)
00175                             transferBuffer[i+1] = ec.subject().tab()[i];
00176                         // Send the announcement to the ech
00177                         boost::asio::write(*ec.snn(), boost::asio::buffer(transferBuffer, sizeof(transferBuffer)));
00178                         // create a thread that gets the ec and if a messages arrives at the
00179                         // socket connection the ec is called back
00180                         NotifyWorkerThread<SEC> * nwt = new NotifyWorkerThread<SEC>(ec);
00181                         NotifyThreadData t (new boost::thread(boost::bind(&NotifyWorkerThread<SEC>::action, nwt)), nwt);
00182                         notify_threads_mutex.lock();
00183                         notify_threads[&ec] = t;
00184                         notify_threads_mutex.unlock();
00185                         ::logging::log::emit< ::logging::Info>()
00186                             << "Generate Thread and Connect to local ECH"
00187                             << ::logging::log::endl;
00188                     }
00189 
00190                     // Verbindung schliessen, sollte reichen
00191                     void unsubscribe(famouso::mw::api::SubscriberEventChannel<EventLayerClientStub> &ec) {
00192                         TRACE_FUNCTION;
00193                         ::logging::log::emit< ::logging::Info>() << "close connection" << ::logging::log::endl;
00194 
00195                         // Only return when notify thread terminated to prevent the thread to
00196                         // access already deleted data structures resulting in undefined behaviour
00197                         notify_threads_mutex.lock();
00198                         NotifyThreadMap::iterator it = notify_threads.find(&ec);
00199                         if (it != notify_threads.end()) {
00200                             NotifyThreadData t = it->second;
00201                             ec.snn()->shutdown(boost::asio::ip::tcp::socket::shutdown_receive);
00202                             t.first->join();
00203                             ec.snn()->close();
00204                             delete t.first;
00205                             delete t.second;
00206                             notify_threads.erase(&ec);
00207                         }
00208                         notify_threads_mutex.unlock();
00209                     }
00210 
00211                     // Verbindung schliessen sollte reichen
00212                     void unannounce(famouso::mw::api::PublisherEventChannel<EventLayerClientStub> &ec) {
00213                         TRACE_FUNCTION;
00214                         ::logging::log::emit< ::logging::Info>() << "close connection" << ::logging::log::endl;
00215                         ec.snn()->close();
00216                     }
00217             };
00218         } // namespace el
00219     } // namespace mw
00220 } //namespace famouso
00221 
00222 #endif