From 8ef9807b87ece6d1cdd5472a768134e6fae89824 Mon Sep 17 00:00:00 2001 From: Stefan Seyfried Date: Sun, 8 Apr 2012 13:26:51 +0200 Subject: [PATCH] spark: make cRecord more error resilient * use aio to improve things for slow recording media (heavily loaded NFS server for example) * in case of a buffer overflow, don't stop the recording but simply drop a buffer A proper aio implementation would involve multiple buffers that could be queued in paralled, but that's much more complex, so let's see if the current code works out well enough. Origin commit data ------------------ Branch: master Commit: https://github.com/neutrino-images/ni-libstb-hal/commit/f3d93833e1d1c05011e99b322737027f43212a37 Author: Stefan Seyfried Date: 2012-04-08 (Sun, 08 Apr 2012) Origin message was: ------------------ spark: make cRecord more error resilient * use aio to improve things for slow recording media (heavily loaded NFS server for example) * in case of a buffer overflow, don't stop the recording but simply drop a buffer A proper aio implementation would involve multiple buffers that could be queued in paralled, but that's much more complex, so let's see if the current code works out well enough. ------------------ This commit was generated by Migit --- libspark/record.cpp | 114 +++++++++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 34 deletions(-) diff --git a/libspark/record.cpp b/libspark/record.cpp index 974e673..0af74b3 100644 --- a/libspark/record.cpp +++ b/libspark/record.cpp @@ -6,6 +6,9 @@ #include #include #include + +#include + #include "record_lib.h" #include "lt_debug.h" #define lt_debug(args...) _lt_debug(TRIPLE_DEBUG_RECORD, this, args) @@ -168,76 +171,119 @@ bool cRecord::AddPid(unsigned short pid) void cRecord::RecordThread() { lt_info("%s: begin\n", __func__); -#define BUFSIZE (1 << 19) /* 512 kB */ +#define BUFSIZE (1 << 20) /* 1MB */ +#define READSIZE (BUFSIZE / 16) ssize_t r = 0; int buf_pos = 0; + int queued = 0; uint8_t *buf; - buf = (uint8_t *)malloc(BUFSIZE); + struct aiocb a; + buf = (uint8_t *)malloc(BUFSIZE); if (!buf) { exit_flag = RECORD_FAILED_MEMORY; lt_info("%s: unable to allocate buffer! (out of memory)\n", __func__); } + int val = fcntl(file_fd, F_GETFL); + if (fcntl(file_fd, F_SETFL, val|O_APPEND)) + lt_info("%s: O_APPEND? (%m)\n", __func__); + + memset(&a, 0, sizeof(a)); + a.aio_fildes = file_fd; + a.aio_sigevent.sigev_notify = SIGEV_NONE; + dmx->Start(); + bool overflow = false; while (exit_flag == RECORD_RUNNING) { if (buf_pos < BUFSIZE) { - r = dmx->Read(buf + buf_pos, BUFSIZE - 1 - buf_pos, 100); + int toread = BUFSIZE - buf_pos; + if (toread > READSIZE) + toread = READSIZE; + r = dmx->Read(buf + buf_pos, toread, 50); lt_debug("%s: buf_pos %6d r %6d / %6d\n", __func__, - buf_pos, (int)r, BUFSIZE - 1 - buf_pos); + buf_pos, (int)r, BUFSIZE - buf_pos); if (r < 0) { - if (errno != EAGAIN) + if (errno != EAGAIN && (errno != EOVERFLOW || !overflow)) { lt_info("%s: read failed: %m\n", __func__); exit_flag = RECORD_FAILED_READ; break; } - lt_info("%s: EAGAIN\n", __func__); + lt_info("%s: %s\n", __func__, errno == EOVERFLOW ? "EOVERFLOW" : "EAGAIN"); } else + { + overflow = false; buf_pos += r; + } } else - lt_info("%s: buffer full! Overflow?\n", __func__); - if (buf_pos > (BUFSIZE / 3)) /* start writeout */ { - size_t towrite = BUFSIZE / 2; - if (buf_pos < BUFSIZE / 2) - towrite = buf_pos; - r = write(file_fd, buf, towrite); - if (r < 0) - { - exit_flag = RECORD_FAILED_FILE; - lt_info("%s: write error: %m\n", __func__); - break; - } - buf_pos -= r; - memmove(buf, buf + r, buf_pos); - lt_debug("%s: buf_pos %6d w %6d / %6d\n", __func__, buf_pos, (int)r, (int)towrite); -#if 0 - if (fdatasync(file_fd)) - perror("cRecord::FileThread() fdatasync"); -#endif - if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED)) - perror("posix_fadvise"); + overflow = true; + lt_info("%s: buffer full! Overflow?\n", __func__); + } + r = aio_error(&a); + if (r == EINPROGRESS) + { + lt_debug("%s: aio in progress...\n", __func__); + if (overflow) /* rate-limit the message */ + usleep(100000); + continue; + } + if (r) + { + exit_flag = RECORD_FAILED_FILE; + lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r); + break; + } + lt_debug("%s: buf_pos %6d w %6d\n", __func__, buf_pos, (int)queued); + if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED)) + perror("posix_fadvise"); + if (queued) + { + memmove(buf, buf + queued, buf_pos - queued); + buf_pos -= queued; + } + queued = buf_pos; + a.aio_buf = buf; + a.aio_nbytes = queued; + r = aio_write(&a); + if (r) + { + lt_info("%s: aio_write %d (%m)\n", __func__, r); + exit_flag = RECORD_FAILED_FILE; + break; } } dmx->Stop(); - while (buf_pos > 0) /* write out the unwritten buffer content */ + while (true) /* write out the unwritten buffer content */ { - r = write(file_fd, buf, buf_pos); - if (r < 0) + lt_debug("%s: run-out write, buf_pos %d\n", __func__, buf_pos); + r = aio_error(&a); + if (r == EINPROGRESS) + { + usleep(50000); + continue; + } + if (r) { exit_flag = RECORD_FAILED_FILE; - lt_info("%s: write error: %m\n", __func__); + lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r); break; } - buf_pos -= r; - memmove(buf, buf + r, buf_pos); + if (!queued) + break; + memmove(buf, buf + queued, buf_pos - queued); + buf_pos -= queued; + queued = buf_pos; + a.aio_buf = buf; + a.aio_nbytes = queued; + r = aio_write(&a); } free(buf); @@ -255,7 +301,7 @@ void cRecord::RecordThread() printf("[stream2file]: pthreads exit code: %i, dir: '%s', filename: '%s' myfilename: '%s'\n", exit_flag, s.dir, s.filename, myfilename); #endif - lt_info("%s: end", __func__); + lt_info("%s: end\n", __func__); pthread_exit(NULL); }