mirror of
https://github.com/tuxbox-fork-migrations/recycled-ni-neutrino.git
synced 2025-08-28 16:01:10 +02:00
driver/streamts.cpp: add support to stream from different tuners
Origin commit data
------------------
Commit: ded947f05a
Author: [CST] Focus <focus.cst@gmail.com>
Date: 2014-03-26 (Wed, 26 Mar 2014)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
/*
|
||||
Neutrino-GUI - DBoxII-Project
|
||||
|
||||
Copyright (C) 2011-2012 CoolStream International Ltd
|
||||
Copyright (C) 2011-2014 CoolStream International Ltd
|
||||
|
||||
based on code which is
|
||||
Copyright (C) 2002 Andreas Oberritter <obi@tuxbox.org>
|
||||
@@ -60,9 +60,8 @@
|
||||
/* experimental mode:
|
||||
* stream not possible, if record running
|
||||
* pids in url ignored, and added from channel, with fake PAT/PMT
|
||||
* different channels supported, only from the same transponder - no zap is done,
|
||||
* different channels supported,
|
||||
* with url like http://coolstream:31339/id=c32400030070283e (channel id)
|
||||
* TODO: multi-tuner support
|
||||
*/
|
||||
#define ENABLE_MULTI_CHANNEL
|
||||
|
||||
@@ -114,23 +113,24 @@ bool CStreamInstance::Stop()
|
||||
|
||||
bool CStreamInstance::Send(ssize_t r)
|
||||
{
|
||||
mutex.lock();
|
||||
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
|
||||
int flags = 0;
|
||||
if (fds.size() > 1)
|
||||
flags = MSG_DONTWAIT;
|
||||
for (stream_fds_t::iterator it = fds.begin(); it != fds.end(); ++it) {
|
||||
int ret, i = 10;
|
||||
int i = 10;
|
||||
unsigned char *b = buf;
|
||||
ssize_t count = r;
|
||||
do {
|
||||
ret = send(*it, buf, r, MSG_DONTWAIT);
|
||||
#if 0
|
||||
if (ret != r)
|
||||
usleep(100);
|
||||
#endif
|
||||
} while ((ret != r) && (i-- > 0));
|
||||
if (ret != r) {
|
||||
if (r < 0)
|
||||
perror("send");
|
||||
printf("send err, fd %d: (%d from %d)\n", *it, ret, r);
|
||||
}
|
||||
int ret = send(*it, b, count, flags);
|
||||
if (ret > 0) {
|
||||
b += ret;
|
||||
count -= ret;
|
||||
}
|
||||
} while ((count > 0) && (i-- > 0));
|
||||
if (count)
|
||||
printf("send err, fd %d: (%d from %d)\n", *it, r-count, r);
|
||||
}
|
||||
mutex.unlock();
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -143,28 +143,23 @@ void CStreamInstance::Close()
|
||||
|
||||
void CStreamInstance::AddClient(int clientfd)
|
||||
{
|
||||
mutex.lock();
|
||||
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
|
||||
fds.insert(clientfd);
|
||||
printf("CStreamInstance::AddClient: %d (count %d)\n", clientfd, fds.size());
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
void CStreamInstance::RemoveClient(int clientfd)
|
||||
{
|
||||
mutex.lock();
|
||||
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
|
||||
fds.erase(clientfd);
|
||||
close(clientfd);
|
||||
printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size());
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
void CStreamInstance::run()
|
||||
{
|
||||
printf("CStreamInstance::run: %llx\n", channel_id);
|
||||
|
||||
#if 0
|
||||
dmx = new cDemux(STREAM_DEMUX);//FIXME
|
||||
#endif
|
||||
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id);
|
||||
if (!tmpchan)
|
||||
return;
|
||||
@@ -191,7 +186,7 @@ void CStreamInstance::run()
|
||||
|
||||
while (running) {
|
||||
ssize_t r = dmx->Read(buf, IN_SIZE, 100);
|
||||
if(r > 0)
|
||||
if (r > 0)
|
||||
Send(r);
|
||||
}
|
||||
|
||||
@@ -273,13 +268,83 @@ bool CStreamManager::SetPort(int newport)
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
|
||||
CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
|
||||
{
|
||||
std::set<CFrontend*> frontends;
|
||||
CFrontend * frontend = NULL;
|
||||
|
||||
t_channel_id chid = channel->getChannelID();
|
||||
if (CRecordManager::getInstance()->RecordingStatus(chid)) {
|
||||
printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", chid);
|
||||
return frontend;
|
||||
}
|
||||
|
||||
t_channel_id live_channel_id = CZapit::getInstance()->GetCurrentChannelID();
|
||||
CFrontend *live_fe = CZapit::getInstance()->GetLiveFrontend();
|
||||
|
||||
if (live_channel_id == chid)
|
||||
return live_fe;
|
||||
|
||||
CFEManager::getInstance()->Lock();
|
||||
|
||||
bool unlock = true;
|
||||
CFEManager::getInstance()->lockFrontend(live_fe);
|
||||
|
||||
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
|
||||
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it)
|
||||
frontends.insert(it->second->frontend);
|
||||
|
||||
for (std::set<CFrontend*>::iterator ft = frontends.begin(); ft != frontends.end(); ft++)
|
||||
CFEManager::getInstance()->lockFrontend(*ft);
|
||||
|
||||
frontend = CFEManager::getInstance()->allocateFE(channel, true);
|
||||
|
||||
if (frontend == NULL) {
|
||||
unlock = false;
|
||||
CFEManager::getInstance()->unlockFrontend(live_fe);
|
||||
frontend = CFEManager::getInstance()->allocateFE(channel, true);
|
||||
}
|
||||
|
||||
CFEManager::getInstance()->Unlock();
|
||||
|
||||
if (frontend) {
|
||||
bool found = (live_fe != frontend) || SAME_TRANSPONDER(live_channel_id, chid);
|
||||
bool ret = false;
|
||||
if (found)
|
||||
ret = zapit.zapTo_record(chid) > 0;
|
||||
else
|
||||
ret = zapit.zapTo_serviceID(chid) > 0;
|
||||
|
||||
if (ret) {
|
||||
#ifdef ENABLE_PIP
|
||||
/* FIXME until proper demux management */
|
||||
t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID();
|
||||
if ((pip_channel_id == chid) && (channel->getRecordDemux() == channel->getPipDemux()))
|
||||
zapit.stopPip();
|
||||
#endif
|
||||
} else {
|
||||
frontend = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
CFEManager::getInstance()->Lock();
|
||||
for (std::set<CFrontend*>::iterator ft = frontends.begin(); ft != frontends.end(); ft++)
|
||||
CFEManager::getInstance()->unlockFrontend(*ft);
|
||||
|
||||
if (unlock)
|
||||
CFEManager::getInstance()->unlockFrontend(live_fe);
|
||||
|
||||
CFEManager::getInstance()->Unlock();
|
||||
return frontend;
|
||||
}
|
||||
|
||||
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend)
|
||||
{
|
||||
char cbuf[512];
|
||||
char *bp;
|
||||
|
||||
FILE * fp = fdopen(fd, "r+");
|
||||
if(fp == NULL) {
|
||||
if (fp == NULL) {
|
||||
perror("fdopen");
|
||||
return false;
|
||||
}
|
||||
@@ -290,7 +355,7 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
|
||||
while (bp - &cbuf[0] < (int) sizeof(cbuf)) {
|
||||
unsigned char c;
|
||||
int res = read(fd, &c, 1);
|
||||
if(res < 0) {
|
||||
if (res < 0) {
|
||||
perror("read");
|
||||
return false;
|
||||
}
|
||||
@@ -313,68 +378,54 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
|
||||
return false;
|
||||
}
|
||||
|
||||
chid = CZapit::getInstance()->GetCurrentChannelID();
|
||||
CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel();
|
||||
|
||||
#ifndef ENABLE_MULTI_CHANNEL
|
||||
/* parse stdin / url path, start dmx filters */
|
||||
do {
|
||||
int pid;
|
||||
int res = sscanf(bp, "%x", &pid);
|
||||
if(res == 1) {
|
||||
printf("New pid: 0x%x\n", pid);
|
||||
if (res == 1) {
|
||||
printf("CStreamManager::Parse: pid: 0x%x\n", pid);
|
||||
pids.insert(pid);
|
||||
}
|
||||
} while ((bp = strchr(bp, ',')) && (bp++));
|
||||
#else
|
||||
t_channel_id tmpid;
|
||||
bp = &cbuf[5];
|
||||
if (sscanf(bp, "id=%llx", &tmpid) == 1) {
|
||||
channel = CServiceManager::getInstance()->FindChannel(tmpid);
|
||||
chid = tmpid;
|
||||
}
|
||||
while ((bp = strchr(bp, ',')) && (bp++));
|
||||
#endif
|
||||
if (!channel)
|
||||
return false;
|
||||
|
||||
chid = CZapit::getInstance()->GetCurrentChannelID();
|
||||
CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel();
|
||||
printf("CStreamManager::Parse: channel_id %llx [%s]\n", chid, channel->getName().c_str());
|
||||
|
||||
int mode = CNeutrinoApp::getInstance()->getMode();
|
||||
if (mode == NeutrinoMessages::mode_standby && streams.empty()) {
|
||||
printf("CStreamManager::Parse: wakeup zapit..\n");
|
||||
g_Zapit->setStandby(false);
|
||||
g_Zapit->getMode();
|
||||
frontend = FindFrontend(channel);
|
||||
if (!frontend) {
|
||||
printf("CStreamManager::Parse: no free frontend\n");
|
||||
return false;
|
||||
}
|
||||
if(pids.empty()) {
|
||||
#ifdef ENABLE_MULTI_CHANNEL
|
||||
t_channel_id tmpid;
|
||||
bp = &cbuf[5];
|
||||
if (sscanf(bp, "id=%llx", &tmpid) == 1) {
|
||||
printf("############################# channel_id %llx\n", tmpid);
|
||||
|
||||
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(tmpid);
|
||||
if (tmpchan && (tmpid != chid) && SAME_TRANSPONDER(tmpid, chid)) {
|
||||
printf("############################# channel_id %llx -> zap\n", tmpid);
|
||||
bool ret = g_Zapit->zapTo_record(tmpid) > 0;
|
||||
if (ret) {
|
||||
channel = tmpchan;
|
||||
chid = tmpid;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(CRecordManager::getInstance()->RecordingStatus(chid)) {
|
||||
printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", chid);
|
||||
return false;
|
||||
}
|
||||
#ifdef ENABLE_PIP
|
||||
t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID();
|
||||
if ((chid == pip_channel_id) && (channel->getRecordDemux() == channel->getPipDemux())) {
|
||||
printf("CStreamManager::Parse: channel %llx used for pip, aborting..\n", chid);
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
AddPids(fd, channel, pids);
|
||||
|
||||
printf("CStreamManager::Parse: no pids in url, using channel %llx pids\n", chid);
|
||||
if(!channel)
|
||||
return false;
|
||||
//pids.insert(0);
|
||||
//pids.insert(channel->getPmtPid());
|
||||
pids.insert(channel->getVideoPid());
|
||||
return !pids.empty();
|
||||
}
|
||||
|
||||
void CStreamManager::AddPids(int fd, CZapitChannel *channel, stream_pids_t &pids)
|
||||
{
|
||||
if (pids.empty()) {
|
||||
printf("CStreamManager::AddPids: no pids in url, using channel %llx pids\n", channel->getChannelID());
|
||||
if (channel->getVideoPid())
|
||||
pids.insert(channel->getVideoPid());
|
||||
for (int i = 0; i < channel->getAudioChannelCount(); i++)
|
||||
pids.insert(channel->getAudioChannel(i)->pid);
|
||||
|
||||
}
|
||||
|
||||
CGenPsi psi;
|
||||
for (stream_pids_t::iterator it = pids.begin(); it != pids.end(); ++it) {
|
||||
if (*it == channel->getVideoPid()) {
|
||||
@@ -385,9 +436,9 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
|
||||
if (*it == channel->getAudioChannel(i)->pid) {
|
||||
CZapitAudioChannel::ZapitAudioChannelType atype = channel->getAudioChannel(i)->audioChannelType;
|
||||
printf("CStreamManager::Parse: genpsi apid %x (%d)\n", *it, atype);
|
||||
if(channel->getAudioChannel(i)->audioChannelType == CZapitAudioChannel::EAC3){
|
||||
if (channel->getAudioChannel(i)->audioChannelType == CZapitAudioChannel::EAC3) {
|
||||
psi.addPid(*it, EN_TYPE_AUDIO_EAC3, atype, channel->getAudioChannel(i)->description.c_str());
|
||||
}else{
|
||||
} else {
|
||||
psi.addPid(*it, EN_TYPE_AUDIO, atype, channel->getAudioChannel(i)->description.c_str());
|
||||
}
|
||||
}
|
||||
@@ -395,40 +446,81 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
|
||||
}
|
||||
}
|
||||
//add pcr pid
|
||||
if(channel->getPcrPid() != channel->getVideoPid()){
|
||||
if (channel->getPcrPid() && (channel->getPcrPid() != channel->getVideoPid())) {
|
||||
pids.insert(channel->getPcrPid());
|
||||
psi.addPid(channel->getPcrPid(), EN_TYPE_PCR, 0);
|
||||
}
|
||||
//add teletext pid
|
||||
if (g_settings.recording_stream_vtxt_pid && channel->getTeletextPid() != 0){
|
||||
if (g_settings.recording_stream_vtxt_pid && channel->getTeletextPid() != 0) {
|
||||
pids.insert(channel->getTeletextPid());
|
||||
psi.addPid(channel->getTeletextPid(), EN_TYPE_TELTEX, 0, channel->getTeletextLang());
|
||||
}
|
||||
//add dvb sub pid
|
||||
if (g_settings.recording_stream_subtitle_pids){
|
||||
if (g_settings.recording_stream_subtitle_pids) {
|
||||
for (int i = 0 ; i < (int)channel->getSubtitleCount() ; ++i) {
|
||||
CZapitAbsSub* s = channel->getChannelSub(i);
|
||||
if (s->thisSubType == CZapitAbsSub::DVB) {
|
||||
if(i>9)//max sub pids
|
||||
if (i>9)//max sub pids
|
||||
break;
|
||||
|
||||
CZapitDVBSub* sd = reinterpret_cast<CZapitDVBSub*>(s);
|
||||
pids.insert(sd->pId);
|
||||
psi.addPid( sd->pId, EN_TYPE_DVBSUB, 0, sd->ISO639_language_code.c_str() );
|
||||
psi.addPid(sd->pId, EN_TYPE_DVBSUB, 0, sd->ISO639_language_code.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
psi.genpsi(fd);
|
||||
}
|
||||
|
||||
return !pids.empty();
|
||||
bool CStreamManager::AddClient(int connfd)
|
||||
{
|
||||
stream_pids_t pids;
|
||||
t_channel_id channel_id;
|
||||
CFrontend *frontend;
|
||||
|
||||
if (Parse(connfd, pids, channel_id, frontend)) {
|
||||
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
|
||||
streammap_iterator_t it = streams.find(channel_id);
|
||||
if (it != streams.end()) {
|
||||
it->second->AddClient(connfd);
|
||||
} else {
|
||||
CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids);
|
||||
stream->frontend = frontend;
|
||||
|
||||
int sendsize = 10*IN_SIZE;
|
||||
unsigned int m = sizeof(sendsize);
|
||||
setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m);
|
||||
if (stream->Start())
|
||||
streams.insert(streammap_pair_t(channel_id, stream));
|
||||
else
|
||||
delete stream;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void CStreamManager::RemoveClient(int fd)
|
||||
{
|
||||
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
|
||||
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) {
|
||||
if (it->second->HasFd(fd)) {
|
||||
CStreamInstance *stream = it->second;
|
||||
stream->RemoveClient(fd);
|
||||
if (stream->GetFds().empty()) {
|
||||
streams.erase(stream->GetChannelId());
|
||||
delete stream;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CStreamManager::run()
|
||||
{
|
||||
struct sockaddr_in servaddr;
|
||||
int clilen = sizeof(servaddr);;
|
||||
int clilen = sizeof(servaddr);
|
||||
|
||||
struct pollfd pfd[128];
|
||||
int poll_cnt;
|
||||
@@ -452,57 +544,33 @@ void CStreamManager::run()
|
||||
}
|
||||
mutex.unlock();
|
||||
//printf("polling, count= %d\n", poll_cnt);
|
||||
int pollres = poll (pfd, poll_cnt, 1000);
|
||||
if (pollres < 0) {
|
||||
perror("CStreamManager::run(): poll");
|
||||
int pollres = poll (pfd, poll_cnt, 10000);
|
||||
if (pollres <= 0) {
|
||||
if (pollres < 0)
|
||||
perror("CStreamManager::run(): poll");
|
||||
continue;
|
||||
}
|
||||
if(pollres == 0)
|
||||
continue;
|
||||
for (int i = poll_cnt - 1; i >= 0; i--) {
|
||||
if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) {
|
||||
printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents);
|
||||
if (pfd[i].fd == listenfd) {
|
||||
int connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen);
|
||||
int connfd = accept(listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen);
|
||||
printf("CStreamManager::run(): connection, fd %d\n", connfd);
|
||||
if(connfd < 0) {
|
||||
if (connfd < 0) {
|
||||
perror("CStreamManager::run(): accept");
|
||||
continue;
|
||||
}
|
||||
stream_pids_t pids;
|
||||
t_channel_id channel_id;
|
||||
if (Parse(connfd, pids, channel_id)) {
|
||||
mutex.lock();
|
||||
streammap_iterator_t it = streams.find(channel_id);
|
||||
if (it != streams.end()) {
|
||||
it->second->AddClient(connfd);
|
||||
} else {
|
||||
CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids);
|
||||
if (stream->Start())
|
||||
streams.insert(streammap_pair_t(channel_id, stream));
|
||||
else
|
||||
delete stream;
|
||||
}
|
||||
mutex.unlock();
|
||||
} else {
|
||||
#if 0
|
||||
if (!AddClient(connfd))
|
||||
close(connfd);
|
||||
}
|
||||
#endif
|
||||
g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_START, connfd);
|
||||
} else {
|
||||
if (pfd[i].revents & (POLLHUP | POLLRDHUP)) {
|
||||
printf("CStreamManager::run(): POLLHUP, fd %d\n", pfd[i].fd);
|
||||
mutex.lock();
|
||||
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) {
|
||||
if (it->second->HasFd(pfd[i].fd)) {
|
||||
CStreamInstance *stream = it->second;
|
||||
stream->RemoveClient(pfd[i].fd);
|
||||
if (stream->GetFds().empty()) {
|
||||
streams.erase(stream->GetChannelId());
|
||||
delete stream;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
mutex.unlock();
|
||||
RemoveClient(pfd[i].fd);
|
||||
if (streams.empty())
|
||||
g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_STOP, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -559,8 +627,6 @@ bool CStreamManager::Listen()
|
||||
{
|
||||
struct sockaddr_in socketAddr;
|
||||
int socketOptActive = 1;
|
||||
int sendsize = 10*IN_SIZE;
|
||||
unsigned int m = sizeof(sendsize);
|
||||
|
||||
if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||
fprintf (stderr, "network port %u open: ", port);
|
||||
@@ -590,12 +656,7 @@ bool CStreamManager::Listen()
|
||||
goto _error;
|
||||
}
|
||||
|
||||
#if 1
|
||||
setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m);
|
||||
sendsize = 0;
|
||||
getsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, &m);
|
||||
printf("CStreamManager::Listen: on %d, fd %d (%d)\n", port, listenfd, sendsize);
|
||||
#endif
|
||||
printf("CStreamManager::Listen: on %d, fd %d\n", port, listenfd);
|
||||
return true;
|
||||
_error:
|
||||
close (listenfd);
|
||||
|
Reference in New Issue
Block a user