driver/streamts.cpp: add webtv channels streaming for compatible streams

This commit is contained in:
[CST] Focus
2016-02-05 18:57:36 +03:00
parent ecb5fa2201
commit c13eb939fe
2 changed files with 224 additions and 21 deletions

View File

@@ -56,6 +56,7 @@
#include <driver/streamts.h>
#include <driver/record.h>
#include <driver/genpsi.h>
#include <cs_api.h>
/* experimental mode:
* stream not possible, if record running
@@ -111,7 +112,7 @@ bool CStreamInstance::Stop()
return (OpenThreads::Thread::join() == 0);
}
bool CStreamInstance::Send(ssize_t r)
bool CStreamInstance::Send(ssize_t r, unsigned char * _buf)
{
//OpenThreads::ScopedLock<OpenThreads::Mutex> m_lock(mutex);
stream_fds_t cfds;
@@ -123,7 +124,7 @@ bool CStreamInstance::Send(ssize_t r)
flags = MSG_DONTWAIT;
for (stream_fds_t::iterator it = cfds.begin(); it != cfds.end(); ++it) {
int i = 10;
unsigned char *b = buf;
unsigned char *b = _buf ? _buf : buf;
ssize_t count = r;
do {
int ret = send(*it, b, count, flags);
@@ -160,17 +161,20 @@ void CStreamInstance::RemoveClient(int clientfd)
printf("CStreamInstance::RemoveClient: %d (count %d)\n", clientfd, fds.size());
}
bool CStreamInstance::Open()
{
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id);
if (!tmpchan)
return false;
dmx = new cDemux(tmpchan->getRecordDemux());//FIXME
return dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE);
}
void CStreamInstance::run()
{
printf("CStreamInstance::run: %llx\n", channel_id);
CZapitChannel * tmpchan = CServiceManager::getInstance()->FindChannel(channel_id);
if (!tmpchan)
return;
dmx = new cDemux(tmpchan->getRecordDemux());//FIXME
dmx->Open(DMX_TP_CHANNEL, NULL, DMX_BUFFER_SIZE);
/* pids here cannot be empty */
stream_pids_t::iterator it = pids.begin();
printf("CStreamInstance::run: add pid %x\n", *it);
@@ -414,7 +418,7 @@ bool CStreamManager::Parse(int fd, stream_pids_t &pids, t_channel_id &chid, CFro
printf("CStreamManager::Parse: channel_id %llx [%s]\n", chid, channel->getName().c_str());
if (IS_WEBTV(chid))
return false;
return true;
frontend = FindFrontend(channel);
if (!frontend) {
@@ -497,13 +501,18 @@ bool CStreamManager::AddClient(int connfd)
if (it != streams.end()) {
it->second->AddClient(connfd);
} else {
CStreamInstance * stream = new CStreamInstance(connfd, channel_id, pids);
stream->frontend = frontend;
CStreamInstance * stream;
if (IS_WEBTV(channel_id)) {
stream = new CStreamStream(connfd, channel_id, pids);
} else {
stream = new CStreamInstance(connfd, channel_id, pids);
stream->frontend = frontend;
}
int sendsize = 10*IN_SIZE;
unsigned int m = sizeof(sendsize);
setsockopt(connfd, SOL_SOCKET, SO_SNDBUF, (void *)&sendsize, m);
if (stream->Start())
if (stream->Open() && stream->Start())
streams.insert(streammap_pair_t(channel_id, stream));
else
delete stream;
@@ -693,3 +702,169 @@ _error:
close (listenfd);
return false;
}
CStreamStream::CStreamStream(int clientfd, t_channel_id chid, stream_pids_t &_pids)
: CStreamInstance(clientfd, chid, _pids)
{
ifcx = NULL;
ofcx = NULL;
avio_ctx = NULL;
stopped = true;
interrupt = false;
}
CStreamStream::~CStreamStream()
{
Stop();
Close();
}
int CStreamStream::write_packet(void *opaque, uint8_t *buffer, int buf_size)
{
CStreamStream * st = (CStreamStream *) opaque;
st->Send(buf_size, buffer);
return buf_size;
}
int CStreamStream::Interrupt(void * data)
{
CStreamStream * sr = (CStreamStream*) data;
if (sr->interrupt)
return 1;
return 0;
}
void CStreamStream::Close()
{
if (ifcx)
avformat_close_input(&ifcx);
if (ofcx)
avformat_free_context(ofcx);
if (buf)
av_freep(&buf);
if (avio_ctx)
av_free(avio_ctx);
ifcx = NULL;
ofcx = NULL;
avio_ctx = NULL;
}
bool CStreamStream::Open()
{
CZapitChannel * channel = CServiceManager::getInstance()->FindChannel(channel_id);
if (!channel)
return false;
std::string url = channel->getUrl();
if (url.empty())
return false;
printf("%s: Open input [%s]....\n", __FUNCTION__, url.c_str());
AVDictionary *options = NULL;
if (avformat_open_input(&ifcx, url.c_str(), NULL, &options) != 0) {
printf("%s: Cannot open input [%s]!\n", __FUNCTION__, channel->getUrl().c_str());
return false;
}
if (avformat_find_stream_info(ifcx, NULL) < 0) {
printf("%s: Cannot find stream info [%s]!\n", __FUNCTION__, channel->getUrl().c_str());
return false;
}
if (!strstr(ifcx->iformat->name, "applehttp") && !strstr(ifcx->iformat->name, "mpegts")) {
printf("%s: not supported format [%s]!\n", __FUNCTION__, ifcx->iformat->name);
return false;
}
AVIOInterruptCB int_cb = { Interrupt, this };
ifcx->interrupt_callback = int_cb;
snprintf(ifcx->filename, sizeof(ifcx->filename), "%s", channel->getUrl().c_str());
av_dump_format(ifcx, 0, ifcx->filename, 0);
buf = (unsigned char *) av_malloc(IN_SIZE);
if (buf == NULL) {
perror("CStreamStream::Open: buf");
return false;
}
avio_ctx = avio_alloc_context(buf, IN_SIZE, 1, this, NULL, &write_packet, NULL);
if (!avio_ctx) {
printf("%s: avio_alloc_context failed\n", __FUNCTION__);
return false;
}
if (avformat_alloc_output_context2(&ofcx, NULL, "mpegts", NULL) < 0) {
printf("%s: avformat_alloc_output_context2 failed\n", __FUNCTION__);
return false;
}
ofcx->pb = avio_ctx;
av_dict_copy(&ofcx->metadata, ifcx->metadata, 0);
int stid = 0x200;
for (unsigned i = 0; i < ifcx->nb_streams; i++) {
AVCodecContext * iccx = ifcx->streams[i]->codec;
AVStream *ost = avformat_new_stream(ofcx, iccx->codec);
avcodec_copy_context(ost->codec, iccx);
av_dict_copy(&ost->metadata, ifcx->streams[i]->metadata, 0);
ost->time_base = iccx->time_base;
ost->id = stid++;
}
av_log_set_level(AV_LOG_VERBOSE);
av_dump_format(ofcx, 0, ofcx->filename, 1);
av_log_set_level(AV_LOG_WARNING);
return true;
}
bool CStreamStream::Start()
{
if (!stopped)
return false;
printf("%s: Starting...\n", __FUNCTION__);
stopped = false;
int ret = start();
return (ret == 0);
}
bool CStreamStream::Stop()
{
if (stopped)
return false;
printf("%s: Stopping...\n", __FUNCTION__);
interrupt = true;
stopped = true;
int ret = join();
interrupt = false;
return (ret == 0);
}
void CStreamStream::run()
{
AVPacket pkt;
printf("%s: Started.\n", __FUNCTION__);
if (avformat_write_header(ofcx, NULL) < 0) {
printf("%s: avformat_write_header failed\n", __FUNCTION__);
return;
}
while (!stopped) {
av_init_packet(&pkt);
if (av_read_frame(ifcx, &pkt) < 0)
break;
av_write_frame(ofcx, &pkt);
av_free_packet(&pkt);
}
av_read_pause(ifcx);
av_write_trailer(ofcx);
printf("%s: Stopped.\n", __FUNCTION__);
}

View File

@@ -31,12 +31,16 @@
#include <set>
#include <map>
extern "C" {
#include <libavformat/avformat.h>
}
typedef std::set<int> stream_pids_t;
typedef std::set<int> stream_fds_t;
class CStreamInstance : public OpenThreads::Thread
{
private:
protected:
bool running;
cDemux * dmx;
CFrontend * frontend;
@@ -47,15 +51,16 @@ class CStreamInstance : public OpenThreads::Thread
stream_pids_t pids;
stream_fds_t fds;
bool Send(ssize_t r);
void Close();
void run();
virtual bool Send(ssize_t r, unsigned char * _buf = NULL);
virtual void Close();
virtual void run();
friend class CStreamManager;
public:
CStreamInstance(int clientfd, t_channel_id chid, stream_pids_t &pids);
~CStreamInstance();
bool Start();
bool Stop();
virtual ~CStreamInstance();
virtual bool Open();
virtual bool Start();
virtual bool Stop();
void AddClient(int clientfd);
void RemoveClient(int clientfd);
bool HasFd(int fd);
@@ -63,6 +68,30 @@ class CStreamInstance : public OpenThreads::Thread
t_channel_id GetChannelId() { return channel_id; }
};
class CStreamStream : public CStreamInstance
{
private:
AVFormatContext *ifcx;
AVFormatContext *ofcx;
AVIOContext *avio_ctx;
bool stopped;
bool interrupt;
void run();
void Close();
public:
CStreamStream(int clientfd, t_channel_id chid, stream_pids_t &pids);
~CStreamStream();
bool Open();
bool Start();
bool Stop();
static int Interrupt(void * data);
static int write_packet(void *opaque, uint8_t *buf, int buf_size);
};
typedef std::pair<t_channel_id, CStreamInstance*> streammap_pair_t;
typedef std::map<t_channel_id, CStreamInstance*> streammap_t;
typedef streammap_t::iterator streammap_iterator_t;
@@ -102,5 +131,4 @@ class CStreamManager : public OpenThreads::Thread
int GetPort() { return port; }
bool AddClient(int fd);
};
#endif