diff --git a/src/driver/streamts.cpp b/src/driver/streamts.cpp index afeebd388..0e3142c5c 100644 --- a/src/driver/streamts.cpp +++ b/src/driver/streamts.cpp @@ -1,3 +1,29 @@ +/* + Neutrino-GUI - DBoxII-Project + + Copyright (C) 2011-2012 CoolStream International Ltd + + based on code which is + Copyright (C) 2002 Andreas Oberritter + Copyright (C) 2001 TripleDES + Copyright (C) 2000, 2001 Marcus Metzler + + License: GPLv2 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + #include #include #include @@ -13,10 +39,8 @@ #include #include -/* work around for building with old kernel headers */ -#ifndef POLLRDHUP -#define POLLRDHUP 0 -#endif +#include +#include #ifdef HAVE_CONFIG_H #include @@ -28,248 +52,231 @@ #include #include #include +#include +#include +#include +#include +/* experimental mode: + * stream not possible, if record running + * pids in url ignored, and added from channel, with fake PAT/PMT + * different channels supported, only from the same transponder - no zap is done, + * with url like http://coolstream:31339/id=c32400030070283e (channel id) + * TODO: multi-tuner support + */ +#define ENABLE_MULTI_CHANNEL #define TS_SIZE 188 -#define IN_SIZE (2048 * TS_SIZE) -//#define IN_SIZE (TS_SIZE * 362) +#define DMX_BUFFER_SIZE (2048*TS_SIZE) +#define IN_SIZE (250*TS_SIZE) -#define DMX_BUFFER_SIZE (2 * 3008 * 62) - -/* maximum number of pes pids */ -#define MAXPIDS 64 - -/* tcp packet data size */ -//#define PACKET_SIZE 1448 -#define PACKET_SIZE 7*TS_SIZE - -//unsigned char * buf; - -extern CCam *cam0; - -//int demuxfd[MAXPIDS]; - -static unsigned char exit_flag = 0; -static unsigned int writebuf_size = 0; -static unsigned char writebuf[PACKET_SIZE]; - -void packet_stdout (int fd, unsigned char * buf, int count, void * /*p*/) +CStreamInstance::CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &_pids) { - - unsigned int size; - unsigned char * bp; - ssize_t written; - -//printf("packet_stdout count %d\n", count); - /* ensure, that there is always at least one complete - * packet inside of the send buffer */ - while (writebuf_size + count >= PACKET_SIZE) { - - /* how many bytes are to be sent from the input buffer? */ - size = PACKET_SIZE - writebuf_size; - - /* send buffer is not empty, so copy from - input buffer to get a complete packet */ - if (writebuf_size) { - memmove(writebuf + writebuf_size, buf, size); - bp = writebuf; - } - - /* if send buffer is empty, then do not memcopy, - but send directly from input buffer */ - else { - bp = buf; - } - - /* write the packet, count the amount of really written bytes */ - written = write(fd, bp, PACKET_SIZE); - - /* exit on error */ - if (written == -1) { - perror("write"); - exit_flag = 1; - return; - } - - /* if the packet could not be written completely, then - * how many bytes must be stored in the send buffer - * until the next packet is to be sent? */ - writebuf_size = PACKET_SIZE - written; - - /* move all bytes of the packet which were not sent - * to the beginning of the send buffer */ - if (writebuf_size) - memmove(writebuf, bp + written, writebuf_size); - - /* * advance in the input buffer */ - buf += size; - - /* * decrease the todo size */ - count -= size; - } - - /* if there are still some bytes left in the input buffer, - * then store them in the send buffer and increase send - * buffer size */ - if (count) { - memmove(writebuf + writebuf_size, buf, count); - writebuf_size += count; - } + printf("CStreamInstance:: new channel %llx fd %d\n", chid, clientfd); + fds.insert(clientfd); + pids = _pids; + channel_id = chid; + running = false; + dmx = NULL; + buf = NULL; } -int open_incoming_port (int port) +CStreamInstance::~CStreamInstance() { - struct sockaddr_in socketAddr; - int socketOptActive = 1; - int handle; - - if (!port) - return -1; - - if ((handle = socket (AF_INET, SOCK_STREAM, 0)) < 0) - { - fprintf (stderr, "network port %u open: ", port); - perror ("socket"); - return -1; - } - - if (setsockopt (handle, SOL_SOCKET, SO_REUSEADDR, (const void *)&socketOptActive, sizeof (int)) < 0) - { - fprintf (stderr, "network port %u open: error setsockopt\n", port); - close (handle); - return -1; - } - - socketAddr.sin_family = AF_INET; - socketAddr.sin_port = htons (port); - socketAddr.sin_addr.s_addr = htonl (INADDR_ANY); - - if (bind (handle, (struct sockaddr *) &socketAddr, sizeof (socketAddr)) < 0) - { - fprintf (stderr, "network port %u open: ", port); - perror ("bind"); - close (handle); - return -1; - } - - if (listen (handle, 5) < 0) - { - fprintf (stderr, "network port %u open: ", port); - perror ("listen"); - close (handle); - return -1; - } - return handle; + Stop(); + Close(); } -void * streamts_live_thread(void *data); -int streamts_stop; - -void streamts_main_thread(void * /*data*/) +bool CStreamInstance::Start() { - struct sockaddr_in servaddr; - int clilen; + if (running) + return false; - struct pollfd pfd[128]; - int poll_cnt, tcnt; - int listenfd; - int connfd = -1; - int pollres; - int i; - pthread_t st = 0; - - printf("Starting STREAM thread keeper, tid %ld\n", syscall(__NR_gettid)); - - listenfd = open_incoming_port(31339); - if(listenfd < 0) { - printf("Open incoming port failed\n"); - return; + buf = new unsigned char [IN_SIZE]; + if (buf == NULL) { + perror("CStreamInstance::Start: buf"); + return false; } - printf("listenfd %d\n", listenfd); - - clilen = sizeof (servaddr); - pfd[0].fd = listenfd; - pfd[0].events = (POLLIN | POLLPRI); - pfd[0].revents = 0; - tcnt = 1; - streamts_stop = 0; - - while (!streamts_stop) { - poll_cnt = tcnt; -//printf("polling, count= %d\n", poll_cnt); - pollres = poll (pfd, poll_cnt, 1000); - if (pollres < 0) { - perror("streamts_main_thread poll"); - continue; - } - if(pollres == 0) - continue; - for (i = poll_cnt - 1; i >= 0; i--) { - if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) { - printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents); - if (pfd[i].fd == listenfd) { - connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen); - printf("new connection, fd %d\n", connfd); - if(connfd < 0) { - perror("accept"); - continue; - } - if(st != 0) { - printf("New connection, stopping stream thread\n"); - exit_flag = 1; - pthread_join(st, NULL); - tcnt --; - } - pfd[tcnt].fd = connfd; - pfd[tcnt].events = POLLRDHUP | POLLHUP; - pfd[tcnt].revents = 0; - tcnt++; - exit_flag = 0; - pthread_create (&st, NULL, streamts_live_thread, (void *) connfd); - } else { - if (pfd[i].revents & (POLLHUP | POLLRDHUP)) { - connfd = -1; - printf("Client disconnected, stopping stream thread\n"); - exit_flag = 1; - if(st) - pthread_join(st, NULL); - st = 0; - tcnt --; - } - } - } - } - } - printf("Stopping STREAM thread keeper\n"); - close(listenfd); - if(st != 0) { - printf("Stopping stream thread\n"); - exit_flag = 1; - pthread_join(st, NULL); - close(connfd); - } - return; + running = true; + printf("CStreamInstance::Start: %llx\n", channel_id); + return (OpenThreads::Thread::start() == 0); } -void * streamts_live_thread(void *data) +bool CStreamInstance::Stop() +{ + if (!running) + return false; + + printf("CStreamInstance::Stop: %llx\n", channel_id); + running = false; + return (OpenThreads::Thread::join() == 0); +} + +bool CStreamInstance::Send(ssize_t r) +{ + mutex.lock(); + for (stream_fds_t::iterator it = fds.begin(); it != fds.end(); ++it) { + int ret, i = 10; + do { + ret = send(*it, buf, r, MSG_DONTWAIT); +#if 0 + if (ret != r) + usleep(100); +#endif + } while ((ret != r) && (i-- > 0)); + if (ret != r) { + if (r < 0) + perror("send"); + printf("send err, fd %d: %d\n", *it, r); + } + } + mutex.unlock(); + return true; +} + +void CStreamInstance::Close() +{ + for (stream_fds_t::iterator fit = fds.begin(); fit != fds.end(); ++fit) + close(*fit); + fds.clear(); +} + +void CStreamInstance::AddClient(int clientfd) +{ + mutex.lock(); + fds.insert(clientfd); + printf("CStreamInstance::AddClient: %d (count %d)\n", clientfd, fds.size()); + mutex.unlock(); +} + +void CStreamInstance::RemoveClient(int clientfd) +{ + mutex.lock(); + fds.erase(clientfd); + close(clientfd); + printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size()); + mutex.unlock(); +} + +void CStreamInstance::run() +{ + printf("CStreamInstance::run: %llx\n", channel_id); + + dmx = new cDemux(STREAM_DEMUX);//FIXME + + dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE); + + /* pids here cannot be empty */ + stream_pids_t::iterator it = pids.begin(); + printf("CStreamInstance::run: add pid %x\n", *it); + dmx->pesFilter(*it); + ++it; + for (; it != pids.end(); ++it) { + printf("CStreamInstance::run: add pid %x\n", *it); + dmx->addPid(*it); + } +#ifdef ENABLE_MULTI_CHANNEL + dmx->Start();//FIXME +#else + dmx->Start(true);//FIXME +#endif + + CCamManager::getInstance()->Start(channel_id, CCamManager::STREAM); + + while (running) { + ssize_t r = dmx->Read(buf, IN_SIZE, 100); + if(r > 0) + Send(r); + } + + CCamManager::getInstance()->Stop(channel_id, CCamManager::STREAM); + + printf("CStreamInstance::run: exiting %llx (%d fds)\n", channel_id, fds.size()); + + Close(); + delete dmx; + delete []buf; +} + +bool CStreamInstance::HasFd(int fd) +{ + if (fds.find(fd) != fds.end()) + return true; + return false; +} + +/************************************************************************/ +CStreamManager *CStreamManager::sm = NULL; +CStreamManager::CStreamManager() +{ + enabled = true; + running = false; + listenfd = -1; + port = 31339; +} + +CStreamManager::~CStreamManager() +{ + Stop(); +} + +CStreamManager * CStreamManager::getInstance() +{ + if (sm == NULL) + sm = new CStreamManager(); + return sm; +} + +bool CStreamManager::Start(int _port) +{ + if (running) + return false; + + if (_port) + port = _port; + if (!Listen()) + return false; + + running = true; + return (OpenThreads::Thread::start() == 0); +} + +bool CStreamManager::Stop() +{ + if (!running) + return false; + running = false; + return (OpenThreads::Thread::join() == 0); +} + +bool CStreamManager::SetPort(int newport) +{ + bool ret = false; + if (port != newport) { + port = newport; +#if 0 + Stop(); + ret = Start(newport); +#endif + mutex.lock(); + if (listenfd >= 0) + close(listenfd); + ret = Listen(); + mutex.unlock(); + } + return ret; +} + +bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid) { - unsigned char * buf; - int pid; - int pids[MAXPIDS]; char cbuf[512]; char *bp; - int fd = (int) data; - unsigned char demuxfd_count = 0; - printf("Starting LIVE STREAM thread, fd %d\n", fd); FILE * fp = fdopen(fd, "r+"); if(fp == NULL) { perror("fdopen"); - return 0; + return false; } - - writebuf_size = 0; - cbuf[0] = 0; bp = &cbuf[0]; @@ -279,7 +286,7 @@ void * streamts_live_thread(void *data) int res = read(fd, &c, 1); if(res < 0) { perror("read"); - return 0; + return false; } if ((*bp++ = c) == '\n') break; @@ -288,7 +295,7 @@ void * streamts_live_thread(void *data) *bp++ = 0; bp = &cbuf[0]; - printf("stream: got %s\n", cbuf); + printf("CStreamManager::Parse: got %s\n", cbuf); /* send response to http client */ if (!strncmp(cbuf, "GET /", 5)) { @@ -297,165 +304,256 @@ void * streamts_live_thread(void *data) bp += 5; } else { printf("Received garbage\n"); - return 0; + return false; } +#ifndef ENABLE_MULTI_CHANNEL /* parse stdin / url path, start dmx filters */ do { + int pid; int res = sscanf(bp, "%x", &pid); if(res == 1) { printf("New pid: 0x%x\n", pid); - pids[demuxfd_count++] = pid; + pids.insert(pid); } } - while ((bp = strchr(bp, ',')) && (bp++) && (demuxfd_count < MAXPIDS)); + while ((bp = strchr(bp, ',')) && (bp++)); +#endif - if(demuxfd_count == 0) { - printf("No pids!\n"); - CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel(); + chid = CZapit::getInstance()->GetCurrentChannelID(); + CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel(); + + int mode = CNeutrinoApp::getInstance()->getMode(); + if (mode == NeutrinoMessages::mode_standby && streams.empty()) { + printf("CStreamManager::Parse: wakeup zapit..\n"); + g_Zapit->setStandby(false); + g_Zapit->getMode(); + } + if(pids.empty()) { +#ifdef ENABLE_MULTI_CHANNEL + t_channel_id tmpid; + bp = &cbuf[5]; + if (sscanf(bp, "id=%llx", &tmpid) == 1) { + printf("############################# channel_id %llx\n", tmpid); + + CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(tmpid); + if (tmpchan && (tmpid != chid) && SAME_TRANSPONDER(tmpid, chid)) { + printf("############################# channel_id %llx -> zap\n", tmpid); + bool ret = g_Zapit->zapTo_record(tmpid) > 0; + if (ret) { + channel = tmpchan; + chid = tmpid; + } + } + } + if(CRecordManager::getInstance()->RecordingStatus(tmpid)) { + printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", tmpid); + return false; + } +#endif + + printf("CStreamManager::Parse: no pids in url, using channel %llx pids\n", chid); if(!channel) - return 0; - pids[demuxfd_count++] = 0; - pids[demuxfd_count++] = channel->getPmtPid(); - pids[demuxfd_count++] = channel->getVideoPid(); + return false; + //pids.insert(0); + //pids.insert(channel->getPmtPid()); + pids.insert(channel->getVideoPid()); for (int i = 0; i < channel->getAudioChannelCount(); i++) - pids[demuxfd_count++] = channel->getAudioChannel(i)->pid; + pids.insert(channel->getAudioChannel(i)->pid); } - - buf = new unsigned char [IN_SIZE]; - if (buf == NULL) { - perror("NEW"); - return 0; + CGenPsi psi; + for (stream_pids_t::iterator it = pids.begin(); it != pids.end(); ++it) { + if (*it == channel->getVideoPid()) { + printf("CStreamManager::Parse: genpsi vpid %x (%d)\n", *it, channel->type); + psi.addPid(*it, channel->type ? EN_TYPE_AVC : EN_TYPE_VIDEO, 0); + } else { + for (int i = 0; i < channel->getAudioChannelCount(); i++) { + if (*it == channel->getAudioChannel(i)->pid) { + CZapitAudioChannel::ZapitAudioChannelType atype = channel->getAudioChannel(i)->audioChannelType; + printf("CStreamManager::Parse: genpsi apid %x (%d)\n", *it, atype); + psi.addPid(*it, EN_TYPE_AUDIO, atype); + } + } + } } + psi.genpsi(fd); - cDemux * dmx = new cDemux(STREAM_DEMUX);//FIXME - - dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE); - - dmx->pesFilter(pids[0]); - for(int i = 1; i < demuxfd_count; i++) - dmx->addPid(pids[i]); - - dmx->Start(true);//FIXME - - CCamManager::getInstance()->Start(CZapit::getInstance()->GetCurrentChannelID(), CCamManager::STREAM); - ssize_t r; - - while (!exit_flag) { - r = dmx->Read(buf, IN_SIZE, 100); - if(r > 0) - packet_stdout(fd, buf, r, NULL); - } - - printf("Exiting LIVE STREAM thread, fd %d\n", fd); - - CCamManager::getInstance()->Stop(CZapit::getInstance()->GetCurrentChannelID(), CCamManager::STREAM); - - delete dmx; - delete []buf; - close(fd); - return 0; + return !pids.empty(); } -#if 0 -//never used -void streamts_file_thread(void *data) + +void CStreamManager::run() { - int dvrfd; - unsigned char * buf; - char cbuf[512]; - char *bp; - unsigned char mode = 0; - char tsfile[IN_SIZE]; - int tsfilelen = 0; - int fileslice = 0; - int i = 0; - int fd = (int) data; + struct sockaddr_in servaddr; + int clilen = sizeof(servaddr);; - buf = (unsigned char *) malloc(IN_SIZE); + struct pollfd pfd[128]; + int poll_cnt; - if (buf == NULL) { - perror("malloc"); - return; - } + printf("Starting STREAM thread keeper, tid %ld\n", syscall(__NR_gettid)); - bp = &cbuf[0]; - - /* read one line */ - while (bp - &cbuf[0] < IN_SIZE) { - unsigned char c; - read(fd, &c, 1); - if ((*bp++ = c) == '\n') - break; - } - - *bp++ = 0; - bp = &cbuf[0]; - - /* send response to http client */ - if (!strncmp(cbuf, "GET /", 5)) { - printf("HTTP/1.1 200 OK\r\nServer: streamts (%s)\r\n\r\n", "ts" /*&argv[1][1]*/); - fflush(stdout); - bp += 5; - } - - /* ts filename */ - int j = 0; - i = 0; - while (i < (int) strlen(bp) - 3) - { - if ((bp[i] == '.') && (bp[i + 1] == 't') && (bp[i + 2] == 's')) - { - tsfile[j] = bp[i]; - tsfile[j + 1] = bp[i + 1]; - tsfile[j + 2] = bp[i + 2]; - tsfile[j + 3] = '\0'; - break; - } - else - if ((bp[i] == '%') && (bp[i + 1] == '2') && (bp[i + 2] == '0')) - { - tsfile[j++] = ' '; - i += 3; + while (running) { + mutex.lock(); + pfd[0].fd = listenfd; + pfd[0].events = (POLLIN | POLLPRI); + pfd[0].revents = 0; + poll_cnt = 1; + for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) { + stream_fds_t fds = it->second->GetFds(); + for (stream_fds_t::iterator fit = fds.begin(); fit != fds.end(); ++fit) { + pfd[poll_cnt].fd = *fit; + pfd[poll_cnt].events = POLLRDHUP | POLLHUP; + pfd[poll_cnt].revents = 0; + poll_cnt++; } - else - tsfile[j++] = bp[i++]; - } - tsfilelen = strlen(tsfile); - /* open ts file */ - if ((dvrfd = open(tsfile, O_RDONLY)) < 0) { - free(buf); - return; - } - - size_t pos; - ssize_t r; - - while (!exit_flag) { - /* always read IN_SIZE bytes */ - for (pos = 0; pos < IN_SIZE; pos += r) { - r = read(dvrfd, buf + pos, IN_SIZE - pos); - if (r == -1) { - /* Error */ - exit_flag = 1; - break; - } else if (r == 0) { - /* End of file */ - if (mode == 3) { - close(dvrfd); - sprintf(&tsfile[tsfilelen], ".%03d", ++fileslice); - dvrfd = open(tsfile, O_RDONLY); - } - if ((dvrfd == -1) || (mode != 3)) { - exit_flag = 1; - break; + } + mutex.unlock(); +//printf("polling, count= %d\n", poll_cnt); + int pollres = poll (pfd, poll_cnt, 1000); + if (pollres < 0) { + perror("CStreamManager::run(): poll"); + continue; + } + if(pollres == 0) + continue; + for (int i = poll_cnt - 1; i >= 0; i--) { + if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) { + printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents); + if (pfd[i].fd == listenfd) { + int connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen); + printf("CStreamManager::run(): connection, fd %d\n", connfd); + if(connfd < 0) { + perror("CStreamManager::run(): accept"); + continue; + } + stream_pids_t pids; + t_channel_id channel_id; + if (Parse(connfd, pids, channel_id)) { + mutex.lock(); + streammap_iterator_t it = streams.find(channel_id); + if (it != streams.end()) { + it->second->AddClient(connfd); + } else { + CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids); + if (stream->Start()) + streams.insert(streammap_pair_t(channel_id, stream)); + else + delete stream; + } + mutex.unlock(); + } else { + close(connfd); + } + } else { + if (pfd[i].revents & (POLLHUP | POLLRDHUP)) { + printf("CStreamManager::run(): POLLHUP, fd %d\n", pfd[i].fd); + mutex.lock(); + for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) { + if (it->second->HasFd(pfd[i].fd)) { + CStreamInstance *stream = it->second; + stream->RemoveClient(pfd[i].fd); + if (stream->GetFds().empty()) { + streams.erase(stream->GetChannelId()); + delete stream; + } + break; + } + } + mutex.unlock(); + } } } } - packet_stdout(fd, buf, pos, NULL); } - close(dvrfd); - free(buf); - - return; + printf("CStreamManager::run: stopping...\n"); + close(listenfd); + listenfd = -1; + StopAll(); +} + +bool CStreamManager::StopAll() +{ + bool ret = !streams.empty(); + for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) { + it->second->Stop(); + delete it->second; + } + streams.clear(); + return ret; +} + +bool CStreamManager::StopStream(t_channel_id channel_id) +{ + bool ret = false; + mutex.lock(); + if (channel_id) { + streammap_iterator_t it = streams.find(channel_id); + if (it != streams.end()) { + delete it->second; + streams.erase(channel_id); + ret = true; + } + } else { + ret = StopAll(); + } + mutex.unlock(); + return ret; +} + +bool CStreamManager::StreamStatus(t_channel_id channel_id) +{ + bool ret; + mutex.lock(); + if (channel_id) + ret = (streams.find(channel_id) != streams.end()); + else + ret = !streams.empty(); + mutex.unlock(); + return ret; +} + +bool CStreamManager::Listen() +{ + struct sockaddr_in socketAddr; + int socketOptActive = 1; + int sendsize = 10*IN_SIZE; + unsigned int m = sizeof(sendsize); + + if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) { + fprintf (stderr, "network port %u open: ", port); + perror ("socket"); + return false; + } + + if (setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&socketOptActive, sizeof (int)) < 0) { + fprintf (stderr, "network port %u open: error setsockopt\n", port); + perror ("setsockopt"); + goto _error; + } + + socketAddr.sin_family = AF_INET; + socketAddr.sin_port = htons (port); + socketAddr.sin_addr.s_addr = htonl (INADDR_ANY); + + if (bind (listenfd, (struct sockaddr *) &socketAddr, sizeof (socketAddr)) < 0) { + fprintf (stderr, "network port %u open: ", port); + perror ("bind"); + goto _error; + } + + if (listen (listenfd, 5) < 0) { + fprintf (stderr, "network port %u open: ", port); + perror ("listen"); + goto _error; + } + +#if 1 + setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m); + sendsize = 0; + getsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, &m); + printf("CStreamManager::Listen: on %d, fd %d (%d)\n", port, listenfd, sendsize); +#endif + return true; +_error: + close (listenfd); + return false; } -#endif \ No newline at end of file diff --git a/src/driver/streamts.h b/src/driver/streamts.h new file mode 100644 index 000000000..9b10ddeca --- /dev/null +++ b/src/driver/streamts.h @@ -0,0 +1,96 @@ +/* + Neutrino-GUI - DBoxII-Project + + Copyright (C) 2011-2012 CoolStream International Ltd + + License: GPLv2 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#ifndef __streamts_h__ +#define __streamts_h__ + +#include +#include + +#include +#include +#include +#include + +typedef std::set stream_pids_t; +typedef std::set stream_fds_t; + +class CStreamInstance : public OpenThreads::Thread +{ + private: + bool running; + cDemux * dmx; + OpenThreads::Mutex mutex; + unsigned char * buf; + + t_channel_id channel_id; + stream_pids_t pids; + stream_fds_t fds; + + bool Send(ssize_t r); + void Close(); + void run(); + public: + CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids); + ~CStreamInstance(); + bool Start(); + bool Stop(); + void AddClient(int clientfd); + void RemoveClient(int clientfd); + bool HasFd(int fd); + stream_fds_t & GetFds() { return fds; } + t_channel_id GetChannelId() { return channel_id; } +}; + +typedef std::pair streammap_pair_t; +typedef std::map streammap_t; +typedef streammap_t::iterator streammap_iterator_t; + +class CStreamManager : public OpenThreads::Thread +{ + private: + bool enabled; + bool running; + int listenfd; + int port; + + OpenThreads::Mutex mutex; + static CStreamManager * sm; + + streammap_t streams; + + bool Listen(); + bool Parse(int fd, stream_pids_t &pids, t_channel_id &chid); + bool StopAll(); + void run(); + CStreamManager(); + public: + ~CStreamManager(); + static CStreamManager * getInstance(); + bool Start(int port = 0); + bool Stop(); + bool StopStream(t_channel_id channel_id = 0); + bool StreamStatus(t_channel_id channel_id = 0); + bool SetPort(int newport); + int GetPort() { return port; } +}; + +#endif