eitd/dmx.cpp: comment request_pause/unpause;

move global myDMXOrderUniqueKey to DMX class;
remove wakeup from dropCachedSectionIDs;
add seen_section flag to prevent change filter on timeouts - until skipped loop or complete;
enable check complete for CN thread
This commit is contained in:
[CST] Focus
2012-02-21 19:56:45 +04:00
parent a304b9562b
commit 2942a55854
2 changed files with 70 additions and 37 deletions

View File

@@ -46,10 +46,11 @@
/* /*
#define DEBUG_MUTEX 1 #define DEBUG_MUTEX 1
#define DEBUG_CACHED_SECTIONS 1 #define DEBUG_CACHED_SECTIONS 1
#define DEBUG_COMPLETE 1 #define DEBUG_COMPLETE_SECTIONS 1
*/ */
#define DEBUG_COMPLETE 1
static MyDMXOrderUniqueKey myDMXOrderUniqueKey; //static MyDMXOrderUniqueKey myDMXOrderUniqueKey;
DMX::DMX(const unsigned short p, const unsigned short bufferSizeInKB, const bool c, int dmx_source) DMX::DMX(const unsigned short p, const unsigned short bufferSizeInKB, const bool c, int dmx_source)
{ {
@@ -92,6 +93,7 @@ void DMX::init()
pthread_mutex_init(&start_stop_mutex, NULL); // default = fast mutex pthread_mutex_init(&start_stop_mutex, NULL); // default = fast mutex
#endif #endif
pthread_cond_init (&change_cond, NULL); pthread_cond_init (&change_cond, NULL);
seen_section = false;
} }
DMX::~DMX() DMX::~DMX()
@@ -113,6 +115,7 @@ void DMX::close(void)
void DMX::closefd(void) void DMX::closefd(void)
{ {
xprintf(" %s: DMX::closefd, isOpen %d\n", name.c_str(), isOpen());
if (isOpen()) if (isOpen())
{ {
//close(fd); //close(fd);
@@ -219,20 +222,39 @@ bool DMX::cache_section(sections_id_t s_id, uint8_t number, uint8_t last, uint8_
{ {
if ( i == number ) if ( i == number )
{ {
for (int x=i; x <= segment_last; ++x) for (int x = i; x <= segment_last; ++x)
calcedSections.insert((sections_id_t) tmpval | (sections_id_t) (x&0xFF)); calcedSections.insert((sections_id_t) tmpval | (sections_id_t) (x&0xFF));
} }
else else
calcedSections.insert((sections_id_t) tmpval | (sections_id_t)(i&0xFF)); calcedSections.insert((sections_id_t) tmpval | (sections_id_t)(i&0xFF));
} }
#ifdef DEBUG_COMPLETE #ifdef DEBUG_COMPLETE_SECTIONS
printf("[cache] section for table 0x%02x sid 0x%04x section 0x%02x last 0x%02x slast 0x%02x seen %d calc %d\n", printf(" [%s cache] new section for table 0x%02x sid 0x%04x section 0x%02x last 0x%02x slast 0x%02x seen %d calc %d\n", name.c_str(),
(int)(s_id >> 56), (int) ((s_id >> 40) & 0xFFFF), (int)(s_id & 0xFF), last, (int)(s_id >> 56), (int) ((s_id >> 40) & 0xFFFF), (int)(s_id & 0xFF), last,
segment_last, seenSections.size(), calcedSections.size()); segment_last, seenSections.size(), calcedSections.size());
#endif #endif
} }
#ifdef DEBUG_COMPLETE_SECTIONS
else {
printf(" [%s cache] old section for table 0x%02x sid 0x%04x section 0x%02x last 0x%02x slast 0x%02x seen %d calc %d\n", name.c_str(),
(int)(s_id >> 56), (int) ((s_id >> 40) & 0xFFFF), (int)(s_id & 0xFF), last,
segment_last, seenSections.size(), calcedSections.size());
}
#endif
if(seenSections == calcedSections) { if(seenSections == calcedSections) {
printf("[sectionsd] cache %02x complete: %d\n", filters[filter_index].filter, seenSections.size()); #ifdef DEBUG_COMPLETE
xcprintf(" %s cache %02x complete: %d", name.c_str(), filters[filter_index].filter, seenSections.size());
#endif
/* FIXME this algo fail sometimes:
* [cnThread cache] new section for table 0x4e sid 0x0a39 section 0x00 last 0x00 slast 0x00 seen 1 calc 1
* 16:18:25.136 cnThread cache 4e complete: 1
* [cnThread cache] new section for table 0x4e sid 0x0a29 section 0x00 last 0x00 slast 0x00 seen 2 calc 2
* 16:18:25.214 cnThread cache 4e complete: 2
* [cnThread cache] new section for table 0x4e sid 0x0a2a section 0x00 last 0x00 slast 0x00 seen 3 calc 3
* 16:18:25.220 cnThread cache 4e complete: 3
* [cnThread cache] new section for table 0x4e sid 0x0a2b section 0x00 last 0x00 slast 0x00 seen 4 calc 4
*/
if(seenSections.size() > 10)
ret = true; ret = true;
} }
unlock(); unlock();
@@ -271,6 +293,7 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
if (rc <= 0) if (rc <= 0)
{ {
dprintf("dmx.read timeout - filter: %x - timeout# %d\n", filters[filter_index].filter, timeouts); dprintf("dmx.read timeout - filter: %x - timeout# %d\n", filters[filter_index].filter, timeouts);
if(!seen_section)
timeouts++; timeouts++;
} }
else else
@@ -298,7 +321,7 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
if (rc != section_length + 3) if (rc != section_length + 3)
{ {
xprintf("rc != section_length + 3 (%d != %d + 3)\n", rc, section_length); xprintf(" %s: rc != section_length + 3 (%d != %d + 3)\n", name.c_str(), rc, section_length);
unlock(); unlock();
// DMX restart required? This should never happen anyway. // DMX restart required? This should never happen anyway.
real_pause(); real_pause();
@@ -310,13 +333,14 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
// check if the filter worked correctly // check if the filter worked correctly
if (((table_id ^ filters[filter_index].filter) & filters[filter_index].mask) != 0) if (((table_id ^ filters[filter_index].filter) & filters[filter_index].mask) != 0)
{ {
printf("[sectionsd] filter 0x%x mask 0x%x -> skip sections for table 0x%x\n", filters[filter_index].filter, filters[filter_index].mask, table_id); xprintf(" %s: filter 0x%x mask 0x%x -> skip sections for table 0x%x\n", name.c_str(), filters[filter_index].filter, filters[filter_index].mask, table_id);
unlock(); unlock();
real_pause(); real_pause();
real_unpause(); real_unpause();
return -1; return -1;
} }
/* there are channels with very low rate, neutrino change filter on timeouts while data not complete */
seen_section = true;
unlock(); unlock();
// skip sections which are too short // skip sections which are too short
if ((section_length < 5) || if ((section_length < 5) ||
@@ -333,6 +357,7 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
uint16_t eh_tbl_extension_id = section.getTableIdExtension(); uint16_t eh_tbl_extension_id = section.getTableIdExtension();
uint8_t version_number = section.getVersionNumber(); uint8_t version_number = section.getVersionNumber();
#if 0
/* if we are not caching the already read sections (CN-thread), check EIT version and get out */ /* if we are not caching the already read sections (CN-thread), check EIT version and get out */
if (!cache) if (!cache)
{ {
@@ -344,7 +369,8 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
} }
return rc; return rc;
} }
#endif
MyDMXOrderUniqueKey::iterator di;
unsigned short current_onid = 0; unsigned short current_onid = 0;
unsigned short current_tsid = 0; unsigned short current_tsid = 0;
@@ -363,8 +389,15 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
bool complete = cache_section(s_id, section_number, last_section_number, eit_extended_header->segment_last_section_number); bool complete = cache_section(s_id, section_number, last_section_number, eit_extended_header->segment_last_section_number);
/* if we are not caching the already read sections (CN-thread), check EIT version and get out */
if (table_id == 0x4e &&
eh_tbl_extension_id == (current_service & 0xFFFF) &&
version_number != eit_version) {
dprintf("EIT old: %d new version: %d\n", eit_version, version_number);
eit_version = version_number;
}
//find current section in list //find current section in list
MyDMXOrderUniqueKey::iterator di = myDMXOrderUniqueKey.find(s_id); di = myDMXOrderUniqueKey.find(s_id);
if (di != myDMXOrderUniqueKey.end()) if (di != myDMXOrderUniqueKey.end())
{ {
//the current section was read before //the current section was read before
@@ -406,8 +439,9 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
} }
//debug //debug
if(timeouts == -1) { if(timeouts == -1) {
printf("\n\n[sectionsd] skipped looped\n\n"); xcprintf(" %s: skipped looped", name.c_str());
} }
if(complete) { if(complete) {
lock(); lock();
seenSections.clear(); seenSections.clear();
@@ -415,7 +449,7 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
timeouts = -2; timeouts = -2;
unlock(); unlock();
} }
//if control comes to here the sections skipped counter must be restarted if(rc > 0)
first_skipped = 0; first_skipped = 0;
return rc; return rc;
@@ -423,9 +457,10 @@ int DMX::getSection(uint8_t *buf, const unsigned timeoutInMSeconds, int &timeout
int DMX::immediate_start(void) int DMX::immediate_start(void)
{ {
xprintf(" %s: DMX::immediate_start, isOpen %d\n", name.c_str(), isOpen());
if (isOpen()) if (isOpen())
{ {
xprintf(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>DMX::imediate_start: isOpen()<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n"); xprintf(" %s: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>DMX::imediate_start: isOpen()<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", name.c_str());
closefd(); closefd();
} }
@@ -469,6 +504,8 @@ int DMX::start(void)
lock(); lock();
lastChanged = time_monotonic();
rc = immediate_start(); rc = immediate_start();
unlock(); unlock();
@@ -514,6 +551,7 @@ int DMX::real_unpause(void)
return 0; return 0;
} }
#if 0
int DMX::request_pause(void) int DMX::request_pause(void)
{ {
real_pause(); // unlocked real_pause(); // unlocked
@@ -542,6 +580,7 @@ int DMX::request_unpause(void)
return 0; return 0;
} }
#endif
bool DMX::next_filter() bool DMX::next_filter()
{ {
@@ -572,25 +611,17 @@ int DMX::change(const int new_filter_index, const t_channel_id new_current_servi
filter_index = new_filter_index; filter_index = new_filter_index;
first_skipped = 0; first_skipped = 0;
#if 0
/* i have to think about this. This #if 0 now makes .change() automatically unpause the
* demux. No idea if there are negative side effects - we will find out :) -- seife
*/
if (!isOpen())
{
pthread_cond_signal(&change_cond);
unlock();
dprintf("DMX::change(%d): not open!\n",new_filter_index);
return 1;
}
#endif
eit_version = 0xff; eit_version = 0xff;
seenSections.clear(); seenSections.clear();
calcedSections.clear(); calcedSections.clear();
seen_section = false;
if(!cache)
myDMXOrderUniqueKey.clear();
if (new_current_service) if (new_current_service)
current_service = new_current_service; current_service = new_current_service;
xprintf("DMX::change: filter %02x current_service %016llx\n\n", filters[new_filter_index].filter, current_service);
xprintf(" %s: filter %02x current_service %016llx\n\n", name.c_str(), filters[filter_index].filter, current_service);
if (real_pauseCounter > 0) if (real_pauseCounter > 0)
{ {
@@ -682,11 +713,7 @@ int DMX::setCurrentService(t_channel_id new_current_service)
int DMX::dropCachedSectionIDs() int DMX::dropCachedSectionIDs()
{ {
lock(); lock();
myDMXOrderUniqueKey.clear(); myDMXOrderUniqueKey.clear();
pthread_cond_signal(&change_cond);
unlock(); unlock();
return 0; return 0;

View File

@@ -39,6 +39,7 @@
#include <zapit/client/zapittypes.h> #include <zapit/client/zapittypes.h>
#include <set> #include <set>
#include <map> #include <map>
#include <string>
typedef uint64_t sections_id_t; typedef uint64_t sections_id_t;
typedef unsigned char version_number_t; typedef unsigned char version_number_t;
@@ -60,6 +61,10 @@ protected:
unsigned char eit_version; unsigned char eit_version;
bool cache; /* should read sections be cached? true for all but dmxCN */ bool cache; /* should read sections be cached? true for all but dmxCN */
int real_pauseCounter; int real_pauseCounter;
std::string name;
/* flag to check if there was section for that table, if yes, dont increase timeouts */
bool seen_section;
inline bool isOpen(void) { inline bool isOpen(void) {
return (fd != -1); return (fd != -1);
@@ -73,6 +78,7 @@ protected:
section_map_t seenSections; section_map_t seenSections;
section_map_t calcedSections; section_map_t calcedSections;
MyDMXOrderUniqueKey myDMXOrderUniqueKey;
bool cache_section(sections_id_t sectionNo, uint8_t number, uint8_t last, uint8_t segment_last); bool cache_section(sections_id_t sectionNo, uint8_t number, uint8_t last, uint8_t segment_last);
public: public:
struct s_filters struct s_filters
@@ -101,10 +107,10 @@ public:
int real_pause(void); int real_pause(void);
int real_unpause(void); int real_unpause(void);
#if 0
int request_pause(void); int request_pause(void);
int request_unpause(void); int request_unpause(void);
#endif
int change(const int new_filter_index, const t_channel_id new_current_service = 0); // locks while changing int change(const int new_filter_index, const t_channel_id new_current_service = 0); // locks while changing
void lock(void); void lock(void);