our current experimental Neutrino branch

git-svn-id: file:///home/bas/coolstream_public_svn/THIRDPARTY/applications/neutrino-experimental@27 e54a6e83-5905-42d5-8d5c-058d10e6a962
This commit is contained in:
mrcolor
2009-12-08 11:05:11 +00:00
commit bc5bd4154e
876 changed files with 193775 additions and 0 deletions

487
src/driver/streamts.cpp Normal file
View File

@@ -0,0 +1,487 @@
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netdb.h>
#include <poll.h>
#include <syscall.h>
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <ctype.h>
#include <string.h>
#include <dmx_cs.h>
#include <zapit/cam.h>
#include <zapit/channel.h>
extern CZapitChannel *channel;
extern CCam *cam0;
#define TS_SIZE 188
//#define IN_SIZE (2048 * TS_SIZE)
#define IN_SIZE (TS_SIZE * 362)
#define DMX_BUFFER_SIZE (3008 * 62)
/* maximum number of pes pids */
#define MAXPIDS 64
/* tcp packet data size */
//#define PACKET_SIZE 1448
#define PACKET_SIZE 7*TS_SIZE
//unsigned char * buf;
int demuxfd[MAXPIDS];
static unsigned char exit_flag = 0;
static unsigned int writebuf_size = 0;
static unsigned char writebuf[PACKET_SIZE];
static int
sync_byte_offset (const unsigned char * buf, const unsigned int len)
{
unsigned int i;
for (i = 0; i < len; i++)
if (buf[i] == 0x47)
return i;
return -1;
}
void packet_stdout (int fd, unsigned char * buf, int count, void * p)
{
unsigned int size;
unsigned char * bp;
ssize_t written;
//printf("packet_stdout count %d\n", count);
/* ensure, that there is always at least one complete
* packet inside of the send buffer */
while (writebuf_size + count >= PACKET_SIZE) {
/* how many bytes are to be sent from the input buffer? */
size = PACKET_SIZE - writebuf_size;
/* send buffer is not empty, so copy from
input buffer to get a complete packet */
if (writebuf_size) {
memcpy(writebuf + writebuf_size, buf, size);
bp = writebuf;
}
/* if send buffer is empty, then do not memcopy,
but send directly from input buffer */
else {
bp = buf;
}
/* write the packet, count the amount of really written bytes */
written = write(fd, bp, PACKET_SIZE);
/* exit on error */
if (written == -1) {
perror("write");
exit_flag = 1;
return;
}
/* if the packet could not be written completely, then
* how many bytes must be stored in the send buffer
* until the next packet is to be sent? */
writebuf_size = PACKET_SIZE - written;
/* move all bytes of the packet which were not sent
* to the beginning of the send buffer */
if (writebuf_size)
memmove(writebuf, bp + written, writebuf_size);
/* * advance in the input buffer */
buf += size;
/* * decrease the todo size */
count -= size;
}
/* if there are still some bytes left in the input buffer,
* then store them in the send buffer and increase send
* buffer size */
if (count) {
memcpy(writebuf + writebuf_size, buf, count);
writebuf_size += count;
}
}
int open_incoming_port (int port)
{
struct sockaddr_in socketAddr;
int socketOptActive = 1;
int handle;
if (!port)
return -1;
if ((handle = socket (AF_INET, SOCK_STREAM, 0)) < 0)
{
fprintf (stderr, "network port %u open: ", port);
perror ("socket");
return -1;
}
if (setsockopt (handle, SOL_SOCKET, SO_REUSEADDR, (const void *)&socketOptActive, sizeof (int)) < 0)
{
fprintf (stderr, "network port %u open: error setsockopt\n", port);
close (handle);
return -1;
}
socketAddr.sin_family = AF_INET;
socketAddr.sin_port = htons (port);
socketAddr.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind (handle, (struct sockaddr *) &socketAddr, sizeof (socketAddr)) < 0)
{
fprintf (stderr, "network port %u open: ", port);
perror ("bind");
close (handle);
return -1;
}
if (listen (handle, 5) < 0)
{
fprintf (stderr, "network port %u open: ", port);
perror ("listen");
close (handle);
return -1;
}
return handle;
}
void * streamts_live_thread(void *data);
int streamts_stop;
void streamts_main_thread(void *data)
{
struct sockaddr_in servaddr;
int clilen;
struct pollfd pfd[128];
int poll_cnt, tcnt;
int listenfd;
int connfd = -1;
int pollres;
int i;
pthread_t st = 0;
printf("Starting STREAM thread keeper, tid %ld\n", syscall(__NR_gettid));
listenfd = open_incoming_port(31339);
if(listenfd < 0) {
printf("Open incoming port failed\n");
return;
}
printf("listenfd %d\n", listenfd);
clilen = sizeof (servaddr);
pfd[0].fd = listenfd;
pfd[0].events = (POLLIN | POLLPRI);
pfd[0].revents = 0;
tcnt = 1;
streamts_stop = 0;
while (!streamts_stop) {
poll_cnt = tcnt;
//printf("polling, count= %d\n", poll_cnt);
pollres = poll (pfd, poll_cnt, 1000);
if (pollres < 0) {
perror("poll");
continue;
}
if(pollres == 0)
continue;
for (i = poll_cnt - 1; i >= 0; i--) {
if (pfd[i].revents & (POLLIN | POLLPRI | POLLHUP | POLLRDHUP)) {
printf("fd %d has events %x\n", pfd[i].fd, pfd[i].revents);
if (pfd[i].fd == listenfd) {
if(connfd >= 0)
close(connfd);
connfd = accept (listenfd, (struct sockaddr *) &servaddr, (socklen_t *) & clilen);
printf("new connection, fd %d\n", connfd);
if(connfd < 0) {
perror("accept");
continue;
}
if(st != 0) {
printf("New connection, stopping stream thread\n");
exit_flag = 1;
pthread_join(st, NULL);
tcnt --;
}
pfd[tcnt].fd = connfd;
pfd[tcnt].events = POLLRDHUP | POLLHUP;
pfd[tcnt].revents = 0;
tcnt++;
exit_flag = 0;
pthread_create (&st, NULL, streamts_live_thread, (void *) connfd);
} else {
if (pfd[i].revents & (POLLHUP | POLLRDHUP)) {
connfd = -1;
printf("Client disconnected, stopping stream thread\n");
exit_flag = 1;
if(st)
pthread_join(st, NULL);
st = 0;
tcnt --;
}
}
}
}
}
printf("Stopping STREAM thread keeper\n");
close(listenfd);
if(st != 0) {
printf("Stopping stream thread\n");
exit_flag = 1;
pthread_join(st, NULL);
close(connfd);
}
return;
}
void * streamts_live_thread(void *data)
{
unsigned char * buf;
int pid;
int pids[MAXPIDS];
char cbuf[512];
char *bp;
int fd = (int) data;
FILE * fp;
unsigned char demuxfd_count = 0;
printf("Starting LIVE STREAM thread, fd %d\n", fd);
fp = fdopen(fd, "r+");
if(fp == NULL) {
perror("fdopen");
return 0;
}
writebuf_size = 0;
cbuf[0] = 0;
bp = &cbuf[0];
/* read one line */
while (bp - &cbuf[0] < (int) sizeof(cbuf)) {
unsigned char c;
int res = read(fd, &c, 1);
if(res < 0) {
perror("read");
return 0;
}
if ((*bp++ = c) == '\n')
break;
}
*bp++ = 0;
bp = &cbuf[0];
printf("stream: got %s\n", cbuf);
/* send response to http client */
if (!strncmp(cbuf, "GET /", 5)) {
fprintf(fp, "HTTP/1.1 200 OK\r\nServer: streamts (%s)\r\n\r\n", "ts" /*&argv[1][1]*/);
fflush(fp);
bp += 5;
} else {
printf("Received garbage\n");
return 0;
}
/* parse stdin / url path, start dmx filters */
do {
int res = sscanf(bp, "%x", &pid);
if(res == 1) {
printf("New pid: 0x%x\n", pid);
pids[demuxfd_count++] = pid;
}
}
while ((bp = strchr(bp, ',')) && (bp++) && (demuxfd_count < MAXPIDS));
if(demuxfd_count == 0) {
printf("No pids!\n");
return 0;
}
buf = (unsigned char *) malloc(IN_SIZE);
if (buf == NULL) {
perror("malloc");
return 0;
}
cDemux * dmx = new cDemux(1);
dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE);
dmx->pesFilter(pids[0]);
for(int i = 1; i < demuxfd_count; i++)
dmx->addPid(pids[i]);
dmx->Start();
if(channel)
cam0->setCaPmt(channel->getCaPmt(), 0, 3, true); // demux 0 + 1, update
size_t pos;
ssize_t r;
ssize_t todo;
int offset;
while (!exit_flag) {
todo = IN_SIZE;
pos = 0;
while ((!exit_flag) && (todo)) {
r = dmx->Read(buf+pos, todo, 100);
if (r > 0) {
//printf("Read: %d\n", r);
pos += r;
todo -= r;
} else
usleep(1000);
}
if(!exit_flag) {
//packet_stdout(fd, buf, IN_SIZE, NULL);
/* make sure to start with a ts header */
offset = sync_byte_offset(buf, IN_SIZE);
if (offset == -1)
continue;
packet_stdout(fd, buf + offset, IN_SIZE - offset, NULL);
}
}
printf("Exiting LIVE STREAM thread, fd %d\n", fd);
if(channel)
cam0->setCaPmt(channel->getCaPmt(), 0, 1, true); // demux 0, update
delete dmx;
free(buf);
close(fd);
return 0;
}
void streamts_file_thread(void *data)
{
int dvrfd;
unsigned char * buf;
char cbuf[512];
char *bp;
unsigned char mode = 0;
char tsfile[IN_SIZE];
int tsfilelen = 0;
int fileslice = 0;
int i = 0;
int fd = (int) data;
buf = (unsigned char *) malloc(IN_SIZE);
if (buf == NULL) {
perror("malloc");
return;
}
bp = &cbuf[0];
/* read one line */
while (bp - &cbuf[0] < IN_SIZE) {
unsigned char c;
read(fd, &c, 1);
if ((*bp++ = c) == '\n')
break;
}
*bp++ = 0;
bp = &cbuf[0];
/* send response to http client */
if (!strncmp(cbuf, "GET /", 5)) {
printf("HTTP/1.1 200 OK\r\nServer: streamts (%s)\r\n\r\n", "ts" /*&argv[1][1]*/);
fflush(stdout);
bp += 5;
}
/* ts filename */
int j = 0;
i = 0;
while (i < (int) strlen(bp) - 3)
{
if ((bp[i] == '.') && (bp[i + 1] == 't') && (bp[i + 2] == 's'))
{
tsfile[j] = bp[i];
tsfile[j + 1] = bp[i + 1];
tsfile[j + 2] = bp[i + 2];
tsfile[j + 3] = '\0';
break;
}
else
if ((bp[i] == '%') && (bp[i + 1] == '2') && (bp[i + 2] == '0'))
{
tsfile[j++] = ' ';
i += 3;
}
else
tsfile[j++] = bp[i++];
}
tsfilelen = strlen(tsfile);
/* open ts file */
if ((dvrfd = open(tsfile, O_RDONLY)) < 0) {
free(buf);
return;
}
size_t pos;
ssize_t r;
while (!exit_flag) {
/* always read IN_SIZE bytes */
for (pos = 0; pos < IN_SIZE; pos += r) {
r = read(dvrfd, buf + pos, IN_SIZE - pos);
if (r == -1) {
/* Error */
exit_flag = 1;
break;
} else if (r == 0) {
/* End of file */
if (mode == 3) {
close(dvrfd);
sprintf(&tsfile[tsfilelen], ".%03d", ++fileslice);
dvrfd = open(tsfile, O_RDONLY);
}
if ((dvrfd == -1) || (mode != 3)) {
exit_flag = 1;
break;
}
}
}
packet_stdout(fd, buf, pos, NULL);
}
close(dvrfd);
free(buf);
return;
}