eitd/sectionsd.cpp: rework thread stop code, testing;

move time-related mutex, condition and some code to CTimeThread
This commit is contained in:
[CST] Focus
2012-03-05 16:49:34 +04:00
parent dec2ed2588
commit 289bf2c4b2
2 changed files with 134 additions and 53 deletions

View File

@@ -28,6 +28,10 @@
#include <sys/time.h> #include <sys/time.h>
#include <OpenThreads/Thread>
#include <OpenThreads/Condition>
#include "dmx.h"
#include "SIutils.hpp" #include "SIutils.hpp"
#include "SIservices.hpp" #include "SIservices.hpp"
#include "SIevents.hpp" #include "SIevents.hpp"
@@ -93,8 +97,6 @@ typedef std::map<t_channel_id, event_id_t, std::less<t_channel_id> > MySIeventUn
typedef std::map<t_channel_id, SIservicePtr, std::less<t_channel_id> > MySIservicesOrderUniqueKey; typedef std::map<t_channel_id, SIservicePtr, std::less<t_channel_id> > MySIservicesOrderUniqueKey;
typedef std::map<t_channel_id, SIservicePtr, std::less<t_channel_id> > MySIservicesNVODorderUniqueKey; typedef std::map<t_channel_id, SIservicePtr, std::less<t_channel_id> > MySIservicesNVODorderUniqueKey;
#include <OpenThreads/Thread>
#include "dmx.h"
/* abstract section reading class */ /* abstract section reading class */
class CSectionThread : public OpenThreads::Thread, public DMX class CSectionThread : public OpenThreads::Thread, public DMX
@@ -177,23 +179,44 @@ class CSectionThread : public OpenThreads::Thread, public DMX
running = true; running = true;
return (OpenThreads::Thread::start() == 0); return (OpenThreads::Thread::start() == 0);
} }
bool Stop() void StopRun() {
{ xprintf("%s::StopRun: to lock\n", name.c_str());
if(!running)
return false;
printf("%s::Stop: to lock\n", name.c_str());
lock(); lock();
running = false; running = false;
printf("%s::Stop: to broadcast\n", name.c_str()); real_pauseCounter = 1;
pthread_cond_broadcast(&change_cond); xprintf("%s::StopRun: to closefd\n", name.c_str());
printf("%s::Stop: to unlock\n", name.c_str());
unlock();
printf("%s::Stop: to closefd\n", name.c_str());
DMX::closefd(); DMX::closefd();
printf("%s::Stop: to join\n", name.c_str()); xprintf("%s::StopRun: to unlock\n", name.c_str());
unlock();
}
void Wakeup() { pthread_cond_broadcast(&change_cond); }
bool Stop()
{
xprintf("%s::Stop: to broadcast\n", name.c_str());
pthread_cond_broadcast(&change_cond);
xprintf("%s::Stop: to join\n", name.c_str());
int ret = (OpenThreads::Thread::join() == 0); int ret = (OpenThreads::Thread::join() == 0);
printf("%s::Stop: to close\n", name.c_str()); xprintf("%s::Stop: to close\n", name.c_str());
DMX::close(); DMX::close();
#if 0
if(!running)
return false;
xprintf("%s::Stop: to lock\n", name.c_str());
lock();
running = false;
xprintf("%s::Stop: to broadcast\n", name.c_str());
pthread_cond_broadcast(&change_cond);
xprintf("%s::Stop: to unlock\n", name.c_str());
unlock();
#if 1
xprintf("%s::Stop: to closefd\n", name.c_str());
DMX::closefd();
#endif
xprintf("%s::Stop: to join\n", name.c_str());
int ret = (OpenThreads::Thread::join() == 0);
xprintf("%s::Stop: to close\n", name.c_str());
DMX::close();
#endif
return ret; return ret;
} }
}; };
@@ -281,11 +304,18 @@ class CTimeThread : public CSectionThread
bool time_ntp; bool time_ntp;
bool first_time; bool first_time;
int64_t timediff;
OpenThreads::Mutex time_mutex;
OpenThreads::Condition time_cond;
void sendTimeEvent(bool dvb, time_t tim = 0); void sendTimeEvent(bool dvb, time_t tim = 0);
void setSystemTime(time_t tim); void setSystemTime(time_t tim);
void run(); void run();
public: public:
CTimeThread(); CTimeThread();
void waitForTimeset();
void setTimeSet();
}; };
class CEitManager class CEitManager

View File

@@ -123,8 +123,8 @@ static t_channel_id messaging_current_servicekey = 0;
static bool channel_is_blacklisted = false; static bool channel_is_blacklisted = false;
bool timeset = false; bool timeset = false;
pthread_cond_t timeIsSetCond = PTHREAD_COND_INITIALIZER; //pthread_cond_t timeIsSetCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t timeIsSetMutex = PTHREAD_MUTEX_INITIALIZER; //pthread_mutex_t timeIsSetMutex = PTHREAD_MUTEX_INITIALIZER;
static int messaging_have_CN = 0x00; // 0x01 = CURRENT, 0x02 = NEXT static int messaging_have_CN = 0x00; // 0x01 = CURRENT, 0x02 = NEXT
static int messaging_got_CN = 0x00; // 0x01 = CURRENT, 0x02 = NEXT static int messaging_got_CN = 0x00; // 0x01 = CURRENT, 0x02 = NEXT
@@ -137,8 +137,8 @@ static CEventServer *eventServer;
static pthread_rwlock_t servicesLock = PTHREAD_RWLOCK_INITIALIZER; // Unsere (fast-)mutex, damit nicht gleichzeitig in die Menge services geschrieben und gelesen wird static pthread_rwlock_t servicesLock = PTHREAD_RWLOCK_INITIALIZER; // Unsere (fast-)mutex, damit nicht gleichzeitig in die Menge services geschrieben und gelesen wird
static pthread_rwlock_t messagingLock = PTHREAD_RWLOCK_INITIALIZER; static pthread_rwlock_t messagingLock = PTHREAD_RWLOCK_INITIALIZER;
static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER; //static pthread_cond_t timeThreadSleepCond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER; //static pthread_mutex_t timeThreadSleepMutex = PTHREAD_MUTEX_INITIALIZER;
static CTimeThread threadTIME; static CTimeThread threadTIME;
static CEitThread threadEIT; static CEitThread threadEIT;
@@ -204,6 +204,7 @@ inline void unlockEvents(void)
pthread_rwlock_unlock(&eventsLock); pthread_rwlock_unlock(&eventsLock);
} }
#if 0
inline bool waitForTimeset(void) inline bool waitForTimeset(void)
{ {
pthread_mutex_lock(&timeIsSetMutex); pthread_mutex_lock(&timeIsSetMutex);
@@ -217,6 +218,7 @@ inline bool waitForTimeset(void)
sleep(1); sleep(1);
return true; return true;
} }
#endif
static const SIevent nullEvt; // Null-Event static const SIevent nullEvt; // Null-Event
@@ -886,12 +888,13 @@ static void commandPauseScanning(int connfd, char *data, const unsigned dataLeng
unlockMessaging(); unlockMessaging();
scanning = 1; scanning = 1;
if (!ntpenable) if (!ntpenable) //FIXME flag if ntp update was good ?
{ {
#if 0
pthread_mutex_lock(&timeThreadSleepMutex); pthread_mutex_lock(&timeThreadSleepMutex);
pthread_cond_broadcast(&timeThreadSleepCond); pthread_cond_broadcast(&timeThreadSleepCond);
pthread_mutex_unlock(&timeThreadSleepMutex); pthread_mutex_unlock(&timeThreadSleepMutex);
#endif
threadTIME.change(0); threadTIME.change(0);
} }
@@ -954,10 +957,11 @@ xprintf("[sectionsd] commandserviceChanged: Service change to " PRINTF_CHANNEL_I
#endif #endif
if (time_trigger_last != (messaging_current_servicekey & 0xFFFFFFFF0000ULL)) { if (time_trigger_last != (messaging_current_servicekey & 0xFFFFFFFF0000ULL)) {
time_trigger_last = messaging_current_servicekey & 0xFFFFFFFF0000ULL; time_trigger_last = messaging_current_servicekey & 0xFFFFFFFF0000ULL;
#if 0
pthread_mutex_lock(&timeThreadSleepMutex); pthread_mutex_lock(&timeThreadSleepMutex);
pthread_cond_broadcast(&timeThreadSleepCond); pthread_cond_broadcast(&timeThreadSleepCond);
pthread_mutex_unlock(&timeThreadSleepMutex); pthread_mutex_unlock(&timeThreadSleepMutex);
#endif
threadTIME.setCurrentService(messaging_current_servicekey); threadTIME.setCurrentService(messaging_current_servicekey);
} }
} }
@@ -1113,18 +1117,22 @@ static void commandSetConfig(int connfd, char *data, const unsigned /*dataLength
} }
if (time_wakeup) { if (time_wakeup) {
pthread_mutex_lock(&timeThreadSleepMutex); //pthread_mutex_lock(&timeThreadSleepMutex);
ntpserver = (std::string)&data[sizeof(struct sectionsd::commandSetConfig)]; ntpserver = (std::string)&data[sizeof(struct sectionsd::commandSetConfig)];
dprintf("new network_ntpserver = %s\n", ntpserver.c_str()); dprintf("new network_ntpserver = %s\n", ntpserver.c_str());
ntp_system_cmd = ntp_system_cmd_prefix + ntpserver; ntp_system_cmd = ntp_system_cmd_prefix + ntpserver;
ntprefresh = pmsg->network_ntprefresh; ntprefresh = pmsg->network_ntprefresh;
ntpenable = (pmsg->network_ntpenable == 1); ntpenable = (pmsg->network_ntpenable == 1);
#if 0
if (timeset) { if (timeset) {
// wake up time thread // wake up time thread
pthread_cond_broadcast(&timeThreadSleepCond); pthread_cond_broadcast(&timeThreadSleepCond);
} }
pthread_mutex_unlock(&timeThreadSleepMutex); pthread_mutex_unlock(&timeThreadSleepMutex);
#endif
if(timeset)
threadTIME.change(1);
} }
epg_dir= (std::string)&data[sizeof(struct sectionsd::commandSetConfig) + strlen(&data[sizeof(struct sectionsd::commandSetConfig)]) + 1]; epg_dir= (std::string)&data[sizeof(struct sectionsd::commandSetConfig) + strlen(&data[sizeof(struct sectionsd::commandSetConfig)]) + 1];
@@ -1332,26 +1340,50 @@ CTimeThread::CTimeThread()
time_ntp = false; time_ntp = false;
}; };
void CTimeThread::sendTimeEvent(bool dvb, time_t tim) void CTimeThread::sendTimeEvent(bool ntp, time_t tim)
{ {
time_t actTime = time(NULL); time_t actTime = time(NULL);
if (dvb) { if (!ntp) {
struct tm *tmTime = localtime(&actTime); struct tm *tmTime = localtime(&actTime);
xprintf("[%sThread] - current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", "time", xprintf("%s: current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", name.c_str(),
tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim)); tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900, tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim));
actTime = tim; actTime = tim;
} }
pthread_mutex_lock(&timeIsSetMutex);
timeset = true;
pthread_cond_broadcast(&timeIsSetCond);
pthread_mutex_unlock(&timeIsSetMutex );
eventServer->sendEvent(CSectionsdClient::EVT_TIMESET, CEventServer::INITID_SECTIONSD, &actTime, sizeof(actTime) ); eventServer->sendEvent(CSectionsdClient::EVT_TIMESET, CEventServer::INITID_SECTIONSD, &actTime, sizeof(actTime) );
setTimeSet();
}
void CTimeThread::setTimeSet()
{
time_mutex.lock();
timeset = true;
time_cond.broadcast();
time_mutex.unlock();
}
void CTimeThread::waitForTimeset(void)
{
time_mutex.lock();
while(!timeset)
time_cond.wait(&time_mutex);
time_mutex.unlock();
} }
void CTimeThread::setSystemTime(time_t tim) void CTimeThread::setSystemTime(time_t tim)
{ {
struct timeval tv; struct timeval tv;
if ((!messaging_neutrino_sets_time) && (geteuid() == 0)) {
time_t now = time(NULL);
struct tm *tmTime = localtime(&now);
gettimeofday(&tv, NULL);
timediff = tim * (int64_t)1000000 - (tv.tv_usec + tv.tv_sec * (int64_t)1000000);
xprintf("%s: timediff %lld, current: %02d.%02d.%04d %02d:%02d:%02d, dvb: %s", name.c_str(), timediff,
tmTime->tm_mday, tmTime->tm_mon+1, tmTime->tm_year+1900,
tmTime->tm_hour, tmTime->tm_min, tmTime->tm_sec, ctime(&tim));
if (!messaging_neutrino_sets_time) {
tv.tv_sec = tim; tv.tv_sec = tim;
tv.tv_usec = 0; tv.tv_usec = 0;
if (settimeofday(&tv, NULL) < 0) if (settimeofday(&tv, NULL) < 0)
@@ -1367,10 +1399,8 @@ void CTimeThread::addFilters()
void CTimeThread::run() void CTimeThread::run()
{ {
//struct timespec restartWait;
//struct timeval now;
time_t dvb_time = 0; time_t dvb_time = 0;
xprintf("[%sThread] pid %d (%lu) start\n", "time", getpid(), pthread_self()); xprintf("%s::run:: starting, pid %d (%lu)\n", name.c_str(), getpid(), pthread_self());
addFilters(); addFilters();
DMX::start(); DMX::start();
@@ -1389,23 +1419,24 @@ void CTimeThread::run()
#endif #endif
if (!running) if (!running)
break; break;
sendToSleepNow = false;
} }
bool success = false; bool success = false;
time_ntp = false; time_ntp = false;
dvb_time = 0; dvb_time = 0;
timediff = 0;
if (ntpenable && system( ntp_system_cmd.c_str() ) == 0) { if (ntpenable && system( ntp_system_cmd.c_str() ) == 0) {
time_ntp = true; time_ntp = true;
success = true; success = true;
} else if (dvb_time_update) { } else if (dvb_time_update) {
xprintf("timeThread: getting time\n");
if(!first_time) if(!first_time)
change(1); change(1);
else
change(0);
xprintf("timeThread: getting DVB time (isOpen %d)\n", isOpen());
int rc = dmx->Read(static_buf, MAX_SECTION_LENGTH, timeoutInMSeconds); int rc = dmx->Read(static_buf, MAX_SECTION_LENGTH, timeoutInMSeconds);
xprintf("timeThread: getting time done : %d messaging_neutrino_sets_time %d\n", rc, messaging_neutrino_sets_time); xprintf("timeThread: getting DVB time done : %d messaging_neutrino_sets_time %d\n", rc, messaging_neutrino_sets_time);
if (rc > 0) { if (rc > 0) {
SIsectionTIME st(static_buf); SIsectionTIME st(static_buf);
if (st.is_parsed()) { if (st.is_parsed()) {
@@ -1419,8 +1450,9 @@ void CTimeThread::run()
if(success) { if(success) {
if(dvb_time) { if(dvb_time) {
setSystemTime(dvb_time); setSystemTime(dvb_time);
/* retry a second time immediately after start, to get TOT ? */
if(first_time) if(first_time)
sleep_time = 5; /* retry a second time immediately */ sleep_time = 5;
} }
sendTimeEvent(time_ntp, dvb_time); sendTimeEvent(time_ntp, dvb_time);
@@ -1428,8 +1460,11 @@ void CTimeThread::run()
time_ntp ? "NTP" : first_time ? "DVB (TDT)" : "DVB (TOT)", sleep_time); time_ntp ? "NTP" : first_time ? "DVB (TDT)" : "DVB (TOT)", sleep_time);
first_time = false; first_time = false;
} else { } else {
xprintf("%s: Time set FAILED", name.c_str()); xprintf("%s: Time set FAILED (enabled: ntp %d dvb %d)\n", name.c_str(), ntpenable, dvb_time_update);
if(!timeset && first_time)
sleep_time = 1;
} }
sendToSleepNow = true;
#if 0 #if 0
if (timeset && dvb_time_update) { if (timeset && dvb_time_update) {
@@ -1464,6 +1499,9 @@ void CTimeThread::run()
break; break;
xprintf("timeThread: going to sleep for %d sec\n\n", seconds); xprintf("timeThread: going to sleep for %d sec\n\n", seconds);
struct timespec restartWait;
struct timeval now;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
TIMEVAL_TO_TIMESPEC(&now, &restartWait); TIMEVAL_TO_TIMESPEC(&now, &restartWait);
restartWait.tv_sec += seconds; restartWait.tv_sec += seconds;
@@ -1525,8 +1563,9 @@ void CSectionThread::run()
addFilters(); addFilters();
if (wait_for_time) { if (wait_for_time) {
waitForTimeset(); threadTIME.waitForTimeset();
xprintf("%s::run:: time set.\n", name.c_str()); time_t now = time(NULL);
xprintf("%s::run:: time set: %s", name.c_str(), ctime(&now));
} }
DMX::start(); DMX::start();
@@ -2164,40 +2203,52 @@ printf("SIevent size: %d\n", sizeof(SIevent));
printf("[sectionsd] stopping...\n"); printf("[sectionsd] stopping...\n");
//scanning = 0; //scanning = 0;
threadEIT.StopRun();
threadCN.StopRun();
threadTIME.StopRun();
xprintf("broadcasting...\n");
#if 0
timeset = true; timeset = true;
printf("broadcasting...\n");
pthread_mutex_lock(&timeIsSetMutex); pthread_mutex_lock(&timeIsSetMutex);
pthread_cond_broadcast(&timeIsSetCond); pthread_cond_broadcast(&timeIsSetCond);
pthread_mutex_unlock(&timeIsSetMutex); pthread_mutex_unlock(&timeIsSetMutex);
#endif
threadTIME.setTimeSet();
#if 0
pthread_mutex_lock(&timeThreadSleepMutex); pthread_mutex_lock(&timeThreadSleepMutex);
pthread_cond_broadcast(&timeThreadSleepCond); pthread_cond_broadcast(&timeThreadSleepCond);
pthread_mutex_unlock(&timeThreadSleepMutex); pthread_mutex_unlock(&timeThreadSleepMutex);
#endif
printf("pausing...\n"); xprintf("pausing...\n");
pthread_cancel(threadHouseKeeping); pthread_cancel(threadHouseKeeping);
//if (dmxUTC) dmxUTC->Stop(); //if (dmxUTC) dmxUTC->Stop();
#if 0
xprintf("cancel TOT\n");
//threadTIME.cancel();
//pthread_cancel(threadTOT); //pthread_cancel(threadTOT);
printf("join TOT\n"); #endif
xprintf("join TOT\n");
//pthread_join(threadTOT, NULL); //pthread_join(threadTOT, NULL);
threadTIME.Stop(); threadTIME.Stop();
//if (dmxUTC) delete dmxUTC; //if (dmxUTC) delete dmxUTC;
printf("join EIT\n"); xprintf("join EIT\n");
threadEIT.Stop(); threadEIT.Stop();
printf("join CN\n"); xprintf("join CN\n");
threadCN.Stop(); threadCN.Stop();
#ifdef ENABLE_SDT #ifdef ENABLE_SDT
printf("join SDT\n"); xprintf("join SDT\n");
threadSDT.Stop(); threadSDT.Stop();
#endif #endif
#ifdef ENABLE_FREESATEPG #ifdef ENABLE_FREESATEPG
printf("join FSEIT\n"); xprintf("join FSEIT\n");
threadFSEIT.Stop(); threadFSEIT.Stop();
#endif #endif
printf("[sectionsd] ended\n"); printf("[sectionsd] ended\n");