driver/streamts.cpp: replace old code with class,

support multi-client and multi-channel,
experimental / testing - please read comments at the top of driver/streamts.cpp
This commit is contained in:
[CST] Focus
2012-11-08 19:10:37 +04:00
parent 2b0f9efbc8
commit ae6cbc9479
2 changed files with 555 additions and 361 deletions

View File

@@ -1,3 +1,29 @@
/*
Neutrino-GUI - DBoxII-Project
Copyright (C) 2011-2012 CoolStream International Ltd
based on code which is
Copyright (C) 2002 Andreas Oberritter <obi@tuxbox.org>
Copyright (C) 2001 TripleDES
Copyright (C) 2000, 2001 Marcus Metzler <marcus@convergence.de>
License: GPLv2
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation;
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
@@ -13,10 +39,8 @@
#include <poll.h>
#include <syscall.h>
/* work around for building with old kernel headers */
#ifndef POLLRDHUP
#define POLLRDHUP 0
#endif
#include <global.h>
#include <neutrino.h>
#ifdef HAVE_CONFIG_H
#include <config.h>
@@ -28,248 +52,231 @@
#include <dmx.h>
#include <zapit/capmt.h>
#include <zapit/zapit.h>
#include <zapit/pat.h>
#include <driver/streamts.h>
#include <driver/record.h>
#include <driver/genpsi.h>
/* experimental mode:
* stream not possible, if record running
* pids in url ignored, and added from channel, with fake PAT/PMT
* different channels supported, only from the same transponder - no zap is done,
* with url like http://coolstream:31339/id=c32400030070283e (channel id)
* TODO: multi-tuner support
*/
#define ENABLE_MULTI_CHANNEL
#define TS_SIZE 188
#define IN_SIZE (2048 * TS_SIZE)
//#define IN_SIZE (TS_SIZE * 362)
#define DMX_BUFFER_SIZE (2048*TS_SIZE)
#define IN_SIZE (250*TS_SIZE)
#define DMX_BUFFER_SIZE (2 * 3008 * 62)
/* maximum number of pes pids */
#define MAXPIDS 64
/* tcp packet data size */
//#define PACKET_SIZE 1448
#define PACKET_SIZE 7*TS_SIZE
//unsigned char * buf;
extern CCam *cam0;
//int demuxfd[MAXPIDS];
static unsigned char exit_flag = 0;
static unsigned int writebuf_size = 0;
static unsigned char writebuf[PACKET_SIZE];
void packet_stdout (int fd, unsigned char * buf, int count, void * /*p*/)
CStreamInstance::CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &_pids)
{
unsigned int size;
unsigned char * bp;
ssize_t written;
//printf("packet_stdout count %d\n", count);
/* ensure, that there is always at least one complete
* packet inside of the send buffer */
while (writebuf_size + count >= PACKET_SIZE) {
/* how many bytes are to be sent from the input buffer? */
size = PACKET_SIZE - writebuf_size;
/* send buffer is not empty, so copy from
input buffer to get a complete packet */
if (writebuf_size) {
memmove(writebuf + writebuf_size, buf, size);
bp = writebuf;
printf("CStreamInstance:: new channel %llx fd %d\n", chid, clientfd);
fds.insert(clientfd);
pids = _pids;
channel_id = chid;
running = false;
dmx = NULL;
buf = NULL;
}
/* if send buffer is empty, then do not memcopy,
but send directly from input buffer */
else {
bp = buf;
}
/* write the packet, count the amount of really written bytes */
written = write(fd, bp, PACKET_SIZE);
/* exit on error */
if (written == -1) {
perror("write");
exit_flag = 1;
return;
}
/* if the packet could not be written completely, then
* how many bytes must be stored in the send buffer
* until the next packet is to be sent? */
writebuf_size = PACKET_SIZE - written;
/* move all bytes of the packet which were not sent
* to the beginning of the send buffer */
if (writebuf_size)
memmove(writebuf, bp + written, writebuf_size);
/* * advance in the input buffer */
buf += size;
/* * decrease the todo size */
count -= size;
}
/* if there are still some bytes left in the input buffer,
* then store them in the send buffer and increase send
* buffer size */
if (count) {
memmove(writebuf + writebuf_size, buf, count);
writebuf_size += count;
}
}
int open_incoming_port (int port)
CStreamInstance::~CStreamInstance()
{
struct sockaddr_in socketAddr;
int socketOptActive = 1;
int handle;
Stop();
Close();
}
if (!port)
return -1;
if ((handle = socket (AF_INET, SOCK_STREAM, 0)) < 0)
bool CStreamInstance::Start()
{
fprintf (stderr, "network port %u open: ", port);
perror ("socket");
return -1;
if (running)
return false;
buf = new unsigned char [IN_SIZE];
if (buf == NULL) {
perror("CStreamInstance::Start: buf");
return false;
}
running = true;
printf("CStreamInstance::Start: %llx\n", channel_id);
return (OpenThreads::Thread::start() == 0);
}
if (setsockopt (handle, SOL_SOCKET, SO_REUSEADDR, (const void *)&socketOptActive, sizeof (int)) < 0)
bool CStreamInstance::Stop()
{
fprintf (stderr, "network port %u open: error setsockopt\n", port);
close (handle);
return -1;
if (!running)
return false;
printf("CStreamInstance::Stop: %llx\n", channel_id);
running = false;
return (OpenThreads::Thread::join() == 0);
}
socketAddr.sin_family = AF_INET;
socketAddr.sin_port = htons (port);
socketAddr.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind (handle, (struct sockaddr *) &socketAddr, sizeof (socketAddr)) < 0)
bool CStreamInstance::Send(ssize_t r)
{
fprintf (stderr, "network port %u open: ", port);
perror ("bind");
close (handle);
return -1;
mutex.lock();
for (stream_fds_t::iterator it = fds.begin(); it != fds.end(); ++it) {
int ret, i = 10;
do {
ret = send(*it, buf, r, MSG_DONTWAIT);
#if 0
if (ret != r)
usleep(100);
#endif
} while ((ret != r) && (i-- > 0));
if (ret != r) {
if (r < 0)
perror("send");
printf("send err, fd %d: %d\n", *it, r);
}
}
mutex.unlock();
return true;
}
if (listen (handle, 5) < 0)
void CStreamInstance::Close()
{
fprintf (stderr, "network port %u open: ", port);
perror ("listen");
close (handle);
return -1;
}
return handle;
for (stream_fds_t::iterator fit = fds.begin(); fit != fds.end(); ++fit)
close(*fit);
fds.clear();
}
void * streamts_live_thread(void *data);
int streamts_stop;
void streamts_main_thread(void * /*data*/)
void CStreamInstance::AddClient(int clientfd)
{
struct sockaddr_in servaddr;
int clilen;
mutex.lock();
fds.insert(clientfd);
printf("CStreamInstance::AddClient: %d (count %d)\n", clientfd, fds.size());
mutex.unlock();
}
struct pollfd pfd[128];
int poll_cnt, tcnt;
int listenfd;
int connfd = -1;
int pollres;
int i;
pthread_t st = 0;
void CStreamInstance::RemoveClient(int clientfd)
{
mutex.lock();
fds.erase(clientfd);
close(clientfd);
printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size());
mutex.unlock();
}
printf("Starting STREAM thread keeper, tid %ld\n", syscall(__NR_gettid));
void CStreamInstance::run()
{
printf("CStreamInstance::run: %llx\n", channel_id);
listenfd = open_incoming_port(31339);
if(listenfd < 0) {
printf("Open incoming port failed\n");
return;
}
printf("listenfd %d\n", listenfd);
dmx = new cDemux(STREAM_DEMUX);//FIXME
clilen = sizeof (servaddr);
pfd[0].fd = listenfd;
pfd[0].events = (POLLIN | POLLPRI);
pfd[0].revents = 0;
tcnt = 1;
streamts_stop = 0;
dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE);
while (!streamts_stop) {
poll_cnt = tcnt;
//printf("polling, count= %d\n", poll_cnt);
pollres = poll (pfd, poll_cnt, 1000);
if (pollres < 0) {
perror("streamts_main_thread poll");
continue;
/* pids here cannot be empty */
stream_pids_t::iterator it = pids.begin();
printf("CStreamInstance::run: add pid %x\n", *it);
dmx->pesFilter(*it);
++it;
for (; it != pids.end(); ++it) {
printf("CStreamInstance::run: add pid %x\n", *it);
dmx->addPid(*it);
}
if(pollres == 0)
continue;
for (i = poll_cnt - 1; i >= 0; i--) {
if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) {
printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents);
if (pfd[i].fd == listenfd) {
connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen);
printf("new connection, fd %d\n", connfd);
if(connfd < 0) {
perror("accept");
continue;
#ifdef ENABLE_MULTI_CHANNEL
dmx->Start();//FIXME
#else
dmx->Start(true);//FIXME
#endif
CCamManager::getInstance()->Start(channel_id, CCamManager::STREAM);
while (running) {
ssize_t r = dmx->Read(buf, IN_SIZE, 100);
if(r > 0)
Send(r);
}
if(st != 0) {
printf("New connection, stopping stream thread\n");
exit_flag = 1;
pthread_join(st, NULL);
tcnt --;
CCamManager::getInstance()->Stop(channel_id, CCamManager::STREAM);
printf("CStreamInstance::run: exiting %llx (%d fds)\n", channel_id, fds.size());
Close();
delete dmx;
delete []buf;
}
pfd[tcnt].fd = connfd;
pfd[tcnt].events = POLLRDHUP | POLLHUP;
pfd[tcnt].revents = 0;
tcnt++;
exit_flag = 0;
pthread_create (&st, NULL, streamts_live_thread, (void *) connfd);
} else {
if (pfd[i].revents & (POLLHUP | POLLRDHUP)) {
connfd = -1;
printf("Client disconnected, stopping stream thread\n");
exit_flag = 1;
if(st)
pthread_join(st, NULL);
st = 0;
tcnt --;
bool CStreamInstance::HasFd(int fd)
{
if (fds.find(fd) != fds.end())
return true;
return false;
}
/************************************************************************/
CStreamManager *CStreamManager::sm = NULL;
CStreamManager::CStreamManager()
{
enabled = true;
running = false;
listenfd = -1;
port = 31339;
}
CStreamManager::~CStreamManager()
{
Stop();
}
CStreamManager * CStreamManager::getInstance()
{
if (sm == NULL)
sm = new CStreamManager();
return sm;
}
bool CStreamManager::Start(int _port)
{
if (running)
return false;
if (_port)
port = _port;
if (!Listen())
return false;
running = true;
return (OpenThreads::Thread::start() == 0);
}
printf("Stopping STREAM thread keeper\n");
bool CStreamManager::Stop()
{
if (!running)
return false;
running = false;
return (OpenThreads::Thread::join() == 0);
}
bool CStreamManager::SetPort(int newport)
{
bool ret = false;
if (port != newport) {
port = newport;
#if 0
Stop();
ret = Start(newport);
#endif
mutex.lock();
if (listenfd >= 0)
close(listenfd);
if(st != 0) {
printf("Stopping stream thread\n");
exit_flag = 1;
pthread_join(st, NULL);
close(connfd);
ret = Listen();
mutex.unlock();
}
return;
return ret;
}
void * streamts_live_thread(void *data)
bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid)
{
unsigned char * buf;
int pid;
int pids[MAXPIDS];
char cbuf[512];
char *bp;
int fd = (int) data;
unsigned char demuxfd_count = 0;
printf("Starting LIVE STREAM thread, fd %d\n", fd);
FILE * fp = fdopen(fd, "r+");
if(fp == NULL) {
perror("fdopen");
return 0;
return false;
}
writebuf_size = 0;
cbuf[0] = 0;
bp = &cbuf[0];
@@ -279,7 +286,7 @@ void * streamts_live_thread(void *data)
int res = read(fd, &c, 1);
if(res < 0) {
perror("read");
return 0;
return false;
}
if ((*bp++ = c) == '\n')
break;
@@ -288,7 +295,7 @@ void * streamts_live_thread(void *data)
*bp++ = 0;
bp = &cbuf[0];
printf("stream: got %s\n", cbuf);
printf("CStreamManager::Parse: got %s\n", cbuf);
/* send response to http client */
if (!strncmp(cbuf, "GET /", 5)) {
@@ -297,165 +304,256 @@ void * streamts_live_thread(void *data)
bp += 5;
} else {
printf("Received garbage\n");
return 0;
return false;
}
#ifndef ENABLE_MULTI_CHANNEL
/* parse stdin / url path, start dmx filters */
do {
int pid;
int res = sscanf(bp, "%x", &pid);
if(res == 1) {
printf("New pid: 0x%x\n", pid);
pids[demuxfd_count++] = pid;
pids.insert(pid);
}
}
while ((bp = strchr(bp, ',')) && (bp++) && (demuxfd_count < MAXPIDS));
while ((bp = strchr(bp, ',')) && (bp++));
#endif
if(demuxfd_count == 0) {
printf("No pids!\n");
chid = CZapit::getInstance()->GetCurrentChannelID();
CZapitChannel * channel = CZapit::getInstance()->GetCurrentChannel();
if(!channel)
return 0;
pids[demuxfd_count++] = 0;
pids[demuxfd_count++] = channel->getPmtPid();
pids[demuxfd_count++] = channel->getVideoPid();
for (int i = 0; i < channel->getAudioChannelCount(); i++)
pids[demuxfd_count++] = channel->getAudioChannel(i)->pid;
int mode = CNeutrinoApp::getInstance()->getMode();
if (mode == NeutrinoMessages::mode_standby && streams.empty()) {
printf("CStreamManager::Parse: wakeup zapit..\n");
g_Zapit->setStandby(false);
g_Zapit->getMode();
}
if(pids.empty()) {
#ifdef ENABLE_MULTI_CHANNEL
t_channel_id tmpid;
bp = &cbuf[5];
if (sscanf(bp, "id=%llx", &tmpid) == 1) {
printf("############################# channel_id %llx\n", tmpid);
buf = new unsigned char [IN_SIZE];
if (buf == NULL) {
perror("NEW");
return 0;
}
cDemux * dmx = new cDemux(STREAM_DEMUX);//FIXME
dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE);
dmx->pesFilter(pids[0]);
for(int i = 1; i < demuxfd_count; i++)
dmx->addPid(pids[i]);
dmx->Start(true);//FIXME
CCamManager::getInstance()->Start(CZapit::getInstance()->GetCurrentChannelID(), CCamManager::STREAM);
ssize_t r;
while (!exit_flag) {
r = dmx->Read(buf, IN_SIZE, 100);
if(r > 0)
packet_stdout(fd, buf, r, NULL);
}
printf("Exiting LIVE STREAM thread, fd %d\n", fd);
CCamManager::getInstance()->Stop(CZapit::getInstance()->GetCurrentChannelID(), CCamManager::STREAM);
delete dmx;
delete []buf;
close(fd);
return 0;
}
#if 0
//never used
void streamts_file_thread(void *data)
{
int dvrfd;
unsigned char * buf;
char cbuf[512];
char *bp;
unsigned char mode = 0;
char tsfile[IN_SIZE];
int tsfilelen = 0;
int fileslice = 0;
int i = 0;
int fd = (int) data;
buf = (unsigned char *) malloc(IN_SIZE);
if (buf == NULL) {
perror("malloc");
return;
}
bp = &cbuf[0];
/* read one line */
while (bp - &cbuf[0] < IN_SIZE) {
unsigned char c;
read(fd, &c, 1);
if ((*bp++ = c) == '\n')
break;
}
*bp++ = 0;
bp = &cbuf[0];
/* send response to http client */
if (!strncmp(cbuf, "GET /", 5)) {
printf("HTTP/1.1 200 OK\r\nServer: streamts (%s)\r\n\r\n", "ts" /*&argv[1][1]*/);
fflush(stdout);
bp += 5;
}
/* ts filename */
int j = 0;
i = 0;
while (i < (int) strlen(bp) - 3)
{
if ((bp[i] == '.') && (bp[i + 1] == 't') && (bp[i + 2] == 's'))
{
tsfile[j] = bp[i];
tsfile[j + 1] = bp[i + 1];
tsfile[j + 2] = bp[i + 2];
tsfile[j + 3] = '\0';
break;
}
else
if ((bp[i] == '%') && (bp[i + 1] == '2') && (bp[i + 2] == '0'))
{
tsfile[j++] = ' ';
i += 3;
}
else
tsfile[j++] = bp[i++];
}
tsfilelen = strlen(tsfile);
/* open ts file */
if ((dvrfd = open(tsfile, O_RDONLY)) < 0) {
free(buf);
return;
}
size_t pos;
ssize_t r;
while (!exit_flag) {
/* always read IN_SIZE bytes */
for (pos = 0; pos < IN_SIZE; pos += r) {
r = read(dvrfd, buf + pos, IN_SIZE - pos);
if (r == -1) {
/* Error */
exit_flag = 1;
break;
} else if (r == 0) {
/* End of file */
if (mode == 3) {
close(dvrfd);
sprintf(&tsfile[tsfilelen], ".%03d", ++fileslice);
dvrfd = open(tsfile, O_RDONLY);
}
if ((dvrfd == -1) || (mode != 3)) {
exit_flag = 1;
break;
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(tmpid);
if (tmpchan && (tmpid != chid) && SAME_TRANSPONDER(tmpid, chid)) {
printf("############################# channel_id %llx -> zap\n", tmpid);
bool ret = g_Zapit->zapTo_record(tmpid) > 0;
if (ret) {
channel = tmpchan;
chid = tmpid;
}
}
}
packet_stdout(fd, buf, pos, NULL);
}
close(dvrfd);
free(buf);
return;
if(CRecordManager::getInstance()->RecordingStatus(tmpid)) {
printf("CStreamManager::Parse: channel %llx recorded, aborting..\n", tmpid);
return false;
}
#endif
printf("CStreamManager::Parse: no pids in url, using channel %llx pids\n", chid);
if(!channel)
return false;
//pids.insert(0);
//pids.insert(channel->getPmtPid());
pids.insert(channel->getVideoPid());
for (int i = 0; i < channel->getAudioChannelCount(); i++)
pids.insert(channel->getAudioChannel(i)->pid);
}
CGenPsi psi;
for (stream_pids_t::iterator it = pids.begin(); it != pids.end(); ++it) {
if (*it == channel->getVideoPid()) {
printf("CStreamManager::Parse: genpsi vpid %x (%d)\n", *it, channel->type);
psi.addPid(*it, channel->type ? EN_TYPE_AVC : EN_TYPE_VIDEO, 0);
} else {
for (int i = 0; i < channel->getAudioChannelCount(); i++) {
if (*it == channel->getAudioChannel(i)->pid) {
CZapitAudioChannel::ZapitAudioChannelType atype = channel->getAudioChannel(i)->audioChannelType;
printf("CStreamManager::Parse: genpsi apid %x (%d)\n", *it, atype);
psi.addPid(*it, EN_TYPE_AUDIO, atype);
}
}
}
}
psi.genpsi(fd);
return !pids.empty();
}
void CStreamManager::run()
{
struct sockaddr_in servaddr;
int clilen = sizeof(servaddr);;
struct pollfd pfd[128];
int poll_cnt;
printf("Starting STREAM thread keeper, tid %ld\n", syscall(__NR_gettid));
while (running) {
mutex.lock();
pfd[0].fd = listenfd;
pfd[0].events = (POLLIN | POLLPRI);
pfd[0].revents = 0;
poll_cnt = 1;
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) {
stream_fds_t fds = it->second->GetFds();
for (stream_fds_t::iterator fit = fds.begin(); fit != fds.end(); ++fit) {
pfd[poll_cnt].fd = *fit;
pfd[poll_cnt].events = POLLRDHUP | POLLHUP;
pfd[poll_cnt].revents = 0;
poll_cnt++;
}
}
mutex.unlock();
//printf("polling, count= %d\n", poll_cnt);
int pollres = poll (pfd, poll_cnt, 1000);
if (pollres < 0) {
perror("CStreamManager::run(): poll");
continue;
}
if(pollres == 0)
continue;
for (int i = poll_cnt - 1; i >= 0; i--) {
if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) {
printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents);
if (pfd[i].fd == listenfd) {
int connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen);
printf("CStreamManager::run(): connection, fd %d\n", connfd);
if(connfd < 0) {
perror("CStreamManager::run(): accept");
continue;
}
stream_pids_t pids;
t_channel_id channel_id;
if (Parse(connfd, pids, channel_id)) {
mutex.lock();
streammap_iterator_t it = streams.find(channel_id);
if (it != streams.end()) {
it->second->AddClient(connfd);
} else {
CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids);
if (stream->Start())
streams.insert(streammap_pair_t(channel_id, stream));
else
delete stream;
}
mutex.unlock();
} else {
close(connfd);
}
} else {
if (pfd[i].revents & (POLLHUP | POLLRDHUP)) {
printf("CStreamManager::run(): POLLHUP, fd %d\n", pfd[i].fd);
mutex.lock();
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) {
if (it->second->HasFd(pfd[i].fd)) {
CStreamInstance *stream = it->second;
stream->RemoveClient(pfd[i].fd);
if (stream->GetFds().empty()) {
streams.erase(stream->GetChannelId());
delete stream;
}
break;
}
}
mutex.unlock();
}
}
}
}
}
printf("CStreamManager::run: stopping...\n");
close(listenfd);
listenfd = -1;
StopAll();
}
bool CStreamManager::StopAll()
{
bool ret = !streams.empty();
for (streammap_iterator_t it = streams.begin(); it != streams.end(); ++it) {
it->second->Stop();
delete it->second;
}
streams.clear();
return ret;
}
bool CStreamManager::StopStream(t_channel_id channel_id)
{
bool ret = false;
mutex.lock();
if (channel_id) {
streammap_iterator_t it = streams.find(channel_id);
if (it != streams.end()) {
delete it->second;
streams.erase(channel_id);
ret = true;
}
} else {
ret = StopAll();
}
mutex.unlock();
return ret;
}
bool CStreamManager::StreamStatus(t_channel_id channel_id)
{
bool ret;
mutex.lock();
if (channel_id)
ret = (streams.find(channel_id) != streams.end());
else
ret = !streams.empty();
mutex.unlock();
return ret;
}
bool CStreamManager::Listen()
{
struct sockaddr_in socketAddr;
int socketOptActive = 1;
int sendsize = 10*IN_SIZE;
unsigned int m = sizeof(sendsize);
if ((listenfd = socket (AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf (stderr, "network port %u open: ", port);
perror ("socket");
return false;
}
if (setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&socketOptActive, sizeof (int)) < 0) {
fprintf (stderr, "network port %u open: error setsockopt\n", port);
perror ("setsockopt");
goto _error;
}
socketAddr.sin_family = AF_INET;
socketAddr.sin_port = htons (port);
socketAddr.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind (listenfd, (struct sockaddr *) &socketAddr, sizeof (socketAddr)) < 0) {
fprintf (stderr, "network port %u open: ", port);
perror ("bind");
goto _error;
}
if (listen (listenfd, 5) < 0) {
fprintf (stderr, "network port %u open: ", port);
perror ("listen");
goto _error;
}
#if 1
setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m);
sendsize = 0;
getsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, &m);
printf("CStreamManager::Listen: on %d, fd %d (%d)\n", port, listenfd, sendsize);
#endif
return true;
_error:
close (listenfd);
return false;
}

96
src/driver/streamts.h Normal file
View File

@@ -0,0 +1,96 @@
/*
Neutrino-GUI - DBoxII-Project
Copyright (C) 2011-2012 CoolStream International Ltd
License: GPLv2
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation;
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#ifndef __streamts_h__
#define __streamts_h__
#include <OpenThreads/Thread>
#include <OpenThreads/Condition>
#include <dmx.h>
#include <zapit/client/zapittypes.h>
#include <set>
#include <map>
typedef std::set<int> stream_pids_t;
typedef std::set<int> stream_fds_t;
class CStreamInstance : public OpenThreads::Thread
{
private:
bool running;
cDemux * dmx;
OpenThreads::Mutex mutex;
unsigned char * buf;
t_channel_id channel_id;
stream_pids_t pids;
stream_fds_t fds;
bool Send(ssize_t r);
void Close();
void run();
public:
CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids);
~CStreamInstance();
bool Start();
bool Stop();
void AddClient(int clientfd);
void RemoveClient(int clientfd);
bool HasFd(int fd);
stream_fds_t & GetFds() { return fds; }
t_channel_id GetChannelId() { return channel_id; }
};
typedef std::pair<t_channel_id, CStreamInstance*> streammap_pair_t;
typedef std::map<t_channel_id, CStreamInstance*> streammap_t;
typedef streammap_t::iterator streammap_iterator_t;
class CStreamManager : public OpenThreads::Thread
{
private:
bool enabled;
bool running;
int listenfd;
int port;
OpenThreads::Mutex mutex;
static CStreamManager * sm;
streammap_t streams;
bool Listen();
bool Parse(int fd, stream_pids_t &pids, t_channel_id &chid);
bool StopAll();
void run();
CStreamManager();
public:
~CStreamManager();
static CStreamManager * getInstance();
bool Start(int port = 0);
bool Stop();
bool StopStream(t_channel_id channel_id = 0);
bool StreamStatus(t_channel_id channel_id = 0);
bool SetPort(int newport);
int GetPort() { return port; }
};
#endif