eitd/sectionsd.cpp: convert SDT thread, add debug defines

This commit is contained in:
[CST] Focus
2012-02-22 16:54:50 +04:00
parent 1930809471
commit effe71d995
2 changed files with 134 additions and 148 deletions

View File

@@ -64,22 +64,25 @@
//#define ENABLE_SDT //FIXME
// 60 Minuten Zyklus...
#define TIME_EIT_SCHEDULED_PAUSE 60 * 60
// -- 5 Minutes max. pause should improve behavior (rasc, 2005-05-02)
// #define TIME_EIT_SCHEDULED_PAUSE 5* 60
// Zeit die fuer die gewartet wird, bevor der Filter weitergeschaltet wird, falls es automatisch nicht klappt
#define TIME_EIT_SKIPPING 90
//#define DEBUG_SDT_THREAD
//#define DEBUG_EIT_THREAD
#define DEBUG_CN_THREAD
#ifdef ENABLE_FREESATEPG
#endif
/* period to restart EIT reading */
#define TIME_EIT_SCHEDULED_PAUSE 60 * 60
/* force EIT thread to change filter after, seconds */
#define TIME_EIT_SKIPPING 90
// a little more time for freesat epg
#define TIME_FSEIT_SKIPPING 240
static bool sectionsd_ready = false;
/*static*/ bool reader_ready = true;
static unsigned int max_events;
/* period to remove old events */
//#define HOUSEKEEPING_SLEEP (5 * 60) // sleep 5 minutes
#define HOUSEKEEPING_SLEEP (30) // FIXME 1 min for testing
/* period to clean cached sections and force restart sections read */
#define META_HOUSEKEEPING (24 * 60 * 60) / HOUSEKEEPING_SLEEP // meta housekeeping after XX housekeepings - every 24h -
// Timeout bei tcp/ip connections in ms
@@ -169,17 +172,16 @@ static CEitThread threadEIT;
static CCNThread threadCN;
#ifdef ENABLE_FREESATEPG
// a little more time for freesat epg
#define TIME_FSEIT_SKIPPING 240
static DMX dmxFSEIT(3842, 320);
#endif
#ifdef ENABLE_SDT
#define TIME_SDT_NONEWDATA 5
#define TIME_SDT_NONEWDATA 15
#define RESTART_DMX_AFTER_TIMEOUTS 5
#define TIME_SDT_SCHEDULED_PAUSE 2* 60* 60
static DMX dmxSDT(0x11, 512, true, 0);
CSdtThread threadSDT;
#endif
int sectionsd_stop = 0;
static bool slow_addevent = true;
@@ -919,17 +921,15 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
dmxFSEIT.change(0);
#endif
#ifdef ENABLE_SDT
dmxSDT.change(0);
threadSDT.change(0);
#endif
}
struct sectionsd::msgResponseHeader msgResponse;
msgResponse.dataLength = 0;
writeNbytes(connfd, (const char *)&msgResponse, sizeof(msgResponse), WRITE_TIMEOUT_IN_SECONDS);
return ;
return;
}
static void commandserviceChanged(int connfd, char *data, const unsigned dataLength)
@@ -1017,7 +1017,7 @@ xprintf("[sectionsd] commandserviceChanged: Service changed to " PRINTF_CHANNEL_
dmxFSEIT.setCurrentService(messaging_current_servicekey);
#endif
#ifdef ENABLE_SDT
dmxSDT.setCurrentService(messaging_current_servicekey);
threadSDT.setCurrentService(messaging_current_servicekey);
#endif
}
else
@@ -1228,8 +1228,8 @@ static void deleteSIexceptEPG()
threadEIT.dropCachedSectionIDs();
threadEIT.change(0);
#ifdef ENABLE_SDT
dmxSDT.dropCachedSectionIDs();
dmxSDT.change(0);
threadSDT.dropCachedSectionIDs();
threadSDT.change(0);
#endif
#ifdef ENABLE_FREESATEPG
dmxFSEIT.setCurrentService(messaging_current_servicekey);
@@ -1242,7 +1242,7 @@ static void commandFreeMemory(int connfd, char * /*data*/, const unsigned /*data
deleteSIexceptEPG();
writeLockEvents();
showProfiling("commandFreeMemory start");
#ifndef USE_BOOST_SHARED_PTR
std::set<SIeventPtr> allevents;
@@ -1259,13 +1259,12 @@ showProfiling("commandFreeMemory start");
for(std::set<SIeventPtr>::iterator ait = allevents.begin(); ait != allevents.end(); ++ait)
delete (*ait);
showProfiling("commandFreeMemory end1");
#endif
mySIeventsOrderFirstEndTimeServiceIDEventUniqueKey.clear();
mySIeventsOrderServiceUniqueKeyFirstStartTimeEventUniqueKey.clear();
mySIeventsOrderUniqueKey.clear();
mySIeventsNVODorderUniqueKey.clear();
showProfiling("commandFreeMemory end2");
unlockEvents();
//FIXME debug
@@ -1876,8 +1875,10 @@ void CEitThread::run()
while (running) {
if(sendToSleepNow || !scanning || channel_is_blacklisted) {
#ifdef DEBUG_EIT_THREAD
xprintf("%s: going to sleep %d seconds, running %d scanning %d blacklisted %d events %d\n",
name.c_str(), TIME_EIT_SCHEDULED_PAUSE, running, scanning, channel_is_blacklisted, event_count);
#endif
event_count = 0;
writeLockMessaging();
@@ -1888,7 +1889,9 @@ void CEitThread::run()
do {
real_pause();
rs = Sleep(TIME_EIT_SCHEDULED_PAUSE);
#ifdef DEBUG_EIT_THREAD
xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs);
#endif
} while(running && (!scanning || channel_is_blacklisted));
if(!running)
@@ -1907,12 +1910,16 @@ void CEitThread::run()
bool need_change = false;
if(timeoutsDMX < 0 || timeoutsDMX >= CHECK_RESTART_DMX_AFTER_TIMEOUTS) {
#ifdef DEBUG_EIT_THREAD
xprintf("%s: skipping to next filter %d from %d (timeouts %d)\n", name.c_str(), filter_index+1, filters.size(), timeoutsDMX);
#endif
timeoutsDMX = 0;
need_change = true;
}
if (zeit > lastChanged + TIME_EIT_SKIPPING) {
#ifdef DEBUG_EIT_THREAD
xprintf("%s: skipping to next filter %d from %d (TIME_EIT_SKIPPING)\n", name.c_str(), filter_index+1, filters.size());
#endif
need_change = true;
}
@@ -1957,7 +1964,9 @@ void CCNThread::run()
unlockMessaging();
waitForTimeset();
#ifdef DEBUG_CN_THREAD
xprintf("CCNThread::run:: time set..\n");
#endif
DMX::start(); // -> unlock
@@ -1966,8 +1975,10 @@ void CCNThread::run()
while(running)
{
if(sendToSleepNow || !scanning || channel_is_blacklisted) {
#ifdef DEBUG_CN_THREAD
xprintf("%s: going to sleep, running %d scanning %d blacklisted %d events %d\n", name.c_str(), running, scanning, channel_is_blacklisted, event_count);
//xprintf("%s: eit_version %02x messaging_need_eit_version %d rc %d\n", name.c_str(), get_eit_version(), messaging_need_eit_version, rc);
#endif
writeLockMessaging();
messaging_eit_is_busy = false;
@@ -1982,8 +1993,9 @@ void CCNThread::run()
rs = pthread_cond_wait(&change_cond, &start_stop_mutex);
eit_stop_update_filter(&eit_update_fd);
pthread_mutex_unlock(&start_stop_mutex);
//rs = Sleep(TIME_EIT_SCHEDULED_PAUSE);
#ifdef DEBUG_CN_THREAD
xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs);
#endif
} while(running && (!scanning || channel_is_blacklisted));
if(!running)
@@ -2050,8 +2062,10 @@ void CCNThread::run()
writeLockMessaging();
messaging_have_CN = messaging_got_CN;
unlockMessaging();
#ifdef DEBUG_CN_THREAD
xprintf("%s: have CN: timeoutsDMX %d messaging_have_CN %x messaging_got_CN %x\n\n",
name.c_str(), timeoutsDMX, messaging_have_CN, messaging_got_CN);
#endif
dprintf("[cnThread] got current_next (0x%x) - sending event!\n", messaging_have_CN);
eventServer->sendEvent(CSectionsdClient::EVT_GOT_CN_EPG,
CEventServer::INITID_SECTIONSD,
@@ -2079,14 +2093,18 @@ void CCNThread::run()
#endif
#if 1
if(timeoutsDMX < 0) {
#ifdef DEBUG_CN_THREAD
xprintf("%s: timeoutsDMX %d messaging_got_CN %x messaging_have_CN %x sid %016llx\n\n",
name.c_str(), timeoutsDMX, messaging_got_CN, messaging_have_CN, messaging_current_servicekey);
#endif
timeoutsDMX = 0;
sendToSleepNow = true;
}
#endif
if (zeit > lastChanged + TIME_EIT_VERSION_WAIT) {
#ifdef DEBUG_CN_THREAD
xprintf("%s: zeit > lastChanged + TIME_EIT_VERSION_WAIT\n", name.c_str());
#endif
sendToSleepNow = true;
}
@@ -2183,7 +2201,7 @@ void CCNThread::run()
#endif
} // for
delete[] static_buf;
printf("[sectionsd] cnThread ended\n");
printf("[sectionsd] %s ended\n", name.c_str());
pthread_exit(NULL);
}
@@ -2199,7 +2217,6 @@ static bool addService(const SIservice &s, const int is_actual)
unlockServices();
if ( (!already_exists) || ((is_actual & 7) && (!si->second->is_actual)) ) {
if (already_exists)
{
writeLockServices();
@@ -2241,146 +2258,108 @@ static bool addService(const SIservice &s, const int is_actual)
}
is_new = true;
}
return is_new;
}
static void *sdtThread(void *)
void CSdtThread::run()
{
const unsigned timeoutInMSeconds = 2500;
t_transponder_id tid = 0;
time_t lastData = 0;
time_t zeit = 0;
int rs = 0;
int is_actual = 0;
name = "sdtThread";
xprintf("%s::run:: starting, pid %d (%lu)\n", name.c_str(), getpid(), pthread_self());
pID = 0x11;
dmx_num = 0;
cache = false;
//FIXME correct mask
dmxSDT.addfilter(0x42, 0xf3 ); //SDT actual = 0x42 + SDT other = 0x46 + BAT = 0x4A
timeoutInMSeconds = EIT_READ_TIMEOUT;
dprintf("[%sThread] pid %d (%lu) start\n", "sdt", getpid(), pthread_self());
bool sendToSleepNow = false;
int timeoutsDMX = 0;
uint8_t *static_buf = new uint8_t[MAX_SECTION_LENGTH];
int rc;
bool startup = true;
//addfilter(0x42, 0xf3 ); //SDT actual = 0x42 + SDT other = 0x46 + BAT = 0x4A
addfilter(0x42, 0xfb ); //SDT actual = 0x42 + SDT other = 0x46
waitForTimeset();
dmxSDT.start(); // -> unlock
DMX::start(); // -> unlock
while (!sectionsd_stop) {
while (!scanning) {
if(sectionsd_stop)
break;
sleep(1);
}
zeit = time_monotonic();
time_t lastData = time_monotonic();
if(sectionsd_stop)
break;
while (running) {
if(sendToSleepNow || !scanning) {
#ifdef DEBUG_SDT_THREAD
xprintf("%s: going to sleep %d seconds, running %d scanning %d services %d\n",
name.c_str(), TIME_SDT_SCHEDULED_PAUSE, running, scanning, event_count);
#endif
event_count = 0;
readLockMessaging();
if (messaging_zap_detected)
startup = true;
unlockMessaging();
int rs = 0;
do {
real_pause();
rs = Sleep(TIME_SDT_SCHEDULED_PAUSE);
#ifdef DEBUG_SDT_THREAD
xprintf("%s: wakeup, running %d scanning %d reason %d\n\n", name.c_str(), running, scanning, rs);
#endif
} while(running && !scanning);
if ((zeit > lastData + TIME_SDT_NONEWDATA) || (startup))
{
struct timespec abs_wait;
struct timeval now;
gettimeofday(&now, NULL);
TIMEVAL_TO_TIMESPEC(&now, &abs_wait);
abs_wait.tv_sec += (TIME_SDT_SCHEDULED_PAUSE);
dmxSDT.real_pause();
/* this is the "last" thread. Means: if this one goes to sleep, sectionsd
* sleeps mostly. Worth printing. */
printdate_ms(stdout);
printf("sdtThread: going to sleep...\n");
writeLockMessaging();
messaging_zap_detected = false;
unlockMessaging();
pthread_mutex_lock( &dmxSDT.start_stop_mutex );
rs = pthread_cond_timedwait( &dmxSDT.change_cond, &dmxSDT.start_stop_mutex, &abs_wait );
pthread_mutex_unlock( &dmxSDT.start_stop_mutex );
if(sectionsd_stop)
if(!running)
break;
if (rs == ETIMEDOUT)
{
dprintf("dmxSDT: waking up again - looking for new services :)\n");
dmxSDT.change( 0 ); // -> restart
}
else if (rs == 0)
{
dprintf("dmxSDT: waking up again - requested from .change()\n");
}
else
{
dprintf("dmxSDT: waking up again - unknown reason?!\n");
dmxSDT.real_unpause();
}
// update zeit after sleep
change(0); // -> restart
startup = false;
zeit = time_monotonic();
timeoutsDMX = 0;
lastData = zeit;
sendToSleepNow = false;
lastData = time_monotonic();
}
if (timeoutsDMX >= RESTART_DMX_AFTER_TIMEOUTS && scanning)
{
timeoutsDMX = 0;
dmxSDT.stop();
dmxSDT.start(); // leaves unlocked
dputs("\n !!! dmxSDT restarted !!!\n");
}
rc = dmxSDT.getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc < 0)
continue;
LongSection sec(static_buf);
uint8_t table_id = sec.getTableId();
if ((table_id == 0x42) || (table_id == 0x46))
{
SIsectionSDT sdt(static_buf);
is_actual = (sdt.getTableId() == 0x42) ? 1 : 0;
if (is_actual && !sdt.getLastSectionNumber())
is_actual = 2;
bool is_new = false;
is_actual = (is_actual | 8);
for (SIservices::iterator s = sdt.services().begin(); s != sdt.services().end(); ++s) {
if (addService(*s, is_actual)) {
is_new = true;
tid = CREATE_TRANSPONDER_ID_FROM_ORIGINALNETWORK_TRANSPORTSTREAM_ID(s->original_network_id,
s->transport_stream_id);
}
}
if (is_new) {
int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if(rc > 0) {
if(addServices())
lastData = time_monotonic();
dprintf("[sdtThread] added %d services [table 0x%x TID: %08x]\n",
sdt.services().size(), table_id, tid);
}
}
} // for
time_t zeit = time_monotonic();
if(timeoutsDMX < 0 || timeoutsDMX >= CHECK_RESTART_DMX_AFTER_TIMEOUTS) {
#ifdef DEBUG_SDT_THREAD
xprintf("%s: timeouts %d\n", name.c_str(), timeoutsDMX);
#endif
timeoutsDMX = 0;
sendToSleepNow = true;
}
if (zeit > (lastData + TIME_SDT_NONEWDATA)) {
#ifdef DEBUG_SDT_THREAD
xprintf("%s: no new services for %d seconds\n", name.c_str(), (int) (zeit - lastData));
#endif
sendToSleepNow = true;
}
} // while running
delete[] static_buf;
printf("[sectionsd] sdt-thread ended\n");
printf("[sectionsd] %s ended\n", name.c_str());
pthread_exit(NULL);
}
bool CSdtThread::addServices()
{
bool is_new = false;
LongSection sec(static_buf);
uint8_t table_id = sec.getTableId();
if ((table_id == 0x42) || (table_id == 0x46)) {
SIsectionSDT sdt(static_buf);
bool is_actual = (sdt.getTableId() == 0x42) ? 1 : 0;
if (is_actual && !sdt.getLastSectionNumber())
is_actual = 2;
is_actual = (is_actual | 8);
for (SIservices::iterator s = sdt.services().begin(); s != sdt.services().end(); ++s) {
if (addService(*s, is_actual))
is_new = true;
event_count++;
}
}
return is_new;
}
#endif
/* helper function for the housekeeping-thread */
@@ -2482,7 +2461,7 @@ void sectionsd_main_thread(void * /*data*/)
pthread_t threadFSEIT;
#endif
#ifdef ENABLE_SDT
pthread_t threadSDT;
//pthread_t threadSDT;
#endif
int rc;
@@ -2563,6 +2542,7 @@ printf("SIevent size: %d\n", sizeof(SIevent));
}
#endif
#ifdef ENABLE_SDT
#if 0
printf("\n\n\n[sectionsd] starting SDT thread\n");
rc = pthread_create(&threadSDT, 0, sdtThread, 0);
@@ -2571,6 +2551,8 @@ printf("SIevent size: %d\n", sizeof(SIevent));
return;
}
#endif
threadSDT.Start();
#endif
// housekeeping-Thread starten
rc = pthread_create(&threadHouseKeeping, 0, houseKeepingThread, 0);
@@ -2642,9 +2624,11 @@ printf("SIevent size: %d\n", sizeof(SIevent));
#endif
#ifdef ENABLE_SDT
#if 0
pthread_mutex_lock(&dmxSDT.start_stop_mutex);
pthread_cond_broadcast(&dmxSDT.change_cond);
pthread_mutex_unlock(&dmxSDT.start_stop_mutex);
#endif
#endif
printf("pausing...\n");
@@ -2656,7 +2640,7 @@ printf("SIevent size: %d\n", sizeof(SIevent));
dmxFSEIT.request_pause();
#endif
#ifdef ENABLE_SDT
dmxSDT.request_pause();
//dmxSDT.request_pause();
#endif
pthread_cancel(threadHouseKeeping);
@@ -2664,19 +2648,20 @@ printf("SIevent size: %d\n", sizeof(SIevent));
//pthread_cancel(threadTOT);
printf("join 1\n");
printf("join TOT\n");
pthread_join(threadTOT, NULL);
if(dmxUTC) delete dmxUTC;
printf("join 2\n");
printf("join EIT\n");
threadEIT.Stop();
printf("join 3\n");
printf("join CN\n");
threadCN.Stop();
#ifdef ENABLE_SDT
printf("join 4\n");
pthread_join(threadSDT, NULL);
printf("join SDT\n");
threadSDT.Stop();
//pthread_join(threadSDT, NULL);
#endif
eit_stop_update_filter(&eit_update_fd);
@@ -2694,7 +2679,7 @@ printf("SIevent size: %d\n", sizeof(SIevent));
dmxFSEIT.close();
#endif
#ifdef ENABLE_SDT
dmxSDT.close();
//dmxSDT.close();
#endif
printf("[sectionsd] ended\n");