From effe71d9957d3e6788da192c76f44bb2950c61e7 Mon Sep 17 00:00:00 2001 From: "[CST] Focus" Date: Wed, 22 Feb 2012 16:54:50 +0400 Subject: [PATCH] eitd/sectionsd.cpp: convert SDT thread, add debug defines --- src/eitd/eitd.h | 1 + src/eitd/sectionsd.cpp | 281 +++++++++++++++++++---------------------- 2 files changed, 134 insertions(+), 148 deletions(-) diff --git a/src/eitd/eitd.h b/src/eitd/eitd.h index 0a07fff37..9de0d5de1 100644 --- a/src/eitd/eitd.h +++ b/src/eitd/eitd.h @@ -164,6 +164,7 @@ class CCNThread : public CSectionThread class CSdtThread : public CSectionThread { private: + bool addServices(); void run(); }; diff --git a/src/eitd/sectionsd.cpp b/src/eitd/sectionsd.cpp index 4cf1863f0..71f1aa722 100644 --- a/src/eitd/sectionsd.cpp +++ b/src/eitd/sectionsd.cpp @@ -64,22 +64,25 @@ //#define ENABLE_SDT //FIXME -// 60 Minuten Zyklus... -#define TIME_EIT_SCHEDULED_PAUSE 60 * 60 -// -- 5 Minutes max. pause should improve behavior (rasc, 2005-05-02) -// #define TIME_EIT_SCHEDULED_PAUSE 5* 60 -// Zeit die fuer die gewartet wird, bevor der Filter weitergeschaltet wird, falls es automatisch nicht klappt -#define TIME_EIT_SKIPPING 90 +//#define DEBUG_SDT_THREAD +//#define DEBUG_EIT_THREAD +#define DEBUG_CN_THREAD -#ifdef ENABLE_FREESATEPG -#endif +/* period to restart EIT reading */ +#define TIME_EIT_SCHEDULED_PAUSE 60 * 60 +/* force EIT thread to change filter after, seconds */ +#define TIME_EIT_SKIPPING 90 +// a little more time for freesat epg +#define TIME_FSEIT_SKIPPING 240 static bool sectionsd_ready = false; /*static*/ bool reader_ready = true; static unsigned int max_events; +/* period to remove old events */ //#define HOUSEKEEPING_SLEEP (5 * 60) // sleep 5 minutes #define HOUSEKEEPING_SLEEP (30) // FIXME 1 min for testing +/* period to clean cached sections and force restart sections read */ #define META_HOUSEKEEPING (24 * 60 * 60) / HOUSEKEEPING_SLEEP // meta housekeeping after XX housekeepings - every 24h - // Timeout bei tcp/ip connections in ms @@ -169,17 +172,16 @@ static CEitThread threadEIT; static CCNThread threadCN; #ifdef ENABLE_FREESATEPG -// a little more time for freesat epg -#define TIME_FSEIT_SKIPPING 240 static DMX dmxFSEIT(3842, 320); #endif #ifdef ENABLE_SDT -#define TIME_SDT_NONEWDATA 5 +#define TIME_SDT_NONEWDATA 15 #define RESTART_DMX_AFTER_TIMEOUTS 5 #define TIME_SDT_SCHEDULED_PAUSE 2* 60* 60 -static DMX dmxSDT(0x11, 512, true, 0); +CSdtThread threadSDT; #endif + int sectionsd_stop = 0; static bool slow_addevent = true; @@ -919,17 +921,15 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng dmxFSEIT.change(0); #endif #ifdef ENABLE_SDT - dmxSDT.change(0); + threadSDT.change(0); #endif } struct sectionsd::msgResponseHeader msgResponse; - msgResponse.dataLength = 0; - writeNbytes(connfd, (const char *)&msgResponse, sizeof(msgResponse), WRITE_TIMEOUT_IN_SECONDS); - return ; + return; } static void commandserviceChanged(int connfd, char *data, const unsigned dataLength) @@ -1017,7 +1017,7 @@ xprintf("[sectionsd] commandserviceChanged: Service changed to " PRINTF_CHANNEL_ dmxFSEIT.setCurrentService(messaging_current_servicekey); #endif #ifdef ENABLE_SDT - dmxSDT.setCurrentService(messaging_current_servicekey); + threadSDT.setCurrentService(messaging_current_servicekey); #endif } else @@ -1228,8 +1228,8 @@ static void deleteSIexceptEPG() threadEIT.dropCachedSectionIDs(); threadEIT.change(0); #ifdef ENABLE_SDT - dmxSDT.dropCachedSectionIDs(); - dmxSDT.change(0); + threadSDT.dropCachedSectionIDs(); + threadSDT.change(0); #endif #ifdef ENABLE_FREESATEPG dmxFSEIT.setCurrentService(messaging_current_servicekey); @@ -1242,7 +1242,7 @@ static void commandFreeMemory(int connfd, char * /*data*/, const unsigned /*data deleteSIexceptEPG(); writeLockEvents(); -showProfiling("commandFreeMemory start"); + #ifndef USE_BOOST_SHARED_PTR std::set allevents; @@ -1259,13 +1259,12 @@ showProfiling("commandFreeMemory start"); for(std::set::iterator ait = allevents.begin(); ait != allevents.end(); ++ait) delete (*ait); -showProfiling("commandFreeMemory end1"); #endif mySIeventsOrderFirstEndTimeServiceIDEventUniqueKey.clear(); mySIeventsOrderServiceUniqueKeyFirstStartTimeEventUniqueKey.clear(); mySIeventsOrderUniqueKey.clear(); mySIeventsNVODorderUniqueKey.clear(); -showProfiling("commandFreeMemory end2"); + unlockEvents(); //FIXME debug @@ -1876,8 +1875,10 @@ void CEitThread::run() while (running) { if(sendToSleepNow || !scanning || channel_is_blacklisted) { +#ifdef DEBUG_EIT_THREAD xprintf("%s: going to sleep %d seconds, running %d scanning %d blacklisted %d events %d\n", name.c_str(), TIME_EIT_SCHEDULED_PAUSE, running, scanning, channel_is_blacklisted, event_count); +#endif event_count = 0; writeLockMessaging(); @@ -1888,7 +1889,9 @@ void CEitThread::run() do { real_pause(); rs = Sleep(TIME_EIT_SCHEDULED_PAUSE); +#ifdef DEBUG_EIT_THREAD xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs); +#endif } while(running && (!scanning || channel_is_blacklisted)); if(!running) @@ -1907,12 +1910,16 @@ void CEitThread::run() bool need_change = false; if(timeoutsDMX < 0 || timeoutsDMX >= CHECK_RESTART_DMX_AFTER_TIMEOUTS) { +#ifdef DEBUG_EIT_THREAD xprintf("%s: skipping to next filter %d from %d (timeouts %d)\n", name.c_str(), filter_index+1, filters.size(), timeoutsDMX); +#endif timeoutsDMX = 0; need_change = true; } if (zeit > lastChanged + TIME_EIT_SKIPPING) { +#ifdef DEBUG_EIT_THREAD xprintf("%s: skipping to next filter %d from %d (TIME_EIT_SKIPPING)\n", name.c_str(), filter_index+1, filters.size()); +#endif need_change = true; } @@ -1957,7 +1964,9 @@ void CCNThread::run() unlockMessaging(); waitForTimeset(); +#ifdef DEBUG_CN_THREAD xprintf("CCNThread::run:: time set..\n"); +#endif DMX::start(); // -> unlock @@ -1966,8 +1975,10 @@ void CCNThread::run() while(running) { if(sendToSleepNow || !scanning || channel_is_blacklisted) { +#ifdef DEBUG_CN_THREAD xprintf("%s: going to sleep, running %d scanning %d blacklisted %d events %d\n", name.c_str(), running, scanning, channel_is_blacklisted, event_count); //xprintf("%s: eit_version %02x messaging_need_eit_version %d rc %d\n", name.c_str(), get_eit_version(), messaging_need_eit_version, rc); +#endif writeLockMessaging(); messaging_eit_is_busy = false; @@ -1982,8 +1993,9 @@ void CCNThread::run() rs = pthread_cond_wait(&change_cond, &start_stop_mutex); eit_stop_update_filter(&eit_update_fd); pthread_mutex_unlock(&start_stop_mutex); - //rs = Sleep(TIME_EIT_SCHEDULED_PAUSE); +#ifdef DEBUG_CN_THREAD xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs); +#endif } while(running && (!scanning || channel_is_blacklisted)); if(!running) @@ -2050,8 +2062,10 @@ void CCNThread::run() writeLockMessaging(); messaging_have_CN = messaging_got_CN; unlockMessaging(); +#ifdef DEBUG_CN_THREAD xprintf("%s: have CN: timeoutsDMX %d messaging_have_CN %x messaging_got_CN %x\n\n", name.c_str(), timeoutsDMX, messaging_have_CN, messaging_got_CN); +#endif dprintf("[cnThread] got current_next (0x%x) - sending event!\n", messaging_have_CN); eventServer->sendEvent(CSectionsdClient::EVT_GOT_CN_EPG, CEventServer::INITID_SECTIONSD, @@ -2079,14 +2093,18 @@ void CCNThread::run() #endif #if 1 if(timeoutsDMX < 0) { +#ifdef DEBUG_CN_THREAD xprintf("%s: timeoutsDMX %d messaging_got_CN %x messaging_have_CN %x sid %016llx\n\n", name.c_str(), timeoutsDMX, messaging_got_CN, messaging_have_CN, messaging_current_servicekey); +#endif timeoutsDMX = 0; sendToSleepNow = true; } #endif if (zeit > lastChanged + TIME_EIT_VERSION_WAIT) { +#ifdef DEBUG_CN_THREAD xprintf("%s: zeit > lastChanged + TIME_EIT_VERSION_WAIT\n", name.c_str()); +#endif sendToSleepNow = true; } @@ -2183,7 +2201,7 @@ void CCNThread::run() #endif } // for delete[] static_buf; - printf("[sectionsd] cnThread ended\n"); + printf("[sectionsd] %s ended\n", name.c_str()); pthread_exit(NULL); } @@ -2199,7 +2217,6 @@ static bool addService(const SIservice &s, const int is_actual) unlockServices(); if ( (!already_exists) || ((is_actual & 7) && (!si->second->is_actual)) ) { - if (already_exists) { writeLockServices(); @@ -2241,146 +2258,108 @@ static bool addService(const SIservice &s, const int is_actual) } is_new = true; } - return is_new; } -static void *sdtThread(void *) +void CSdtThread::run() { - const unsigned timeoutInMSeconds = 2500; - t_transponder_id tid = 0; - time_t lastData = 0; - time_t zeit = 0; - int rs = 0; - int is_actual = 0; + name = "sdtThread"; + xprintf("%s::run:: starting, pid %d (%lu)\n", name.c_str(), getpid(), pthread_self()); + pID = 0x11; + dmx_num = 0; + cache = false; - //FIXME correct mask - dmxSDT.addfilter(0x42, 0xf3 ); //SDT actual = 0x42 + SDT other = 0x46 + BAT = 0x4A + timeoutInMSeconds = EIT_READ_TIMEOUT; - dprintf("[%sThread] pid %d (%lu) start\n", "sdt", getpid(), pthread_self()); + bool sendToSleepNow = false; - int timeoutsDMX = 0; - uint8_t *static_buf = new uint8_t[MAX_SECTION_LENGTH]; - int rc; - - bool startup = true; + //addfilter(0x42, 0xf3 ); //SDT actual = 0x42 + SDT other = 0x46 + BAT = 0x4A + addfilter(0x42, 0xfb ); //SDT actual = 0x42 + SDT other = 0x46 waitForTimeset(); - dmxSDT.start(); // -> unlock + DMX::start(); // -> unlock - while (!sectionsd_stop) { - while (!scanning) { - if(sectionsd_stop) - break; - sleep(1); - } - zeit = time_monotonic(); + time_t lastData = time_monotonic(); - if(sectionsd_stop) - break; + while (running) { + if(sendToSleepNow || !scanning) { +#ifdef DEBUG_SDT_THREAD + xprintf("%s: going to sleep %d seconds, running %d scanning %d services %d\n", + name.c_str(), TIME_SDT_SCHEDULED_PAUSE, running, scanning, event_count); +#endif + event_count = 0; - readLockMessaging(); - if (messaging_zap_detected) - startup = true; - unlockMessaging(); + int rs = 0; + do { + real_pause(); + rs = Sleep(TIME_SDT_SCHEDULED_PAUSE); +#ifdef DEBUG_SDT_THREAD + xprintf("%s: wakeup, running %d scanning %d reason %d\n\n", name.c_str(), running, scanning, rs); +#endif + } while(running && !scanning); - if ((zeit > lastData + TIME_SDT_NONEWDATA) || (startup)) - { - struct timespec abs_wait; - struct timeval now; - - gettimeofday(&now, NULL); - TIMEVAL_TO_TIMESPEC(&now, &abs_wait); - abs_wait.tv_sec += (TIME_SDT_SCHEDULED_PAUSE); - - dmxSDT.real_pause(); - /* this is the "last" thread. Means: if this one goes to sleep, sectionsd - * sleeps mostly. Worth printing. */ - printdate_ms(stdout); - printf("sdtThread: going to sleep...\n"); - - writeLockMessaging(); - messaging_zap_detected = false; - unlockMessaging(); - - pthread_mutex_lock( &dmxSDT.start_stop_mutex ); - rs = pthread_cond_timedwait( &dmxSDT.change_cond, &dmxSDT.start_stop_mutex, &abs_wait ); - pthread_mutex_unlock( &dmxSDT.start_stop_mutex ); - - if(sectionsd_stop) + if(!running) break; if (rs == ETIMEDOUT) - { - dprintf("dmxSDT: waking up again - looking for new services :)\n"); - dmxSDT.change( 0 ); // -> restart - } - else if (rs == 0) - { - dprintf("dmxSDT: waking up again - requested from .change()\n"); - } - else - { - dprintf("dmxSDT: waking up again - unknown reason?!\n"); - dmxSDT.real_unpause(); - } - // update zeit after sleep + change(0); // -> restart - startup = false; - zeit = time_monotonic(); - timeoutsDMX = 0; - lastData = zeit; + sendToSleepNow = false; + lastData = time_monotonic(); } - if (timeoutsDMX >= RESTART_DMX_AFTER_TIMEOUTS && scanning) - { - timeoutsDMX = 0; - dmxSDT.stop(); - dmxSDT.start(); // leaves unlocked - dputs("\n !!! dmxSDT restarted !!!\n"); - } - - rc = dmxSDT.getSection(static_buf, timeoutInMSeconds, timeoutsDMX); - - if (rc < 0) - continue; - - LongSection sec(static_buf); - uint8_t table_id = sec.getTableId(); - if ((table_id == 0x42) || (table_id == 0x46)) - { - SIsectionSDT sdt(static_buf); - - is_actual = (sdt.getTableId() == 0x42) ? 1 : 0; - - if (is_actual && !sdt.getLastSectionNumber()) - is_actual = 2; - - bool is_new = false; - is_actual = (is_actual | 8); - - for (SIservices::iterator s = sdt.services().begin(); s != sdt.services().end(); ++s) { - if (addService(*s, is_actual)) { - is_new = true; - tid = CREATE_TRANSPONDER_ID_FROM_ORIGINALNETWORK_TRANSPORTSTREAM_ID(s->original_network_id, - s->transport_stream_id); - } - } - - if (is_new) { + int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX); + if(rc > 0) { + if(addServices()) lastData = time_monotonic(); - - dprintf("[sdtThread] added %d services [table 0x%x TID: %08x]\n", - sdt.services().size(), table_id, tid); - } } - } // for + time_t zeit = time_monotonic(); + + if(timeoutsDMX < 0 || timeoutsDMX >= CHECK_RESTART_DMX_AFTER_TIMEOUTS) { +#ifdef DEBUG_SDT_THREAD + xprintf("%s: timeouts %d\n", name.c_str(), timeoutsDMX); +#endif + timeoutsDMX = 0; + sendToSleepNow = true; + } + + if (zeit > (lastData + TIME_SDT_NONEWDATA)) { +#ifdef DEBUG_SDT_THREAD + xprintf("%s: no new services for %d seconds\n", name.c_str(), (int) (zeit - lastData)); +#endif + sendToSleepNow = true; + } + } // while running delete[] static_buf; - printf("[sectionsd] sdt-thread ended\n"); - + printf("[sectionsd] %s ended\n", name.c_str()); pthread_exit(NULL); } + +bool CSdtThread::addServices() +{ + bool is_new = false; + + LongSection sec(static_buf); + uint8_t table_id = sec.getTableId(); + if ((table_id == 0x42) || (table_id == 0x46)) { + SIsectionSDT sdt(static_buf); + + bool is_actual = (sdt.getTableId() == 0x42) ? 1 : 0; + + if (is_actual && !sdt.getLastSectionNumber()) + is_actual = 2; + + is_actual = (is_actual | 8); + + for (SIservices::iterator s = sdt.services().begin(); s != sdt.services().end(); ++s) { + if (addService(*s, is_actual)) + is_new = true; + event_count++; + } + } + return is_new; +} #endif /* helper function for the housekeeping-thread */ @@ -2482,7 +2461,7 @@ void sectionsd_main_thread(void * /*data*/) pthread_t threadFSEIT; #endif #ifdef ENABLE_SDT - pthread_t threadSDT; + //pthread_t threadSDT; #endif int rc; @@ -2563,6 +2542,7 @@ printf("SIevent size: %d\n", sizeof(SIevent)); } #endif #ifdef ENABLE_SDT +#if 0 printf("\n\n\n[sectionsd] starting SDT thread\n"); rc = pthread_create(&threadSDT, 0, sdtThread, 0); @@ -2571,6 +2551,8 @@ printf("SIevent size: %d\n", sizeof(SIevent)); return; } #endif + threadSDT.Start(); +#endif // housekeeping-Thread starten rc = pthread_create(&threadHouseKeeping, 0, houseKeepingThread, 0); @@ -2642,9 +2624,11 @@ printf("SIevent size: %d\n", sizeof(SIevent)); #endif #ifdef ENABLE_SDT +#if 0 pthread_mutex_lock(&dmxSDT.start_stop_mutex); pthread_cond_broadcast(&dmxSDT.change_cond); pthread_mutex_unlock(&dmxSDT.start_stop_mutex); +#endif #endif printf("pausing...\n"); @@ -2656,7 +2640,7 @@ printf("SIevent size: %d\n", sizeof(SIevent)); dmxFSEIT.request_pause(); #endif #ifdef ENABLE_SDT - dmxSDT.request_pause(); + //dmxSDT.request_pause(); #endif pthread_cancel(threadHouseKeeping); @@ -2664,19 +2648,20 @@ printf("SIevent size: %d\n", sizeof(SIevent)); //pthread_cancel(threadTOT); - printf("join 1\n"); + printf("join TOT\n"); pthread_join(threadTOT, NULL); if(dmxUTC) delete dmxUTC; - printf("join 2\n"); + printf("join EIT\n"); threadEIT.Stop(); - printf("join 3\n"); + printf("join CN\n"); threadCN.Stop(); #ifdef ENABLE_SDT - printf("join 4\n"); - pthread_join(threadSDT, NULL); + printf("join SDT\n"); + threadSDT.Stop(); + //pthread_join(threadSDT, NULL); #endif eit_stop_update_filter(&eit_update_fd); @@ -2694,7 +2679,7 @@ printf("SIevent size: %d\n", sizeof(SIevent)); dmxFSEIT.close(); #endif #ifdef ENABLE_SDT - dmxSDT.close(); + //dmxSDT.close(); #endif printf("[sectionsd] ended\n");