[PATCH] rtmp: WIP: check retcodes of net IO operations
Samuel, while you're at enhancing rtmp proto, could you please add also checks on net connection IO
operations? I planned to do it by myself, but as you are already preparing the patchset, it makes more sense
to ask you for this.
The reason for this patch is case when RTMP server is shut down while we publish the stream onto it. Currently
it just strikes the application with SIGPIPE. I have an environment for testing this issue.
Thanks in advance, Samuel.
---8<---
---
libavformat/rtmppkt.c | 9 +++-
libavformat/rtmpproto.c | 103 ++++++++++++++++++++++++++++++++--------------
2 files changed, 77 insertions(+), 35 deletions(-)
diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
index 61e159b..8e8ca26 100644
--- a/libavformat/rtmppkt.c
+++ b/libavformat/rtmppkt.c
@@ -216,15 +216,18 @@ int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
}
prev_pkt[pkt->channel_id].extra = pkt->extra;
- ffurl_write(h, pkt_hdr, p-pkt_hdr);
+ if (ffurl_write(h, pkt_hdr, p-pkt_hdr) < 0)
+ return AVERROR(EIO);
size = p - pkt_hdr + pkt->data_size;
while (off < pkt->data_size) {
int towrite = FFMIN(chunk_size, pkt->data_size - off);
- ffurl_write(h, pkt->data + off, towrite);
+ if (ffurl_write(h, pkt->data + off, towrite) < 0)
+ return AVERROR(EIO);
off += towrite;
if (off < pkt->data_size) {
uint8_t marker = 0xC0 | pkt->channel_id;
- ffurl_write(h, &marker, 1);
+ if (ffurl_write(h, &marker, 1) < 0)
+ return AVERROR(EIO);
size++;
}
}
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index a2c7b5d..6304895 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -104,9 +104,10 @@ static const uint8_t rtmp_server_key[] = {
/**
* Generate 'connect' call and send it to the server.
*/
-static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
+static int gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
const char *host, int port)
{
+ int ret;
RTMPPacket pkt;
uint8_t ver[64], *p;
char tcurl[512];
@@ -149,16 +150,18 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto,
pkt.data_size = p - pkt.data;
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate 'releaseStream' call and send it to the server. It should make
* the server release some channel for media streams.
*/
-static void gen_release_stream(URLContext *s, RTMPContext *rt)
+static int gen_release_stream(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -172,16 +175,18 @@ static void gen_release_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate 'FCPublish' call and send it to the server. It should make
* the server preapare for receiving media streams.
*/
-static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
+static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -195,16 +200,18 @@ static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate 'FCUnpublish' call and send it to the server. It should make
* the server destroy stream.
*/
-static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
+static int gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -218,16 +225,18 @@ static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate 'createStream' call and send it to the server. It should make
* the server allocate some channel for media streams.
*/
-static void gen_create_stream(URLContext *s, RTMPContext *rt)
+static int gen_create_stream(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -240,8 +249,9 @@ static void gen_create_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null(&p);
rt->create_stream_invoke = rt->nb_invokes;
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
@@ -249,8 +259,9 @@ static void gen_create_stream(URLContext *s, RTMPContext *rt)
* Generate 'deleteStream' call and send it to the server. It should make
* the server remove some channel for media streams.
*/
-static void gen_delete_stream(URLContext *s, RTMPContext *rt)
+static int gen_delete_stream(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -263,16 +274,18 @@ static void gen_delete_stream(URLContext *s, RTMPContext *rt)
ff_amf_write_null(&p);
ff_amf_write_number(&p, rt->main_channel_id);
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate 'play' call and send it to the server, then ping the server
* to start actual playing.
*/
-static void gen_play(URLContext *s, RTMPContext *rt)
+static int gen_play(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -287,8 +300,10 @@ static void gen_play(URLContext *s, RTMPContext *rt)
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ if (ret)
+ return ret;
// set client buffer time disguised in ping packet
ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, 1, 10);
@@ -298,15 +313,17 @@ static void gen_play(URLContext *s, RTMPContext *rt)
bytestream_put_be32(&p, 1);
bytestream_put_be32(&p, 256); //TODO: what is a good value here?
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate 'publish' call and send it to the server.
*/
-static void gen_publish(URLContext *s, RTMPContext *rt)
+static int gen_publish(URLContext *s, RTMPContext *rt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -322,15 +339,17 @@ static void gen_publish(URLContext *s, RTMPContext *rt)
ff_amf_write_string(&p, rt->playpath);
ff_amf_write_string(&p, "live");
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate ping reply and send it to the server.
*/
-static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
+static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
@@ -338,23 +357,26 @@ static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
p = pkt.data;
bytestream_put_be16(&p, 7);
bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
/**
* Generate report on bytes read so far and send it to the server.
*/
-static void gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
+static int gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
{
+ int ret;
RTMPPacket pkt;
uint8_t *p;
ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_BYTES_READ, ts, 4);
p = pkt.data;
bytestream_put_be32(&p, rt->bytes_read);
- ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
+ return ret;
}
//TODO: Move HMAC code somewhere. Eventually.
@@ -548,6 +570,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt)
*/
static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
{
+ int ret;
int i, t;
const uint8_t *data_end = pkt->data + pkt->data_size;
@@ -563,7 +586,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
return -1;
}
if (!rt->is_input)
- ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]);
+ if (ret = ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]))
+ return ret;
rt->chunk_size = AV_RB32(pkt->data);
if (rt->chunk_size <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size);
@@ -574,7 +598,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
case RTMP_PT_PING:
t = AV_RB16(pkt->data);
if (t == 6)
- gen_pong(s, rt, pkt);
+ if (ret = gen_pong(s, rt, pkt))
+ return ret;
break;
case RTMP_PT_CLIENT_BW:
if (pkt->data_size < 4) {
@@ -599,13 +624,16 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
switch (rt->state) {
case STATE_HANDSHAKED:
if (!rt->is_input) {
- gen_release_stream(s, rt);
- gen_fcpublish_stream(s, rt);
+ if (ret = gen_release_stream(s, rt))
+ return ret;
+ if (ret = gen_fcpublish_stream(s, rt))
+ return ret;
rt->state = STATE_RELEASING;
} else {
rt->state = STATE_CONNECTING;
}
- gen_create_stream(s, rt);
+ if (ret = gen_create_stream(s, rt))
+ return ret;
break;
case STATE_FCPUBLISH:
rt->state = STATE_CONNECTING;
@@ -629,9 +657,11 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
}
if (rt->is_input) {
- gen_play(s, rt);
+ if (ret = gen_play(s, rt))
+ return ret;
} else {
- gen_publish(s, rt);
+ if (ret = gen_publish(s, rt))
+ return ret;
}
rt->state = STATE_READY;
break;
@@ -702,7 +732,8 @@ static int get_packet(URLContext *s, int for_header)
rt->bytes_read += ret;
if (rt->bytes_read - rt->last_bytes_read > rt->client_report_size) {
av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
- gen_bytes_read(s, rt, rpkt.timestamp + 1);
+ if (ret = gen_bytes_read(s, rt, rpkt.timestamp + 1))
+ return ret;
rt->last_bytes_read = rt->bytes_read;
}
@@ -772,14 +803,17 @@ static int get_packet(URLContext *s, int for_header)
static int rtmp_close(URLContext *h)
{
+ int ret;
RTMPContext *rt = h->priv_data;
if (!rt->is_input) {
rt->flv_data = NULL;
if (rt->out_pkt.data_size)
ff_rtmp_packet_destroy(&rt->out_pkt);
- if (rt->state > STATE_FCPUBLISH)
- gen_fcunpublish_stream(h, rt);
+ if (rt->state > STATE_FCPUBLISH) {
+ if (ret = gen_fcunpublish_stream(h, rt))
+ return ret;
+ }
}
if (rt->state > STATE_HANDSHAKED)
gen_delete_stream(h, rt);
@@ -867,6 +901,7 @@ static int rtmp_open(URLContext *s, const char *uri, int flags)
do {
ret = get_packet(s, 1);
+ av_log(s, AV_LOG_DEBUG, "get_packet ret %d\n", ret);
} while (ret == EAGAIN);
if (ret < 0)
goto fail;
@@ -889,6 +924,7 @@ static int rtmp_open(URLContext *s, const char *uri, int flags)
return 0;
fail:
+ av_log(s, AV_LOG_ERROR, "RTMP open fail\n");
rtmp_close(s);
return AVERROR(EIO);
}
@@ -922,6 +958,7 @@ static int rtmp_read(URLContext *s, uint8_t *buf, int size)
static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
{
+ int ret;
RTMPContext *rt = s->priv_data;
int size_temp = size;
int pktsize, pkttype;
@@ -983,8 +1020,10 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
if (rt->flv_off == rt->flv_size) {
rt->skip_bytes = 4;
- ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]);
+ ret = ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&rt->out_pkt);
+ if (ret)
+ return ret;
rt->flv_size = 0;
rt->flv_off = 0;
rt->flv_header_bytes = 0;
--
--
1.7.3.4