From: Kostya Shishkov Date: Sat, 15 Mar 2025 12:44:34 +0000 (+0100) Subject: prepare framework for syncing output packets X-Git-Url: https://git.nihav.org/?a=commitdiff_plain;h=df5974a12d66488b6552b39dc2d5844fc6654d01;p=nihav-encoder.git prepare framework for syncing output packets --- diff --git a/src/main.rs b/src/main.rs index e1eba18..24a395e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -201,6 +201,7 @@ fn main() { let mut arg_idx = 1; let mut printed_info = false; + let mut force_sync = false; while arg_idx < args.len() { match args[arg_idx].as_str() { "--list-decoders" => { @@ -339,6 +340,9 @@ fn main() { return; } }, + "--sync" => { + force_sync = true; + }, "--muxer-options" => { next_arg!(args, arg_idx); if !transcoder.parse_muxer_options(&args[arg_idx], &full_reg.mux_reg) { @@ -508,6 +512,7 @@ fn main() { if mux_quirks.is_fixed_duration() { transcoder.calc_len = true; } + transcoder.queue.set_sync(force_sync || !mux_quirks.is_unsync()); if transcoder.calc_len { let mut sids = Vec::new(); @@ -562,6 +567,8 @@ fn main() { println!(" #{}: {} {}", ostr.get_num(), ostr, ostr.get_info().get_name()); } + transcoder.queue.reserve_streams(mux.get_num_streams()); + let mut time = Instant::now(); let show_interval = Duration::from_millis(100); let mut adata_size = 0; @@ -630,10 +637,7 @@ fn main() { StreamType::Audio => { adata_size += pkt_size; }, _ => {}, }; - if mux.mux_frame(pkt).is_err() { - println!("error muxing packet"); - break; - } + transcoder.queue.queue_packet(pkt); }, OutputMode::Encode(dst_id, ref mut encoder, ref mut cvt) => { if let Some((ref mut dsupp, ref mut decoder, ref mut reorderer)) = transcoder.decoders[src_id] { @@ -660,7 +664,7 @@ fn main() { StreamType::Audio => { adata_size += pkt_size; }, _ => {}, }; - mux.mux_frame(pkt).unwrap(); + transcoder.queue.queue_packet(pkt); } } if let OutputConvert::Audio(ref mut acvt) = cvt { @@ -672,7 +676,7 @@ fn main() { if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; } let pkt_size = pkt.get_buffer().len(); adata_size += pkt_size; - mux.mux_frame(pkt).unwrap(); + transcoder.queue.queue_packet(pkt); } } } @@ -682,6 +686,13 @@ fn main() { } }, }; + + while let Some(pkt) = transcoder.queue.get_packet() { + if mux.mux_frame(pkt).is_err() { + println!("error muxing packet"); + break; + } + } } 'reord_flush_loop: for stream in ism.iter() { let src_id = stream.get_num(); @@ -693,13 +704,13 @@ fn main() { } while let Ok(Some(pkt)) = encoder.get_packet() { if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'reord_flush_loop; } - mux.mux_frame(pkt).unwrap(); + transcoder.queue.queue_packet(pkt); } } } } } - 'flush_loop: for enc in transcoder.encoders.iter_mut() { + /*'flush_loop:*/ for enc in transcoder.encoders.iter_mut() { match enc { OutputMode::Encode(str_id, ref mut encoder, _) => { let ret = encoder.flush(); @@ -708,16 +719,21 @@ fn main() { break; } else { while let Ok(Some(pkt)) = encoder.get_packet() { - if mux.mux_frame(pkt).is_err() { - println!("error muxing packet"); - break 'flush_loop; - } + transcoder.queue.queue_packet(pkt); } } }, _ => {}, }; } + + while let Some(pkt) = transcoder.queue.get_packet() { + if mux.mux_frame(pkt).is_err() { + println!("error muxing packet"); + break; + } + } + if transcoder.verbose > 0 { println!(); } diff --git a/src/transcoder.rs b/src/transcoder.rs index b6ba295..f57d45e 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -1,6 +1,7 @@ use std::fs::File; use std::io::BufReader; use std::pin::Pin; +use std::collections::VecDeque; use nihav_core::frame::*; use nihav_core::options::*; use nihav_core::codecs::*; @@ -126,6 +127,70 @@ pub enum OutputMode { Encode(u32, Box, OutputConvert), } +#[derive(Default)] +pub struct OutputQueue { + packets: Vec>, + sync: bool, + cur_stream: usize, + had_warning: bool, +} + +impl OutputQueue { + pub fn set_sync(&mut self, sync: bool) { self.sync = sync; } + pub fn reserve_streams(&mut self, nstreams: usize) { + while self.packets.len() < nstreams { + self.packets.push(VecDeque::new()); + } + } + pub fn queue_packet(&mut self, pkt: NAPacket) { + let stream_no = pkt.get_stream().get_num(); + if stream_no >= self.packets.len() { + panic!("wrong packet stream"); + } + self.packets[stream_no].push_back(pkt); + } + pub fn get_packet(&mut self) -> Option { + if self.sync { + for queue in self.packets.iter() { + if queue.is_empty() { + if !self.had_warning && self.packets.len() > 1 { + for q in self.packets.iter() { + if q.len() > 64 { + self.had_warning = true; + println!("Too many packets in one of the queues, synchronisation is not possible!"); + self.sync = false; + break; + } + } + } + return None; + } + } +// todo return packet with minimum timestamp + self.get_last_packet() + } else { + self.get_last_packet() + } + } + pub fn get_last_packet(&mut self) -> Option { + let last_str = self.cur_stream; + loop { + let idx = self.cur_stream; + self.cur_stream += 1; + if self.cur_stream >= self.packets.len() { + self.cur_stream = 0; + } + if let Some(pkt) = self.packets[idx].pop_front() { + return Some(pkt); + } + if self.cur_stream == last_str { + break; + } + } + None + } +} + #[derive(Default)] #[allow(clippy::type_complexity)] pub struct Transcoder { @@ -149,6 +214,8 @@ pub struct Transcoder { pub calc_len: bool, pub nframes: Vec, pub global_tb: (u32, u32), + + pub queue: OutputQueue, } impl Transcoder {