From f3d93833e1d1c05011e99b322737027f43212a37 Mon Sep 17 00:00:00 2001 From: Stefan Seyfried Date: Sun, 8 Apr 2012 13:26:51 +0200 Subject: [PATCH] spark: make cRecord more error resilient * use aio to improve things for slow recording media (heavily loaded NFS server for example) * in case of a buffer overflow, don't stop the recording but simply drop a buffer A proper aio implementation would involve multiple buffers that could be queued in paralled, but that's much more complex, so let's see if the current code works out well enough. --- libspark/record.cpp | 114 +++++++++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 34 deletions(-) diff --git a/libspark/record.cpp b/libspark/record.cpp index 974e673..0af74b3 100644 --- a/libspark/record.cpp +++ b/libspark/record.cpp @@ -6,6 +6,9 @@ #include #include #include + +#include + #include "record_lib.h" #include "lt_debug.h" #define lt_debug(args...) _lt_debug(TRIPLE_DEBUG_RECORD, this, args) @@ -168,76 +171,119 @@ bool cRecord::AddPid(unsigned short pid) void cRecord::RecordThread() { lt_info("%s: begin\n", __func__); -#define BUFSIZE (1 << 19) /* 512 kB */ +#define BUFSIZE (1 << 20) /* 1MB */ +#define READSIZE (BUFSIZE / 16) ssize_t r = 0; int buf_pos = 0; + int queued = 0; uint8_t *buf; - buf = (uint8_t *)malloc(BUFSIZE); + struct aiocb a; + buf = (uint8_t *)malloc(BUFSIZE); if (!buf) { exit_flag = RECORD_FAILED_MEMORY; lt_info("%s: unable to allocate buffer! (out of memory)\n", __func__); } + int val = fcntl(file_fd, F_GETFL); + if (fcntl(file_fd, F_SETFL, val|O_APPEND)) + lt_info("%s: O_APPEND? (%m)\n", __func__); + + memset(&a, 0, sizeof(a)); + a.aio_fildes = file_fd; + a.aio_sigevent.sigev_notify = SIGEV_NONE; + dmx->Start(); + bool overflow = false; while (exit_flag == RECORD_RUNNING) { if (buf_pos < BUFSIZE) { - r = dmx->Read(buf + buf_pos, BUFSIZE - 1 - buf_pos, 100); + int toread = BUFSIZE - buf_pos; + if (toread > READSIZE) + toread = READSIZE; + r = dmx->Read(buf + buf_pos, toread, 50); lt_debug("%s: buf_pos %6d r %6d / %6d\n", __func__, - buf_pos, (int)r, BUFSIZE - 1 - buf_pos); + buf_pos, (int)r, BUFSIZE - buf_pos); if (r < 0) { - if (errno != EAGAIN) + if (errno != EAGAIN && (errno != EOVERFLOW || !overflow)) { lt_info("%s: read failed: %m\n", __func__); exit_flag = RECORD_FAILED_READ; break; } - lt_info("%s: EAGAIN\n", __func__); + lt_info("%s: %s\n", __func__, errno == EOVERFLOW ? "EOVERFLOW" : "EAGAIN"); } else + { + overflow = false; buf_pos += r; + } } else - lt_info("%s: buffer full! Overflow?\n", __func__); - if (buf_pos > (BUFSIZE / 3)) /* start writeout */ { - size_t towrite = BUFSIZE / 2; - if (buf_pos < BUFSIZE / 2) - towrite = buf_pos; - r = write(file_fd, buf, towrite); - if (r < 0) - { - exit_flag = RECORD_FAILED_FILE; - lt_info("%s: write error: %m\n", __func__); - break; - } - buf_pos -= r; - memmove(buf, buf + r, buf_pos); - lt_debug("%s: buf_pos %6d w %6d / %6d\n", __func__, buf_pos, (int)r, (int)towrite); -#if 0 - if (fdatasync(file_fd)) - perror("cRecord::FileThread() fdatasync"); -#endif - if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED)) - perror("posix_fadvise"); + overflow = true; + lt_info("%s: buffer full! Overflow?\n", __func__); + } + r = aio_error(&a); + if (r == EINPROGRESS) + { + lt_debug("%s: aio in progress...\n", __func__); + if (overflow) /* rate-limit the message */ + usleep(100000); + continue; + } + if (r) + { + exit_flag = RECORD_FAILED_FILE; + lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r); + break; + } + lt_debug("%s: buf_pos %6d w %6d\n", __func__, buf_pos, (int)queued); + if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED)) + perror("posix_fadvise"); + if (queued) + { + memmove(buf, buf + queued, buf_pos - queued); + buf_pos -= queued; + } + queued = buf_pos; + a.aio_buf = buf; + a.aio_nbytes = queued; + r = aio_write(&a); + if (r) + { + lt_info("%s: aio_write %d (%m)\n", __func__, r); + exit_flag = RECORD_FAILED_FILE; + break; } } dmx->Stop(); - while (buf_pos > 0) /* write out the unwritten buffer content */ + while (true) /* write out the unwritten buffer content */ { - r = write(file_fd, buf, buf_pos); - if (r < 0) + lt_debug("%s: run-out write, buf_pos %d\n", __func__, buf_pos); + r = aio_error(&a); + if (r == EINPROGRESS) + { + usleep(50000); + continue; + } + if (r) { exit_flag = RECORD_FAILED_FILE; - lt_info("%s: write error: %m\n", __func__); + lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r); break; } - buf_pos -= r; - memmove(buf, buf + r, buf_pos); + if (!queued) + break; + memmove(buf, buf + queued, buf_pos - queued); + buf_pos -= queued; + queued = buf_pos; + a.aio_buf = buf; + a.aio_nbytes = queued; + r = aio_write(&a); } free(buf); @@ -255,7 +301,7 @@ void cRecord::RecordThread() printf("[stream2file]: pthreads exit code: %i, dir: '%s', filename: '%s' myfilename: '%s'\n", exit_flag, s.dir, s.filename, myfilename); #endif - lt_info("%s: end", __func__); + lt_info("%s: end\n", __func__); pthread_exit(NULL); }