eitd/sectionsd.cpp: WIP. commented most checks in CN thread;

comment request_pause/unpause;
move add events code to class member;
This commit is contained in:
[CST] Focus
2012-02-21 19:59:45 +04:00
parent 2942a55854
commit 443706b06f

View File

@@ -316,27 +316,18 @@ static bool deleteEvent(const event_id_t uniqueKey)
// only if it is the current channel... and if we don't have them already.
if (evt.get_channel_id() == messaging_current_servicekey &&
(messaging_got_CN != 0x03)) {
xprintf("addEvent: current %016llx event %016llx running %d messaging_got_CN %d\n", messaging_current_servicekey, evt.get_channel_id(), evt.runningStatus(), messaging_got_CN);
unlockMessaging();
SIevent *eptr = new SIevent(evt);
if (!eptr)
{
printf("[sectionsd::addEvent] new SIevent1 failed.\n");
return;
}
//FIXME is ptr needed here ?
SIeventPtr e(eptr);
writeLockEvents();
if (e->runningStatus() > 2) { // paused or currently running
if (evt.runningStatus() > 2) { // paused or currently running
//TODO myCurrentEvent/myNextEvent without pointers.
if (!myCurrentEvent || (myCurrentEvent && (*myCurrentEvent).uniqueKey() != e->uniqueKey())) {
if (myCurrentEvent)
delete myCurrentEvent;
if (!myCurrentEvent || (myCurrentEvent && (*myCurrentEvent).uniqueKey() != evt.uniqueKey())) {
delete myCurrentEvent;
myCurrentEvent = new SIevent(evt);
writeLockMessaging();
messaging_got_CN |= 0x01;
if (myNextEvent && (*myNextEvent).uniqueKey() == e->uniqueKey()) {
if (myNextEvent && (*myNextEvent).uniqueKey() == evt.uniqueKey()) {
dprintf("addevent-cn: removing next-event\n");
/* next got "promoted" to current => trigger re-read */
delete myNextEvent;
@@ -345,28 +336,27 @@ static bool deleteEvent(const event_id_t uniqueKey)
}
unlockMessaging();
dprintf("addevent-cn: added running (%d) event 0x%04x '%s'\n",
e->runningStatus(), e->eventID, e->getName().c_str());
evt.runningStatus(), evt.eventID, evt.getName().c_str());
} else {
writeLockMessaging();
messaging_got_CN |= 0x01;
unlockMessaging();
dprintf("addevent-cn: not add runn. (%d) event 0x%04x '%s'\n",
e->runningStatus(), e->eventID, e->getName().c_str());
evt.runningStatus(), evt.eventID, evt.getName().c_str());
}
} else {
if ((!myNextEvent || (myNextEvent && (*myNextEvent).uniqueKey() != e->uniqueKey() && (*myNextEvent).times.begin()->startzeit < e->times.begin()->startzeit)) &&
(!myCurrentEvent || (myCurrentEvent && (*myCurrentEvent).uniqueKey() != e->uniqueKey()))) {
if (myNextEvent)
delete myNextEvent;
if ((!myNextEvent || (myNextEvent && (*myNextEvent).uniqueKey() != evt.uniqueKey() && (*myNextEvent).times.begin()->startzeit < evt.times.begin()->startzeit)) &&
(!myCurrentEvent || (myCurrentEvent && (*myCurrentEvent).uniqueKey() != evt.uniqueKey()))) {
delete myNextEvent;
myNextEvent = new SIevent(evt);
writeLockMessaging();
messaging_got_CN |= 0x02;
unlockMessaging();
dprintf("addevent-cn: added next (%d) event 0x%04x '%s'\n",
e->runningStatus(), e->eventID, e->getName().c_str());
evt.runningStatus(), evt.eventID, evt.getName().c_str());
} else {
dprintf("addevent-cn: not added next(%d) event 0x%04x '%s'\n",
e->runningStatus(), e->eventID, e->getName().c_str());
evt.runningStatus(), evt.eventID, evt.getName().c_str());
writeLockMessaging();
messaging_got_CN |= 0x02;
unlockMessaging();
@@ -870,54 +860,51 @@ inline bool writeNbytes(int fd, const char *buf, const size_t numberOfBytes, co
static void commandPauseScanning(int connfd, char *data, const unsigned dataLength)
{
if (dataLength != 4)
return ;
if (dataLength != sizeof(int))
return;
int pause = *(int *)data;
if (pause && pause != 1)
return ;
dprintf("Request of %s scanning.\n", pause ? "stop" : "continue" );
xprintf("Request of %s scanning (now %s).\n", pause ? "stop" : "continue", scanning ? "scanning" : "idle");
if (scanning && pause)
{
#if 0
threadCN.request_pause();
//dmxEIT.request_pause();
threadEIT.request_pause();
#ifdef ENABLE_FREESATEPG
dmxFSEIT.request_pause();
#endif
#ifdef ENABLE_SDT
dmxSDT.request_pause();
#endif
#endif
scanning = 0;
}
else if (!pause && !scanning)
{
#if 0
threadCN.request_unpause();
//dmxEIT.request_unpause();
threadEIT.request_unpause();
#ifdef ENABLE_FREESATEPG
dmxFSEIT.request_unpause();
#endif
#ifdef ENABLE_SDT
dmxSDT.request_unpause();
#endif
#endif
writeLockEvents();
if (myCurrentEvent) {
delete myCurrentEvent;
myCurrentEvent = NULL;
}
if (myNextEvent) {
delete myNextEvent;
myNextEvent = NULL;
}
delete myCurrentEvent;
myCurrentEvent = NULL;
delete myNextEvent;
myNextEvent = NULL;
unlockEvents();
writeLockMessaging();
messaging_have_CN = 0x00;
messaging_got_CN = 0x00;
unlockMessaging();
scanning = 1;
if (!bTimeCorrect && !ntpenable)
{
@@ -926,9 +913,7 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
pthread_mutex_unlock(&timeThreadSleepMutex);
}
scanning = 1;
threadCN.change(0);
//dmxEIT.change(0);
threadEIT.change(0);
#ifdef ENABLE_FREESATEPG
dmxFSEIT.change(0);
@@ -947,6 +932,106 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
return ;
}
static void commandserviceChanged(int connfd, char *data, const unsigned dataLength)
{
t_channel_id uniqueServiceKey;
if (dataLength != sizeof(sectionsd::commandSetServiceChanged))
goto out;
uniqueServiceKey = (((sectionsd::commandSetServiceChanged *)data)->channel_id);
uniqueServiceKey &= 0xFFFFFFFFFFFFULL;
dprintf("[sectionsd] commandserviceChanged: Service changed to " PRINTF_CHANNEL_ID_TYPE "\n", uniqueServiceKey);
xprintf("[sectionsd] commandserviceChanged: Service changed to " PRINTF_CHANNEL_ID_TYPE "\n\n", uniqueServiceKey);
messaging_last_requested = time_monotonic();
#if 0
if(checkBlacklist(uniqueServiceKey))
{
if (!channel_is_blacklisted) {
channel_is_blacklisted = true;
threadCN.request_pause();
threadEIT.request_pause();
#ifdef ENABLE_SDT
dmxSDT.request_pause();
#endif
}
xprintf("[sectionsd] commandserviceChanged: service is filtered!\n");
}
else
{
if (channel_is_blacklisted) {
channel_is_blacklisted = false;
threadCN.request_unpause();
threadEIT.request_unpause();
#ifdef ENABLE_SDT
dmxSDT.request_unpause();
#endif
xprintf("[sectionsd] commandserviceChanged: service is no longer filtered!\n");
}
}
#endif
#if 0
if(checkNoDVBTimelist(uniqueServiceKey))
{
if (dvb_time_update) {
dvb_time_update = false;
}
xprintf("[sectionsd] commandserviceChanged: DVB time update is blocked!\n");
}
else
{
if (!dvb_time_update) {
dvb_time_update = true;
xprintf("[sectionsd] commandserviceChanged: DVB time update is allowed!\n");
}
}
#endif
if (uniqueServiceKey && messaging_current_servicekey != uniqueServiceKey)
{
dvb_time_update = !checkNoDVBTimelist(uniqueServiceKey);
dprintf("[sectionsd] commandserviceChanged: DVB time update is %s\n", dvb_time_update ? "allowed" : "blocked!");
channel_is_blacklisted = checkBlacklist(uniqueServiceKey);
dprintf("[sectionsd] commandserviceChanged: service is %s\n", channel_is_blacklisted ? "filtered!" : "not filtered");
writeLockEvents();
delete myCurrentEvent;
myCurrentEvent = NULL;
delete myNextEvent;
myNextEvent = NULL;
unlockEvents();
writeLockMessaging();
messaging_current_servicekey = uniqueServiceKey;
messaging_have_CN = 0x00;
messaging_got_CN = 0x00;
messaging_zap_detected = true;
messaging_need_eit_version = false;
unlockMessaging();
threadCN.setCurrentService(messaging_current_servicekey);
threadEIT.setCurrentService(messaging_current_servicekey);
#ifdef ENABLE_FREESATEPG
dmxFSEIT.setCurrentService(messaging_current_servicekey);
#endif
#ifdef ENABLE_SDT
dmxSDT.setCurrentService(messaging_current_servicekey);
#endif
}
else
dprintf("[sectionsd] commandserviceChanged: no change...\n");
out:
struct sectionsd::msgResponseHeader msgResponse;
msgResponse.dataLength = 0;
writeNbytes(connfd, (const char *)&msgResponse, sizeof(msgResponse), WRITE_TIMEOUT_IN_SECONDS);
dprintf("[sectionsd] commandserviceChanged: END!!\n");
return ;
}
static void commandGetIsScanningActive(int connfd, char* /*data*/, const unsigned /*dataLength*/)
{
struct sectionsd::msgResponseHeader responseHeader;
@@ -1024,103 +1109,6 @@ static void commandDumpStatusInformation(int /*connfd*/, char* /*data*/, const u
}
static void commandserviceChanged(int connfd, char *data, const unsigned dataLength)
{
t_channel_id *uniqueServiceKey;
if (dataLength != sizeof(sectionsd::commandSetServiceChanged))
goto out;
uniqueServiceKey = &(((sectionsd::commandSetServiceChanged *)data)->channel_id);
dprintf("[sectionsd] commandserviceChanged: Service changed to " PRINTF_CHANNEL_ID_TYPE "\n", *uniqueServiceKey);
messaging_last_requested = time_monotonic();
if(checkBlacklist(*uniqueServiceKey))
{
if (!channel_is_blacklisted) {
channel_is_blacklisted = true;
threadCN.request_pause();
//dmxEIT.request_pause();
threadEIT.request_pause();
#ifdef ENABLE_SDT
dmxSDT.request_pause();
#endif
}
xprintf("[sectionsd] commandserviceChanged: service is filtered!\n");
}
else
{
if (channel_is_blacklisted) {
channel_is_blacklisted = false;
threadCN.request_unpause();
//dmxEIT.request_unpause();
threadEIT.request_unpause();
#ifdef ENABLE_SDT
dmxSDT.request_unpause();
#endif
xprintf("[sectionsd] commandserviceChanged: service is no longer filtered!\n");
}
}
//TODO move check to time thread
if(checkNoDVBTimelist(*uniqueServiceKey))
{
if (dvb_time_update) {
dvb_time_update = false;
}
xprintf("[sectionsd] commandserviceChanged: DVB time update is blocked!\n");
}
else
{
if (!dvb_time_update) {
dvb_time_update = true;
xprintf("[sectionsd] commandserviceChanged: DVB time update is allowed!\n");
}
}
if (messaging_current_servicekey != *uniqueServiceKey)
{
//if (debug) showProfiling("[sectionsd] commandserviceChanged: before events lock");
writeLockEvents();
//if (debug) showProfiling("[sectionsd] commandserviceChanged: after events lock");
if (myCurrentEvent) {
delete myCurrentEvent;
myCurrentEvent = NULL;
}
if (myNextEvent) {
delete myNextEvent;
myNextEvent = NULL;
}
unlockEvents();
writeLockMessaging();
messaging_current_servicekey = *uniqueServiceKey;
messaging_have_CN = 0x00;
messaging_got_CN = 0x00;
messaging_zap_detected = true;
messaging_need_eit_version = false;
unlockMessaging();
threadCN.setCurrentService(messaging_current_servicekey);
threadEIT.setCurrentService(messaging_current_servicekey);
#ifdef ENABLE_FREESATEPG
dmxFSEIT.setCurrentService(messaging_current_servicekey);
#endif
#ifdef ENABLE_SDT
dmxSDT.setCurrentService(messaging_current_servicekey);
#endif
}
else
dprintf("[sectionsd] commandserviceChanged: no change...\n");
out:
struct sectionsd::msgResponseHeader msgResponse;
msgResponse.dataLength = 0;
writeNbytes(connfd, (const char *)&msgResponse, sizeof(msgResponse), WRITE_TIMEOUT_IN_SECONDS);
dprintf("[sectionsd] commandserviceChanged: END!!\n");
return ;
}
static void commandGetIsTimeSet(int connfd, char* /*data*/, const unsigned /*dataLength*/)
{
sectionsd::responseIsTimeSet rmsg;
@@ -1235,11 +1223,17 @@ static void commandSetConfig(int connfd, char *data, const unsigned /*dataLength
static void deleteSIexceptEPG()
{
writeLockServices();
mySIservicesOrderUniqueKey.clear();
unlockServices();
//dmxEIT.dropCachedSectionIDs();
threadEIT.dropCachedSectionIDs();
threadEIT.change(0);
#ifdef ENABLE_SDT
dmxSDT.dropCachedSectionIDs();
dmxSDT.change(0);
#endif
#ifdef ENABLE_FREESATEPG
dmxFSEIT.setCurrentService(messaging_current_servicekey);
dmxFSEIT.change(0);
#endif
}
@@ -1286,7 +1280,6 @@ showProfiling("commandFreeMemory end2");
return ;
}
void *insertEventsfromFile(void * data);
static void commandReadSIfromXML(int connfd, char *data, const unsigned dataLength)
{
pthread_t thrInsert;
@@ -1317,7 +1310,6 @@ static void commandReadSIfromXML(int connfd, char *data, const unsigned dataLeng
return ;
}
void writeEventsToFile(char *epgdir);
static void commandWriteSI2XML(int connfd, char *data, const unsigned dataLength)
{
char epgdir[100] = "";
@@ -1458,6 +1450,8 @@ static void *timeThread(void *)
bool time_ntp = false;
bool success = true;
//t_channel_id time_trigger_last = 0;
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, 0);
dprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self());
@@ -1469,6 +1463,13 @@ static void *timeThread(void *)
break;
sleep(1);
}
#if 0
t_channel_id new_transponder = (messaging_current_servicekey & 0xFFFFFFFF0000ULL);
if (time_trigger_last != new_transponder)
{
time_trigger_last = new_transponder;
}
#endif
if (bTimeCorrect == true) { // sectionsd started with parameter "-tc"
if (first_time == true) { // only do this once!
time_t actTime;
@@ -1783,44 +1784,7 @@ static void *fseitThread(void *)
unlockMessaging();
}
SIsectionEIT eit(static_buf);
// Houdini: if section is not parsed (too short) -> no need to check events
if (!eit.is_parsed())
continue;
//dprintf("[eitThread] adding %d events [table 0x%x] (begin)\n", eit.events().size(), eit.getTableId());
zeit = time(NULL);
// Nicht alle Events speichern
for (SIevents::iterator e = eit.events().begin(); e != eit.events().end(); ++e)
{
if (!(e->times.empty()))
{
if ( ( e->times.begin()->startzeit < zeit + secondsToCache ) &&
( ( e->times.begin()->startzeit + (long)e->times.begin()->dauer ) > zeit - oldEventsAre ) )
{
addEvent(*e, zeit);
}
}
else
{
// pruefen ob nvod event
readLockServices();
MySIservicesNVODorderUniqueKey::iterator si = mySIservicesNVODorderUniqueKey.find(e->get_channel_id());
if (si != mySIservicesNVODorderUniqueKey.end())
{
// Ist ein nvod-event
writeLockEvents();
for (SInvodReferences::iterator i = si->second->nvods.begin(); i != si->second->nvods.end(); ++i)
mySIeventUniqueKeysMetaOrderServiceUniqueKey.insert(std::make_pair(i->uniqueKey(), e->uniqueKey()));
unlockEvents();
addNVODevent(*e);
}
unlockServices();
}
} // for
addEvents();
//dprintf("[eitThread] added %d events (end)\n", eit.events().size());
} // for
delete[] static_buf;
@@ -1830,6 +1794,45 @@ static void *fseitThread(void *)
}
#endif
bool CSectionThread::addEvents()
{
SIsectionEIT eit(static_buf);
if (!eit.is_parsed())
return false;
dprintf("[eitThread] adding %d events [table 0x%x] (begin)\n", eit.events().size(), eit.getTableId());
time_t zeit = time(NULL);
for (SIevents::iterator e = eit.events().begin(); e != eit.events().end(); ++e) {
if (!(e->times.empty())) {
if ( ( e->times.begin()->startzeit < zeit + secondsToCache ) &&
( ( e->times.begin()->startzeit + (long)e->times.begin()->dauer ) > zeit - oldEventsAre ) )
{
addEvent(*e, zeit, e->table_id == 0x4e);
event_count++;
}
} else {
// pruefen ob nvod event
readLockServices();
MySIservicesNVODorderUniqueKey::iterator si = mySIservicesNVODorderUniqueKey.find(e->get_channel_id());
if (si != mySIservicesNVODorderUniqueKey.end()) {
// Ist ein nvod-event
writeLockEvents();
for (SInvodReferences::iterator i = si->second->nvods.begin(); i != si->second->nvods.end(); ++i)
mySIeventUniqueKeysMetaOrderServiceUniqueKey.insert(std::make_pair(i->uniqueKey(), e->uniqueKey()));
unlockEvents();
addNVODevent(*e);
}
unlockServices();
}
} // for
return true;
}
//---------------------------------------------------------------------
// EIT-thread
// reads EPG-datas
@@ -1837,16 +1840,14 @@ static void *fseitThread(void *)
void CEitThread::run()
{
xprintf("CEitThread::run:: starting..\n");
name = "eitThread";
xprintf("%s::run:: starting, pid %d (%lu)\n", name.c_str(), getpid(), pthread_self());
pID = 0x12;
timeoutInMSeconds = EIT_READ_TIMEOUT;
bool sendToSleepNow = false;
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, 0);
/* These filters are a bit tricky (index numbers):
- 0 Dummy filter, to make this thread sleep for some seconds
- 1 then get other TS's current/next (this TS's cur/next are
@@ -1855,7 +1856,6 @@ void CEitThread::run()
- 4 then get the other TS's scheduled events,
- 4ab (in two steps to reduce the POLLERRs on the DMX device)
*/
// -- set EIT filter 0x4e-0x6F
addfilter(0x00, 0x00); //0 dummy filter
addfilter(0x50, 0xf0); //1 current TS, scheduled
addfilter(0x4f, 0xff); //2 other TS, current/next
@@ -1869,160 +1869,63 @@ void CEitThread::run()
if (sections_debug)
dump_sched_info("eitThread");
dprintf("[%sThread] pid %d (%lu) start\n", "eit", getpid(), pthread_self());
int rc;
if (static_buf == NULL)
{
xprintf("%s: could not allocate static_buf\n", __FUNCTION__);
pthread_exit(NULL);
}
waitForTimeset();
xprintf("%s::run:: time set.\n", name.c_str());
DMX::start(); // -> unlock
if (!scanning)
request_pause();
waitForTimeset();
lastChanged = time_monotonic();
int cnt = 0;
while (running) {
while (!scanning) {
if(sectionsd_stop)
break;
sleep(1);
}
if(sectionsd_stop)
break;
time_t zeit = time_monotonic();
rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if(sectionsd_stop)
break;
if (timeoutsDMX < 0 && !channel_is_blacklisted)
{
dprintf("[eitThread] skipping to next filter(%d) (timeouts %d > %s)\n", filter_index+1, timeoutsDMX,
timeoutsDMX == -1 ? "HAS_ALL_SECTIONS_SKIPPING" : timeoutsDMX == -2 ? "HAS_ALL_CURRENT_SECTIONS_SKIPPING" : "UNKNOWN");
timeoutsDMX = 0;
if (!next_filter())
sendToSleepNow = true;
}
if (timeoutsDMX >= CHECK_RESTART_DMX_AFTER_TIMEOUTS && scanning && !channel_is_blacklisted)
{
dprintf("[eitThread] skipping to next filter(%d) (> DMX_TIMEOUT_SKIPPING %d)\n", filter_index+1, timeoutsDMX);
timeoutsDMX = 0;
if (!next_filter())
sendToSleepNow = true;
}
if (sendToSleepNow || channel_is_blacklisted)
{
sendToSleepNow = false;
real_pause();
if(sendToSleepNow || !scanning || channel_is_blacklisted) {
xprintf("%s: going to sleep %d seconds, running %d scanning %d blacklisted %d events %d\n",
name.c_str(), TIME_EIT_SCHEDULED_PAUSE, running, scanning, channel_is_blacklisted, event_count);
event_count = 0;
writeLockMessaging();
messaging_zap_detected = false;
unlockMessaging();
xprintf("dmxEIT: going to sleep for %d seconds, added %d events\n\n", TIME_EIT_SCHEDULED_PAUSE, cnt);
cnt = 0;
int rs = 0;
do {
real_pause();
rs = Sleep(TIME_EIT_SCHEDULED_PAUSE);
} while (channel_is_blacklisted);
xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs);
} while(running && (!scanning || channel_is_blacklisted));
if(!running)
break;
if (rs == ETIMEDOUT)
{
dprintf("dmxEIT: waking up again - timed out\n");
dprintf("New Filterindex: %d (ges. %d)\n", 2, (signed) filters.size() );
change(1); // -> restart
}
else if (rs == 0)
{
dprintf("dmxEIT: waking up again - requested from .change()\n");
}
else
{
dprintf("dmxEIT: waking up again - unknown reason %d\n",rs);
real_unpause();
}
// update zeit after sleep
zeit = time_monotonic();
sendToSleepNow = false;
}
else if (zeit > lastChanged + TIME_EIT_SKIPPING )
{
int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc > 0)
addEvents();
time_t zeit = time_monotonic();
bool need_change = false;
if(timeoutsDMX < 0 || timeoutsDMX >= CHECK_RESTART_DMX_AFTER_TIMEOUTS) {
xprintf("%s: skipping to next filter %d from %d (timeouts %d)\n", name.c_str(), filter_index+1, filters.size(), timeoutsDMX);
timeoutsDMX = 0;
need_change = true;
}
if (zeit > lastChanged + TIME_EIT_SKIPPING) {
xprintf("%s: skipping to next filter %d from %d (TIME_EIT_SKIPPING)\n", name.c_str(), filter_index+1, filters.size());
need_change = true;
}
if(running && need_change && scanning && !channel_is_blacklisted) {
readLockMessaging();
dprintf("[eitThread] skipping to next filter(%d) (> TIME_EIT_SKIPPING)\n", filter_index+1 );
if (!next_filter())
sendToSleepNow = true;
unlockMessaging();
}
} // while running
if (rc < 0)
continue;
if(sectionsd_stop)
break;
SIsectionEIT eit(static_buf);
if (!eit.is_parsed())
continue;
dprintf("[eitThread] adding %d events [table 0x%x] (begin)\n", eit.events().size(), eit.getTableId());
zeit = time(NULL);
for (SIevents::iterator e = eit.events().begin(); e != eit.events().end(); ++e)
{
if (!(e->times.empty()))
{
if ( ( e->times.begin()->startzeit < zeit + secondsToCache ) &&
( ( e->times.begin()->startzeit + (long)e->times.begin()->dauer ) > zeit - oldEventsAre ) )
{
//printf("Adding event 0x%llx table %x version %x running %d\n", e->uniqueKey(), eit.getTableId(), eit.getVersionNumber(), e->runningStatus());
addEvent(*e, zeit);
cnt++;
}
}
else
{
// pruefen ob nvod event
readLockServices();
MySIservicesNVODorderUniqueKey::iterator si = mySIservicesNVODorderUniqueKey.find(e->get_channel_id());
if (si != mySIservicesNVODorderUniqueKey.end())
{
// Ist ein nvod-event
writeLockEvents();
for (SInvodReferences::iterator i = si->second->nvods.begin(); i != si->second->nvods.end(); ++i)
mySIeventUniqueKeysMetaOrderServiceUniqueKey.insert(std::make_pair(i->uniqueKey(), e->uniqueKey()));
unlockEvents();
addNVODevent(*e);
}
unlockServices();
}
} // for
//dprintf("[eitThread] added %d events (end)\n", eit.events().size());
} // for
delete[] static_buf;
printf("[sectionsd] eitThread ended\n");
printf("[sectionsd] %s ended\n", name.c_str());
pthread_exit(NULL);
}
@@ -2032,7 +1935,8 @@ cnt = 0;
void CCNThread::run()
{
xprintf("CCNThread::run:: starting..\n");
name = "cnThread";
xprintf("%s::run:: starting, pid %d (%lu)\n", name.c_str(), getpid(), pthread_self());
pID = 0x12;
dmx_num = 1;
cache = false;
@@ -2041,18 +1945,11 @@ void CCNThread::run()
make it too long... */
timeoutInMSeconds = EIT_READ_TIMEOUT;
bool sendToSleepNow = false;
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, 0);
// -- set EIT filter 0x4e
addfilter(0x4e, 0xff); //0 current TS, current/next
dprintf("[%sThread] pid %d (%lu) start\n", "cn", getpid(), pthread_self());
t_channel_id time_trigger_last = 0;
int rc;
DMX::start(); // -> unlock
if (!scanning)
request_pause();
//t_channel_id time_trigger_last = 0;
writeLockMessaging();
messaging_eit_is_busy = true;
@@ -2062,24 +1959,57 @@ void CCNThread::run()
waitForTimeset();
xprintf("CCNThread::run:: time set..\n");
time_t eit_waiting_since = time_monotonic();
lastChanged = eit_waiting_since;
DMX::start(); // -> unlock
//time_t eit_waiting_since = time_monotonic();
int cnt = 0;
while(running)
{
while (!scanning) {
sleep(1);
if(sectionsd_stop)
break;
}
if(sectionsd_stop)
break;
if(sendToSleepNow || !scanning || channel_is_blacklisted) {
xprintf("%s: going to sleep, running %d scanning %d blacklisted %d events %d\n", name.c_str(), running, scanning, channel_is_blacklisted, event_count);
//xprintf("%s: eit_version %02x messaging_need_eit_version %d rc %d\n", name.c_str(), get_eit_version(), messaging_need_eit_version, rc);
rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
writeLockMessaging();
messaging_eit_is_busy = false;
unlockMessaging();
int rs = 0;
do {
real_pause();
pthread_mutex_lock( &start_stop_mutex );
if (!channel_is_blacklisted)
eit_set_update_filter(&eit_update_fd);
rs = pthread_cond_wait(&change_cond, &start_stop_mutex);
eit_stop_update_filter(&eit_update_fd);
pthread_mutex_unlock(&start_stop_mutex);
//rs = Sleep(TIME_EIT_SCHEDULED_PAUSE);
xprintf("%s: wakeup, running %d scanning %d blacklisted %d reason %d\n\n", name.c_str(), running, scanning, channel_is_blacklisted, rs);
} while(running && (!scanning || channel_is_blacklisted));
if(!running)
break;
sendToSleepNow = false;
writeLockMessaging();
messaging_need_eit_version = false;
messaging_eit_is_busy = true;
unlockMessaging();
#if HAVE_IPBOX_HARDWARE
if (rs == 0)
change(0);
#endif
}
int rc = getSection(static_buf, timeoutInMSeconds, timeoutsDMX);
if (rc > 0)
addEvents();
time_t zeit = time_monotonic();
if (update_eit) {
#if 0
xprintf("%s: eit_version %02x messaging_need_eit_version %d rc %d\n", name.c_str(), get_eit_version(), messaging_need_eit_version, rc);
if (get_eit_version() != 0xff) {
writeLockMessaging();
messaging_need_eit_version = false;
@@ -2110,47 +2040,72 @@ int cnt = 0;
sendToSleepNow = true;
}
}
#endif
} // if (update_eit)
readLockMessaging();
if (messaging_got_CN != messaging_have_CN) // timeoutsDMX < -1)
if (messaging_got_CN != messaging_have_CN)
{
xprintf("dmxCN: have CN: timeoutsDMX %d messaging_got_CN %x messaging_have_CN %x\n\n", timeoutsDMX, messaging_got_CN, messaging_have_CN);
unlockMessaging();
writeLockMessaging();
messaging_have_CN = messaging_got_CN;
unlockMessaging();
xprintf("%s: have CN: timeoutsDMX %d messaging_have_CN %x messaging_got_CN %x\n\n",
name.c_str(), timeoutsDMX, messaging_have_CN, messaging_got_CN);
dprintf("[cnThread] got current_next (0x%x) - sending event!\n", messaging_have_CN);
eventServer->sendEvent(CSectionsdClient::EVT_GOT_CN_EPG,
CEventServer::INITID_SECTIONSD,
&messaging_current_servicekey,
sizeof(messaging_current_servicekey));
/* we received an event => reset timeout timer... */
eit_waiting_since = zeit;
//eit_waiting_since = zeit;
lastChanged = zeit; /* this is ugly - needs somehting better */
readLockMessaging();
//readLockMessaging();
}
else
unlockMessaging();
#if 0
/* TODO ? change() from setServiceChanged trigger before service tuned,
* check for complete cannot be used yet */
if(timeoutsDMX < 0) {
xprintf("dmxCN: timeoutsDMX %d messaging_got_CN %x messaging_have_CN %x sid %016llx\n\n\n", timeoutsDMX, messaging_got_CN, messaging_have_CN, messaging_current_servicekey);
timeoutsDMX = 0;
sendToSleepNow = true;
}
#endif
if (messaging_have_CN == 0x03) // current + next
{
xprintf("dmxCN: have all CN timeoutsDMX %d messaging_got_CN %x messaging_have_CN %x\n\n", timeoutsDMX, messaging_got_CN, messaging_have_CN);
xprintf("%s: have all CN: timeoutsDMX %d messaging_have_CN %x messaging_got_CN %x\n\n",
name.c_str(), timeoutsDMX, messaging_have_CN, messaging_got_CN);
unlockMessaging();
sendToSleepNow = true;
//sendToSleepNow = true;
//timeoutsDMX = 0;
}
else {
unlockMessaging();
}
#endif
#if 1
if(timeoutsDMX < 0) {
xprintf("%s: timeoutsDMX %d messaging_got_CN %x messaging_have_CN %x sid %016llx\n\n",
name.c_str(), timeoutsDMX, messaging_got_CN, messaging_have_CN, messaging_current_servicekey);
timeoutsDMX = 0;
sendToSleepNow = true;
}
#endif
if (zeit > lastChanged + TIME_EIT_VERSION_WAIT) {
xprintf("%s: zeit > lastChanged + TIME_EIT_VERSION_WAIT\n", name.c_str());
sendToSleepNow = true;
}
if (sendToSleepNow && messaging_have_CN == 0x00) {
/* send a "no epg" event anyway before going to sleep */
eventServer->sendEvent(CSectionsdClient::EVT_GOT_CN_EPG,
CEventServer::INITID_SECTIONSD,
&messaging_current_servicekey,
sizeof(messaging_current_servicekey));
}
#if 0
/* ignore sleep if channel not blacklisted and messaging_need_eit_version == false */
if(!channel_is_blacklisted && messaging_need_eit_version) {
xprintf("%s: sendToSleepNow ignored: messaging_need_eit_version %d\n", name.c_str(), messaging_need_eit_version);
sendToSleepNow = false;
}
#endif
#if 0
/* sleep if channel blacklisted OR sleep requested and messaging_need_eit_version == false */
if ((sendToSleepNow && !messaging_need_eit_version) || channel_is_blacklisted)
{
sendToSleepNow = false;
@@ -2178,8 +2133,8 @@ xprintf("dmxCN: have all CN timeoutsDMX %d messaging_got_CN %x messaging_have_CN
pthread_mutex_unlock(&timeThreadSleepMutex);
}
xprintf("\n\ndmxCN: going to sleep, added %d events\n\n", cnt);
cnt = 0;
xprintf("dmxCN: going to sleep, processed %d events\n\n", event_count);
event_count = 0;
int rs;
do {
@@ -2225,32 +2180,10 @@ cnt = 0;
sizeof(messaging_current_servicekey));
continue;
}
if (rc < 0)
continue;
SIsectionEIT eit(static_buf);
if (!eit.is_parsed())
continue;
//dprintf("[cnThread] adding %d events [table 0x%x] (begin)\n", eit.events().size(), eit.getTableId());
zeit = time(NULL);
//xprintf("dmxCN: add %d events, timeouts %d\n", eit.events().size(), timeoutsDMX);
for (SIevents::iterator e = eit.events().begin(); e != eit.events().end(); ++e)
{
if (!(e->times.empty()))
{
addEvent(*e, zeit, true); /* cn = true => fill in current / next event */
cnt++;
}
}
//dprintf("[cnThread] added %d events (end)\n", eit.events().size());
#endif
} // for
delete[] static_buf;
printf("[sectionsd] cnThread ended\n");
pthread_exit(NULL);
}
@@ -2330,20 +2263,10 @@ static void *sdtThread(void *)
uint8_t *static_buf = new uint8_t[MAX_SECTION_LENGTH];
int rc;
if (static_buf == NULL)
{
xprintf("%s: could not allocate static_buf\n", __FUNCTION__);
pthread_exit(NULL);
}
dmxSDT.start(); // -> unlock
if (!scanning)
dmxSDT.request_pause();
bool startup = true;
waitForTimeset();
dmxSDT.start(); // -> unlock
while (!sectionsd_stop) {
while (!scanning) {
@@ -2695,7 +2618,8 @@ printf("SIevent size: %d\n", sizeof(SIevent));
}
printf("[sectionsd] stopping...\n");
scanning = 0;
//scanning = 0;
timeset = true;
printf("broadcasting...\n");
pthread_mutex_lock(&timeIsSetMutex);
@@ -3032,10 +2956,10 @@ void sectionsd_getCurrentNextServiceKey(t_channel_id uniqueServiceKey, CSections
unlockEvents();
//dprintf("change: %s, messaging_eit_busy: %s, last_request: %d\n", change?"true":"false", messaging_eit_is_busy?"true":"false",(time_monotonic() - messaging_last_requested));
xprintf("change: %s, messaging_eit_busy: %s, last_request: %d\n", change?"true":"false", messaging_eit_is_busy?"true":"false",(int) (time_monotonic() - messaging_last_requested));
if (change && !messaging_eit_is_busy && (time_monotonic() - messaging_last_requested) < 11) {
/* restart dmxCN, but only if it is not already running, and only for 10 seconds */
dprintf("change && !messaging_eit_is_busy => dmxCN.change(0)\n");
xprintf("change && !messaging_eit_is_busy => dmxCN.change(0)\n");
threadCN.change(0);
}
}