From: Kostya Shishkov Date: Fri, 12 Nov 2021 16:38:28 +0000 (+0100) Subject: add interfaces for raw data stream handling X-Git-Url: https://git.nihav.org/?a=commitdiff_plain;ds=inline;h=1ebf572dc8cdbf6e1440d90295b3e69eb76339df;p=nihav.git add interfaces for raw data stream handling --- diff --git a/nihav-core/src/codecs/mod.rs b/nihav-core/src/codecs/mod.rs index 2ae0c33..1072fc4 100644 --- a/nihav-core/src/codecs/mod.rs +++ b/nihav-core/src/codecs/mod.rs @@ -351,3 +351,69 @@ impl RegisteredEncoders { } } +/// Trait for packetisers (objects that form full packets from raw stream data). +pub trait NAPacketiser { + /// Queues new raw stream data for parsing. + /// + /// Returns false is the internal buffer grows too large. + fn add_data(&mut self, src: &[u8]) -> bool; + /// Tries to retrieve stream information from the data. + /// + /// Returns [`NAStream`] reference on success (with stream ID set to `id`), [`ShortData`] when there is not enough data to parse the headers and other errors in case there was an error parsing the data. + /// + /// [`NAStream`]: ../frame/struct.NAStream.html + /// [`ShortData`]: ./enum.DecoderError.html#variant.ShortData + fn parse_stream(&mut self, id: u32) -> DecoderResult; + /// Tries to discard junk data until the first possible packet header. + /// + /// Returns the number of bytes skipped. + fn skip_junk(&mut self) -> DecoderResult; + /// Tries to form full packet from the already queued data. + /// + /// The function should be called repeatedly until it returns nothing or an error. + fn get_packet(&mut self, stream: NAStreamRef) -> DecoderResult>; + /// Resets the internal buffer. + fn reset(&mut self); +} + +/// Decoder information used during creating a packetiser for requested codec. +#[derive(Clone,Copy)] +pub struct PacketiserInfo { + /// Short packetiser name. + pub name: &'static str, + /// The function that creates a packetiser instance. + pub get_packetiser: fn () -> Box, +} + +/// Structure for registering known packetisers. +/// +/// It is supposed to be filled using `register_all_packetisers()` from some decoders crate and then it can be used to create packetisers for the requested codecs. +#[derive(Default)] +pub struct RegisteredPacketisers { + packs: Vec, +} + +impl RegisteredPacketisers { + /// Constructs a new instance of `RegisteredPacketisers`. + pub fn new() -> Self { + Self { packs: Vec::new() } + } + /// Adds another packetiser to the registry. + pub fn add_packetiser(&mut self, pack: PacketiserInfo) { + self.packs.push(pack); + } + /// Searches for the packetiser for the provided name and returns a function for creating it on success. + pub fn find_packetiser(&self, name: &str) -> Option Box> { + for &pack in self.packs.iter() { + if pack.name == name { + return Some(pack.get_packetiser); + } + } + None + } + /// Provides an iterator over currently registered packetiser. + pub fn iter(&self) -> std::slice::Iter { + self.packs.iter() + } +} + diff --git a/nihav-core/src/demuxers/mod.rs b/nihav-core/src/demuxers/mod.rs index 9c7e46e..46e0617 100644 --- a/nihav-core/src/demuxers/mod.rs +++ b/nihav-core/src/demuxers/mod.rs @@ -500,3 +500,169 @@ impl RegisteredDemuxers { self.dmxs.iter() } } + +/// A trait for raw data demuxing operations. +pub trait RawDemuxCore<'a>: NAOptionHandler { + /// Opens the input stream, reads required headers and prepares everything for packet demuxing. + fn open(&mut self, strmgr: &mut StreamManager, seek_idx: &mut SeekIndex) -> DemuxerResult<()>; + /// Reads a piece of raw data. + fn get_data(&mut self, strmgr: &mut StreamManager) -> DemuxerResult; + /// Seeks to the requested time. + fn seek(&mut self, time: NATimePoint, seek_idx: &SeekIndex) -> DemuxerResult<()>; + /// Returns container duration in milliseconds (zero if not available). + fn get_duration(&self) -> u64; +} + +/// Demuxer structure with auxiliary data. +pub struct RawDemuxer<'a> { + dmx: Box + 'a>, + streams: StreamManager, + seek_idx: SeekIndex, +} + +impl<'a> RawDemuxer<'a> { + /// Constructs a new `Demuxer` instance. + fn new(dmx: Box + 'a>, strmgr: StreamManager, seek_idx: SeekIndex) -> Self { + Self { + dmx, + streams: strmgr, + seek_idx, + } + } + /// Returns a stream reference by its number. + pub fn get_stream(&self, idx: usize) -> Option { + self.streams.get_stream(idx) + } + /// Returns a stream reference by its ID. + pub fn get_stream_by_id(&self, id: u32) -> Option { + self.streams.get_stream_by_id(id) + } + /// Reports the total number of streams. + pub fn get_num_streams(&self) -> usize { + self.streams.get_num_streams() + } + /// Returns a reference to the internal stream manager. + pub fn get_stream_manager(&self) -> &StreamManager { + &self.streams + } + /// Returns an iterator over streams. + pub fn get_streams(&self) -> StreamIter { + self.streams.iter() + } + /// Returns 'ignored' marker for requested stream. + pub fn is_ignored_stream(&self, idx: usize) -> bool { + self.streams.is_ignored(idx) + } + /// Sets 'ignored' marker for requested stream. + pub fn set_ignored_stream(&mut self, idx: usize) { + self.streams.set_ignored(idx) + } + /// Clears 'ignored' marker for requested stream. + pub fn set_unignored_stream(&mut self, idx: usize) { + self.streams.set_unignored(idx) + } + + /// Demuxes a new piece of data from the container. + pub fn get_data(&mut self) -> DemuxerResult { + loop { + let res = self.dmx.get_data(&mut self.streams); + if self.streams.no_ign || res.is_err() { return res; } + let res = res.unwrap(); + let idx = res.get_stream().get_num(); + if !self.is_ignored_stream(idx) { + return Ok(res); + } + } + } + /// Seeks to the requested time if possible. + pub fn seek(&mut self, time: NATimePoint) -> DemuxerResult<()> { + if self.seek_idx.skip_index { + return Err(DemuxerError::NotPossible); + } + self.dmx.seek(time, &self.seek_idx) + } + /// Returns internal seek index. + pub fn get_seek_index(&self) -> &SeekIndex { + &self.seek_idx + } + /// Returns media duration reported by container or its streams. + /// + /// Duration is in milliseconds and set to zero when it is not available. + pub fn get_duration(&self) -> u64 { + let duration = self.dmx.get_duration(); + if duration != 0 { + return duration; + } + let mut duration = 0; + for stream in self.streams.iter() { + if stream.duration > 0 { + let dur = NATimeInfo::ts_to_time(stream.duration, 1000, stream.tb_num, stream.tb_den); + if duration < dur { + duration = dur; + } + } + } + duration + } +} + +impl<'a> NAOptionHandler for RawDemuxer<'a> { + fn get_supported_options(&self) -> &[NAOptionDefinition] { + self.dmx.get_supported_options() + } + fn set_options(&mut self, options: &[NAOption]) { + self.dmx.set_options(options); + } + fn query_option_value(&self, name: &str) -> Option { + self.dmx.query_option_value(name) + } +} + +/// The trait for creating raw data demuxers. +pub trait RawDemuxerCreator { + /// Creates new raw demuxer instance that will use `ByteReader` source as an input. + fn new_demuxer<'a>(&self, br: &'a mut ByteReader<'a>) -> Box + 'a>; + /// Tries to check whether the input can be demuxed with the demuxer. + fn check_format(&self, br: &mut ByteReader) -> bool; + /// Returns the name of current raw data demuxer creator (equal to the container name it can demux). + fn get_name(&self) -> &'static str; +} + +/// Creates raw data demuxer for a provided bytestream. +pub fn create_raw_demuxer<'a>(dmxcr: &dyn RawDemuxerCreator, br: &'a mut ByteReader<'a>) -> DemuxerResult> { + let mut dmx = dmxcr.new_demuxer(br); + let mut str = StreamManager::new(); + let mut seek_idx = SeekIndex::new(); + dmx.open(&mut str, &mut seek_idx)?; + Ok(RawDemuxer::new(dmx, str, seek_idx)) +} + +/// List of registered demuxers. +#[derive(Default)] +pub struct RegisteredRawDemuxers { + dmxs: Vec<&'static dyn RawDemuxerCreator>, +} + +impl RegisteredRawDemuxers { + /// Constructs a new `RegisteredDemuxers` instance. + pub fn new() -> Self { + Self { dmxs: Vec::new() } + } + /// Registers a new demuxer. + pub fn add_demuxer(&mut self, dmx: &'static dyn RawDemuxerCreator) { + self.dmxs.push(dmx); + } + /// Searches for a demuxer that supports requested container format. + pub fn find_demuxer(&self, name: &str) -> Option<&dyn RawDemuxerCreator> { + for &dmx in self.dmxs.iter() { + if dmx.get_name() == name { + return Some(dmx); + } + } + None + } + /// Provides an iterator over currently registered demuxers. + pub fn iter(&self) -> std::slice::Iter<&dyn RawDemuxerCreator> { + self.dmxs.iter() + } +} diff --git a/nihav-core/src/frame.rs b/nihav-core/src/frame.rs index f596dc9..70a054d 100644 --- a/nihav-core/src/frame.rs +++ b/nihav-core/src/frame.rs @@ -1444,6 +1444,37 @@ impl fmt::Display for NAPacket { } } +/// Packet with a piece of data for a raw stream. +pub struct NARawData { + stream: NAStreamRef, + buffer: NABufferRef>, +} + +impl NARawData { + /// Constructs a new `NARawData` instance. + pub fn new(stream: NAStreamRef, vec: Vec) -> Self { + Self { stream, buffer: NABufferRef::new(vec) } + } + /// Constructs a new `NARawData` instance reusing a buffer reference. + pub fn new_from_refbuf(stream: NAStreamRef, buffer: NABufferRef>) -> Self { + Self { stream, buffer } + } + /// Returns information about the stream this data belongs to. + pub fn get_stream(&self) -> NAStreamRef { self.stream.clone() } + /// Returns a reference to packet data. + pub fn get_buffer(&self) -> NABufferRef> { self.buffer.clone() } + /// Assigns raw data to a new stream. + pub fn reassign(&mut self, stream: NAStreamRef) { + self.stream = stream; + } +} + +impl fmt::Display for NARawData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "[raw data for {} size {}]", self.stream, self.buffer.len()) + } +} + #[cfg(test)] mod test { use super::*;