From 450fc1966a38b628d1e6bf719abc38f9191b8f3a Mon Sep 17 00:00:00 2001 From: martii Date: Sat, 21 Jul 2012 16:32:00 +0200 Subject: [PATCH] libspark/record.cpp: user configurable buffer sizes --- libspark/record.cpp | 61 ++++++++++++++++++++++++++++++++++--------- libspark/record_lib.h | 11 ++++++++ 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/libspark/record.cpp b/libspark/record.cpp index c6aaf8e..30d7180 100644 --- a/libspark/record.cpp +++ b/libspark/record.cpp @@ -3,6 +3,9 @@ #include #include #include +#ifdef MARTII +#include +#endif #include #include #include @@ -22,13 +25,23 @@ void *execute_record_thread(void *c) return NULL; } +#ifdef MARTII +cRecord::cRecord(int /*num*/, int bs_dmx, int bs) +#else cRecord::cRecord(int /*num*/) +#endif { lt_info("%s\n", __func__); dmx = NULL; record_thread_running = false; file_fd = -1; exit_flag = RECORD_STOPPED; +#ifdef MARTII + bufsize = bs; + bufsize_dmx = bs_dmx; + failureCallback = NULL; + failureData = NULL; +#endif } cRecord::~cRecord() @@ -53,11 +66,6 @@ void cRecord::Close(void) } #endif -#ifdef MARTII -#define TS_SIZE 188 -#define READSIZE (100 * 188 * 1024) -#define BUFSIZE READSIZE -#endif bool cRecord::Start(int fd, unsigned short vpid, unsigned short * apids, int numpids) { lt_info("%s: fd %d, vpid 0x%03x\n", __func__, fd, vpid); @@ -67,7 +75,7 @@ bool cRecord::Start(int fd, unsigned short vpid, unsigned short * apids, int num dmx = new cDemux(1); #ifdef MARTII - dmx->Open(DMX_TP_CHANNEL, NULL, READSIZE); + dmx->Open(DMX_TP_CHANNEL, NULL, bufsize_dmx); #else dmx->Open(DMX_TP_CHANNEL, NULL, 512*1024); #endif @@ -180,7 +188,13 @@ bool cRecord::AddPid(unsigned short pid) void cRecord::RecordThread() { lt_info("%s: begin\n", __func__); -#ifndef MARTII +#ifdef MARTII + char threadname[17]; + strncpy(threadname, "RecordThread", sizeof(threadname)); + threadname[16] = 0; + prctl (PR_SET_NAME, (unsigned long)&threadname); + int readsize = bufsize/16; +#else #define BUFSIZE (1 << 20) /* 1MB */ #define READSIZE (BUFSIZE / 16) #endif @@ -190,11 +204,21 @@ void cRecord::RecordThread() uint8_t *buf; struct aiocb a; +#ifdef MARTII + buf = (uint8_t *)malloc(bufsize); +#else buf = (uint8_t *)malloc(BUFSIZE); +#endif if (!buf) { exit_flag = RECORD_FAILED_MEMORY; lt_info("%s: unable to allocate buffer! (out of memory)\n", __func__); +#ifdef MARTII + if (failureCallback) + failureCallback(failureData); + lt_info("%s: end\n", __func__); + pthread_exit(NULL); +#endif } int val = fcntl(file_fd, F_GETFL); @@ -207,27 +231,36 @@ void cRecord::RecordThread() dmx->Start(); #ifdef MARTII - int dmxfd = dmx->getFD(); - fcntl(dmxfd, F_SETFL, fcntl(dmxfd, F_GETFL) | O_NONBLOCK); int overflow_count = 0; #endif bool overflow = false; while (exit_flag == RECORD_RUNNING) { +#ifdef MARTII + if (buf_pos < bufsize) +#else if (buf_pos < BUFSIZE) +#endif { #ifdef MARTII if (overflow_count) { lt_info("%s: Overflow cleared after %d iterations\n", __func__, overflow_count); overflow_count = 0; } -#endif + int toread = bufsize - buf_pos; + if (toread > readsize) + toread = readsize; + r = dmx->Read(buf + buf_pos, toread, 50); + lt_debug("%s: buf_pos %6d r %6d / %6d\n", __func__, + buf_pos, (int)r, bufsize - buf_pos); +#else int toread = BUFSIZE - buf_pos; if (toread > READSIZE) toread = READSIZE; r = dmx->Read(buf + buf_pos, toread, 50); lt_debug("%s: buf_pos %6d r %6d / %6d\n", __func__, buf_pos, (int)r, BUFSIZE - buf_pos); +#endif if (r < 0) { if (errno != EAGAIN && (errno != EOVERFLOW || !overflow)) @@ -264,7 +297,7 @@ void cRecord::RecordThread() if (r == EINPROGRESS) { #ifdef MARTII - lt_debug("%s: aio in progress, free: %d\n", __func__, BUFSIZE - buf_pos); + lt_debug("%s: aio in progress, free: %d\n", __func__, bufsize - buf_pos); #else lt_debug("%s: aio in progress...\n", __func__); if (overflow) /* rate-limit the message */ @@ -290,7 +323,7 @@ void cRecord::RecordThread() } #ifdef MARTII else - lt_debug("%s: aio_return = %d, free: %d\n", __func__, r, BUFSIZE - buf_pos); + lt_debug("%s: aio_return = %d, free: %d\n", __func__, r, bufsize - buf_pos); #else lt_debug("%s: buf_pos %6d w %6d\n", __func__, buf_pos, (int)queued); #endif @@ -362,6 +395,10 @@ void cRecord::RecordThread() printf("[stream2file]: pthreads exit code: %i, dir: '%s', filename: '%s' myfilename: '%s'\n", exit_flag, s.dir, s.filename, myfilename); #endif +#ifdef MARTII + if ((exit_flag != RECORD_STOPPED) && failureCallback) + failureCallback(failureData); +#endif lt_info("%s: end\n", __func__); pthread_exit(NULL); } diff --git a/libspark/record_lib.h b/libspark/record_lib.h index 8b882d6..7628068 100644 --- a/libspark/record_lib.h +++ b/libspark/record_lib.h @@ -21,8 +21,19 @@ class cRecord pthread_t record_thread; bool record_thread_running; record_state_t exit_flag; +#ifdef MARTII + int bufsize; + int bufsize_dmx; + void (*failureCallback)(void *); + void *failureData; +#endif public: +#ifdef MARTII + cRecord(int num = 0, int bs_dmx = 100 * 188 * 1024, int bs = 100 * 188 * 1024); + void setFailureCallback(void (*f)(void *), void *d) { failureCallback = f; failureData = d; } +#else cRecord(int num = 0); +#endif ~cRecord(); bool Open();