diff --git a/src/eitd/eitd.h b/src/eitd/eitd.h index 62943e191..7ce153dd7 100644 --- a/src/eitd/eitd.h +++ b/src/eitd/eitd.h @@ -28,6 +28,10 @@ #include +#include +#include +#include "dmx.h" + #include "SIutils.hpp" #include "SIservices.hpp" #include "SIevents.hpp" @@ -93,8 +97,6 @@ typedef std::map > MySIeventUn typedef std::map > MySIservicesOrderUniqueKey; typedef std::map > MySIservicesNVODorderUniqueKey; -#include -#include "dmx.h" /* abstract section reading class */ class CSectionThread : public OpenThreads::Thread, public DMX @@ -177,23 +179,44 @@ class CSectionThread : public OpenThreads::Thread, public DMX running = true; return (OpenThreads::Thread::start() == 0); } - bool Stop() - { - if(!running) - return false; -printf("%s::Stop: to lock\n", name.c_str()); + void StopRun() { + xprintf("%s::StopRun: to lock\n", name.c_str()); lock(); running = false; -printf("%s::Stop: to broadcast\n", name.c_str()); - pthread_cond_broadcast(&change_cond); -printf("%s::Stop: to unlock\n", name.c_str()); - unlock(); -printf("%s::Stop: to closefd\n", name.c_str()); + real_pauseCounter = 1; + xprintf("%s::StopRun: to closefd\n", name.c_str()); DMX::closefd(); -printf("%s::Stop: to join\n", name.c_str()); + xprintf("%s::StopRun: to unlock\n", name.c_str()); + unlock(); + } + void Wakeup() { pthread_cond_broadcast(&change_cond); } + bool Stop() + { + xprintf("%s::Stop: to broadcast\n", name.c_str()); + pthread_cond_broadcast(&change_cond); + xprintf("%s::Stop: to join\n", name.c_str()); int ret = (OpenThreads::Thread::join() == 0); -printf("%s::Stop: to close\n", name.c_str()); + xprintf("%s::Stop: to close\n", name.c_str()); DMX::close(); +#if 0 + if(!running) + return false; +xprintf("%s::Stop: to lock\n", name.c_str()); + lock(); + running = false; +xprintf("%s::Stop: to broadcast\n", name.c_str()); + pthread_cond_broadcast(&change_cond); +xprintf("%s::Stop: to unlock\n", name.c_str()); + unlock(); +#if 1 +xprintf("%s::Stop: to closefd\n", name.c_str()); + DMX::closefd(); +#endif +xprintf("%s::Stop: to join\n", name.c_str()); + int ret = (OpenThreads::Thread::join() == 0); +xprintf("%s::Stop: to close\n", name.c_str()); + DMX::close(); +#endif return ret; } }; @@ -281,11 +304,18 @@ class CTimeThread : public CSectionThread bool time_ntp; bool first_time; + int64_t timediff; + + OpenThreads::Mutex time_mutex; + OpenThreads::Condition time_cond; + void sendTimeEvent(bool dvb, time_t tim = 0); void setSystemTime(time_t tim); void run(); public: CTimeThread(); + void waitForTimeset(); + void setTimeSet(); }; class CEitManager diff --git a/src/eitd/sectionsd.cpp b/src/eitd/sectionsd.cpp index 840e8bf5b..07b03b0f9 100644 --- a/src/eitd/sectionsd.cpp +++ b/src/eitd/sectionsd.cpp @@ -123,8 +123,8 @@ static t_channel_id messaging_current_servicekey = 0; static bool channel_is_blacklisted = false; bool timeset = false; -pthread_cond_t timeIsSetCond = PTHREAD_COND_INITIALIZER; -pthread_mutex_t timeIsSetMutex = PTHREAD_MUTEX_INITIALIZER; +//pthread_cond_t timeIsSetCond = PTHREAD_COND_INITIALIZER; +//pthread_mutex_t timeIsSetMutex = PTHREAD_MUTEX_INITIALIZER; static int messaging_have_CN = 0x00; // 0x01 = CURRENT, 0x02 = NEXT static int messaging_got_CN = 0x00; // 0x01 = CURRENT, 0x02 = NEXT @@ -137,8 +137,8 @@ static CEventServer *eventServer; static pthread_rwlock_t servicesLock = PTHREAD_RWLOCK_INITIALIZER; // Unsere (fast-)mutex, damit nicht gleichzeitig in die Menge services geschrieben und gelesen wird static pthread_rwlock_t messagingLock = PTHREAD_RWLOCK_INITIALIZER; -static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER; +//static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER; +//static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER; static CTimeThread threadTIME; static CEitThread threadEIT; @@ -204,6 +204,7 @@ inline void unlockEvents(void) pthread_rwlock_unlock(&eventsLock); } +#if 0 inline bool waitForTimeset(void) { pthread_mutex_lock(&timeIsSetMutex); @@ -217,6 +218,7 @@ inline bool waitForTimeset(void) sleep(1); return true; } +#endif static const SIevent nullEvt; // Null-Event @@ -886,12 +888,13 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng unlockMessaging(); scanning = 1; - if (!ntpenable) + if (!ntpenable) //FIXME flag if ntp update was good ? { +#if 0 pthread_mutex_lock(&timeThreadSleepMutex); pthread_cond_broadcast(&timeThreadSleepCond); pthread_mutex_unlock(&timeThreadSleepMutex); - +#endif threadTIME.change(0); } @@ -954,10 +957,11 @@ xprintf("[sectionsd] commandserviceChanged: Service change to " PRINTF_CHANNEL_I #endif if (time_trigger_last != (messaging_current_servicekey & 0xFFFFFFFF0000ULL)) { time_trigger_last = messaging_current_servicekey & 0xFFFFFFFF0000ULL; +#if 0 pthread_mutex_lock(&timeThreadSleepMutex); pthread_cond_broadcast(&timeThreadSleepCond); pthread_mutex_unlock(&timeThreadSleepMutex); - +#endif threadTIME.setCurrentService(messaging_current_servicekey); } } @@ -1113,18 +1117,22 @@ static void commandSetConfig(int connfd, char *data, const unsigned /*dataLength } if (time_wakeup) { - pthread_mutex_lock(&timeThreadSleepMutex); + //pthread_mutex_lock(&timeThreadSleepMutex); ntpserver = (std::string)&data[sizeof(struct sectionsd::commandSetConfig)]; dprintf("new network_ntpserver = %s\n", ntpserver.c_str()); ntp_system_cmd = ntp_system_cmd_prefix + ntpserver; ntprefresh = pmsg->network_ntprefresh; ntpenable = (pmsg->network_ntpenable == 1); +#if 0 if (timeset) { // wake up time thread pthread_cond_broadcast(&timeThreadSleepCond); } pthread_mutex_unlock(&timeThreadSleepMutex); +#endif + if(timeset) + threadTIME.change(1); } epg_dir= (std::string)&data[sizeof(struct sectionsd::commandSetConfig) + strlen(&data[sizeof(struct sectionsd::commandSetConfig)]) + 1]; @@ -1332,26 +1340,50 @@ CTimeThread::CTimeThread() time_ntp = false; }; -void CTimeThread::sendTimeEvent(bool dvb, time_t tim) +void CTimeThread::sendTimeEvent(bool ntp, time_t tim) { time_t actTime = time(NULL); - if (dvb) { + if (!ntp) { struct tm *tmTime = localtime(&actTime); - xprintf("[%sThread] - current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", "time", + xprintf("%s: current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", name.c_str(), tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim)); actTime = tim; } - pthread_mutex_lock(&timeIsSetMutex); - timeset = true; - pthread_cond_broadcast(&timeIsSetCond); - pthread_mutex_unlock(&timeIsSetMutex ); eventServer->sendEvent(CSectionsdClient::EVT_TIMESET, CEventServer::INITID_SECTIONSD, &actTime, sizeof(actTime) ); + setTimeSet(); +} + +void CTimeThread::setTimeSet() +{ + time_mutex.lock(); + timeset = true; + time_cond.broadcast(); + time_mutex.unlock(); +} + +void CTimeThread::waitForTimeset(void) +{ + time_mutex.lock(); + while(!timeset) + time_cond.wait(&time_mutex); + time_mutex.unlock(); } void CTimeThread::setSystemTime(time_t tim) { struct timeval tv; - if ((!messaging_neutrino_sets_time) && (geteuid() == 0)) { + + time_t now = time(NULL); + struct tm *tmTime = localtime(&now); + + gettimeofday(&tv, NULL); + timediff = tim * (int64_t)1000000 - (tv.tv_usec + tv.tv_sec * (int64_t)1000000); + + xprintf("%s: timediff %lld, current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", name.c_str(), timediff, + tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, + tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim)); + + if (!messaging_neutrino_sets_time) { tv.tv_sec = tim; tv.tv_usec = 0; if (settimeofday(&tv, NULL) < 0) @@ -1367,16 +1399,14 @@ void CTimeThread::addFilters() void CTimeThread::run() { - //struct timespec restartWait; - //struct timeval now; time_t dvb_time = 0; - xprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self()); + xprintf("%s::run:: starting, pid %d (%lu)\n", name.c_str(), getpid(), pthread_self()); addFilters(); DMX::start(); while(running) { - if (sendToSleepNow) { + if(sendToSleepNow) { #ifdef DEBUG_TIME_THREAD xprintf("%s: going to sleep %d seconds, running %d scanning %d\n", name.c_str(), sleep_time, running, scanning); @@ -1389,23 +1419,24 @@ void CTimeThread::run() #endif if (!running) break; - - sendToSleepNow = false; } - bool success = false; time_ntp = false; dvb_time = 0; + timediff = 0; + if (ntpenable && system( ntp_system_cmd.c_str() ) == 0) { time_ntp = true; success = true; } else if (dvb_time_update) { - xprintf("timeThread: getting time\n"); if(!first_time) change(1); + else + change(0); + xprintf("timeThread: getting DVB time (isOpen %d)\n", isOpen()); int rc = dmx->Read(static_buf, MAX_SECTION_LENGTH, timeoutInMSeconds); - xprintf("timeThread: getting time done : %d messaging_neutrino_sets_time %d\n", rc, messaging_neutrino_sets_time); + xprintf("timeThread: getting DVB time done : %d messaging_neutrino_sets_time %d\n", rc, messaging_neutrino_sets_time); if (rc > 0) { SIsectionTIME st(static_buf); if (st.is_parsed()) { @@ -1419,8 +1450,9 @@ void CTimeThread::run() if(success) { if(dvb_time) { setSystemTime(dvb_time); + /* retry a second time immediately after start, to get TOT ? */ if(first_time) - sleep_time = 5; /* retry a second time immediately */ + sleep_time = 5; } sendTimeEvent(time_ntp, dvb_time); @@ -1428,8 +1460,11 @@ void CTimeThread::run() time_ntp ? "NTP" : first_time ? "DVB (TDT)" : "DVB (TOT)", sleep_time); first_time = false; } else { - xprintf("%s: Time set FAILED", name.c_str()); + xprintf("%s: Time set FAILED (enabled: ntp %d dvb %d)\n", name.c_str(), ntpenable, dvb_time_update); + if(!timeset && first_time) + sleep_time = 1; } + sendToSleepNow = true; #if 0 if (timeset && dvb_time_update) { @@ -1443,7 +1478,7 @@ void CTimeThread::run() } else { xprintf("[%sThread] Time %sset via DVB(%s), going to sleep for %d seconds.\n", - "time", success?"":"not ", first_time?"TDT":"TOT", sleep_time); + "time", success?"":"not ", first_time?"TDT":"TOT", sleep_time); } first_time = false; } @@ -1463,7 +1498,10 @@ void CTimeThread::run() if (sectionsd_stop) break; -xprintf("timeThread: going to sleep for %d sec\n\n", seconds); + xprintf("timeThread: going to sleep for %d sec\n\n", seconds); + + struct timespec restartWait; + struct timeval now; gettimeofday(&now, NULL); TIMEVAL_TO_TIMESPEC(&now, &restartWait); restartWait.tv_sec += seconds; @@ -1525,8 +1563,9 @@ void CSectionThread::run() addFilters(); if (wait_for_time) { - waitForTimeset(); - xprintf("%s::run:: time set.\n", name.c_str()); + threadTIME.waitForTimeset(); + time_t now = time(NULL); + xprintf("%s::run:: time set: %s", name.c_str(), ctime(&now)); } DMX::start(); @@ -2164,40 +2203,52 @@ printf("SIevent size: %d\n", sizeof(SIevent)); printf("[sectionsd] stopping...\n"); //scanning = 0; + threadEIT.StopRun(); + threadCN.StopRun(); + threadTIME.StopRun(); + + xprintf("broadcasting...\n"); +#if 0 timeset = true; - printf("broadcasting...\n"); pthread_mutex_lock(&timeIsSetMutex); pthread_cond_broadcast(&timeIsSetCond); pthread_mutex_unlock(&timeIsSetMutex); +#endif + threadTIME.setTimeSet(); +#if 0 pthread_mutex_lock(&timeThreadSleepMutex); pthread_cond_broadcast(&timeThreadSleepCond); pthread_mutex_unlock(&timeThreadSleepMutex); - - printf("pausing...\n"); +#endif + xprintf("pausing...\n"); pthread_cancel(threadHouseKeeping); //if (dmxUTC) dmxUTC->Stop(); +#if 0 + xprintf("cancel TOT\n"); + //threadTIME.cancel(); //pthread_cancel(threadTOT); - printf("join TOT\n"); +#endif + xprintf("join TOT\n"); //pthread_join(threadTOT, NULL); threadTIME.Stop(); //if (dmxUTC) delete dmxUTC; - printf("join EIT\n"); + xprintf("join EIT\n"); threadEIT.Stop(); - printf("join CN\n"); + xprintf("join CN\n"); threadCN.Stop(); #ifdef ENABLE_SDT - printf("join SDT\n"); + xprintf("join SDT\n"); threadSDT.Stop(); #endif #ifdef ENABLE_FREESATEPG - printf("join FSEIT\n"); + xprintf("join FSEIT\n"); threadFSEIT.Stop(); #endif printf("[sectionsd] ended\n");