EventLayerClientStub.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #ifndef __EventLayerClientStub_h__
00042 #define __EventLayerClientStub_h__
00043
00044 #include <boost/asio/ip/tcp.hpp>
00045 #include <boost/asio/read.hpp>
00046 #include <boost/asio/write.hpp>
00047 #include <boost/thread.hpp>
00048 #include <boost/bind.hpp>
00049 #include <boost/shared_ptr.hpp>
00050 #include <boost/enable_shared_from_this.hpp>
00051
00052 #include "mw/api/SubscriberEventChannel.h"
00053 #include "mw/api/PublisherEventChannel.h"
00054
00055 #include "mw/el/LocalConnectionParameters.h"
00056 #include "util/endianness.h"
00057 #include "case/Delegate.h"
00058 #include "util/ios.h"
00059 #include "debug.h"
00060
00061 #include <map>
00062
00063 namespace famouso {
00064 namespace mw {
00065 namespace el {
00066
00067 template< class T >
00068 class NotifyWorkerThread {
00069
00070 T &sec;
00071 public:
00072 void action() {
00073 uint8_t recvBuffer[BUFSIZE];
00074 int recvMsgSize;
00075 try {
00076 while ((recvMsgSize = boost::asio::read(*sec.snn(), boost::asio::buffer(recvBuffer, 13))) > 0) {
00077
00078 unsigned int len = ntohl(*(uint32_t *) & (recvBuffer[9]));
00079
00080 if ((recvMsgSize = boost::asio::read(*sec.snn(), boost::asio::buffer(recvBuffer, len))) > 0) {
00081
00082 Event e(sec.subject());
00083 e.length = len;
00084 e.data = (uint8_t *) recvBuffer;
00085 sec.callback(e);
00086 }
00087 }
00088 } catch (...) {
00089
00090 }
00091 }
00092 public:
00093 NotifyWorkerThread(T &ec) : sec(ec) {
00094 }
00095 };
00096
00097 class EventLayerClientStub {
00098
00099 typedef famouso::mw::api::SubscriberEventChannel<EventLayerClientStub> SEC;
00100 typedef std::pair<boost::thread *, NotifyWorkerThread<SEC> *> NotifyThreadData;
00101 typedef std::map<SEC *, NotifyThreadData> NotifyThreadMap;
00102 NotifyThreadMap notify_threads;
00103 boost::mutex notify_threads_mutex;
00104
00105 void do_connection_socket(famouso::mw::api::EventChannel<EventLayerClientStub> &ec) {
00106 ec.snn() = new boost::asio::ip::tcp::socket(famouso::util::ios::instance());
00107
00108 boost::asio::ip::tcp::endpoint endpoint(
00109 boost::asio::ip::address::from_string(servAddress), ServPort);
00110
00111 try {
00112 ec.snn()->connect(endpoint);
00113 } catch (...) {
00114 ::logging::log::emit< ::logging::Error>()
00115 << "An error occurred while connecting to the ech"
00116 << ::logging::log::endl;
00117 abort();
00118 }
00119 }
00120 public:
00121 typedef boost::asio::ip::tcp::socket *SNN;
00122
00123 void init() {
00124 famouso::util::impl::start_ios();
00125 }
00126
00127 EventLayerClientStub() {
00128 init();
00129 }
00130
00131
00132
00133 void announce(famouso::mw::api::PublisherEventChannel<EventLayerClientStub> &ec) {
00134 TRACE_FUNCTION;
00135 do_connection_socket(ec);
00136 uint8_t transferBuffer[9] = {FAMOUSO::ANNOUNCE};
00137 for (uint8_t i = 0;i < 8;++i)
00138 transferBuffer[i+1] = ec.subject().tab()[i];
00139
00140 boost::asio::write(*ec.snn(), boost::asio::buffer(transferBuffer, sizeof(transferBuffer)));
00141 }
00142
00143
00144
00145 void publish(famouso::mw::api::PublisherEventChannel<EventLayerClientStub> &ec, const Event &e) {
00146 TRACE_FUNCTION;
00147 ::logging::log::emit< ::logging::Info>() << "Publish channel "
00148 << ::logging::log::hex << ec.select() << ::logging::log::endl;
00149
00150
00151
00152
00153
00154
00155
00156 uint8_t transferBuffer[13] = {FAMOUSO::PUBLISH};
00157 for (uint8_t i = 0;i < 8;++i)
00158 transferBuffer[i+1] = ec.subject().tab()[i];
00159 uint32_t *len = (uint32_t *) & transferBuffer[9];
00160 *len = htonl(e.length);
00161
00162 boost::asio::write(*ec.snn(), boost::asio::buffer(transferBuffer, sizeof(transferBuffer)));
00163 boost::asio::write(*ec.snn(), boost::asio::buffer(e.data, e.length));
00164 }
00165
00166
00167 void subscribe(famouso::mw::api::SubscriberEventChannel<EventLayerClientStub> &ec) {
00168 TRACE_FUNCTION;
00169 ::logging::log::emit< ::logging::Info>() << "Subscribe channel "
00170 << ::logging::log::hex << ec.select() << ::logging::log::endl;
00171 do_connection_socket(ec);
00172
00173 uint8_t transferBuffer[9] = {FAMOUSO::SUBSCRIBE};
00174 for (uint8_t i = 0;i < 8;++i)
00175 transferBuffer[i+1] = ec.subject().tab()[i];
00176
00177 boost::asio::write(*ec.snn(), boost::asio::buffer(transferBuffer, sizeof(transferBuffer)));
00178
00179
00180 NotifyWorkerThread<SEC> * nwt = new NotifyWorkerThread<SEC>(ec);
00181 NotifyThreadData t (new boost::thread(boost::bind(&NotifyWorkerThread<SEC>::action, nwt)), nwt);
00182 notify_threads_mutex.lock();
00183 notify_threads[&ec] = t;
00184 notify_threads_mutex.unlock();
00185 ::logging::log::emit< ::logging::Info>()
00186 << "Generate Thread and Connect to local ECH"
00187 << ::logging::log::endl;
00188 }
00189
00190
00191 void unsubscribe(famouso::mw::api::SubscriberEventChannel<EventLayerClientStub> &ec) {
00192 TRACE_FUNCTION;
00193 ::logging::log::emit< ::logging::Info>() << "close connection" << ::logging::log::endl;
00194
00195
00196
00197 notify_threads_mutex.lock();
00198 NotifyThreadMap::iterator it = notify_threads.find(&ec);
00199 if (it != notify_threads.end()) {
00200 NotifyThreadData t = it->second;
00201 ec.snn()->shutdown(boost::asio::ip::tcp::socket::shutdown_receive);
00202 t.first->join();
00203 ec.snn()->close();
00204 delete t.first;
00205 delete t.second;
00206 notify_threads.erase(&ec);
00207 }
00208 notify_threads_mutex.unlock();
00209 }
00210
00211
00212 void unannounce(famouso::mw::api::PublisherEventChannel<EventLayerClientStub> &ec) {
00213 TRACE_FUNCTION;
00214 ::logging::log::emit< ::logging::Info>() << "close connection" << ::logging::log::endl;
00215 ec.snn()->close();
00216 }
00217 };
00218 }
00219 }
00220 }
00221
00222 #endif