diff --git a/src/driver/streamts.cpp b/src/driver/streamts.cpp index 798f47dcc..df6a9a7a8 100644 --- a/src/driver/streamts.cpp +++ b/src/driver/streamts.cpp @@ -56,6 +56,7 @@ #include #include #include +#include /* experimental mode: * stream not possible, if record running @@ -111,7 +112,7 @@ bool CStreamInstance::Stop() return (OpenThreads::Thread::join() == 0); } -bool CStreamInstance::Send(ssize_t r) +bool CStreamInstance::Send(ssize_t r, unsigned char * _buf) { //OpenThreads::ScopedLock m_lock(mutex); stream_fds_t cfds; @@ -123,7 +124,7 @@ bool CStreamInstance::Send(ssize_t r) flags = MSG_DONTWAIT; for (stream_fds_t::iterator it = cfds.begin(); it != cfds.end(); ++it) { int i = 10; - unsigned char *b = buf; + unsigned char *b = _buf ? _buf : buf; ssize_t count = r; do { int ret = send(*it, b, count, flags); @@ -160,17 +161,20 @@ void CStreamInstance::RemoveClient(int clientfd) printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size()); } +bool CStreamInstance::Open() +{ + CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id); + if (!tmpchan) + return false; + + dmx = new cDemux(tmpchan->getRecordDemux());//FIXME + return dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE); +} + void CStreamInstance::run() { printf("CStreamInstance::run: %llx\n", channel_id); - CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id); - if (!tmpchan) - return; - - dmx = new cDemux(tmpchan->getRecordDemux());//FIXME - dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE); - /* pids here cannot be empty */ stream_pids_t::iterator it = pids.begin(); printf("CStreamInstance::run: add pid %x\n", *it); @@ -414,7 +418,7 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFro printf("CStreamManager::Parse: channel_id %llx [%s]\n", chid, channel->getName().c_str()); if (IS_WEBTV(chid)) - return false; + return true; frontend = FindFrontend(channel); if (!frontend) { @@ -497,13 +501,18 @@ bool CStreamManager::AddClient(int connfd) if (it != streams.end()) { it->second->AddClient(connfd); } else { - CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids); - stream->frontend = frontend; + CStreamInstance * stream; + if (IS_WEBTV(channel_id)) { + stream = new CStreamStream(connfd, channel_id, pids); + } else { + stream = new CStreamInstance(connfd, channel_id, pids); + stream->frontend = frontend; + } int sendsize = 10*IN_SIZE; unsigned int m = sizeof(sendsize); setsockopt(connfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m); - if (stream->Start()) + if (stream->Open() && stream->Start()) streams.insert(streammap_pair_t(channel_id, stream)); else delete stream; @@ -693,3 +702,169 @@ _error: close (listenfd); return false; } + +CStreamStream::CStreamStream(int clientfd, t_channel_id chid, stream_pids_t &_pids) + : CStreamInstance(clientfd, chid, _pids) +{ + ifcx = NULL; + ofcx = NULL; + avio_ctx = NULL; + stopped = true; + interrupt = false; +} + +CStreamStream::~CStreamStream() +{ + Stop(); + Close(); +} + +int CStreamStream::write_packet(void *opaque, uint8_t *buffer, int buf_size) +{ + CStreamStream * st = (CStreamStream *) opaque; + st->Send(buf_size, buffer); + return buf_size; +} + +int CStreamStream::Interrupt(void * data) +{ + CStreamStream * sr = (CStreamStream*) data; + if (sr->interrupt) + return 1; + return 0; +} + +void CStreamStream::Close() +{ + if (ifcx) + avformat_close_input(&ifcx); + + if (ofcx) + avformat_free_context(ofcx); + + if (buf) + av_freep(&buf); + + if (avio_ctx) + av_free(avio_ctx); + + ifcx = NULL; + ofcx = NULL; + avio_ctx = NULL; +} + +bool CStreamStream::Open() +{ + CZapitChannel * channel = CServiceManager::getInstance()->FindChannel(channel_id); + if (!channel) + return false; + + std::string url = channel->getUrl(); + + if (url.empty()) + return false; + + printf("%s: Open input [%s]....\n", __FUNCTION__, url.c_str()); + + AVDictionary *options = NULL; + if (avformat_open_input(&ifcx, url.c_str(), NULL, &options) != 0) { + printf("%s: Cannot open input [%s]!\n", __FUNCTION__, channel->getUrl().c_str()); + return false; + } + + if (avformat_find_stream_info(ifcx, NULL) < 0) { + printf("%s: Cannot find stream info [%s]!\n", __FUNCTION__, channel->getUrl().c_str()); + return false; + } + if (!strstr(ifcx->iformat->name, "applehttp") && !strstr(ifcx->iformat->name, "mpegts")) { + printf("%s: not supported format [%s]!\n", __FUNCTION__, ifcx->iformat->name); + return false; + } + + AVIOInterruptCB int_cb = { Interrupt, this }; + ifcx->interrupt_callback = int_cb; + + snprintf(ifcx->filename, sizeof(ifcx->filename), "%s", channel->getUrl().c_str()); + av_dump_format(ifcx, 0, ifcx->filename, 0); + + buf = (unsigned char *) av_malloc(IN_SIZE); + if (buf == NULL) { + perror("CStreamStream::Open: buf"); + return false; + } + avio_ctx = avio_alloc_context(buf, IN_SIZE, 1, this, NULL, &write_packet, NULL); + if (!avio_ctx) { + printf("%s: avio_alloc_context failed\n", __FUNCTION__); + return false; + } + + if (avformat_alloc_output_context2(&ofcx, NULL, "mpegts", NULL) < 0) { + printf("%s: avformat_alloc_output_context2 failed\n", __FUNCTION__); + return false; + } + ofcx->pb = avio_ctx; + + av_dict_copy(&ofcx->metadata, ifcx->metadata, 0); + int stid = 0x200; + for (unsigned i = 0; i < ifcx->nb_streams; i++) { + AVCodecContext * iccx = ifcx->streams[i]->codec; + + AVStream *ost = avformat_new_stream(ofcx, iccx->codec); + avcodec_copy_context(ost->codec, iccx); + av_dict_copy(&ost->metadata, ifcx->streams[i]->metadata, 0); + ost->time_base = iccx->time_base; + ost->id = stid++; + } + av_log_set_level(AV_LOG_VERBOSE); + av_dump_format(ofcx, 0, ofcx->filename, 1); + av_log_set_level(AV_LOG_WARNING); + + return true; +} + +bool CStreamStream::Start() +{ + if (!stopped) + return false; + + printf("%s: Starting...\n", __FUNCTION__); + stopped = false; + int ret = start(); + return (ret == 0); +} + +bool CStreamStream::Stop() +{ + if (stopped) + return false; + + printf("%s: Stopping...\n", __FUNCTION__); + interrupt = true; + stopped = true; + int ret = join(); + interrupt = false; + return (ret == 0); +} + +void CStreamStream::run() +{ + AVPacket pkt; + + printf("%s: Started.\n", __FUNCTION__); + if (avformat_write_header(ofcx, NULL) < 0) { + printf("%s: avformat_write_header failed\n", __FUNCTION__); + return; + } + + while (!stopped) { + av_init_packet(&pkt); + if (av_read_frame(ifcx, &pkt) < 0) + break; + av_write_frame(ofcx, &pkt); + av_free_packet(&pkt); + } + + av_read_pause(ifcx); + av_write_trailer(ofcx); + printf("%s: Stopped.\n", __FUNCTION__); +} diff --git a/src/driver/streamts.h b/src/driver/streamts.h index 23f74c33d..6c6d7dd72 100644 --- a/src/driver/streamts.h +++ b/src/driver/streamts.h @@ -31,12 +31,16 @@ #include #include +extern "C" { +#include +} + typedef std::set stream_pids_t; typedef std::set stream_fds_t; class CStreamInstance : public OpenThreads::Thread { - private: + protected: bool running; cDemux * dmx; CFrontend * frontend; @@ -47,15 +51,16 @@ class CStreamInstance : public OpenThreads::Thread stream_pids_t pids; stream_fds_t fds; - bool Send(ssize_t r); - void Close(); - void run(); + virtual bool Send(ssize_t r, unsigned char * _buf = NULL); + virtual void Close(); + virtual void run(); friend class CStreamManager; public: CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids); - ~CStreamInstance(); - bool Start(); - bool Stop(); + virtual ~CStreamInstance(); + virtual bool Open(); + virtual bool Start(); + virtual bool Stop(); void AddClient(int clientfd); void RemoveClient(int clientfd); bool HasFd(int fd); @@ -63,6 +68,30 @@ class CStreamInstance : public OpenThreads::Thread t_channel_id GetChannelId() { return channel_id; } }; +class CStreamStream : public CStreamInstance +{ + private: + AVFormatContext *ifcx; + AVFormatContext *ofcx; + AVIOContext *avio_ctx; + + bool stopped; + bool interrupt; + void run(); + void Close(); + + public: + CStreamStream(int clientfd, t_channel_id chid, stream_pids_t &pids); + ~CStreamStream(); + + bool Open(); + bool Start(); + bool Stop(); + + static int Interrupt(void * data); + static int write_packet(void *opaque, uint8_t *buf, int buf_size); +}; + typedef std::pair streammap_pair_t; typedef std::map streammap_t; typedef streammap_t::iterator streammap_iterator_t; @@ -102,5 +131,4 @@ class CStreamManager : public OpenThreads::Thread int GetPort() { return port; } bool AddClient(int fd); }; - #endif