diff --git a/src/eitd/eitd.h b/src/eitd/eitd.h index 01e57b445..62943e191 100644 --- a/src/eitd/eitd.h +++ b/src/eitd/eitd.h @@ -96,8 +96,6 @@ typedef std::map > MySIservi #include #include "dmx.h" -#define MAX_SECTION_LENGTH (0x0fff + 3) - /* abstract section reading class */ class CSectionThread : public OpenThreads::Thread, public DMX { @@ -139,7 +137,7 @@ class CSectionThread : public OpenThreads::Thread, public DMX virtual void afterWait() {}; /* process section after getSection */ - virtual void processSection(int rc) { if(rc < 0) return; }; + virtual void processSection() {}; /* cleanup before exit */ virtual void cleanup() {}; @@ -207,7 +205,7 @@ class CEventsThread : public CSectionThread /* default hooks */ bool shouldSleep(); bool checkSleep(); - void processSection(int rc); + void processSection(); /* EIT-specific */ bool addEvents(); @@ -245,7 +243,7 @@ class CCNThread : public CEventsThread void beforeSleep(); void beforeWait(); void afterWait(); - void processSection(int rc); + void processSection(); void cleanup(); /* CN-specific */ @@ -265,7 +263,7 @@ class CSdtThread : public CSectionThread void addFilters(); bool shouldSleep(); bool checkSleep(); - void processSection(int rc); + void processSection(); /* SDT-specific */ bool addServices(); @@ -276,7 +274,18 @@ class CSdtThread : public CSectionThread class CTimeThread : public CSectionThread { private: + /* overloaded hooks */ + void addFilters(); + + /* specific */ + bool time_ntp; + bool first_time; + + void sendTimeEvent(bool dvb, time_t tim = 0); + void setSystemTime(time_t tim); void run(); + public: + CTimeThread(); }; class CEitManager diff --git a/src/eitd/sectionsd.cpp b/src/eitd/sectionsd.cpp index f9b18afb8..840e8bf5b 100644 --- a/src/eitd/sectionsd.cpp +++ b/src/eitd/sectionsd.cpp @@ -61,10 +61,12 @@ #include "edvbstring.h" #include "xmlutil.h" -#define ENABLE_SDT //FIXME +//#define ENABLE_SDT //FIXME //#define DEBUG_SDT_THREAD -#define DEBUG_SECTION_THREAD +#define DEBUG_TIME_THREAD + +#define DEBUG_SECTION_THREADS #define DEBUG_CN_THREAD static bool sectionsd_ready = false; @@ -138,6 +140,7 @@ static pthread_rwlock_t messagingLock = PTHREAD_RWLOCK_INITIALIZER; static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER; +static CTimeThread threadTIME; static CEitThread threadEIT; static CCNThread threadCN; @@ -888,6 +891,8 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng pthread_mutex_lock(&timeThreadSleepMutex); pthread_cond_broadcast(&timeThreadSleepCond); pthread_mutex_unlock(&timeThreadSleepMutex); + + threadTIME.change(0); } threadCN.change(0); @@ -952,6 +957,8 @@ xprintf("[sectionsd] commandserviceChanged: Service change to " PRINTF_CHANNEL_I pthread_mutex_lock(&timeThreadSleepMutex); pthread_cond_broadcast(&timeThreadSleepCond); pthread_mutex_unlock(&timeThreadSleepMutex); + + threadTIME.setCurrentService(messaging_current_servicekey); } } else @@ -1105,7 +1112,7 @@ static void commandSetConfig(int connfd, char *data, const unsigned /*dataLength time_wakeup = true; } - if(time_wakeup) { + if (time_wakeup) { pthread_mutex_lock(&timeThreadSleepMutex); ntpserver = (std::string)&data[sizeof(struct sectionsd::commandSetConfig)]; @@ -1286,7 +1293,7 @@ bool sectionsd_parse_command(CBasicMessage::Header &rmsg, int connfd) if (rc == true) { dprintf("%s\n", connectionCommands[header.command].sCmd.c_str()); - if(connectionCommands[header.command].cmd == sendEmptyResponse) + if (connectionCommands[header.command].cmd == sendEmptyResponse) printf("sectionsd_parse_command: UNUSED cmd used: %d (%s)\n", header.command, connectionCommands[header.command].sCmd.c_str()); connectionCommands[header.command].cmd(connfd, data, header.dataLength); } @@ -1314,10 +1321,21 @@ static void dump_sched_info(std::string label) // updates system time according TOT every 30 minutes //--------------------------------------------------------------------- -static void sendTimeEvent(bool dvb, time_t tim = 0) +CTimeThread::CTimeThread() + : CSectionThread("timeThread", 0x14) +{ + timeoutInMSeconds = 36000; + cache = false; + wait_for_time = false; + + first_time = true; + time_ntp = false; +}; + +void CTimeThread::sendTimeEvent(bool dvb, time_t tim) { time_t actTime = time(NULL); - if(dvb) { + if (dvb) { struct tm *tmTime = localtime(&actTime); xprintf("[%sThread] - current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", "time", tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim)); @@ -1330,89 +1348,119 @@ static void sendTimeEvent(bool dvb, time_t tim = 0) eventServer->sendEvent(CSectionsdClient::EVT_TIMESET, CEventServer::INITID_SECTIONSD, &actTime, sizeof(actTime) ); } -static void *timeThread(void *) +void CTimeThread::setSystemTime(time_t tim) { - UTC_t UTC; - time_t tim; - unsigned int seconds; - bool first_time = true; /* we don't sleep the first time (we try to get a TOT header) */ - struct timespec restartWait; - struct timeval now; - bool time_ntp = false; - bool success = true; + struct timeval tv; + if ((!messaging_neutrino_sets_time) && (geteuid() == 0)) { + tv.tv_sec = tim; + tv.tv_usec = 0; + if (settimeofday(&tv, NULL) < 0) + perror("[sectionsd] settimeofday"); + } +} - dprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self()); +void CTimeThread::addFilters() +{ + addfilter(0x70, 0xff); + addfilter(0x73, 0xff); +} - while(!sectionsd_stop) - { - while (!scanning || !reader_ready) { - if(sectionsd_stop) +void CTimeThread::run() +{ + //struct timespec restartWait; + //struct timeval now; + time_t dvb_time = 0; + xprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self()); + + addFilters(); + DMX::start(); + + while(running) { + if (sendToSleepNow) { +#ifdef DEBUG_TIME_THREAD + xprintf("%s: going to sleep %d seconds, running %d scanning %d\n", + name.c_str(), sleep_time, running, scanning); +#endif + real_pause(); + int rs = Sleep(); +#ifdef DEBUG_TIME_THREAD + xprintf("%s: wakeup, running %d scanning %d reason %d\n", + name.c_str(), running, scanning, rs); +#endif + if (!running) break; - sleep(1); + + sendToSleepNow = false; } - if ( ntpenable && system( ntp_system_cmd.c_str() ) == 0) - { - first_time = false; + bool success = false; + time_ntp = false; + dvb_time = 0; + if (ntpenable && system( ntp_system_cmd.c_str() ) == 0) { time_ntp = true; - sendTimeEvent(false); - } else { - if (dvb_time_update) { -xprintf("timeThread: getting UTC\n"); - success = getUTC(&UTC, first_time); // for first time, get TDT, then TOT -xprintf("timeThread: getting UTC done : %d\n", success); - if (success) - { - tim = changeUTCtoCtime((const unsigned char *) &UTC); - time_ntp = false; - sendTimeEvent(true, tim); + success = true; + } else if (dvb_time_update) { + xprintf("timeThread: getting time\n"); + if(!first_time) + change(1); - if (tim) { - if ((!messaging_neutrino_sets_time) && (geteuid() == 0)) { - struct timeval tv; - tv.tv_sec = tim; - tv.tv_usec = 0; - if (settimeofday(&tv, NULL) < 0) { - perror("[sectionsd] settimeofday"); - pthread_exit(NULL); - } - } - } + int rc = dmx->Read(static_buf, MAX_SECTION_LENGTH, timeoutInMSeconds); + xprintf("timeThread: getting time done : %d messaging_neutrino_sets_time %d\n", rc, messaging_neutrino_sets_time); + if (rc > 0) { + SIsectionTIME st(static_buf); + if (st.is_parsed()) { + dvb_time = st.getTime(); + success = true; } } } + /* default sleep time */ + sleep_time = ntprefresh * 60; + if(success) { + if(dvb_time) { + setSystemTime(dvb_time); + if(first_time) + sleep_time = 5; /* retry a second time immediately */ + } + sendTimeEvent(time_ntp, dvb_time); + xprintf("%s: Time set via %s, going to sleep for %d seconds.\n", name.c_str(), + time_ntp ? "NTP" : first_time ? "DVB (TDT)" : "DVB (TOT)", sleep_time); + first_time = false; + } else { + xprintf("%s: Time set FAILED", name.c_str()); + } + +#if 0 if (timeset && dvb_time_update) { - if (first_time) - seconds = 5; /* retry a second time immediately */ + if (!first_time) + sleep_time = ntprefresh * 60; else - seconds = ntprefresh * 60; + sleep_time = 5; /* retry a second time immediately */ - if(time_ntp) { - xprintf("[%sThread] Time set via NTP, going to sleep for %d seconds.\n", "time", seconds); + if (time_ntp) { + xprintf("[%sThread] Time set via NTP, going to sleep for %d seconds.\n", "time", sleep_time); } else { xprintf("[%sThread] Time %sset via DVB(%s), going to sleep for %d seconds.\n", - "time", success?"":"not ", first_time?"TDT":"TOT", seconds); + "time", success?"":"not ", first_time?"TDT":"TOT", sleep_time); } first_time = false; } else { if (!first_time) { /* time was already set, no need to do it again soon when DVB time-blocked channel is tuned */ - seconds = ntprefresh * 60; - } - else if (!scanning) { - seconds = 60; - } - else { - seconds = 1; + sleep_time = ntprefresh * 60; + } else { + sleep_time = 1; } if (!dvb_time_update && !first_time) { - xprintf("[%sThread] Time NOT set via DVB due to blocked channel, going to sleep for %d seconds.\n", "time", seconds); + xprintf("[%sThread] Time NOT set via DVB due to blocked channel, going to sleep for %d seconds.\n", "time", sleep_time); } } - if(sectionsd_stop) +#endif +#if 0 + if (sectionsd_stop) break; xprintf("timeThread: going to sleep for %d sec\n\n", seconds); @@ -1430,6 +1478,7 @@ xprintf("timeThread: going to sleep for %d sec\n\n", seconds); { dprintf("TDT-Thread sleeping interrupted\n"); } +#endif } printf("[sectionsd] timeThread ended\n"); @@ -1446,7 +1495,7 @@ int CSectionThread::Sleep() struct timespec abs_wait; struct timeval now; - if(sleep_time) { + if (sleep_time) { gettimeofday(&now, NULL); TIMEVAL_TO_TIMESPEC(&now, &abs_wait); abs_wait.tv_sec += sleep_time; @@ -1455,7 +1504,7 @@ int CSectionThread::Sleep() pthread_mutex_lock(&start_stop_mutex); beforeWait(); - if(sleep_time) + if (sleep_time) rs = pthread_cond_timedwait( &change_cond, &start_stop_mutex, &abs_wait ); else rs = pthread_cond_wait(&change_cond, &start_stop_mutex); @@ -1484,7 +1533,7 @@ void CSectionThread::run() while (running) { if (shouldSleep()) { -#ifdef DEBUG_SECTION_THREAD +#ifdef DEBUG_SECTION_THREADS xprintf("%s: going to sleep %d seconds, running %d scanning %d blacklisted %d events %d\n", name.c_str(), sleep_time, running, scanning, channel_is_blacklisted, event_count); #endif @@ -1495,13 +1544,13 @@ void CSectionThread::run() do { real_pause(); rs = Sleep(); -#ifdef DEBUG_SECTION_THREAD +#ifdef DEBUG_SECTION_THREADS xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs); #endif } while (checkSleep()); - if(!running) + if (!running) break; afterSleep(); @@ -1512,15 +1561,17 @@ void CSectionThread::run() sendToSleepNow = false; } +#if 0 int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX); - processSection(rc); +#endif + processSection(); time_t zeit = time_monotonic(); bool need_change = false; - if(timeoutsDMX < 0 || timeoutsDMX >= skipTimeouts) { -#ifdef DEBUG_SECTION_THREAD + if (timeoutsDMX < 0 || timeoutsDMX >= skipTimeouts) { +#ifdef DEBUG_SECTION_THREADS xprintf("%s: skipping to next filter %d from %d (timeouts %d)\n", name.c_str(), filter_index+1, filters.size(), timeoutsDMX); #endif @@ -1528,13 +1579,13 @@ void CSectionThread::run() need_change = true; } if (zeit > lastChanged + skipTime) { -#ifdef DEBUG_SECTION_THREAD +#ifdef DEBUG_SECTION_THREADS xprintf("%s: skipping to next filter %d from %d (seconds %d)\n", name.c_str(), filter_index+1, filters.size(), (int) (zeit - lastChanged)); #endif need_change = true; } - if(running && need_change && scanning) { + if (running && need_change && scanning) { readLockMessaging(); if (!next_filter()) sendToSleepNow = true; @@ -1602,9 +1653,10 @@ bool CEventsThread::checkSleep() } /* default section process */ -void CEventsThread::processSection(int rc) +void CEventsThread::processSection() { - if(rc <= 0) + int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX); + if (rc <= 0) return; addEvents(); } @@ -1709,7 +1761,7 @@ void CCNThread::beforeWait() void CCNThread::afterWait() { xprintf("%s: stop eit update filter (%s)\n", name.c_str(), updating ? "active" : "not active"); - if(updating) { + if (updating) { updating = false; eitDmx->Stop(); } @@ -1723,9 +1775,10 @@ void CCNThread::beforeSleep() } } -void CCNThread::processSection(int rc) +void CCNThread::processSection() { - if(rc <= 0) + int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX); + if (rc <= 0) return; addEvents(); @@ -1751,7 +1804,7 @@ void CCNThread::processSection(int rc) /* CN specific functions */ bool CCNThread::checkUpdate() { - if(!updating) + if (!updating) return false; unsigned char buf[MAX_SECTION_LENGTH]; @@ -1872,12 +1925,13 @@ bool CSdtThread::checkSleep() return (running && !scanning); } -void CSdtThread::processSection(int rc) +void CSdtThread::processSection() { - if(rc <= 0) + int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX); + if (rc <= 0) return; - if(addServices()) + if (addServices()) lastChanged = time_monotonic(); } @@ -1946,7 +2000,7 @@ static void *houseKeepingThread(void *) while (!scanning) { sleep(1); // wait for streaming to end... - if(sectionsd_stop) + if (sectionsd_stop) break; } @@ -1997,11 +2051,11 @@ static void *houseKeepingThread(void *) pthread_exit(NULL); } -extern cDemux * dmxUTC; +//extern cDemux * dmxUTC; void sectionsd_main_thread(void * /*data*/) { - pthread_t threadTOT, threadHouseKeeping; + pthread_t /*threadTOT,*/ threadHouseKeeping; int rc; printf("$Id: sectionsd.cpp,v 1.305 2009/07/30 12:41:39 seife Exp $\n"); @@ -2059,6 +2113,7 @@ printf("SIevent size: %d\n", sizeof(SIevent)); eventServer = new CEventServer; +#if 0 // time-Thread starten rc = pthread_create(&threadTOT, 0, timeThread, 0); @@ -2066,7 +2121,8 @@ printf("SIevent size: %d\n", sizeof(SIevent)); fprintf(stderr, "[sectionsd] failed to create time-thread (rc=%d)\n", rc); return; } - +#endif + threadTIME.Start(); threadEIT.Start(); threadCN.Start(); @@ -2093,7 +2149,7 @@ printf("SIevent size: %d\n", sizeof(SIevent)); while (!sectionsd_stop && sectionsd_server.run(sectionsd_parse_command, sectionsd::ACTVERSION, true)) { sched_yield(); - if(threadCN.checkUpdate()) { + if (threadCN.checkUpdate()) { sched_yield(); threadCN.change(0); sched_yield(); @@ -2121,12 +2177,14 @@ printf("SIevent size: %d\n", sizeof(SIevent)); pthread_cancel(threadHouseKeeping); - if(dmxUTC) dmxUTC->Stop(); + //if (dmxUTC) dmxUTC->Stop(); //pthread_cancel(threadTOT); printf("join TOT\n"); - pthread_join(threadTOT, NULL); - if(dmxUTC) delete dmxUTC; + //pthread_join(threadTOT, NULL); + threadTIME.Stop(); + + //if (dmxUTC) delete dmxUTC; printf("join EIT\n"); threadEIT.Stop();