diff --git a/src/driver/streamts.cpp b/src/driver/streamts.cpp index 8df0b0580..22d569094 100644 --- a/src/driver/streamts.cpp +++ b/src/driver/streamts.cpp @@ -1,7 +1,7 @@ /* Neutrino-GUI - DBoxII-Project - Copyright (C) 2011-2012 CoolStream International Ltd + Copyright (C) 2011-2014 CoolStream International Ltd based on code which is Copyright (C) 2002 Andreas Oberritter @@ -60,9 +60,8 @@ /* experimental mode: * stream not possible, if record running * pids in url ignored, and added from channel, with fake PAT/PMT - * different channels supported, only from the same transponder - no zap is done, + * different channels supported, * with url like http://coolstream:31339/id=c32400030070283e (channel id) - * TODO: multi-tuner support */ #define ENABLE_MULTI_CHANNEL @@ -114,23 +113,24 @@ bool CStreamInstance::Stop() bool CStreamInstance::Send(ssize_t r) { - mutex.lock(); + OpenThreads::ScopedLock m_lock(mutex); + int flags = 0; + if (fds.size() > 1) + flags = MSG_DONTWAIT; for (stream_fds_t::iterator it = fds.begin(); it != fds.end(); ++it) { - int ret, i = 10; + int i = 10; + unsigned char *b = buf; + ssize_t count = r; do { - ret = send(*it, buf, r, MSG_DONTWAIT); -#if 0 - if (ret != r) - usleep(100); -#endif - } while ((ret != r) && (i-- > 0)); - if (ret != r) { - if (r < 0) - perror("send"); - printf("send err, fd %d: (%d from %d)\n", *it, ret, r); - } + int ret = send(*it, b, count, flags); + if (ret > 0) { + b += ret; + count -= ret; + } + } while ((count > 0) && (i-- > 0)); + if (count) + printf("send err, fd %d: (%d from %d)\n", *it, r-count, r); } - mutex.unlock(); return true; } @@ -143,28 +143,23 @@ void CStreamInstance::Close() void CStreamInstance::AddClient(int clientfd) { - mutex.lock(); + OpenThreads::ScopedLock m_lock(mutex); fds.insert(clientfd); printf("CStreamInstance::AddClient: %d (count %d)\n", clientfd, fds.size()); - mutex.unlock(); } void CStreamInstance::RemoveClient(int clientfd) { - mutex.lock(); + OpenThreads::ScopedLock m_lock(mutex); fds.erase(clientfd); close(clientfd); printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size()); - mutex.unlock(); } void CStreamInstance::run() { printf("CStreamInstance::run: %llx\n", channel_id); -#if 0 - dmx = new cDemux(STREAM_DEMUX);//FIXME -#endif CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id); if (!tmpchan) return; @@ -191,7 +186,7 @@ void CStreamInstance::run() while (running) { ssize_t r = dmx->Read(buf, IN_SIZE, 100); - if(r > 0) + if (r > 0) Send(r); } @@ -273,13 +268,83 @@ bool CStreamManager::SetPort(int newport) return ret; } -bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) +CFrontend * CStreamManager::FindFrontend(CZapitChannel * channel) +{ + std::set frontends; + CFrontend * frontend = NULL; + + t_channel_id chid = channel->getChannelID(); + if (CRecordManager::getInstance()->RecordingStatus(chid)) { + printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", chid); + return frontend; + } + + t_channel_id live_channel_id = CZapit::getInstance()->GetCurrentChannelID(); + CFrontend *live_fe = CZapit::getInstance()->GetLiveFrontend(); + + if (live_channel_id == chid) + return live_fe; + + CFEManager::getInstance()->Lock(); + + bool unlock = true; + CFEManager::getInstance()->lockFrontend(live_fe); + + OpenThreads::ScopedLock m_lock(mutex); + for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) + frontends.insert(it->second->frontend); + + for (std::set::iterator ft = frontends.begin(); ft != frontends.end(); ft++) + CFEManager::getInstance()->lockFrontend(*ft); + + frontend = CFEManager::getInstance()->allocateFE(channel, true); + + if (frontend == NULL) { + unlock = false; + CFEManager::getInstance()->unlockFrontend(live_fe); + frontend = CFEManager::getInstance()->allocateFE(channel, true); + } + + CFEManager::getInstance()->Unlock(); + + if (frontend) { + bool found = (live_fe != frontend) || SAME_TRANSPONDER(live_channel_id, chid); + bool ret = false; + if (found) + ret = zapit.zapTo_record(chid) > 0; + else + ret = zapit.zapTo_serviceID(chid) > 0; + + if (ret) { +#ifdef ENABLE_PIP + /* FIXME until proper demux management */ + t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID(); + if ((pip_channel_id == chid) && (channel->getRecordDemux() == channel->getPipDemux())) + zapit.stopPip(); +#endif + } else { + frontend = NULL; + } + } + + CFEManager::getInstance()->Lock(); + for (std::set::iterator ft = frontends.begin(); ft != frontends.end(); ft++) + CFEManager::getInstance()->unlockFrontend(*ft); + + if (unlock) + CFEManager::getInstance()->unlockFrontend(live_fe); + + CFEManager::getInstance()->Unlock(); + return frontend; +} + +bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend) { char cbuf[512]; char *bp; FILE * fp = fdopen(fd, "r+"); - if(fp == NULL) { + if (fp == NULL) { perror("fdopen"); return false; } @@ -290,7 +355,7 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) while (bp - &cbuf[0] < (int) sizeof(cbuf)) { unsigned char c; int res = read(fd, &c, 1); - if(res < 0) { + if (res < 0) { perror("read"); return false; } @@ -313,68 +378,54 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) return false; } + chid = CZapit::getInstance()->GetCurrentChannelID(); + CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel(); + #ifndef ENABLE_MULTI_CHANNEL /* parse stdin / url path, start dmx filters */ do { int pid; int res = sscanf(bp, "%x", &pid); - if(res == 1) { - printf("New pid: 0x%x\n", pid); + if (res == 1) { + printf("CStreamManager::Parse: pid: 0x%x\n", pid); pids.insert(pid); } + } while ((bp = strchr(bp, ',')) && (bp++)); +#else + t_channel_id tmpid; + bp = &cbuf[5]; + if (sscanf(bp, "id=%llx", &tmpid) == 1) { + channel = CServiceManager::getInstance()->FindChannel(tmpid); + chid = tmpid; } - while ((bp = strchr(bp, ',')) && (bp++)); #endif + if (!channel) + return false; - chid = CZapit::getInstance()->GetCurrentChannelID(); - CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel(); + printf("CStreamManager::Parse: channel_id %llx [%s]\n", chid, channel->getName().c_str()); - int mode = CNeutrinoApp::getInstance()->getMode(); - if (mode == NeutrinoMessages::mode_standby && streams.empty()) { - printf("CStreamManager::Parse: wakeup zapit..\n"); - g_Zapit->setStandby(false); - g_Zapit->getMode(); + frontend = FindFrontend(channel); + if (!frontend) { + printf("CStreamManager::Parse: no free frontend\n"); + return false; } - if(pids.empty()) { -#ifdef ENABLE_MULTI_CHANNEL - t_channel_id tmpid; - bp = &cbuf[5]; - if (sscanf(bp, "id=%llx", &tmpid) == 1) { - printf("############################# channel_id %llx\n", tmpid); - CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(tmpid); - if (tmpchan && (tmpid != chid) && SAME_TRANSPONDER(tmpid, chid)) { - printf("############################# channel_id %llx -> zap\n", tmpid); - bool ret = g_Zapit->zapTo_record(tmpid) > 0; - if (ret) { - channel = tmpchan; - chid = tmpid; - } - } - } - if(CRecordManager::getInstance()->RecordingStatus(chid)) { - printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", chid); - return false; - } -#ifdef ENABLE_PIP - t_channel_id pip_channel_id = CZapit::getInstance()->GetPipChannelID(); - if ((chid == pip_channel_id) && (channel->getRecordDemux() == channel->getPipDemux())) { - printf("CStreamManager::Parse: channel %llx used for pip, aborting..\n", chid); - return false; - } -#endif -#endif + AddPids(fd, channel, pids); - printf("CStreamManager::Parse: no pids in url, using channel %llx pids\n", chid); - if(!channel) - return false; - //pids.insert(0); - //pids.insert(channel->getPmtPid()); - pids.insert(channel->getVideoPid()); + return !pids.empty(); +} + +void CStreamManager::AddPids(int fd, CZapitChannel *channel, stream_pids_t &pids) +{ + if (pids.empty()) { + printf("CStreamManager::AddPids: no pids in url, using channel %llx pids\n", channel->getChannelID()); + if (channel->getVideoPid()) + pids.insert(channel->getVideoPid()); for (int i = 0; i < channel->getAudioChannelCount(); i++) pids.insert(channel->getAudioChannel(i)->pid); } + CGenPsi psi; for (stream_pids_t::iterator it = pids.begin(); it != pids.end(); ++it) { if (*it == channel->getVideoPid()) { @@ -385,9 +436,9 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) if (*it == channel->getAudioChannel(i)->pid) { CZapitAudioChannel::ZapitAudioChannelType atype = channel->getAudioChannel(i)->audioChannelType; printf("CStreamManager::Parse: genpsi apid %x (%d)\n", *it, atype); - if(channel->getAudioChannel(i)->audioChannelType == CZapitAudioChannel::EAC3){ + if (channel->getAudioChannel(i)->audioChannelType == CZapitAudioChannel::EAC3) { psi.addPid(*it, EN_TYPE_AUDIO_EAC3, atype, channel->getAudioChannel(i)->description.c_str()); - }else{ + } else { psi.addPid(*it, EN_TYPE_AUDIO, atype, channel->getAudioChannel(i)->description.c_str()); } } @@ -395,40 +446,81 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) } } //add pcr pid - if(channel->getPcrPid() != channel->getVideoPid()){ + if (channel->getPcrPid() && (channel->getPcrPid() != channel->getVideoPid())) { pids.insert(channel->getPcrPid()); psi.addPid(channel->getPcrPid(), EN_TYPE_PCR, 0); } //add teletext pid - if (g_settings.recording_stream_vtxt_pid && channel->getTeletextPid() != 0){ + if (g_settings.recording_stream_vtxt_pid && channel->getTeletextPid() != 0) { pids.insert(channel->getTeletextPid()); psi.addPid(channel->getTeletextPid(), EN_TYPE_TELTEX, 0, channel->getTeletextLang()); } //add dvb sub pid - if (g_settings.recording_stream_subtitle_pids){ + if (g_settings.recording_stream_subtitle_pids) { for (int i = 0 ; i < (int)channel->getSubtitleCount() ; ++i) { CZapitAbsSub* s = channel->getChannelSub(i); if (s->thisSubType == CZapitAbsSub::DVB) { - if(i>9)//max sub pids + if (i>9)//max sub pids break; CZapitDVBSub* sd = reinterpret_cast(s); pids.insert(sd->pId); - psi.addPid( sd->pId, EN_TYPE_DVBSUB, 0, sd->ISO639_language_code.c_str() ); + psi.addPid(sd->pId, EN_TYPE_DVBSUB, 0, sd->ISO639_language_code.c_str()); } } - } psi.genpsi(fd); +} - return !pids.empty(); +bool CStreamManager::AddClient(int connfd) +{ + stream_pids_t pids; + t_channel_id channel_id; + CFrontend *frontend; + + if (Parse(connfd, pids, channel_id, frontend)) { + OpenThreads::ScopedLock m_lock(mutex); + streammap_iterator_t it = streams.find(channel_id); + if (it != streams.end()) { + it->second->AddClient(connfd); + } else { + CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids); + stream->frontend = frontend; + + int sendsize = 10*IN_SIZE; + unsigned int m = sizeof(sendsize); + setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m); + if (stream->Start()) + streams.insert(streammap_pair_t(channel_id, stream)); + else + delete stream; + } + return true; + } + return false; +} + +void CStreamManager::RemoveClient(int fd) +{ + OpenThreads::ScopedLock m_lock(mutex); + for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) { + if (it->second->HasFd(fd)) { + CStreamInstance *stream = it->second; + stream->RemoveClient(fd); + if (stream->GetFds().empty()) { + streams.erase(stream->GetChannelId()); + delete stream; + } + break; + } + } } void CStreamManager::run() { struct sockaddr_in servaddr; - int clilen = sizeof(servaddr);; + int clilen = sizeof(servaddr); struct pollfd pfd[128]; int poll_cnt; @@ -452,57 +544,33 @@ void CStreamManager::run() } mutex.unlock(); //printf("polling, count= %d\n", poll_cnt); - int pollres = poll (pfd, poll_cnt, 1000); - if (pollres < 0) { - perror("CStreamManager::run(): poll"); + int pollres = poll (pfd, poll_cnt, 10000); + if (pollres <= 0) { + if (pollres < 0) + perror("CStreamManager::run(): poll"); continue; } - if(pollres == 0) - continue; for (int i = poll_cnt - 1; i >= 0; i--) { if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) { printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents); if (pfd[i].fd == listenfd) { - int connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen); + int connfd = accept(listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen); printf("CStreamManager::run(): connection, fd %d\n", connfd); - if(connfd < 0) { + if (connfd < 0) { perror("CStreamManager::run(): accept"); continue; } - stream_pids_t pids; - t_channel_id channel_id; - if (Parse(connfd, pids, channel_id)) { - mutex.lock(); - streammap_iterator_t it = streams.find(channel_id); - if (it != streams.end()) { - it->second->AddClient(connfd); - } else { - CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids); - if (stream->Start()) - streams.insert(streammap_pair_t(channel_id, stream)); - else - delete stream; - } - mutex.unlock(); - } else { +#if 0 + if (!AddClient(connfd)) close(connfd); - } +#endif + g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_START, connfd); } else { if (pfd[i].revents & (POLLHUP | POLLRDHUP)) { printf("CStreamManager::run(): POLLHUP, fd %d\n", pfd[i].fd); - mutex.lock(); - for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) { - if (it->second->HasFd(pfd[i].fd)) { - CStreamInstance *stream = it->second; - stream->RemoveClient(pfd[i].fd); - if (stream->GetFds().empty()) { - streams.erase(stream->GetChannelId()); - delete stream; - } - break; - } - } - mutex.unlock(); + RemoveClient(pfd[i].fd); + if (streams.empty()) + g_RCInput->postMsg(NeutrinoMessages::EVT_STREAM_STOP, 0); } } } @@ -559,8 +627,6 @@ bool CStreamManager::Listen() { struct sockaddr_in socketAddr; int socketOptActive = 1; - int sendsize = 10*IN_SIZE; - unsigned int m = sizeof(sendsize); if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) { fprintf (stderr, "network port %u open: ", port); @@ -590,12 +656,7 @@ bool CStreamManager::Listen() goto _error; } -#if 1 - setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m); - sendsize = 0; - getsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, &m); - printf("CStreamManager::Listen: on %d, fd %d (%d)\n", port, listenfd, sendsize); -#endif + printf("CStreamManager::Listen: on %d, fd %d\n", port, listenfd); return true; _error: close (listenfd); diff --git a/src/driver/streamts.h b/src/driver/streamts.h index 9b10ddeca..e1a6915a1 100644 --- a/src/driver/streamts.h +++ b/src/driver/streamts.h @@ -27,6 +27,7 @@ #include #include +#include #include #include @@ -38,6 +39,7 @@ class CStreamInstance : public OpenThreads::Thread private: bool running; cDemux * dmx; + CFrontend * frontend; OpenThreads::Mutex mutex; unsigned char * buf; @@ -48,6 +50,7 @@ class CStreamInstance : public OpenThreads::Thread bool Send(ssize_t r); void Close(); void run(); + friend class CStreamManager; public: CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids); ~CStreamInstance(); @@ -74,12 +77,17 @@ class CStreamManager : public OpenThreads::Thread OpenThreads::Mutex mutex; static CStreamManager * sm; + CZapitClient zapit; streammap_t streams; bool Listen(); - bool Parse(int fd, stream_pids_t &pids, t_channel_id &chid); + bool Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFrontend * &frontend); + void AddPids(int fd, CZapitChannel * channel, stream_pids_t &pids); + void CheckStandby(bool enter); + CFrontend * FindFrontend(CZapitChannel * channel); bool StopAll(); + void RemoveClient(int fd); void run(); CStreamManager(); public: @@ -91,6 +99,7 @@ class CStreamManager : public OpenThreads::Thread bool StreamStatus(t_channel_id channel_id = 0); bool SetPort(int newport); int GetPort() { return port; } + bool AddClient(int fd); }; #endif