From: Kostya Shishkov Date: Sat, 10 Jun 2023 10:29:48 +0000 (+0200) Subject: generic frame reorderer for multi-threaded decoders X-Git-Url: https://git.nihav.org/?p=nihav.git;a=commitdiff_plain;h=4213ad5979331cc06dfef79c00b7f0615ffed0d9 generic frame reorderer for multi-threaded decoders --- diff --git a/nihav-core/src/reorder.rs b/nihav-core/src/reorder.rs index ebd7bdf..e8ef622 100644 --- a/nihav-core/src/reorder.rs +++ b/nihav-core/src/reorder.rs @@ -6,6 +6,7 @@ //! //! You can find out required reorderer by quering codec properties using `nihav_core::register` module. use std::mem::swap; +use std::collections::VecDeque; pub use crate::frame::{FrameType, NAFrameRef}; /// A trait for frame reorderer. @@ -170,3 +171,122 @@ impl FrameReorderer for ComplexReorderer { } } +/// A generic reorderer for a multi-threaded decoder. +#[derive(Default)] +pub struct MTFrameReorderer { + ids: Vec, + frames: VecDeque<(u32, NAFrameRef)>, + cur_id: u32, + flush_mode: bool, + output_to: Option, + last_ts: Option, +} + +impl MTFrameReorderer { + /// Constructs a new instance of `MTFrameReorderer`. + pub fn new() -> Self { Self::default() } + /// Registers the fact that a new frame is queued for decoding and returns an internal ID for it. + pub fn register_frame(&mut self) -> u32 { + if self.flush_mode { + self.flush_mode = false; + } + let ret = self.cur_id; + self.cur_id += 1; + self.ids.push(ret); + ret + } + /// Puts a newly decoded frame into the internal queue. + pub fn add_frame(&mut self, frm: NAFrameRef, id: u32) { + //let ftype = frm.get_frame_type(); + let frm_id = if let Some(ts) = frm.ts.dts { ts } else { u64::from(id) }; + let mut idx = 0; + for (_, frm) in self.frames.iter() { + let cur_id = if let Some(ts) = frm.ts.dts { ts } else { frm.id as u64 }; + if frm_id < cur_id { + break; + } + idx += 1; + } + self.frames.insert(idx, (id, frm)); + /*if self.frames.len() > 48 { + for (id, frm) in self.frames.iter() { print!(" {}{}({})", frm.get_frame_type(), frm.get_dts().unwrap_or(0), *id); } println!(); + print!("reg IDs:"); for &id in self.ids.iter() { print!(" {}", id); } println!(); + panic!("too many frames in the queue"); + }*/ + } + /// Removes the registered frame (e.g. in case of a decoding error). + pub fn drop_frame(&mut self, id: u32) { + self.ids.retain(|&el| el != id); + } + fn get_first_frame(&mut self) -> Option { + let (id, frm) = self.frames.pop_front().unwrap(); + self.drop_frame(id); + self.last_ts = frm.get_dts(); + Some(frm) + } + /// Gets the next frame to be displayed (or `None` if that is not possible). + #[allow(clippy::collapsible_if)] + pub fn get_frame(&mut self) -> Option { + // check if we have consequent timestamps that we can output + if !self.frames.is_empty() { + if let Some(dts) = self.frames[0].1.get_dts() { + let last_ts = self.last_ts.unwrap_or(0); + if self.last_ts.is_none() || (dts == last_ts + 1) { + self.output_to = None; + return self.get_first_frame(); + } + } + } + if !self.flush_mode { + 'out_loop: loop { + if let Some(last_id) = self.output_to { + if self.frames[0].0 != last_id { + return self.get_first_frame(); + } else { + self.output_to = None; + } + } + for (pos, (id, frm)) in self.frames.iter().enumerate() { + if frm.is_keyframe() || (self.frames.len() > 32 && matches!(frm.get_frame_type(), FrameType::I | FrameType::P)) { + let kf_id = *id; + self.ids.sort(); + if pos == 0 && kf_id == self.ids[0] { + return self.get_first_frame(); + } + let end = self.ids.iter().position(|&id| id == kf_id).unwrap(); + for ref_id in self.ids[..end].iter() { + if self.frames.iter().position(|(id, _)| id == ref_id).is_none() { + return None; + } + } + self.output_to = if pos < self.frames.len() - 1 { + Some(self.frames[pos + 1].0) + } else { + Some(kf_id) + }; + continue 'out_loop; + } + } + return None; + } + } else { + if !self.frames.is_empty() { + Some(self.frames.pop_front().unwrap().1) + } else { + None + } + } + } + /// Retrieves the last frames stored by the reorderer. + pub fn get_last_frames(&mut self) -> Option { + self.flush_mode = true; + self.get_frame() + } + /// Clears all stored frames. + pub fn flush(&mut self) { + self.flush_mode = false; + self.frames.clear(); + self.output_to = None; + self.last_ts = None; + } +}