spark: make cRecord more error resilient

* use aio to improve things for slow recording media (heavily loaded
  NFS server for example)
* in case of a buffer overflow, don't stop the recording but simply
  drop a buffer
A proper aio implementation would involve multiple buffers that could
be queued in paralled, but that's much more complex, so let's see if
the current code works out well enough.
This commit is contained in:
Stefan Seyfried
2012-04-08 13:26:51 +02:00
parent 56874d9721
commit f3d93833e1

View File

@@ -6,6 +6,9 @@
#include <inttypes.h>
#include <cstdio>
#include <cstring>
#include <aio.h>
#include "record_lib.h"
#include "lt_debug.h"
#define lt_debug(args...) _lt_debug(TRIPLE_DEBUG_RECORD, this, args)
@@ -168,76 +171,119 @@ bool cRecord::AddPid(unsigned short pid)
void cRecord::RecordThread()
{
lt_info("%s: begin\n", __func__);
#define BUFSIZE (1 << 19) /* 512 kB */
#define BUFSIZE (1 << 20) /* 1MB */
#define READSIZE (BUFSIZE / 16)
ssize_t r = 0;
int buf_pos = 0;
int queued = 0;
uint8_t *buf;
buf = (uint8_t *)malloc(BUFSIZE);
struct aiocb a;
buf = (uint8_t *)malloc(BUFSIZE);
if (!buf)
{
exit_flag = RECORD_FAILED_MEMORY;
lt_info("%s: unable to allocate buffer! (out of memory)\n", __func__);
}
int val = fcntl(file_fd, F_GETFL);
if (fcntl(file_fd, F_SETFL, val|O_APPEND))
lt_info("%s: O_APPEND? (%m)\n", __func__);
memset(&a, 0, sizeof(a));
a.aio_fildes = file_fd;
a.aio_sigevent.sigev_notify = SIGEV_NONE;
dmx->Start();
bool overflow = false;
while (exit_flag == RECORD_RUNNING)
{
if (buf_pos < BUFSIZE)
{
r = dmx->Read(buf + buf_pos, BUFSIZE - 1 - buf_pos, 100);
int toread = BUFSIZE - buf_pos;
if (toread > READSIZE)
toread = READSIZE;
r = dmx->Read(buf + buf_pos, toread, 50);
lt_debug("%s: buf_pos %6d r %6d / %6d\n", __func__,
buf_pos, (int)r, BUFSIZE - 1 - buf_pos);
buf_pos, (int)r, BUFSIZE - buf_pos);
if (r < 0)
{
if (errno != EAGAIN)
if (errno != EAGAIN && (errno != EOVERFLOW || !overflow))
{
lt_info("%s: read failed: %m\n", __func__);
exit_flag = RECORD_FAILED_READ;
break;
}
lt_info("%s: EAGAIN\n", __func__);
lt_info("%s: %s\n", __func__, errno == EOVERFLOW ? "EOVERFLOW" : "EAGAIN");
}
else
{
overflow = false;
buf_pos += r;
}
}
else
lt_info("%s: buffer full! Overflow?\n", __func__);
if (buf_pos > (BUFSIZE / 3)) /* start writeout */
{
size_t towrite = BUFSIZE / 2;
if (buf_pos < BUFSIZE / 2)
towrite = buf_pos;
r = write(file_fd, buf, towrite);
if (r < 0)
{
exit_flag = RECORD_FAILED_FILE;
lt_info("%s: write error: %m\n", __func__);
break;
}
buf_pos -= r;
memmove(buf, buf + r, buf_pos);
lt_debug("%s: buf_pos %6d w %6d / %6d\n", __func__, buf_pos, (int)r, (int)towrite);
#if 0
if (fdatasync(file_fd))
perror("cRecord::FileThread() fdatasync");
#endif
if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED))
perror("posix_fadvise");
overflow = true;
lt_info("%s: buffer full! Overflow?\n", __func__);
}
r = aio_error(&a);
if (r == EINPROGRESS)
{
lt_debug("%s: aio in progress...\n", __func__);
if (overflow) /* rate-limit the message */
usleep(100000);
continue;
}
if (r)
{
exit_flag = RECORD_FAILED_FILE;
lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r);
break;
}
lt_debug("%s: buf_pos %6d w %6d\n", __func__, buf_pos, (int)queued);
if (posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED))
perror("posix_fadvise");
if (queued)
{
memmove(buf, buf + queued, buf_pos - queued);
buf_pos -= queued;
}
queued = buf_pos;
a.aio_buf = buf;
a.aio_nbytes = queued;
r = aio_write(&a);
if (r)
{
lt_info("%s: aio_write %d (%m)\n", __func__, r);
exit_flag = RECORD_FAILED_FILE;
break;
}
}
dmx->Stop();
while (buf_pos > 0) /* write out the unwritten buffer content */
while (true) /* write out the unwritten buffer content */
{
r = write(file_fd, buf, buf_pos);
if (r < 0)
lt_debug("%s: run-out write, buf_pos %d\n", __func__, buf_pos);
r = aio_error(&a);
if (r == EINPROGRESS)
{
usleep(50000);
continue;
}
if (r)
{
exit_flag = RECORD_FAILED_FILE;
lt_info("%s: write error: %m\n", __func__);
lt_info("%s: aio_error != EINPROGRESS: %d (%m)\n", __func__, r);
break;
}
buf_pos -= r;
memmove(buf, buf + r, buf_pos);
if (!queued)
break;
memmove(buf, buf + queued, buf_pos - queued);
buf_pos -= queued;
queued = buf_pos;
a.aio_buf = buf;
a.aio_nbytes = queued;
r = aio_write(&a);
}
free(buf);
@@ -255,7 +301,7 @@ void cRecord::RecordThread()
printf("[stream2file]: pthreads exit code: %i, dir: '%s', filename: '%s' myfilename: '%s'\n", exit_flag, s.dir, s.filename, myfilename);
#endif
lt_info("%s: end", __func__);
lt_info("%s: end\n", __func__);
pthread_exit(NULL);
}