Menu

MultiSourceDemux.h

Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2009-2010 Philipp Werner <philipp.werner@st.ovgu.de>
00004  * All rights reserved.
00005  *
00006  *    Redistribution and use in source and binary forms, with or without
00007  *    modification, are permitted provided that the following conditions
00008  *    are met:
00009  *
00010  *    * Redistributions of source code must retain the above copyright
00011  *      notice, this list of conditions and the following disclaimer.
00012  *
00013  *    * Redistributions in binary form must reproduce the above copyright
00014  *      notice, this list of conditions and the following disclaimer in
00015  *      the documentation and/or other materials provided with the
00016  *      distribution.
00017  *
00018  *    * Neither the name of the copyright holders nor the names of
00019  *      contributors may be used to endorse or promote products derived
00020  *      from this software without specific prior written permission.
00021  *
00022  *
00023  *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024  *    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00025  *    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00026  *    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
00027  *    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00028  *    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00029  *    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00030  *    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00031  *    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00032  *    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00033  *    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034  *
00035  *
00036  * $Id$
00037  *
00038  ******************************************************************************/
00039 
00040 
00041 #ifndef __MULTISOURCEDEMUX_H_2A8B3BD61E7528__
00042 #define __MULTISOURCEDEMUX_H_2A8B3BD61E7528__
00043 
00044 
00045 #include "debug.h"
00046 
00047 #include "mw/afp/defrag/Defragmenter.h"
00048 #include "mw/afp/defrag/detail/PointerMap.h"
00049 #include "mw/afp/defrag/NoEventSeqHeaderSupport.h"
00050 
00051 
00052 namespace famouso {
00053     namespace mw {
00054         namespace afp {
00055             namespace defrag {
00056 
00057 
00061                 template <typename DCP, class Subject_t>
00062                 struct SubjectDemuxKey {
00063                     const Subject_t subject;
00064 
00065                     enum { uses_subject = 1 };
00066 
00067                     bool operator<(const SubjectDemuxKey & v2) const {
00068                         return subject < v2.subject;
00069                     }
00070 
00071                     bool operator==(const SubjectDemuxKey & v2) const {
00072                         return subject == v2.subject;
00073                     }
00074 
00075                     SubjectDemuxKey(const Headers<DCP> & header, const Subject_t & subj) :
00076                             subject(subj) {
00077                     }
00078                 };
00079 
00080 
00081 
00082                 // TODO: Testing!!!
00102                 template <class DCP>
00103                 class MultiSourceDemux {
00104 
00105                         typedef typename DCP::SizeProp::elen_t   elen_t;
00106                         typedef typename DCP::SizeProp::flen_t   flen_t;
00107                         typedef typename DCP::SizeProp::fcount_t fcount_t;
00108 
00109                         typedef typename DCP::EventDemuxKey KeyType;
00110                         typedef typename DCP::Allocator Allocator;
00111 
00112                     public:
00113 
00114                         typedef NoEventSeqHeaderSupport<DCP> EventSeqHeaderPolicy;
00115 
00116                     private:
00117 
00122                         template <class KeyType>
00123                         class Event {
00124 
00125                                 typedef typename DCP::SizeProp::flen_t   flen_t;
00126 
00127                             public:
00129                                 Defragmenter<DCP> def;
00130 
00132                                 KeyType key;
00133 
00135                                 Event(flen_t max_payload, const KeyType & key)
00136                                         : def(max_payload), key(key) {
00137                                 }
00138 
00140                                 ~Event() {
00141                                 }
00142 
00144                                 const KeyType & get_key() {
00145                                     return key;
00146                                 }
00147 
00149                                 void * to_handle() {
00150                                     return reinterpret_cast<void *>(this);
00151                                 }
00152 
00154                                 static Event * from_handle(void * handle) {
00155                                     return reinterpret_cast<Event *>(handle);
00156                                 }
00157                         };
00158 
00160                         flen_t mtu;
00161 
00162                         typedef detail::PointerMap < KeyType, Event<KeyType>, DCP::old_event_ids > EventMap;
00163 
00165                         EventMap events;
00166 
00167                     public:
00168 
00170                         MultiSourceDemux(flen_t mtu)
00171                                 : mtu(mtu) {
00172                         }
00173 
00175                         ~MultiSourceDemux() {
00176                             typename EventMap::iterator it = events.begin();
00177                             Event<KeyType> * event;
00178                             for (; it != events.end(); ++it) {
00179                                 event = *it;
00180                                 Allocator::destroy(event);
00181                             }
00182                         }
00183 
00190                         void * get_defragmenter_handle(const Headers<DCP> & header, const KeyType & event_key) {
00191                             typename EventMap::iterator it = events.find(event_key);
00192                             Event<KeyType> * event;
00193 
00194                             // Start new defragmentation only with first fragment.
00195                             if (header.first_fragment) {
00196                                 // First fragment!
00197 
00198                                 // Improve robustness:
00199                                 // Channel is assumed to be ideal, so old defragmenters should be
00200                                 // freed when a new first fragment arrives. But in case there are
00201                                 // packet loss or reordering and current event is not complete while
00202                                 // we get first fragment of the next event drop incomplete event.
00203                                 if (it != events.end()) {
00204                                     free_defragmenter((*it)->to_handle());
00205                                 }
00206 
00207                                 // Unknown or recently freed key
00208                                 // -> create new event defragmenter
00209                                 FAMOUSO_ASSERT(mtu > header.length());
00210                                 event = new (Allocator()) Event<KeyType>(mtu - header.ext_length(), event_key);
00211 
00212                                 if (!event)
00213                                     goto out_of_mem_error;
00214                                 if (!events.insert(event))
00215                                     goto out_of_mem_error;
00216 
00217                                 ::logging::log::emit< ::logging::Info>()
00218                                    << PROGMEMSTRING("AFP: defrag fragment ")
00219                                    << ::logging::log::dec << (unsigned int)header.fseq
00220                                    << PROGMEMSTRING(" of NEW event")
00221                                    << ::logging::log::endl;
00222                             } else {
00223                                 // Not first fragment: If we found a defragmenter fitting the key,
00224                                 // return it. Otherwise (missing frist fragment) drop this fragment.
00225 
00226                                 // Improve robustness:
00227                                 // Channel is assumed to be ideal, so first fragment should arrive
00228                                 // first. If it did not (event defragmenter not found), drop other
00229                                 // fragments.
00230                                 if (it == events.end())
00231                                     return 0;
00232 
00233                                 event = *it;
00234 
00235                                 ::logging::log::emit< ::logging::Info>()
00236                                     << PROGMEMSTRING("AFP: defrag fragment ")
00237                                     << ::logging::log::dec << (unsigned int)header.fseq
00238                                     << PROGMEMSTRING(" of event")
00239                                     << ::logging::log::endl;
00240                             }
00241 
00242                             return event->to_handle();
00243 
00244                         out_of_mem_error:
00245                             ::logging::log::emit< ::logging::Warning>()
00246                                 << PROGMEMSTRING("AFP: Out of memory -> drop")
00247                                 << ::logging::log::endl;
00248                             return 0;
00249                         }
00250 
00252                         Defragmenter<DCP> * get_defragmenter(void * handle) {
00253                             return & Event<KeyType>::from_handle(handle)->def;
00254                         }
00255 
00260                         void free_defragmenter(void * handle) {
00261                             Event<KeyType> * e = Event<KeyType>::from_handle(handle);
00262                             events.erase(e->get_key());
00263                             Allocator::destroy(e);
00264                         }
00265                 };
00266 
00267 
00268             } // namespace defrag
00269         } // namespace afp
00270     } // namespace mw
00271 } // namespace famouso
00272 
00273 
00274 #endif // __MULTISOURCEDEMUX_H_2A8B3BD61E7528__
00275