Linux DVB output for STBs based on Broadcom - replace active pooling by select (ready to write)

Signed-off-by: max_10 <max_10@gmx.de>
This commit is contained in:
samsamsam
2018-04-02 23:59:24 +02:00
committed by Thilo Graf
parent 2878ec69bc
commit 93f61a89f6
6 changed files with 178 additions and 22 deletions

View File

@@ -25,6 +25,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <memory.h>
#include <string.h>
#include <errno.h>
@@ -98,6 +99,7 @@ static uint32_t bufferingDataSize = 0;
static int videofd = -1;
static int audiofd = -1;
static int g_pfd[2] = {-1, -1};
/* ***************************** */
/* Prototypes */
@@ -106,14 +108,46 @@ static int audiofd = -1;
/* ***************************** */
/* MISC Functions */
/* ***************************** */
static void WriteWakeUp()
{
write(g_pfd[1], "x", 1);
}
/* **************************** */
/* Worker Thread */
/* **************************** */
static void LinuxDvbBuffThread(Context_t *context)
{
int flags = 0;
static BufferingNode_t *nodePtr = NULL;
buff_printf(20, "ENTER\n");
if (pipe(g_pfd) == -1)
{
buff_err("critical error\n");
}
/* Make read and write ends of pipe nonblocking */
if ((flags = fcntl(g_pfd[0], F_GETFL)) == -1)
{
buff_err("critical error\n");
}
/* Make read end nonblocking */
flags |= O_NONBLOCK;
if (fcntl(g_pfd[0], F_SETFL, flags) == -1)
{
buff_err("critical error\n");
}
if ((flags = fcntl(g_pfd[1], F_GETFL)) == -1)
{
buff_err("critical error\n");
}
/* Make write end nonblocking */
flags |= O_NONBLOCK;
if (fcntl(g_pfd[1], F_SETFL, flags) == -1)
{
buff_err("critical error\n");
}
PlaybackDieNowRegisterCallback(WriteWakeUp);
while (0 == PlaybackDieNow(0))
{
pthread_mutex_lock(&bufferingMtx);
@@ -165,9 +199,9 @@ static void LinuxDvbBuffThread(Context_t *context)
/* Write data to valid output */
uint8_t *dataPtr = (uint8_t *)nodePtr + sizeof(BufferingNode_t);
int fd = nodePtr->dataType == OUTPUT_VIDEO ? videofd : audiofd;
if (0 != write_with_retry(fd, dataPtr, nodePtr->dataSize))
if (0 != WriteWithRetry(context, g_pfd[0], fd, dataPtr, nodePtr->dataSize))
{
printf("Something is WRONG\n");
buff_err("Something is WRONG\n");
}
}
}
@@ -178,6 +212,10 @@ static void LinuxDvbBuffThread(Context_t *context)
buff_printf(20, "EXIT\n");
hasBufferingThreadStarted = false;
close(g_pfd[0]);
close(g_pfd[1]);
g_pfd[0] = -1;
g_pfd[1] = -1;
}
int32_t WriteSetBufferingSize(const uint32_t bufferSize)
@@ -240,7 +278,7 @@ int32_t LinuxDvbBuffOpen(Context_t *context, char *type, int outfd)
return ret;
}
int32_t LinuxDvbBuffClose(Context_t *context)
int32_t LinuxDvbBuffClose(Context_t *context __attribute__((unused)))
{
int32_t ret = 0;
@@ -251,6 +289,10 @@ int32_t LinuxDvbBuffClose(Context_t *context)
if (hasBufferingThreadStarted)
{
struct timespec max_wait = {0, 0};
/* WakeUp if we are waiting in the write */
WriteWakeUp();
pthread_mutex_lock(&bufferingMtx);
/* wait for thread end */
clock_gettime(CLOCK_REALTIME, &max_wait);
@@ -278,10 +320,14 @@ int32_t LinuxDvbBuffClose(Context_t *context)
return ret;
}
int32_t LinuxDvbBuffFlush(Context_t *context)
int32_t LinuxDvbBuffFlush(Context_t *context __attribute__((unused)))
{
static BufferingNode_t *nodePtr = NULL;
buff_printf(40, "ENTER bufferingQueueHead[%p]\n", bufferingQueueHead);
/* signal if we are waiting for write to DVB decoders */
WriteWakeUp();
pthread_mutex_lock(&bufferingMtx);
while (bufferingQueueHead)
{

View File

@@ -29,6 +29,7 @@
#include "misc.h"
#include "writer.h"
#include "common.h"
/* ***************************** */
/* Makros/Constants */
@@ -100,6 +101,69 @@ static Writer_t *AvailableWriter[] =
/* Functions */
/* ***************************** */
ssize_t WriteWithRetry(Context_t *context, int pipefd, int fd, const void *buf, size_t size)
{
fd_set rfds;
fd_set wfds;
ssize_t ret;
int retval = -1;
int maxFd = pipefd > fd ? pipefd : fd;
while(size > 0 && 0 == PlaybackDieNow(0) && !context->playback->isSeeking)
{
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_SET(pipefd, &rfds);
FD_SET(fd, &wfds);
retval = select(maxFd + 1, &rfds, &wfds, NULL, NULL);
if (retval < 0)
{
break;
}
if(FD_ISSET(pipefd, &rfds))
{
char tmp;
/* flush pipefd pipe */
while(1 == read(pipefd, &tmp, 1));
break;
}
if(FD_ISSET(fd, &wfds))
{
ret = write(fd, buf, size);
if (ret < 0)
{
switch(errno)
{
case EINTR:
case EAGAIN:
continue;
default:
retval = -3;
break;
}
if (retval < 0)
{
break;
}
}
if (ret < 0)
{
return ret;
}
size -= ret;
buf += ret;
}
}
return 0;
}
ssize_t write_with_retry(int fd, const void *buf, size_t size)
{
ssize_t ret;