From 1df6cfc16de6ce0250b05aeb13c02ecd1a6f6162 Mon Sep 17 00:00:00 2001 From: Stefan Seyfried Date: Sun, 22 Feb 2015 22:21:33 +0100 Subject: [PATCH] libspark: port a prior version of martii's cRecord This imports most of the changes of martii's cRecord https://gitorious.org/neutrino-hd/martiis-libstb-hal, commmit 23bea6147355e94c113ef5380c5cc8fc9b2e43a3 --- libspark/record.cpp | 48 ++++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/libspark/record.cpp b/libspark/record.cpp index 3ec7b0e..11d3eec 100644 --- a/libspark/record.cpp +++ b/libspark/record.cpp @@ -109,7 +109,7 @@ bool cRecord::Start(int fd, unsigned short vpid, unsigned short *apids, int nump if (!pd->dmx) pd->dmx = new cDemux(pd->dmx_num); - pd->dmx->Open(DMX_TP_CHANNEL, NULL, 512*1024); + pd->dmx->Open(DMX_TP_CHANNEL, NULL, 2*1024*1024); pd->dmx->pesFilter(vpid); for (i = 0; i < numpids; i++) @@ -221,14 +221,17 @@ bool cRecord::AddPid(unsigned short pid) void RecData::RecordThread() { lt_info("%s: begin\n", __func__); -#define BUFSIZE (1 << 20) /* 1MB */ + hal_set_threadname("hal:record"); +#define BUFSIZE (2 << 20) /* 2MB */ #define READSIZE (BUFSIZE / 16) + const int bufsize = BUFSIZE; + const int readsize = READSIZE; int buf_pos = 0; int queued = 0; uint8_t *buf; struct aiocb a; - buf = (uint8_t *)malloc(BUFSIZE); + buf = (uint8_t *)malloc(bufsize); if (!buf) { exit_flag = RECORD_FAILED_MEMORY; @@ -244,18 +247,23 @@ void RecData::RecordThread() a.aio_sigevent.sigev_notify = SIGEV_NONE; dmx->Start(); + int overflow_count = 0; bool overflow = false; int r = 0; while (exit_flag == RECORD_RUNNING) { - if (buf_pos < BUFSIZE) + if (buf_pos < bufsize) { - int toread = BUFSIZE - buf_pos; - if (toread > READSIZE) - toread = READSIZE; + if (overflow_count) { + lt_info("%s: Overflow cleared after %d iterations\n", __func__, overflow_count); + overflow_count = 0; + } + int toread = bufsize - buf_pos; + if (toread > readsize) + toread = readsize; ssize_t s = dmx->Read(buf + buf_pos, toread, 50); lt_debug("%s: buf_pos %6d s %6d / %6d\n", __func__, - buf_pos, (int)s, BUFSIZE - buf_pos); + buf_pos, (int)s, bufsize - buf_pos); if (s < 0) { if (errno != EAGAIN && (errno != EOVERFLOW || !overflow)) @@ -264,7 +272,6 @@ void RecData::RecordThread() exit_flag = RECORD_FAILED_READ; break; } - lt_info("%s: %s\n", __func__, errno == EOVERFLOW ? "EOVERFLOW" : "EAGAIN"); } else { @@ -274,24 +281,28 @@ void RecData::RecordThread() } else { + if (!overflow) + overflow_count = 0; overflow = true; - lt_info("%s: buffer full! Overflow?\n", __func__); + if (!(overflow_count % 10)) + lt_info("%s: buffer full! Overflow? (%d)\n", __func__, ++overflow_count); } r = aio_error(&a); if (r == EINPROGRESS) { - lt_debug("%s: aio in progress...\n", __func__); - if (overflow) /* rate-limit the message */ - usleep(100000); + lt_debug("%s: aio in progress, free: %d\n", __func__, bufsize - buf_pos); continue; } - if (r) + // not calling aio_return causes a memory leak --martii + r = aio_return(&a); + if (r < 0) { exit_flag = RECORD_FAILED_FILE; - lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r); + lt_debug("%s: aio_return = %d (%m)\n", __func__, r); break; } - lt_debug("%s: buf_pos %6d w %6d\n", __func__, buf_pos, (int)queued); + else + lt_debug("%s: aio_return = %d, free: %d\n", __func__, r, bufsize - buf_pos); if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED)) perror("posix_fadvise"); if (queued) @@ -320,10 +331,11 @@ void RecData::RecordThread() usleep(50000); continue; } - if (r) + r = aio_return(&a); + if (r < 0) { exit_flag = RECORD_FAILED_FILE; - lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r); + lt_info("%s: aio_result: %d (%m)\n", __func__, r); break; } if (!queued)