eitd: converting timeThread, WIP

This commit is contained in:
[CST] Focus
2012-03-02 19:30:57 +04:00
parent 0a6177a4d9
commit 95e61ba402
2 changed files with 161 additions and 94 deletions

View File

@@ -96,8 +96,6 @@ typedef std::map<t_channel_id, SIservicePtr, std::less<t_channel_id> > MySIservi
#include <OpenThreads/Thread> #include <OpenThreads/Thread>
#include "dmx.h" #include "dmx.h"
#define MAX_SECTION_LENGTH (0x0fff + 3)
/* abstract section reading class */ /* abstract section reading class */
class CSectionThread : public OpenThreads::Thread, public DMX class CSectionThread : public OpenThreads::Thread, public DMX
{ {
@@ -139,7 +137,7 @@ class CSectionThread : public OpenThreads::Thread, public DMX
virtual void afterWait() {}; virtual void afterWait() {};
/* process section after getSection */ /* process section after getSection */
virtual void processSection(int rc) { if(rc < 0) return; }; virtual void processSection() {};
/* cleanup before exit */ /* cleanup before exit */
virtual void cleanup() {}; virtual void cleanup() {};
@@ -207,7 +205,7 @@ class CEventsThread : public CSectionThread
/* default hooks */ /* default hooks */
bool shouldSleep(); bool shouldSleep();
bool checkSleep(); bool checkSleep();
void processSection(int rc); void processSection();
/* EIT-specific */ /* EIT-specific */
bool addEvents(); bool addEvents();
@@ -245,7 +243,7 @@ class CCNThread : public CEventsThread
void beforeSleep(); void beforeSleep();
void beforeWait(); void beforeWait();
void afterWait(); void afterWait();
void processSection(int rc); void processSection();
void cleanup(); void cleanup();
/* CN-specific */ /* CN-specific */
@@ -265,7 +263,7 @@ class CSdtThread : public CSectionThread
void addFilters(); void addFilters();
bool shouldSleep(); bool shouldSleep();
bool checkSleep(); bool checkSleep();
void processSection(int rc); void processSection();
/* SDT-specific */ /* SDT-specific */
bool addServices(); bool addServices();
@@ -276,7 +274,18 @@ class CSdtThread : public CSectionThread
class CTimeThread : public CSectionThread class CTimeThread : public CSectionThread
{ {
private: private:
/* overloaded hooks */
void addFilters();
/* specific */
bool time_ntp;
bool first_time;
void sendTimeEvent(bool dvb, time_t tim = 0);
void setSystemTime(time_t tim);
void run(); void run();
public:
CTimeThread();
}; };
class CEitManager class CEitManager

View File

@@ -61,10 +61,12 @@
#include "edvbstring.h" #include "edvbstring.h"
#include "xmlutil.h" #include "xmlutil.h"
#define ENABLE_SDT //FIXME //#define ENABLE_SDT //FIXME
//#define DEBUG_SDT_THREAD //#define DEBUG_SDT_THREAD
#define DEBUG_SECTION_THREAD #define DEBUG_TIME_THREAD
#define DEBUG_SECTION_THREADS
#define DEBUG_CN_THREAD #define DEBUG_CN_THREAD
static bool sectionsd_ready = false; static bool sectionsd_ready = false;
@@ -138,6 +140,7 @@ static pthread_rwlock_t messagingLock = PTHREAD_RWLOCK_INITIALIZER;
static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER; static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER;
static CTimeThread threadTIME;
static CEitThread threadEIT; static CEitThread threadEIT;
static CCNThread threadCN; static CCNThread threadCN;
@@ -888,6 +891,8 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
pthread_mutex_lock(&timeThreadSleepMutex); pthread_mutex_lock(&timeThreadSleepMutex);
pthread_cond_broadcast(&timeThreadSleepCond); pthread_cond_broadcast(&timeThreadSleepCond);
pthread_mutex_unlock(&timeThreadSleepMutex); pthread_mutex_unlock(&timeThreadSleepMutex);
threadTIME.change(0);
} }
threadCN.change(0); threadCN.change(0);
@@ -952,6 +957,8 @@ xprintf("[sectionsd] commandserviceChanged: Service change to " PRINTF_CHANNEL_I
pthread_mutex_lock(&timeThreadSleepMutex); pthread_mutex_lock(&timeThreadSleepMutex);
pthread_cond_broadcast(&timeThreadSleepCond); pthread_cond_broadcast(&timeThreadSleepCond);
pthread_mutex_unlock(&timeThreadSleepMutex); pthread_mutex_unlock(&timeThreadSleepMutex);
threadTIME.setCurrentService(messaging_current_servicekey);
} }
} }
else else
@@ -1105,7 +1112,7 @@ static void commandSetConfig(int connfd, char *data, const unsigned /*dataLength
time_wakeup = true; time_wakeup = true;
} }
if(time_wakeup) { if (time_wakeup) {
pthread_mutex_lock(&timeThreadSleepMutex); pthread_mutex_lock(&timeThreadSleepMutex);
ntpserver = (std::string)&data[sizeof(struct sectionsd::commandSetConfig)]; ntpserver = (std::string)&data[sizeof(struct sectionsd::commandSetConfig)];
@@ -1286,7 +1293,7 @@ bool sectionsd_parse_command(CBasicMessage::Header &rmsg, int connfd)
if (rc == true) if (rc == true)
{ {
dprintf("%s\n", connectionCommands[header.command].sCmd.c_str()); dprintf("%s\n", connectionCommands[header.command].sCmd.c_str());
if(connectionCommands[header.command].cmd == sendEmptyResponse) if (connectionCommands[header.command].cmd == sendEmptyResponse)
printf("sectionsd_parse_command: UNUSED cmd used: %d (%s)\n", header.command, connectionCommands[header.command].sCmd.c_str()); printf("sectionsd_parse_command: UNUSED cmd used: %d (%s)\n", header.command, connectionCommands[header.command].sCmd.c_str());
connectionCommands[header.command].cmd(connfd, data, header.dataLength); connectionCommands[header.command].cmd(connfd, data, header.dataLength);
} }
@@ -1314,10 +1321,21 @@ static void dump_sched_info(std::string label)
// updates system time according TOT every 30 minutes // updates system time according TOT every 30 minutes
//--------------------------------------------------------------------- //---------------------------------------------------------------------
static void sendTimeEvent(bool dvb, time_t tim = 0) CTimeThread::CTimeThread()
: CSectionThread("timeThread", 0x14)
{
timeoutInMSeconds = 36000;
cache = false;
wait_for_time = false;
first_time = true;
time_ntp = false;
};
void CTimeThread::sendTimeEvent(bool dvb, time_t tim)
{ {
time_t actTime = time(NULL); time_t actTime = time(NULL);
if(dvb) { if (dvb) {
struct tm *tmTime = localtime(&actTime); struct tm *tmTime = localtime(&actTime);
xprintf("[%sThread] - current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", "time", xprintf("[%sThread] - current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", "time",
tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim)); tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim));
@@ -1330,89 +1348,119 @@ static void sendTimeEvent(bool dvb, time_t tim = 0)
eventServer->sendEvent(CSectionsdClient::EVT_TIMESET, CEventServer::INITID_SECTIONSD, &actTime, sizeof(actTime) ); eventServer->sendEvent(CSectionsdClient::EVT_TIMESET, CEventServer::INITID_SECTIONSD, &actTime, sizeof(actTime) );
} }
static void *timeThread(void *) void CTimeThread::setSystemTime(time_t tim)
{ {
UTC_t UTC; struct timeval tv;
time_t tim; if ((!messaging_neutrino_sets_time) && (geteuid() == 0)) {
unsigned int seconds; tv.tv_sec = tim;
bool first_time = true; /* we don't sleep the first time (we try to get a TOT header) */ tv.tv_usec = 0;
struct timespec restartWait; if (settimeofday(&tv, NULL) < 0)
struct timeval now; perror("[sectionsd] settimeofday");
bool time_ntp = false; }
bool success = true; }
dprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self()); void CTimeThread::addFilters()
{
addfilter(0x70, 0xff);
addfilter(0x73, 0xff);
}
while(!sectionsd_stop) void CTimeThread::run()
{ {
while (!scanning || !reader_ready) { //struct timespec restartWait;
if(sectionsd_stop) //struct timeval now;
time_t dvb_time = 0;
xprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self());
addFilters();
DMX::start();
while(running) {
if (sendToSleepNow) {
#ifdef DEBUG_TIME_THREAD
xprintf("%s: going to sleep %d seconds, running %d scanning %d\n",
name.c_str(), sleep_time, running, scanning);
#endif
real_pause();
int rs = Sleep();
#ifdef DEBUG_TIME_THREAD
xprintf("%s: wakeup, running %d scanning %d reason %d\n",
name.c_str(), running, scanning, rs);
#endif
if (!running)
break; break;
sleep(1);
sendToSleepNow = false;
} }
if ( ntpenable && system( ntp_system_cmd.c_str() ) == 0) bool success = false;
{ time_ntp = false;
first_time = false; dvb_time = 0;
if (ntpenable && system( ntp_system_cmd.c_str() ) == 0) {
time_ntp = true; time_ntp = true;
sendTimeEvent(false); success = true;
} else { } else if (dvb_time_update) {
if (dvb_time_update) { xprintf("timeThread: getting time\n");
xprintf("timeThread: getting UTC\n"); if(!first_time)
success = getUTC(&UTC, first_time); // for first time, get TDT, then TOT change(1);
xprintf("timeThread: getting UTC done : %d\n", success);
if (success)
{
tim = changeUTCtoCtime((const unsigned char *) &UTC);
time_ntp = false;
sendTimeEvent(true, tim);
if (tim) { int rc = dmx->Read(static_buf, MAX_SECTION_LENGTH, timeoutInMSeconds);
if ((!messaging_neutrino_sets_time) && (geteuid() == 0)) { xprintf("timeThread: getting time done : %d messaging_neutrino_sets_time %d\n", rc, messaging_neutrino_sets_time);
struct timeval tv; if (rc > 0) {
tv.tv_sec = tim; SIsectionTIME st(static_buf);
tv.tv_usec = 0; if (st.is_parsed()) {
if (settimeofday(&tv, NULL) < 0) { dvb_time = st.getTime();
perror("[sectionsd] settimeofday"); success = true;
pthread_exit(NULL);
}
}
}
} }
} }
} }
/* default sleep time */
sleep_time = ntprefresh * 60;
if(success) {
if(dvb_time) {
setSystemTime(dvb_time);
if(first_time)
sleep_time = 5; /* retry a second time immediately */
}
sendTimeEvent(time_ntp, dvb_time);
xprintf("%s: Time set via %s, going to sleep for %d seconds.\n", name.c_str(),
time_ntp ? "NTP" : first_time ? "DVB (TDT)" : "DVB (TOT)", sleep_time);
first_time = false;
} else {
xprintf("%s: Time set FAILED", name.c_str());
}
#if 0
if (timeset && dvb_time_update) { if (timeset && dvb_time_update) {
if (first_time) if (!first_time)
seconds = 5; /* retry a second time immediately */ sleep_time = ntprefresh * 60;
else else
seconds = ntprefresh * 60; sleep_time = 5; /* retry a second time immediately */
if(time_ntp) { if (time_ntp) {
xprintf("[%sThread] Time set via NTP, going to sleep for %d seconds.\n", "time", seconds); xprintf("[%sThread] Time set via NTP, going to sleep for %d seconds.\n", "time", sleep_time);
} }
else { else {
xprintf("[%sThread] Time %sset via DVB(%s), going to sleep for %d seconds.\n", xprintf("[%sThread] Time %sset via DVB(%s), going to sleep for %d seconds.\n",
"time", success?"":"not ", first_time?"TDT":"TOT", seconds); "time", success?"":"not ", first_time?"TDT":"TOT", sleep_time);
} }
first_time = false; first_time = false;
} }
else { else {
if (!first_time) { if (!first_time) {
/* time was already set, no need to do it again soon when DVB time-blocked channel is tuned */ /* time was already set, no need to do it again soon when DVB time-blocked channel is tuned */
seconds = ntprefresh * 60; sleep_time = ntprefresh * 60;
} } else {
else if (!scanning) { sleep_time = 1;
seconds = 60;
}
else {
seconds = 1;
} }
if (!dvb_time_update && !first_time) { if (!dvb_time_update && !first_time) {
xprintf("[%sThread] Time NOT set via DVB due to blocked channel, going to sleep for %d seconds.\n", "time", seconds); xprintf("[%sThread] Time NOT set via DVB due to blocked channel, going to sleep for %d seconds.\n", "time", sleep_time);
} }
} }
if(sectionsd_stop) #endif
#if 0
if (sectionsd_stop)
break; break;
xprintf("timeThread: going to sleep for %d sec\n\n", seconds); xprintf("timeThread: going to sleep for %d sec\n\n", seconds);
@@ -1430,6 +1478,7 @@ xprintf("timeThread: going to sleep for %d sec\n\n", seconds);
{ {
dprintf("TDT-Thread sleeping interrupted\n"); dprintf("TDT-Thread sleeping interrupted\n");
} }
#endif
} }
printf("[sectionsd] timeThread ended\n"); printf("[sectionsd] timeThread ended\n");
@@ -1446,7 +1495,7 @@ int CSectionThread::Sleep()
struct timespec abs_wait; struct timespec abs_wait;
struct timeval now; struct timeval now;
if(sleep_time) { if (sleep_time) {
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
TIMEVAL_TO_TIMESPEC(&now, &abs_wait); TIMEVAL_TO_TIMESPEC(&now, &abs_wait);
abs_wait.tv_sec += sleep_time; abs_wait.tv_sec += sleep_time;
@@ -1455,7 +1504,7 @@ int CSectionThread::Sleep()
pthread_mutex_lock(&start_stop_mutex); pthread_mutex_lock(&start_stop_mutex);
beforeWait(); beforeWait();
if(sleep_time) if (sleep_time)
rs = pthread_cond_timedwait( &change_cond, &start_stop_mutex, &abs_wait ); rs = pthread_cond_timedwait( &change_cond, &start_stop_mutex, &abs_wait );
else else
rs = pthread_cond_wait(&change_cond, &start_stop_mutex); rs = pthread_cond_wait(&change_cond, &start_stop_mutex);
@@ -1484,7 +1533,7 @@ void CSectionThread::run()
while (running) { while (running) {
if (shouldSleep()) { if (shouldSleep()) {
#ifdef DEBUG_SECTION_THREAD #ifdef DEBUG_SECTION_THREADS
xprintf("%s: going to sleep %d seconds, running %d scanning %d blacklisted %d events %d\n", xprintf("%s: going to sleep %d seconds, running %d scanning %d blacklisted %d events %d\n",
name.c_str(), sleep_time, running, scanning, channel_is_blacklisted, event_count); name.c_str(), sleep_time, running, scanning, channel_is_blacklisted, event_count);
#endif #endif
@@ -1495,13 +1544,13 @@ void CSectionThread::run()
do { do {
real_pause(); real_pause();
rs = Sleep(); rs = Sleep();
#ifdef DEBUG_SECTION_THREAD #ifdef DEBUG_SECTION_THREADS
xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n",
name.c_str(), running, scanning, channel_is_blacklisted, rs); name.c_str(), running, scanning, channel_is_blacklisted, rs);
#endif #endif
} while (checkSleep()); } while (checkSleep());
if(!running) if (!running)
break; break;
afterSleep(); afterSleep();
@@ -1512,15 +1561,17 @@ void CSectionThread::run()
sendToSleepNow = false; sendToSleepNow = false;
} }
#if 0
int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX); int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
processSection(rc); processSection(rc);
#endif
processSection();
time_t zeit = time_monotonic(); time_t zeit = time_monotonic();
bool need_change = false; bool need_change = false;
if(timeoutsDMX < 0 || timeoutsDMX >= skipTimeouts) { if (timeoutsDMX < 0 || timeoutsDMX >= skipTimeouts) {
#ifdef DEBUG_SECTION_THREAD #ifdef DEBUG_SECTION_THREADS
xprintf("%s: skipping to next filter %d from %d (timeouts %d)\n", xprintf("%s: skipping to next filter %d from %d (timeouts %d)\n",
name.c_str(), filter_index+1, filters.size(), timeoutsDMX); name.c_str(), filter_index+1, filters.size(), timeoutsDMX);
#endif #endif
@@ -1528,13 +1579,13 @@ void CSectionThread::run()
need_change = true; need_change = true;
} }
if (zeit > lastChanged + skipTime) { if (zeit > lastChanged + skipTime) {
#ifdef DEBUG_SECTION_THREAD #ifdef DEBUG_SECTION_THREADS
xprintf("%s: skipping to next filter %d from %d (seconds %d)\n", xprintf("%s: skipping to next filter %d from %d (seconds %d)\n",
name.c_str(), filter_index+1, filters.size(), (int) (zeit - lastChanged)); name.c_str(), filter_index+1, filters.size(), (int) (zeit - lastChanged));
#endif #endif
need_change = true; need_change = true;
} }
if(running && need_change && scanning) { if (running && need_change && scanning) {
readLockMessaging(); readLockMessaging();
if (!next_filter()) if (!next_filter())
sendToSleepNow = true; sendToSleepNow = true;
@@ -1602,9 +1653,10 @@ bool CEventsThread::checkSleep()
} }
/* default section process */ /* default section process */
void CEventsThread::processSection(int rc) void CEventsThread::processSection()
{ {
if(rc <= 0) int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc <= 0)
return; return;
addEvents(); addEvents();
} }
@@ -1709,7 +1761,7 @@ void CCNThread::beforeWait()
void CCNThread::afterWait() void CCNThread::afterWait()
{ {
xprintf("%s: stop eit update filter (%s)\n", name.c_str(), updating ? "active" : "not active"); xprintf("%s: stop eit update filter (%s)\n", name.c_str(), updating ? "active" : "not active");
if(updating) { if (updating) {
updating = false; updating = false;
eitDmx->Stop(); eitDmx->Stop();
} }
@@ -1723,9 +1775,10 @@ void CCNThread::beforeSleep()
} }
} }
void CCNThread::processSection(int rc) void CCNThread::processSection()
{ {
if(rc <= 0) int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc <= 0)
return; return;
addEvents(); addEvents();
@@ -1751,7 +1804,7 @@ void CCNThread::processSection(int rc)
/* CN specific functions */ /* CN specific functions */
bool CCNThread::checkUpdate() bool CCNThread::checkUpdate()
{ {
if(!updating) if (!updating)
return false; return false;
unsigned char buf[MAX_SECTION_LENGTH]; unsigned char buf[MAX_SECTION_LENGTH];
@@ -1872,12 +1925,13 @@ bool CSdtThread::checkSleep()
return (running && !scanning); return (running && !scanning);
} }
void CSdtThread::processSection(int rc) void CSdtThread::processSection()
{ {
if(rc <= 0) int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc <= 0)
return; return;
if(addServices()) if (addServices())
lastChanged = time_monotonic(); lastChanged = time_monotonic();
} }
@@ -1946,7 +2000,7 @@ static void *houseKeepingThread(void *)
while (!scanning) { while (!scanning) {
sleep(1); // wait for streaming to end... sleep(1); // wait for streaming to end...
if(sectionsd_stop) if (sectionsd_stop)
break; break;
} }
@@ -1997,11 +2051,11 @@ static void *houseKeepingThread(void *)
pthread_exit(NULL); pthread_exit(NULL);
} }
extern cDemux * dmxUTC; //extern cDemux * dmxUTC;
void sectionsd_main_thread(void * /*data*/) void sectionsd_main_thread(void * /*data*/)
{ {
pthread_t threadTOT, threadHouseKeeping; pthread_t /*threadTOT,*/ threadHouseKeeping;
int rc; int rc;
printf("$Id: sectionsd.cpp,v 1.305 2009/07/30 12:41:39 seife Exp $\n"); printf("$Id: sectionsd.cpp,v 1.305 2009/07/30 12:41:39 seife Exp $\n");
@@ -2059,6 +2113,7 @@ printf("SIevent size: %d\n", sizeof(SIevent));
eventServer = new CEventServer; eventServer = new CEventServer;
#if 0
// time-Thread starten // time-Thread starten
rc = pthread_create(&threadTOT, 0, timeThread, 0); rc = pthread_create(&threadTOT, 0, timeThread, 0);
@@ -2066,7 +2121,8 @@ printf("SIevent size: %d\n", sizeof(SIevent));
fprintf(stderr, "[sectionsd] failed to create time-thread (rc=%d)\n", rc); fprintf(stderr, "[sectionsd] failed to create time-thread (rc=%d)\n", rc);
return; return;
} }
#endif
threadTIME.Start();
threadEIT.Start(); threadEIT.Start();
threadCN.Start(); threadCN.Start();
@@ -2093,7 +2149,7 @@ printf("SIevent size: %d\n", sizeof(SIevent));
while (!sectionsd_stop && sectionsd_server.run(sectionsd_parse_command, sectionsd::ACTVERSION, true)) { while (!sectionsd_stop && sectionsd_server.run(sectionsd_parse_command, sectionsd::ACTVERSION, true)) {
sched_yield(); sched_yield();
if(threadCN.checkUpdate()) { if (threadCN.checkUpdate()) {
sched_yield(); sched_yield();
threadCN.change(0); threadCN.change(0);
sched_yield(); sched_yield();
@@ -2121,12 +2177,14 @@ printf("SIevent size: %d\n", sizeof(SIevent));
pthread_cancel(threadHouseKeeping); pthread_cancel(threadHouseKeeping);
if(dmxUTC) dmxUTC->Stop(); //if (dmxUTC) dmxUTC->Stop();
//pthread_cancel(threadTOT); //pthread_cancel(threadTOT);
printf("join TOT\n"); printf("join TOT\n");
pthread_join(threadTOT, NULL); //pthread_join(threadTOT, NULL);
if(dmxUTC) delete dmxUTC; threadTIME.Stop();
//if (dmxUTC) delete dmxUTC;
printf("join EIT\n"); printf("join EIT\n");
threadEIT.Stop(); threadEIT.Stop();