eitd: SDT thread re-added, as is but without BAT.

TODO: unify SDT thread algo with other threads
This commit is contained in:
[CST] Focus
2012-02-09 20:46:40 +04:00
parent 018c1c868a
commit b026b6b9a7
5 changed files with 380 additions and 155 deletions

View File

@@ -61,6 +61,8 @@
#include "eitd.h"
#include "edvbstring.h"
#define ENABLE_SDT //FIXME
// 60 Minuten Zyklus...
#define TIME_EIT_SCHEDULED_PAUSE 60 * 60
// -- 5 Minutes max. pause should improve behavior (rasc, 2005-05-02)
@@ -68,10 +70,7 @@
// Zeit die fuer die gewartet wird, bevor der Filter weitergeschaltet wird, falls es automatisch nicht klappt
#define TIME_EIT_SKIPPING 90
#define ENABLE_FREESATEPG // FIXME
#ifdef ENABLE_FREESATEPG
// a little more time for freesat epg
#define TIME_FSEIT_SKIPPING 240
#endif
static bool sectionsd_ready = false;
@@ -86,9 +85,6 @@ static unsigned int max_events;
#define READ_TIMEOUT_IN_SECONDS 2
#define WRITE_TIMEOUT_IN_SECONDS 2
// Gibt die Anzahl Timeouts an, nach der die Verbindung zum DMX neu gestartet wird (wegen evtl. buffer overflow)
// for NIT and SDT threads...
#define RESTART_DMX_AFTER_TIMEOUTS 5
// Timeout in ms for reading from dmx in EIT threads. Dont make this too long
// since we are holding the start_stop lock during this read!
@@ -154,10 +150,20 @@ static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER;
/* no matter how big the buffer, we will receive spurious POLLERR's in table 0x60,
but those are not a big deal, so let's save some memory */
static DMX dmxEIT(0x12, 3000 /*320*/);
static DMX dmxCN(0x12, 512, false, 1);
#ifdef ENABLE_FREESATEPG
// a little more time for freesat epg
#define TIME_FSEIT_SKIPPING 240
static DMX dmxFSEIT(3842, 320);
#endif
static DMX dmxCN(0x12, 512, false, 1);
#ifdef ENABLE_SDT
#define TIME_SDT_NONEWDATA 5
#define RESTART_DMX_AFTER_TIMEOUTS 5
#define TIME_SDT_SCHEDULED_PAUSE 2* 60* 60
static DMX dmxSDT(0x11, 512, true, 0);
#endif
int sectionsd_stop = 0;
static bool slow_addevent = true;
@@ -1081,6 +1087,9 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
dmxEIT.request_pause();
#ifdef ENABLE_FREESATEPG
dmxFSEIT.request_pause();
#endif
#ifdef ENABLE_SDT
dmxSDT.request_pause();
#endif
scanning = 0;
}
@@ -1090,6 +1099,9 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
dmxEIT.request_unpause();
#ifdef ENABLE_FREESATEPG
dmxFSEIT.request_unpause();
#endif
#ifdef ENABLE_SDT
dmxSDT.request_unpause();
#endif
writeLockEvents();
if (myCurrentEvent) {
@@ -1118,6 +1130,9 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
dmxEIT.change(0);
#ifdef ENABLE_FREESATEPG
dmxFSEIT.change(0);
#endif
#ifdef ENABLE_SDT
dmxSDT.change(0);
#endif
}
@@ -1595,6 +1610,9 @@ static void commandserviceChanged(int connfd, char *data, const unsigned dataLen
channel_is_blacklisted = true;
dmxCN.request_pause();
dmxEIT.request_pause();
#ifdef ENABLE_SDT
dmxSDT.request_pause();
#endif
}
xprintf("[sectionsd] commandserviceChanged: service is filtered!\n");
}
@@ -1604,6 +1622,9 @@ static void commandserviceChanged(int connfd, char *data, const unsigned dataLen
channel_is_blacklisted = false;
dmxCN.request_unpause();
dmxEIT.request_unpause();
#ifdef ENABLE_SDT
dmxSDT.request_unpause();
#endif
xprintf("[sectionsd] commandserviceChanged: service is no longer filtered!\n");
}
}
@@ -1648,6 +1669,9 @@ static void commandserviceChanged(int connfd, char *data, const unsigned dataLen
dmxEIT.setCurrentService(messaging_current_servicekey & 0xffff);
#ifdef ENABLE_FREESATEPG
dmxFSEIT.setCurrentService(messaging_current_servicekey & 0xffff);
#endif
#ifdef ENABLE_SDT
dmxSDT.setCurrentService(messaging_current_servicekey & 0xffff);
#endif
}
else
@@ -2516,6 +2540,9 @@ static void deleteSIexceptEPG()
writeLockServices();
unlockServices();
dmxEIT.dropCachedSectionIDs();
#ifdef ENABLE_SDT
dmxSDT.dropCachedSectionIDs();
#endif
}
static void commandFreeMemory(int connfd, char * /*data*/, const unsigned /*dataLength*/)
@@ -2587,11 +2614,13 @@ static void commandWriteSI2XML(int connfd, char *data, const unsigned dataLength
return;
}
#if 0
/* dummy1: do not send back anything */
static void commandDummy1(int, char *, const unsigned)
{
return;
}
#endif
/* dummy2: send back an empty response */
static void commandDummy2(int connfd, char *, const unsigned)
@@ -3534,6 +3563,212 @@ static void *cnThread(void *)
pthread_exit(NULL);
}
#ifdef ENABLE_SDT
static bool addService(const SIservice &s, const int is_actual)
{
bool already_exists;
bool is_new = false;
readLockServices();
MySIservicesOrderUniqueKey::iterator si = mySIservicesOrderUniqueKey.find(s.uniqueKey());
already_exists = (si != mySIservicesOrderUniqueKey.end());
unlockServices();
if ( (!already_exists) || ((is_actual & 7) && (!si->second->is_actual)) ) {
if (already_exists)
{
writeLockServices();
mySIservicesOrderUniqueKey.erase(s.uniqueKey());
unlockServices();
}
SIservice *sp = new SIservice(s);
if (!sp)
{
printf("[sectionsd::addService] new SIservice failed.\n");
return false;
}
SIservicePtr sptr(sp);
#if 0
#define MAX_SIZE_SERVICENAME 50
char servicename[MAX_SIZE_SERVICENAME];
if (sptr->serviceName.empty()) {
sprintf(servicename, "%04x", sptr->service_id);
servicename[sizeof(servicename) - 1] = 0;
sptr->serviceName = servicename;
}
#endif
sptr->is_actual = is_actual;
writeLockServices();
mySIservicesOrderUniqueKey.insert(std::make_pair(sptr->uniqueKey(), sptr));
unlockServices();
if (sptr->nvods.size())
{
writeLockServices();
mySIservicesNVODorderUniqueKey.insert(std::make_pair(sptr->uniqueKey(), sptr));
unlockServices();
}
is_new = true;
}
return is_new;
}
static void *sdtThread(void *)
{
const unsigned timeoutInMSeconds = 2500;
t_transponder_id tid = 0;
time_t lastData = 0;
time_t zeit = 0;
int rs = 0;
int is_actual = 0;
//FIXME
dmxSDT.addfilter(0x42, 0xf3 ); //SDT actual = 0x42 + SDT other = 0x46 + BAT = 0x4A
dprintf("[%sThread] pid %d (%lu) start\n", "sdt", getpid(), pthread_self());
int timeoutsDMX = 0;
uint8_t *static_buf = new uint8_t[MAX_SECTION_LENGTH];
int rc;
if (static_buf == NULL)
{
xprintf("%s: could not allocate static_buf\n", __FUNCTION__);
pthread_exit(NULL);
}
dmxSDT.start(); // -> unlock
if (!scanning)
dmxSDT.request_pause();
bool startup = true;
waitForTimeset();
while (!sectionsd_stop) {
while (!scanning) {
if(sectionsd_stop)
break;
sleep(1);
}
zeit = time_monotonic();
if(sectionsd_stop)
break;
readLockMessaging();
if (messaging_zap_detected)
startup = true;
unlockMessaging();
if ((zeit > lastData + TIME_SDT_NONEWDATA) || (startup))
{
struct timespec abs_wait;
struct timeval now;
gettimeofday(&now, NULL);
TIMEVAL_TO_TIMESPEC(&now, &abs_wait);
abs_wait.tv_sec += (TIME_SDT_SCHEDULED_PAUSE);
dmxSDT.real_pause();
/* this is the "last" thread. Means: if this one goes to sleep, sectionsd
* sleeps mostly. Worth printing. */
printdate_ms(stdout);
printf("sdtThread: going to sleep...\n");
writeLockMessaging();
messaging_zap_detected = false;
unlockMessaging();
pthread_mutex_lock( &dmxSDT.start_stop_mutex );
rs = pthread_cond_timedwait( &dmxSDT.change_cond, &dmxSDT.start_stop_mutex, &abs_wait );
pthread_mutex_unlock( &dmxSDT.start_stop_mutex );
if(sectionsd_stop)
break;
if (rs == ETIMEDOUT)
{
dprintf("dmxSDT: waking up again - looking for new services :)\n");
dmxSDT.change( 0 ); // -> restart
}
else if (rs == 0)
{
dprintf("dmxSDT: waking up again - requested from .change()\n");
}
else
{
dprintf("dmxSDT: waking up again - unknown reason?!\n");
dmxSDT.real_unpause();
}
// update zeit after sleep
startup = false;
zeit = time_monotonic();
timeoutsDMX = 0;
lastData = zeit;
}
if (timeoutsDMX >= RESTART_DMX_AFTER_TIMEOUTS && scanning)
{
timeoutsDMX = 0;
dmxSDT.stop();
dmxSDT.start(); // leaves unlocked
dputs("\n !!! dmxSDT restarted !!!\n");
}
rc = dmxSDT.getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc < 0)
continue;
LongSection sec(static_buf);
uint8_t table_id = sec.getTableId();
if ((table_id == 0x42) || (table_id == 0x46))
{
SIsectionSDT sdt(static_buf);
is_actual = (sdt.getTableId() == 0x42) ? 1 : 0;
if (is_actual && !sdt.getLastSectionNumber())
is_actual = 2;
bool is_new = false;
is_actual = (is_actual | 8);
for (SIservices::iterator s = sdt.services().begin(); s != sdt.services().end(); s++) {
if (addService(*s, is_actual)) {
is_new = true;
tid = CREATE_TRANSPONDER_ID_FROM_ORIGINALNETWORK_TRANSPORTSTREAM_ID(s->original_network_id,
s->transport_stream_id);
}
}
if (is_new) {
lastData = time_monotonic();
dprintf("[sdtThread] added %d services [table 0x%x TID: %08x]\n",
sdt.services().size(), table_id, tid);
}
}
} // for
delete[] static_buf;
printf("[sectionsd] sdt-thread ended\n");
pthread_exit(NULL);
}
#endif
/* helper function for the housekeeping-thread */
static void print_meminfo(void)
{
@@ -3692,11 +3927,14 @@ static void readDVBTimeFilter(void)
extern cDemux * dmxUTC;
void sectionsd_main_thread(void */*data*/)
void sectionsd_main_thread(void * /*data*/)
{
pthread_t threadTOT, threadEIT, threadCN, threadHouseKeeping;
#ifdef ENABLE_FREESATEPG
pthread_t threadFSEIT;
#endif
#ifdef ENABLE_SDT
pthread_t threadSDT;
#endif
int rc;
@@ -3704,6 +3942,7 @@ void sectionsd_main_thread(void */*data*/)
printf("$Id: sectionsd.cpp,v 1.305 2009/07/30 12:41:39 seife Exp $\n");
printf("SIevent size: %d\n", sizeof(SIevent));
/* "export NO_SLOW_ADDEVENT=true" to disable this */
slow_addevent = (getenv("NO_SLOW_ADDEVENT") == NULL);
if (slow_addevent)
@@ -3789,6 +4028,15 @@ printf("SIevent size: %d\n", sizeof(SIevent));
return;
}
#endif
#ifdef ENABLE_SDT
printf("\n\n\n[sectionsd] starting SDT thread\n");
rc = pthread_create(&threadSDT, 0, sdtThread, 0);
if (rc) {
fprintf(stderr, "[sectionsd] failed to create sdt-thread (rc=%d)\n", rc);
return;
}
#endif
// housekeeping-Thread starten
rc = pthread_create(&threadHouseKeeping, 0, houseKeepingThread, 0);
@@ -3816,14 +4064,13 @@ printf("SIevent size: %d\n", sizeof(SIevent));
printdate_ms(stdout);
printf("EIT Update Filter: new version 0x%x, Activate cnThread\n", ((SI_section_header*)buf)->version_number);
writeLockMessaging();
// messaging_skipped_sections_ID[0].clear();
// messaging_sections_max_ID[0] = -1;
// messaging_sections_got_all[0] = false;
messaging_have_CN = 0x00;
messaging_got_CN = 0x00;
messaging_last_requested = time_monotonic();
unlockMessaging();
sched_yield();
dmxCN.change(0);
sched_yield();
@@ -3854,11 +4101,20 @@ printf("SIevent size: %d\n", sizeof(SIevent));
pthread_mutex_lock(&dmxCN.start_stop_mutex);
pthread_cond_broadcast(&dmxCN.change_cond);
pthread_mutex_unlock(&dmxCN.start_stop_mutex);
#ifdef ENABLE_SDT
pthread_mutex_lock(&dmxSDT.start_stop_mutex);
pthread_cond_broadcast(&dmxSDT.change_cond);
pthread_mutex_unlock(&dmxSDT.start_stop_mutex);
#endif
printf("pausing...\n");
dmxEIT.request_pause();
dmxCN.request_pause();
#ifdef ENABLE_FREESATEPG
dmxFSEIT.request_pause();
#endif
#ifdef ENABLE_SDT
dmxSDT.request_pause();
#endif
pthread_cancel(threadHouseKeeping);
@@ -3873,6 +4129,10 @@ printf("SIevent size: %d\n", sizeof(SIevent));
pthread_join(threadEIT, NULL);
printf("join 3\n");
pthread_join(threadCN, NULL);
#ifdef ENABLE_SDT
printf("join 4\n");
pthread_join(threadSDT, NULL);
#endif
eit_stop_update_filter(&eit_update_fd);
if(eitDmx)
@@ -3884,6 +4144,9 @@ printf("SIevent size: %d\n", sizeof(SIevent));
dmxCN.close();
#ifdef ENABLE_FREESATEPG
dmxFSEIT.close();
#endif
#ifdef ENABLE_SDT
dmxSDT.close();
#endif
printf("[sectionsd] ended\n");