Menu

EventSeqDemux.h

Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2009-2010 Philipp Werner <philipp.werner@st.ovgu.de>
00004  * All rights reserved.
00005  *
00006  *    Redistribution and use in source and binary forms, with or without
00007  *    modification, are permitted provided that the following conditions
00008  *    are met:
00009  *
00010  *    * Redistributions of source code must retain the above copyright
00011  *      notice, this list of conditions and the following disclaimer.
00012  *
00013  *    * Redistributions in binary form must reproduce the above copyright
00014  *      notice, this list of conditions and the following disclaimer in
00015  *      the documentation and/or other materials provided with the
00016  *      distribution.
00017  *
00018  *    * Neither the name of the copyright holders nor the names of
00019  *      contributors may be used to endorse or promote products derived
00020  *      from this software without specific prior written permission.
00021  *
00022  *
00023  *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00024  *    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00025  *    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00026  *    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
00027  *    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00028  *    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00029  *    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00030  *    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00031  *    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00032  *    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00033  *    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034  *
00035  *
00036  * $Id$
00037  *
00038  ******************************************************************************/
00039 
00040 
00041 #ifndef __EVENTSEQDEMUX_H_2A8B3BD61E7528__
00042 #define __EVENTSEQDEMUX_H_2A8B3BD61E7528__
00043 
00044 
00045 #include "debug.h"
00046 
00047 #include "mw/afp/defrag/Defragmenter.h"
00048 
00049 #include "mw/afp/shared/Time.h"
00050 #include "mw/afp/defrag/detail/PointerMap.h"
00051 #include "mw/afp/defrag/detail/Queue.h"
00052 
00053 #include "mw/afp/defrag/EventSeqHeaderSupport.h"
00054 
00055 
00056 
00057 namespace famouso {
00058     namespace mw {
00059         namespace afp {
00060             namespace defrag {
00061 
00062 
00066                 template <typename DCP, class Subject_t>
00067                 struct EseqDemuxKey {
00068                     const uint32_t eseq;
00069                     const bool eseq_header;      
00070 
00071                     enum { uses_subject = 0 };
00072 
00073                     bool operator<(const EseqDemuxKey & v2) const {
00074                         return eseq < v2.eseq;
00075                     }
00076 
00077                     bool operator==(const EseqDemuxKey & v2) const {
00078                         return eseq == v2.eseq;
00079                     }
00080 
00081                     EseqDemuxKey(const Headers<DCP> & header, const Subject_t & subj) :
00082                             eseq(header.eseq.occurs() ?
00083                                  header.eseq.get_eseq() :
00084                                  0xffffffff),
00085                             eseq_header(header.eseq.occurs()) {
00086                     }
00087                 };
00088 
00089 
00093                 template <typename DCP, class Subject_t>
00094                 struct EseqSubjectDemuxKey {
00095                     const uint32_t eseq;
00096                     const Subject_t subject;
00097                     const bool eseq_header;      
00098 
00099                     enum { uses_subject = 1 };
00100 
00101                     bool operator<(const EseqSubjectDemuxKey & v2) const {
00102                         if (eseq < v2.eseq)
00103                             return true;
00104                         else
00105                             return false;
00106                     }
00107 
00108                     bool operator==(const EseqSubjectDemuxKey & v2) const {
00109                         return eseq == v2.eseq && subject == v2.subject;
00110                     }
00111 
00112                     EseqSubjectDemuxKey(const Headers<DCP> & header, const Subject_t & subj) :
00113                             eseq(header.eseq.occurs() ?
00114                                  header.eseq.get_eseq() :
00115                                  0xffffffff),
00116                             subject(subj),
00117                             eseq_header(header.eseq.occurs()) {
00118                     }
00119                 };
00120 
00121 
00122 
00128                 template <class DCP>
00129                 class EventSeqDemux {
00130 
00131                         typedef typename DCP::SizeProp::elen_t   elen_t;
00132                         typedef typename DCP::SizeProp::flen_t   flen_t;
00133                         typedef typename DCP::SizeProp::fcount_t fcount_t;
00134 
00135                         typedef typename DCP::EventDemuxKey KeyType;
00136                         typedef typename DCP::Allocator Allocator;
00137 
00138                     public:
00139 
00140                         typedef EventSeqHeaderSupport<DCP> EventSeqHeaderPolicy;
00141 
00142                     private:
00143 
00148                         template <class KeyType>
00149                         class Event {
00150 
00151                                 typedef typename DCP::SizeProp::flen_t   flen_t;
00152 
00153                             public:
00155                                 // TODO: def nicht mehr mit new, sondern free_resources() funktion, bei keep event mit aufgehoben
00156                                 Defragmenter<DCP> * def;
00157 
00159                                 const KeyType key;
00160 
00162                                 enum {
00164                                     event_incomplete,
00166                                     event_outdated
00167                                 } status;
00168 
00177                                 shared::Time expire_time;
00178 
00179 
00181                                 Event(flen_t max_payload, const KeyType & key) :
00182                                         def(new (Allocator()) Defragmenter<DCP>(max_payload)),
00183                                         key(key), status(def ? event_incomplete : event_outdated) {
00184                                     touch();
00185                                 }
00186 
00188                                 ~Event() {
00189                                     if (def)
00190                                         Allocator::destroy(def);
00191                                 }
00192 
00196                                 void touch() {
00197                                     // Postpone time to drop incomplete fragment (3 seconds from now)
00198                                     shared::Time::get_current_time(expire_time);
00199                                     expire_time.add_sec(3);
00200                                 }
00201 
00203                                 const KeyType & get_key() {
00204                                     return key;
00205                                 }
00206 
00208                                 void * to_handle() {
00209                                     return reinterpret_cast<void *>(this);
00210                                 }
00211 
00213                                 static Event * from_handle(void * handle) {
00214                                     return reinterpret_cast<Event *>(handle);
00215                                 }
00216                         };
00217 
00219                         flen_t mtu;
00220 
00221                         typedef detail::PointerMap <
00222                                     KeyType,
00223                                     Event<KeyType>,
00224                                     (DCP::concurrent_events == (unsigned int)dynamic || DCP::old_event_ids == (unsigned int)dynamic ?
00225                                         dynamic :
00226                                         DCP::concurrent_events + DCP::old_event_ids)
00227                                 > EventMap;
00228 
00229                         typedef detail::Queue <
00230                                     Event<KeyType> *,
00231                                     DCP::old_event_ids
00232                                 > OutdatedQueue;
00233 
00235                         EventMap events;
00236 
00238                         OutdatedQueue outdated_events;
00239 
00245                         void set_event_outdated(Event<KeyType> * e) {
00246                             e->def = 0;
00247 
00248                             if (!e->key.eseq_header) {
00249                                 // Event without event sequence number -> no outdating, free immediately
00250                                 events.erase(e->key);
00251                                 Allocator::destroy(e);
00252                             } else {
00253                                 // Default:
00254                                 // Keep event sequence number for some time to detect late duplicates.
00255                                 e->status = Event<KeyType>::event_outdated;
00256 
00257                                 shared::Time::get_current_time(e->expire_time);
00258 
00259                                 // Use time for cleaning outdated events
00260                                 clean_outdated_events(e->expire_time);
00261 
00262                                 // If the queue is full delete the oldest entry (at least)
00263                                 if (outdated_events.full())
00264                                     clean_outdated_events(outdated_events.front()->expire_time);
00265 
00266                                 e->expire_time.add_sec(3);
00267                                 outdated_events.push_back(e);
00268                             }
00269                         }
00270 
00276                         void clean_outdated_events(const shared::Time & curr_time) {
00277                             while (!outdated_events.empty()) {
00278                                 Event<KeyType> * e = outdated_events.front();
00279 
00280                                 if (!(e->expire_time < curr_time))
00281                                     break;
00282 
00283                                 ::logging::log::emit< ::logging::Info>()
00284                                     << PROGMEMSTRING("AFP: clean outdated event ")
00285                                     << ::logging::log::dec
00286                                     << (unsigned int)e->key.eseq
00287                                     << PROGMEMSTRING(" (seq can occur again)")
00288                                     << ::logging::log::endl;
00289                                 events.erase(e->key);
00290                                 Allocator::destroy(e);
00291                                 outdated_events.pop();
00292                             }
00293                         }
00294 
00298                         void clean_incomplete_untouched_events() {
00299                             shared::Time curr_time;
00300                             Event<KeyType> * e;
00301 
00302                             shared::Time::get_current_time(curr_time);
00303 
00304                             typename EventMap::iterator it = events.begin();
00305                             while (it != events.end()) {
00306                                 e = *it;
00307                                 if (e->status == Event<KeyType>::event_incomplete && e->expire_time < curr_time) {
00308                                     ::logging::log::emit< ::logging::Info>()
00309                                         << PROGMEMSTRING("AFP: clean untouched event (timeout)")
00310                                         << ::logging::log::endl;
00311                                     events.erase(it++);
00312                                     Allocator::destroy(e);
00313                                 } else {
00314                                     ++it;
00315                                 }
00316                             }
00317                         }
00318 
00319                     public:
00320 
00322                         EventSeqDemux(flen_t mtu)
00323                                 : mtu(mtu) {
00324                         }
00325 
00327                         ~EventSeqDemux() {
00328                             typename EventMap::iterator it = events.begin();
00329                             for (; it != events.end(); ++it)
00330                                 Allocator::destroy(*it);
00331                         }
00332 
00336                         void * get_defragmenter_handle(const Headers<DCP> & header, const KeyType & event_key) {
00337                             // Call cleanup function for every 10th fragment (TODO: place this somewhere else... quite expensive)
00338                             static int call = 0;
00339                             if (++call % 10 == 0)
00340                                 clean_incomplete_untouched_events();
00341 
00342                             typename EventMap::iterator it = events.find(event_key);
00343                             Event<KeyType> * event;
00344 
00345                             if (it == events.end()) {
00346                                 // Unknown key
00347                                 // -> create new event defragmenter
00348                                 FAMOUSO_ASSERT(mtu > header.length());
00349                                 event = new (Allocator()) Event<KeyType>(mtu - header.ext_length(), event_key);
00350                                 if (!event || event->status == Event<KeyType>::event_outdated || !events.insert(event)) {
00351                                     ::logging::log::emit< ::logging::Warning>()
00352                                         << PROGMEMSTRING("AFP: Out of memory -> drop")
00353                                         << ::logging::log::endl;
00354                                     return 0;
00355                                 }
00356                                 ::logging::log::emit< ::logging::Info>()
00357                                     << PROGMEMSTRING("AFP: defrag fragment ")
00358                                     << ::logging::log::dec << (unsigned int)header.fseq
00359                                     << PROGMEMSTRING(" of NEW event ")
00360                                     << (unsigned int)event_key.eseq << ::logging::log::endl;
00361                                 return event->to_handle();
00362                             }
00363 
00364                             // Found event
00365                             event = *it;
00366                             FAMOUSO_ASSERT(event->key == event_key);
00367 
00368                             if (event->status == Event<KeyType>::event_outdated || event->def->is_event_complete()) {
00369                                 // Event was already processed, dropped or is
00370                                 // complete (late fragment of already dropped
00371                                 // event, duplicate or FEC redundancy fragment
00372                                 // not needed) -> drop fragment
00373                                 ::logging::log::emit< ::logging::Info>()
00374                                     << PROGMEMSTRING("AFP: dropping outdated fragment ")
00375                                     << ::logging::log::dec << (unsigned int)header.fseq
00376                                     << PROGMEMSTRING(" of event ")
00377                                     << (unsigned int)event_key.eseq << ::logging::log::endl;
00378                                 return 0;
00379                             }
00380 
00381                             // Incomplete event will be dropped if it is not been touched for 3 seconds
00382                             // Do not touch for every fragment to reduce frequency of expensive gettimeofday() system calls
00383                             if (header.eseq.occurs() && (header.eseq.get_eseq() & 3) == 0)
00384                                 event->touch();
00385 
00386                             ::logging::log::emit< ::logging::Info>()
00387                                 << PROGMEMSTRING("AFP: defrag fragment ")
00388                                 << ::logging::log::dec << (unsigned int)header.fseq
00389                                 << PROGMEMSTRING(" of event ")
00390                                 << (unsigned int)event_key.eseq << ::logging::log::endl;
00391                             return event->to_handle();
00392                         }
00393 
00395                         Defragmenter<DCP> * get_defragmenter(void * handle) {
00396                             return Event<KeyType>::from_handle(handle)->def;
00397                         }
00398 
00406                         void free_defragmenter(void * handle) {
00407                             Event<KeyType> * e = Event<KeyType>::from_handle(handle);
00408                             Allocator::destroy(e->def);
00409                             set_event_outdated(e);
00410                         }
00411 
00412 
00415                         enum { support_late_delivery };
00416 
00426                         void * keep_defragmenter(void * handle) {
00427                             // Keep Defragmenter instance seperated from Event marked as processed.
00428                             // It will be deleted in free_kept_defragmenter.
00429                             Event<KeyType> * e = Event<KeyType>::from_handle(handle);
00430                             Defragmenter<DCP> * def = e->def;
00431                             FAMOUSO_ASSERT(def->is_event_complete());
00432                             set_event_outdated(e);
00433                             return static_cast<void *>(def);
00434                         }
00435 
00441                         Defragmenter<DCP> * get_kept_defragmenter(void * handle) {
00442                             return static_cast<Defragmenter<DCP> *>(handle);
00443                         }
00444 
00452                         void free_kept_defragmenter(void * handle) {
00453                             Allocator::destroy(get_kept_defragmenter(handle));
00454                         }
00455 
00456                 };
00457 
00458 
00459             } // namespace defrag
00460         } // namespace afp
00461     } // namespace mw
00462 } // namespace famouso
00463 
00464 
00465 #endif // __EVENTSEQDEMUX_H_2A8B3BD61E7528__
00466