rework streaming part1

Origin commit data
------------------
Commit: 878418d988
Author: TangoCash <eric@loxat.de>
Date: 2022-10-05 (Wed, 05 Oct 2022)
This commit is contained in:
TangoCash
2022-10-05 21:48:04 +02:00
committed by vanhofen
parent 578748ad59
commit f2e22001d7
6 changed files with 118 additions and 84 deletions

View File

@@ -70,7 +70,7 @@
#define DMX_BUFFER_SIZE (2048*TS_SIZE)
#define IN_SIZE (250*TS_SIZE)
CStreamInstance::CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &_pids, bool _send_raw)
CStreamInstance::CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &_pids)
{
printf("CStreamInstance:: new channel %" PRIx64 " fd %d\n", chid, clientfd);
fds.insert(clientfd);
@@ -80,7 +80,7 @@ CStreamInstance::CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t
dmx = NULL;
buf = NULL;
frontend = NULL;
send_raw = _send_raw;
is_e2_stream = false;
}
CStreamInstance::~CStreamInstance()
@@ -172,7 +172,7 @@ bool CStreamInstance::Open()
if (!tmpchan)
return false;
dmx = new cDemux(tmpchan->getRecordDemux());//FIXME
dmx = new cDemux(tmpchan->getStreamDemux());//FIXME
if(!dmx)
return false;
return dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE);
@@ -195,7 +195,7 @@ void CStreamInstance::run()
dmx->Start(true);
if (!send_raw)
//if (!g_settings.stream_raw && is_e2_stream)
CCamManager::getInstance()->Start(channel_id, CCamManager::STREAM);
#if HAVE_ARM_HARDWARE || HAVE_MIPS_HARDWARE
@@ -212,7 +212,7 @@ void CStreamInstance::run()
Send(r);
}
if (!send_raw)
//if (!g_settings.stream_raw && is_e2_stream)
CCamManager::getInstance()->Stop(channel_id, CCamManager::STREAM);
#if HAVE_ARM_HARDWARE || HAVE_MIPS_HARDWARE
@@ -307,6 +307,7 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
{
std::set<CFrontend*> frontends;
CFrontend * frontend = NULL;
CFEManager::getInstance()->Open();
t_channel_id chid = channel->getChannelID();
if (CRecordManager::getInstance()->RecordingStatus(chid)) {
@@ -314,8 +315,13 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
return frontend;
}
t_channel_id live_channel_id = CZapit::getInstance()->GetCurrentChannelID();
CFrontend *live_fe = CZapit::getInstance()->GetLiveFrontend();
t_channel_id live_channel_id = 0;
CFrontend *live_fe = NULL;
if (CNeutrinoApp::getInstance()->getMode() != NeutrinoModes::mode_standby) {
live_channel_id = CZapit::getInstance()->GetCurrentChannelID();
live_fe = CZapit::getInstance()->GetLiveFrontend();
}
if (live_channel_id == chid)
return live_fe;
@@ -325,7 +331,8 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
bool unlock = false;
if (!IS_WEBCHAN(live_channel_id)) {
unlock = true;
CFEManager::getInstance()->lockFrontend(live_fe);
if (live_fe)
CFEManager::getInstance()->lockFrontend(live_fe);
}
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
@@ -339,14 +346,15 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
if (unlock && frontend == NULL) {
unlock = false;
CFEManager::getInstance()->unlockFrontend(live_fe);
if (live_fe)
CFEManager::getInstance()->unlockFrontend(live_fe);
frontend = CFEManager::getInstance()->allocateFE(channel, true);
}
CFEManager::getInstance()->Unlock();
if (frontend) {
bool found = (live_fe != frontend) || IS_WEBCHAN(live_channel_id) || SAME_TRANSPONDER(live_channel_id, chid);
bool found = (live_fe != NULL && live_fe != frontend) || IS_WEBCHAN(live_channel_id) || SAME_TRANSPONDER(live_channel_id, chid);
bool ret = false;
if (found)
ret = zapit.zapTo_record(chid) > 0;
@@ -357,7 +365,7 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
#if ENABLE_PIP
/* FIXME until proper demux management */
t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID();
if ((pip_channel_id == chid) && (channel->getRecordDemux() == channel->getPipDemux()))
if ((pip_channel_id == chid) && (channel->getStreamDemux() == channel->getPipDemux()))
zapit.stopPip();
#endif
} else {
@@ -370,9 +378,9 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
CFEManager::getInstance()->unlockFrontend(*ft);
#if HAVE_ARM_HARDWARE || HAVE_MIPS_HARDWARE
if (unlock && !frontend)
if (unlock && !frontend && live_fe)
#else
if (unlock)
if (unlock && live_fe)
#endif
CFEManager::getInstance()->unlockFrontend(live_fe);
@@ -380,7 +388,7 @@ CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
return frontend;
}
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend, bool &send_raw)
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend, bool &is_e2)
{
char cbuf[512];
char *bp;
@@ -421,49 +429,51 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFro
return false;
}
chid = CZapit::getInstance()->GetCurrentChannelID();
CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel();
CFrontend *live_fe = NULL;
CZapitChannel * channel = NULL;
if (CNeutrinoApp::getInstance()->getMode() != NeutrinoModes::mode_standby) {
chid = CZapit::getInstance()->GetCurrentChannelID();
channel = CZapit::getInstance()->GetCurrentChannel();
}
t_channel_id tmpid = 0;
CZapitChannel *tmp_channel = NULL;
bp = &cbuf[5];
if (sscanf(bp, "id=%" SCNx64, &tmpid) == 1) {
channel = CServiceManager::getInstance()->FindChannel(tmpid);
chid = tmpid;
}
int tmpraw = 0;
bp = &cbuf[25];
if (sscanf(bp, "raw=%d", &tmpraw) == 1)
if (strstr(bp,"id="))
{
send_raw = (tmpraw > 0);
if (sscanf(bp, "id=%" SCNx64, &tmpid) == 1) {
tmp_channel = CServiceManager::getInstance()->FindChannel(tmpid);
if (tmp_channel)
{
printf("CStreamManager::Parse:N: channel_id %" PRIx64 " [%s] \n", tmp_channel->getChannelID(), tmp_channel->getName().c_str());
channel = tmp_channel;
chid = tmp_channel->getChannelID();
}
}
}
if (!tmpid)
else
{
bp = &cbuf[5];
u_int service;
u_int i1, i2, i3, i4, satpos;
if (sscanf(bp, "%X:0:%X:%X:%X:%X:%X:0:0:0:", &service, &i1, &i2, &i3, &i4, &satpos) == 6)
{
t_channel_id tmpchid = (((t_channel_id)i3) << 32) | (((t_channel_id)i4) << 16) | (t_channel_id)i2;
CZapitChannel *tmp_channel = CServiceManager::getInstance()->FindChannel48(tmpchid);
tmpid = (((t_channel_id)i3) << 32) | (((t_channel_id)i4) << 16) | (t_channel_id)i2;
tmp_channel = CServiceManager::getInstance()->FindChannel48(tmpid);
if (tmp_channel)
{
printf("e2 -> n chid:%" SCNx64 "\n", tmp_channel->getChannelID());
printf("CStreamManager::Parse:E: channel_id %" PRIx64 " [%s] \n", tmp_channel->getChannelID(), tmp_channel->getName().c_str());
channel = tmp_channel;
chid = tmp_channel->getChannelID();
send_raw = true;
is_e2 = true;
}
}
}
if (!channel)
return false;
printf("CStreamManager::Parse: channel_id %" PRIx64 " [%s] send %s\n", chid, channel->getName().c_str(), send_raw ? "raw" : "decrypted");
streammap_iterator_t it = streams.find(chid);
if (it != streams.end())
{
@@ -480,53 +490,54 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFro
return false;
}
AddPids(fd, channel, pids, send_raw);
PreparePids(channel, pids);
return !pids.empty();
}
void CStreamManager::AddPids(int fd, CZapitChannel *channel, stream_pids_t &pids, bool send_raw)
void CStreamManager::PreparePids(CZapitChannel *channel, stream_pids_t &pids)
{
// avoid compiler warning
(void) fd;
pids.clear();
if (pids.empty()) {
printf("CStreamManager::AddPids: searching channel %" PRIx64 " pids\n", channel->getChannelID());
if (channel->getVideoPid())
{
pids.insert(channel->getVideoPid());
printf("CStreamManager::AddPids: vpid 0x%04x \n", channel->getVideoPid());
}
for (int i = 0; i < channel->getAudioChannelCount(); i++)
{
pids.insert(channel->getAudioChannel(i)->pid);
printf("CStreamManager::AddPids: apid 0x%04x \n", channel->getAudioChannel(i)->pid);
}
if (!channel->capids.empty() && send_raw)
{
for(casys_pids_iterator_t it = channel->capids.begin(); it != channel->capids.end(); ++it)
{
pids.insert((*it)); //all ECM Pids
printf("CStreamManager::AddPids: capid 0x%04x \n", (*it));
}
}
pids.insert(0); //PAT
printf("CStreamManager::AddPids: PATpid 0x%04x \n", 0);
pids.insert(channel->getPmtPid()); //PMT
printf("CStreamManager::AddPids: PMTpid 0x%04x \n", channel->getPmtPid());
pids.insert(0x14); //TDT
printf("CStreamManager::AddPids: TDTpid 0x%04x \n", 0x14);
pids.insert(0); //PAT
printf("CStreamManager::PreparePids: PATpid 0x%04x \n", 0);
pids.insert(channel->getPmtPid()); //PMT
printf("CStreamManager::PreparePids: PMTpid 0x%04x \n", channel->getPmtPid());
pids.insert(0x14); //TDT
printf("CStreamManager::PreparePids: TDTpid 0x%04x \n", 0x14);
printf("CStreamManager::PreparePids: searching channel %" PRIx64 " pids\n", channel->getChannelID());
if (channel->getVideoPid())
{
pids.insert(channel->getVideoPid());
printf("CStreamManager::PreparePids: vpid 0x%04x \n", channel->getVideoPid());
}
for (int i = 0; i < channel->getAudioChannelCount(); i++)
{
pids.insert(channel->getAudioChannel(i)->pid);
printf("CStreamManager::PreparePids: apid 0x%04x \n", channel->getAudioChannel(i)->pid);
}
#if 0
if (!channel->capids.empty() && g_settings.stream_raw && is_e2_stream)
{
for(casys_pids_iterator_t it = channel->capids.begin(); it != channel->capids.end(); ++it)
{
pids.insert((*it)); //all ECM Pids
printf("CStreamManager::PreparePids: capid 0x%04x \n", (*it));
}
}
#endif
//add pcr pid
if (channel->getPcrPid() && (channel->getPcrPid() != channel->getVideoPid())) {
pids.insert(channel->getPcrPid());
printf("CStreamManager::AddPids: PCRpid 0x%04x \n", channel->getPcrPid());
printf("CStreamManager::PreparePids: PCRpid 0x%04x \n", channel->getPcrPid());
}
//add teletext pid
if (channel->getTeletextPid() != 0) {
pids.insert(channel->getTeletextPid());
printf("CStreamManager::AddPids: Teletext pid 0x%04x \n", channel->getTeletextPid());
printf("CStreamManager::PreparePids: Teletext pid 0x%04x \n", channel->getTeletextPid());
}
//add dvb sub pid
if ((int)channel->getSubtitleCount() > 0) {
@@ -535,7 +546,7 @@ void CStreamManager::AddPids(int fd, CZapitChannel *channel, stream_pids_t &pids
if (s->thisSubType == CZapitAbsSub::DVB) {
CZapitDVBSub* sd = reinterpret_cast<CZapitDVBSub*>(s);
pids.insert(sd->pId);
printf("CStreamManager::AddPids: Subtitle pid 0x%04x \n", sd->pId);
printf("CStreamManager::PreparePids: Subtitle pid 0x%04x \n", sd->pId);
}
}
}
@@ -545,13 +556,13 @@ void CStreamManager::AddPids(int fd, CZapitChannel *channel, stream_pids_t &pids
bool CStreamManager::AddClient(int connfd)
{
stream_pids_t pids;
t_channel_id channel_id;
CFrontend *frontend;
bool send_raw;
pids.clear(); // to catch and stream all pids later !
pids.clear();
if (Parse(connfd, pids, channel_id, frontend, send_raw)) {
t_channel_id channel_id = 0;
CFrontend *frontend = NULL;
bool is_e2 = false;
if (Parse(connfd, pids, channel_id, frontend, is_e2)) {
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
streammap_iterator_t it = streams.find(channel_id);
if (it != streams.end()) {
@@ -561,8 +572,9 @@ bool CStreamManager::AddClient(int connfd)
if (IS_WEBCHAN(channel_id)) {
stream = new CStreamStream(connfd, channel_id, pids);
} else {
stream = new CStreamInstance(connfd, channel_id, pids, send_raw);
stream = new CStreamInstance(connfd, channel_id, pids);
stream->frontend = frontend;
stream->is_e2_stream = is_e2;
}
int sendsize = 10*IN_SIZE;
@@ -640,13 +652,12 @@ void CStreamManager::run()
continue;
}
g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_START, 0);
if (!AddClient(connfd))
{
close(connfd);
g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_STOP, 0);
}
else
g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_START, 0);
poll_timeout = 1000;
} else {
if (pfd[i].revents & (POLLHUP | POLLRDHUP)) {
@@ -764,7 +775,7 @@ _error:
}
CStreamStream::CStreamStream(int clientfd, t_channel_id chid, stream_pids_t &_pids)
: CStreamInstance(clientfd, chid, _pids, false)
: CStreamInstance(clientfd, chid, _pids)
{
ifcx = NULL;
ofcx = NULL;