From 9ba5cad53d7bdde5c1fbbcf33bcb554a3bfa0b9d Mon Sep 17 00:00:00 2001 From: Kostya Shishkov Date: Fri, 19 Nov 2021 18:31:46 +0100 Subject: [PATCH] implement raw stream seeking --- src/demux.rs | 128 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 36 deletions(-) diff --git a/src/demux.rs b/src/demux.rs index 084d9e7..8a58f2d 100644 --- a/src/demux.rs +++ b/src/demux.rs @@ -31,11 +31,62 @@ pub struct RawStreamCtx<'a> { pkt: Box, br: &'a mut ByteReader<'a>, pts: u64, + seek: SeekIndex, } impl<'a> RawStreamCtx<'a> { fn new(stream: NAStreamRef, packetiser: Box, br: &'a mut ByteReader<'a>) -> Self { - Self { stream, pkt: packetiser, br, pts: 0 } + let mut seek = SeekIndex::new(); + seek.add_stream(0); + Self { stream, pkt: packetiser, br, pts: 0, seek } + } + fn account_for_packet(&mut self, packet: &mut NAPacket) { + let pos = self.br.tell() - (self.pkt.bytes_left() as u64); + if packet.get_pts().is_none() && packet.get_duration().is_some() { + packet.ts.pts = Some(self.pts); + } + if packet.is_keyframe() { + let pts = packet.get_pts().unwrap_or(self.pts); + let time = NATimeInfo::ts_to_time(pts, 1000, self.stream.tb_num, self.stream.tb_den); + let in_range = if let Some(last) = self.seek.seek_info[0].entries.last() { + last.pts >= pts + } else { + false + }; + if !in_range { + self.seek.add_entry(0, SeekEntry { time, pts, pos }); + } + } + self.pts += packet.get_duration().unwrap_or(0); + } + fn get_frame(&mut self) -> DemuxerResult { + let mut buf = [0; 1048576]; + loop { + match self.pkt.get_packet(self.stream.clone()) { + Ok(Some(mut packet)) => { + self.account_for_packet(&mut packet); + return Ok(packet); + }, + Ok(None) => {}, + Err(DecoderError::ShortData) => {}, + _ => return Err(DemuxerError::InvalidData), + }; + match self.br.read_buf_some(&mut buf) { + Ok(size) => { + self.pkt.add_data(&buf[..size]); + }, + Err(_) => { + match self.pkt.get_packet(self.stream.clone()) { + Ok(Some(mut packet)) => { + self.account_for_packet(&mut packet); + return Ok(packet); + }, + Ok(None) | Err(DecoderError::ShortData) => return Err(DemuxerError::EOF), + _ => return Err(DemuxerError::InvalidData), + }; + }, + }; + } } } @@ -189,41 +240,7 @@ impl<'a> DemuxerObject<'a> { } } }, - DemuxerObject::RawStream(ref mut ctx) => { - let mut buf = [0; 1048576]; - loop { - match ctx.pkt.get_packet(ctx.stream.clone()) { - Ok(Some(mut packet)) => { - if packet.get_pts().is_none() && packet.get_duration().is_some() { - packet.ts.pts = Some(ctx.pts); - } - ctx.pts += packet.get_duration().unwrap_or(0); - return Ok(packet); - }, - Ok(None) => {}, - Err(DecoderError::ShortData) => {}, - _ => return Err(DemuxerError::InvalidData), - }; - match ctx.br.read_buf_some(&mut buf) { - Ok(size) => { - ctx.pkt.add_data(&buf[..size]); - }, - Err(_) => { - match ctx.pkt.get_packet(ctx.stream.clone()) { - Ok(Some(mut packet)) => { - if packet.get_pts().is_none() && packet.get_duration().is_some() { - packet.ts.pts = Some(ctx.pts); - } - ctx.pts += packet.get_duration().unwrap_or(0); - return Ok(packet); - }, - Ok(None) | Err(DecoderError::ShortData) => return Err(DemuxerError::EOF), - _ => return Err(DemuxerError::InvalidData), - }; - }, - }; - } - }, + DemuxerObject::RawStream(ref mut ctx) => ctx.get_frame(), _ => unreachable!(), } } @@ -231,6 +248,45 @@ impl<'a> DemuxerObject<'a> { match *self { DemuxerObject::Normal(ref mut dmx) => dmx.seek(seek_time), DemuxerObject::Raw(ref mut dmx, _, _) => dmx.seek(seek_time), + DemuxerObject::RawStream(ref mut ctx) => { + if seek_time == NATimePoint::None { + return Err(DemuxerError::SeekError); + } + if let Some(last) = ctx.seek.seek_info[0].entries.last() { + let in_index = match seek_time { + NATimePoint::None => unreachable!(), + NATimePoint::PTS(pts) => last.pts >= pts, + NATimePoint::Milliseconds(ms) => last.time >= ms, + }; + if in_index { + if let Some(result) = ctx.seek.find_pos(seek_time) { + ctx.br.seek(SeekFrom::Start(result.pos))?; + ctx.pts = result.pts; + ctx.pkt.reset(); + return Ok(()); + } + } + } + if let Some(last) = ctx.seek.seek_info[0].entries.last() { + ctx.br.seek(SeekFrom::Start(last.pos))?; + ctx.pts = last.pts; + ctx.pkt.reset(); + } + let mut key_pts = 0; + while let Ok(pkt) = ctx.get_frame() { + if !pkt.ts.less_than(seek_time) && !pkt.ts.equal(seek_time) { + break; + } + if pkt.is_keyframe() { + key_pts = pkt.get_pts().unwrap_or(0); + } + } + let result = ctx.seek.find_pos(NATimePoint::PTS(key_pts)).unwrap(); + ctx.br.seek(SeekFrom::Start(result.pos))?; + ctx.pts = result.pts; + ctx.pkt.reset(); + Ok(()) + }, _ => Err(DemuxerError::NotImplemented), } } -- 2.39.5