driver/streamts.cpp: add support to stream from different tuners

This commit is contained in:
[CST] Focus
2014-03-26 14:29:44 +04:00
parent 63ce8f2a69
commit ded947f05a
2 changed files with 198 additions and 128 deletions

View File

@@ -1,7 +1,7 @@
/* /*
Neutrino-GUI - DBoxII-Project Neutrino-GUI - DBoxII-Project
Copyright (C) 2011-2012 CoolStream International Ltd Copyright (C) 2011-2014 CoolStream International Ltd
based on code which is based on code which is
Copyright (C) 2002 Andreas Oberritter <obi@tuxbox.org> Copyright (C) 2002 Andreas Oberritter <obi@tuxbox.org>
@@ -60,9 +60,8 @@
/* experimental mode: /* experimental mode:
* stream not possible, if record running * stream not possible, if record running
* pids in url ignored, and added from channel, with fake PAT/PMT * pids in url ignored, and added from channel, with fake PAT/PMT
* different channels supported, only from the same transponder - no zap is done, * different channels supported,
* with url like http://coolstream:31339/id=c32400030070283e (channel id) * with url like http://coolstream:31339/id=c32400030070283e (channel id)
* TODO: multi-tuner support
*/ */
#define ENABLE_MULTI_CHANNEL #define ENABLE_MULTI_CHANNEL
@@ -114,23 +113,24 @@ bool CStreamInstance::Stop()
bool CStreamInstance::Send(ssize_t r) bool CStreamInstance::Send(ssize_t r)
{ {
mutex.lock(); OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
int flags = 0;
if (fds.size() > 1)
flags = MSG_DONTWAIT;
for (stream_fds_t::iterator it = fds.begin(); it != fds.end(); ++it) { for (stream_fds_t::iterator it = fds.begin(); it != fds.end(); ++it) {
int ret, i = 10; int i = 10;
unsigned char *b = buf;
ssize_t count = r;
do { do {
ret = send(*it, buf, r, MSG_DONTWAIT); int ret = send(*it, b, count, flags);
#if 0 if (ret > 0) {
if (ret != r) b += ret;
usleep(100); count -= ret;
#endif }
} while ((ret != r) && (i-- > 0)); } while ((count > 0) && (i-- > 0));
if (ret != r) { if (count)
if (r < 0) printf("send err, fd %d: (%d from %d)\n", *it, r-count, r);
perror("send");
printf("send err, fd %d: (%d from %d)\n", *it, ret, r);
}
} }
mutex.unlock();
return true; return true;
} }
@@ -143,28 +143,23 @@ void CStreamInstance::Close()
void CStreamInstance::AddClient(int clientfd) void CStreamInstance::AddClient(int clientfd)
{ {
mutex.lock(); OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
fds.insert(clientfd); fds.insert(clientfd);
printf("CStreamInstance::AddClient: %d (count %d)\n", clientfd, fds.size()); printf("CStreamInstance::AddClient: %d (count %d)\n", clientfd, fds.size());
mutex.unlock();
} }
void CStreamInstance::RemoveClient(int clientfd) void CStreamInstance::RemoveClient(int clientfd)
{ {
mutex.lock(); OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
fds.erase(clientfd); fds.erase(clientfd);
close(clientfd); close(clientfd);
printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size()); printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size());
mutex.unlock();
} }
void CStreamInstance::run() void CStreamInstance::run()
{ {
printf("CStreamInstance::run: %llx\n", channel_id); printf("CStreamInstance::run: %llx\n", channel_id);
#if 0
dmx = new cDemux(STREAM_DEMUX);//FIXME
#endif
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id); CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id);
if (!tmpchan) if (!tmpchan)
return; return;
@@ -191,7 +186,7 @@ void CStreamInstance::run()
while (running) { while (running) {
ssize_t r = dmx->Read(buf, IN_SIZE, 100); ssize_t r = dmx->Read(buf, IN_SIZE, 100);
if(r > 0) if (r > 0)
Send(r); Send(r);
} }
@@ -273,13 +268,83 @@ bool CStreamManager::SetPort(int newport)
return ret; return ret;
} }
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel)
{
std::set<CFrontend*> frontends;
CFrontend * frontend = NULL;
t_channel_id chid = channel->getChannelID();
if (CRecordManager::getInstance()->RecordingStatus(chid)) {
printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", chid);
return frontend;
}
t_channel_id live_channel_id = CZapit::getInstance()->GetCurrentChannelID();
CFrontend *live_fe = CZapit::getInstance()->GetLiveFrontend();
if (live_channel_id == chid)
return live_fe;
CFEManager::getInstance()->Lock();
bool unlock = true;
CFEManager::getInstance()->lockFrontend(live_fe);
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it)
frontends.insert(it->second->frontend);
for (std::set<CFrontend*>::iterator ft = frontends.begin(); ft != frontends.end(); ft++)
CFEManager::getInstance()->lockFrontend(*ft);
frontend = CFEManager::getInstance()->allocateFE(channel, true);
if (frontend == NULL) {
unlock = false;
CFEManager::getInstance()->unlockFrontend(live_fe);
frontend = CFEManager::getInstance()->allocateFE(channel, true);
}
CFEManager::getInstance()->Unlock();
if (frontend) {
bool found = (live_fe != frontend) || SAME_TRANSPONDER(live_channel_id, chid);
bool ret = false;
if (found)
ret = zapit.zapTo_record(chid) > 0;
else
ret = zapit.zapTo_serviceID(chid) > 0;
if (ret) {
#ifdef ENABLE_PIP
/* FIXME until proper demux management */
t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID();
if ((pip_channel_id == chid) && (channel->getRecordDemux() == channel->getPipDemux()))
zapit.stopPip();
#endif
} else {
frontend = NULL;
}
}
CFEManager::getInstance()->Lock();
for (std::set<CFrontend*>::iterator ft = frontends.begin(); ft != frontends.end(); ft++)
CFEManager::getInstance()->unlockFrontend(*ft);
if (unlock)
CFEManager::getInstance()->unlockFrontend(live_fe);
CFEManager::getInstance()->Unlock();
return frontend;
}
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend)
{ {
char cbuf[512]; char cbuf[512];
char *bp; char *bp;
FILE * fp = fdopen(fd, "r+"); FILE * fp = fdopen(fd, "r+");
if(fp == NULL) { if (fp == NULL) {
perror("fdopen"); perror("fdopen");
return false; return false;
} }
@@ -290,7 +355,7 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
while (bp - &cbuf[0] < (int) sizeof(cbuf)) { while (bp - &cbuf[0] < (int) sizeof(cbuf)) {
unsigned char c; unsigned char c;
int res = read(fd, &c, 1); int res = read(fd, &c, 1);
if(res < 0) { if (res < 0) {
perror("read"); perror("read");
return false; return false;
} }
@@ -313,68 +378,54 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
return false; return false;
} }
chid = CZapit::getInstance()->GetCurrentChannelID();
CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel();
#ifndef ENABLE_MULTI_CHANNEL #ifndef ENABLE_MULTI_CHANNEL
/* parse stdin / url path, start dmx filters */ /* parse stdin / url path, start dmx filters */
do { do {
int pid; int pid;
int res = sscanf(bp, "%x", &pid); int res = sscanf(bp, "%x", &pid);
if(res == 1) { if (res == 1) {
printf("New pid: 0x%x\n", pid); printf("CStreamManager::Parse: pid: 0x%x\n", pid);
pids.insert(pid); pids.insert(pid);
} }
} while ((bp = strchr(bp, ',')) && (bp++));
#else
t_channel_id tmpid;
bp = &cbuf[5];
if (sscanf(bp, "id=%llx", &tmpid) == 1) {
channel = CServiceManager::getInstance()->FindChannel(tmpid);
chid = tmpid;
} }
while ((bp = strchr(bp, ',')) && (bp++));
#endif #endif
if (!channel)
return false;
chid = CZapit::getInstance()->GetCurrentChannelID(); printf("CStreamManager::Parse: channel_id %llx [%s]\n", chid, channel->getName().c_str());
CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel();
int mode = CNeutrinoApp::getInstance()->getMode(); frontend = FindFrontend(channel);
if (mode == NeutrinoMessages::mode_standby && streams.empty()) { if (!frontend) {
printf("CStreamManager::Parse: wakeup zapit..\n"); printf("CStreamManager::Parse: no free frontend\n");
g_Zapit->setStandby(false); return false;
g_Zapit->getMode();
} }
if(pids.empty()) {
#ifdef ENABLE_MULTI_CHANNEL
t_channel_id tmpid;
bp = &cbuf[5];
if (sscanf(bp, "id=%llx", &tmpid) == 1) {
printf("############################# channel_id %llx\n", tmpid);
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(tmpid); AddPids(fd, channel, pids);
if (tmpchan && (tmpid != chid) && SAME_TRANSPONDER(tmpid, chid)) {
printf("############################# channel_id %llx -> zap\n", tmpid);
bool ret = g_Zapit->zapTo_record(tmpid) > 0;
if (ret) {
channel = tmpchan;
chid = tmpid;
}
}
}
if(CRecordManager::getInstance()->RecordingStatus(chid)) {
printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", chid);
return false;
}
#ifdef ENABLE_PIP
t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID();
if ((chid == pip_channel_id) && (channel->getRecordDemux() == channel->getPipDemux())) {
printf("CStreamManager::Parse: channel %llx used for pip, aborting..\n", chid);
return false;
}
#endif
#endif
printf("CStreamManager::Parse: no pids in url, using channel %llx pids\n", chid); return !pids.empty();
if(!channel) }
return false;
//pids.insert(0); void CStreamManager::AddPids(int fd, CZapitChannel *channel, stream_pids_t &pids)
//pids.insert(channel->getPmtPid()); {
pids.insert(channel->getVideoPid()); if (pids.empty()) {
printf("CStreamManager::AddPids: no pids in url, using channel %llx pids\n", channel->getChannelID());
if (channel->getVideoPid())
pids.insert(channel->getVideoPid());
for (int i = 0; i < channel->getAudioChannelCount(); i++) for (int i = 0; i < channel->getAudioChannelCount(); i++)
pids.insert(channel->getAudioChannel(i)->pid); pids.insert(channel->getAudioChannel(i)->pid);
} }
CGenPsi psi; CGenPsi psi;
for (stream_pids_t::iterator it = pids.begin(); it != pids.end(); ++it) { for (stream_pids_t::iterator it = pids.begin(); it != pids.end(); ++it) {
if (*it == channel->getVideoPid()) { if (*it == channel->getVideoPid()) {
@@ -385,9 +436,9 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
if (*it == channel->getAudioChannel(i)->pid) { if (*it == channel->getAudioChannel(i)->pid) {
CZapitAudioChannel::ZapitAudioChannelType atype = channel->getAudioChannel(i)->audioChannelType; CZapitAudioChannel::ZapitAudioChannelType atype = channel->getAudioChannel(i)->audioChannelType;
printf("CStreamManager::Parse: genpsi apid %x (%d)\n", *it, atype); printf("CStreamManager::Parse: genpsi apid %x (%d)\n", *it, atype);
if(channel->getAudioChannel(i)->audioChannelType == CZapitAudioChannel::EAC3){ if (channel->getAudioChannel(i)->audioChannelType == CZapitAudioChannel::EAC3) {
psi.addPid(*it, EN_TYPE_AUDIO_EAC3, atype, channel->getAudioChannel(i)->description.c_str()); psi.addPid(*it, EN_TYPE_AUDIO_EAC3, atype, channel->getAudioChannel(i)->description.c_str());
}else{ } else {
psi.addPid(*it, EN_TYPE_AUDIO, atype, channel->getAudioChannel(i)->description.c_str()); psi.addPid(*it, EN_TYPE_AUDIO, atype, channel->getAudioChannel(i)->description.c_str());
} }
} }
@@ -395,40 +446,81 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
} }
} }
//add pcr pid //add pcr pid
if(channel->getPcrPid() != channel->getVideoPid()){ if (channel->getPcrPid() && (channel->getPcrPid() != channel->getVideoPid())) {
pids.insert(channel->getPcrPid()); pids.insert(channel->getPcrPid());
psi.addPid(channel->getPcrPid(), EN_TYPE_PCR, 0); psi.addPid(channel->getPcrPid(), EN_TYPE_PCR, 0);
} }
//add teletext pid //add teletext pid
if (g_settings.recording_stream_vtxt_pid && channel->getTeletextPid() != 0){ if (g_settings.recording_stream_vtxt_pid && channel->getTeletextPid() != 0) {
pids.insert(channel->getTeletextPid()); pids.insert(channel->getTeletextPid());
psi.addPid(channel->getTeletextPid(), EN_TYPE_TELTEX, 0, channel->getTeletextLang()); psi.addPid(channel->getTeletextPid(), EN_TYPE_TELTEX, 0, channel->getTeletextLang());
} }
//add dvb sub pid //add dvb sub pid
if (g_settings.recording_stream_subtitle_pids){ if (g_settings.recording_stream_subtitle_pids) {
for (int i = 0 ; i < (int)channel->getSubtitleCount() ; ++i) { for (int i = 0 ; i < (int)channel->getSubtitleCount() ; ++i) {
CZapitAbsSub* s = channel->getChannelSub(i); CZapitAbsSub* s = channel->getChannelSub(i);
if (s->thisSubType == CZapitAbsSub::DVB) { if (s->thisSubType == CZapitAbsSub::DVB) {
if(i>9)//max sub pids if (i>9)//max sub pids
break; break;
CZapitDVBSub* sd = reinterpret_cast<CZapitDVBSub*>(s); CZapitDVBSub* sd = reinterpret_cast<CZapitDVBSub*>(s);
pids.insert(sd->pId); pids.insert(sd->pId);
psi.addPid( sd->pId, EN_TYPE_DVBSUB, 0, sd->ISO639_language_code.c_str() ); psi.addPid(sd->pId, EN_TYPE_DVBSUB, 0, sd->ISO639_language_code.c_str());
} }
} }
} }
psi.genpsi(fd); psi.genpsi(fd);
}
return !pids.empty(); bool CStreamManager::AddClient(int connfd)
{
stream_pids_t pids;
t_channel_id channel_id;
CFrontend *frontend;
if (Parse(connfd, pids, channel_id, frontend)) {
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
streammap_iterator_t it = streams.find(channel_id);
if (it != streams.end()) {
it->second->AddClient(connfd);
} else {
CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids);
stream->frontend = frontend;
int sendsize = 10*IN_SIZE;
unsigned int m = sizeof(sendsize);
setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m);
if (stream->Start())
streams.insert(streammap_pair_t(channel_id, stream));
else
delete stream;
}
return true;
}
return false;
}
void CStreamManager::RemoveClient(int fd)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) {
if (it->second->HasFd(fd)) {
CStreamInstance *stream = it->second;
stream->RemoveClient(fd);
if (stream->GetFds().empty()) {
streams.erase(stream->GetChannelId());
delete stream;
}
break;
}
}
} }
void CStreamManager::run() void CStreamManager::run()
{ {
struct sockaddr_in servaddr; struct sockaddr_in servaddr;
int clilen = sizeof(servaddr);; int clilen = sizeof(servaddr);
struct pollfd pfd[128]; struct pollfd pfd[128];
int poll_cnt; int poll_cnt;
@@ -452,57 +544,33 @@ void CStreamManager::run()
} }
mutex.unlock(); mutex.unlock();
//printf("polling, count= %d\n", poll_cnt); //printf("polling, count= %d\n", poll_cnt);
int pollres = poll (pfd, poll_cnt, 1000); int pollres = poll (pfd, poll_cnt, 10000);
if (pollres < 0) { if (pollres <= 0) {
perror("CStreamManager::run(): poll"); if (pollres < 0)
perror("CStreamManager::run(): poll");
continue; continue;
} }
if(pollres == 0)
continue;
for (int i = poll_cnt - 1; i >= 0; i--) { for (int i = poll_cnt - 1; i >= 0; i--) {
if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) { if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) {
printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents); printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents);
if (pfd[i].fd == listenfd) { if (pfd[i].fd == listenfd) {
int connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen); int connfd = accept(listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen);
printf("CStreamManager::run(): connection, fd %d\n", connfd); printf("CStreamManager::run(): connection, fd %d\n", connfd);
if(connfd < 0) { if (connfd < 0) {
perror("CStreamManager::run(): accept"); perror("CStreamManager::run(): accept");
continue; continue;
} }
stream_pids_t pids; #if 0
t_channel_id channel_id; if (!AddClient(connfd))
if (Parse(connfd, pids, channel_id)) {
mutex.lock();
streammap_iterator_t it = streams.find(channel_id);
if (it != streams.end()) {
it->second->AddClient(connfd);
} else {
CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids);
if (stream->Start())
streams.insert(streammap_pair_t(channel_id, stream));
else
delete stream;
}
mutex.unlock();
} else {
close(connfd); close(connfd);
} #endif
g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_START, connfd);
} else { } else {
if (pfd[i].revents & (POLLHUP | POLLRDHUP)) { if (pfd[i].revents & (POLLHUP | POLLRDHUP)) {
printf("CStreamManager::run(): POLLHUP, fd %d\n", pfd[i].fd); printf("CStreamManager::run(): POLLHUP, fd %d\n", pfd[i].fd);
mutex.lock(); RemoveClient(pfd[i].fd);
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) { if (streams.empty())
if (it->second->HasFd(pfd[i].fd)) { g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_STOP, 0);
CStreamInstance *stream = it->second;
stream->RemoveClient(pfd[i].fd);
if (stream->GetFds().empty()) {
streams.erase(stream->GetChannelId());
delete stream;
}
break;
}
}
mutex.unlock();
} }
} }
} }
@@ -559,8 +627,6 @@ bool CStreamManager::Listen()
{ {
struct sockaddr_in socketAddr; struct sockaddr_in socketAddr;
int socketOptActive = 1; int socketOptActive = 1;
int sendsize = 10*IN_SIZE;
unsigned int m = sizeof(sendsize);
if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) { if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf (stderr, "network port %u open: ", port); fprintf (stderr, "network port %u open: ", port);
@@ -590,12 +656,7 @@ bool CStreamManager::Listen()
goto _error; goto _error;
} }
#if 1 printf("CStreamManager::Listen: on %d, fd %d\n", port, listenfd);
setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m);
sendsize = 0;
getsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, &m);
printf("CStreamManager::Listen: on %d, fd %d (%d)\n", port, listenfd, sendsize);
#endif
return true; return true;
_error: _error:
close (listenfd); close (listenfd);

View File

@@ -27,6 +27,7 @@
#include <dmx.h> #include <dmx.h>
#include <zapit/client/zapittypes.h> #include <zapit/client/zapittypes.h>
#include <zapit/femanager.h>
#include <set> #include <set>
#include <map> #include <map>
@@ -38,6 +39,7 @@ class CStreamInstance : public OpenThreads::Thread
private: private:
bool running; bool running;
cDemux * dmx; cDemux * dmx;
CFrontend * frontend;
OpenThreads::Mutex mutex; OpenThreads::Mutex mutex;
unsigned char * buf; unsigned char * buf;
@@ -48,6 +50,7 @@ class CStreamInstance : public OpenThreads::Thread
bool Send(ssize_t r); bool Send(ssize_t r);
void Close(); void Close();
void run(); void run();
friend class CStreamManager;
public: public:
CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids); CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids);
~CStreamInstance(); ~CStreamInstance();
@@ -74,12 +77,17 @@ class CStreamManager : public OpenThreads::Thread
OpenThreads::Mutex mutex; OpenThreads::Mutex mutex;
static CStreamManager * sm; static CStreamManager * sm;
CZapitClient zapit;
streammap_t streams; streammap_t streams;
bool Listen(); bool Listen();
bool Parse(int fd, stream_pids_t &pids, t_channel_id &chid); bool Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend);
void AddPids(int fd, CZapitChannel * channel, stream_pids_t &pids);
void CheckStandby(bool enter);
CFrontend * FindFrontend(CZapitChannel * channel);
bool StopAll(); bool StopAll();
void RemoveClient(int fd);
void run(); void run();
CStreamManager(); CStreamManager();
public: public:
@@ -91,6 +99,7 @@ class CStreamManager : public OpenThreads::Thread
bool StreamStatus(t_channel_id channel_id = 0); bool StreamStatus(t_channel_id channel_id = 0);
bool SetPort(int newport); bool SetPort(int newport);
int GetPort() { return port; } int GetPort() { return port; }
bool AddClient(int fd);
}; };
#endif #endif