From: Kostya Shishkov Date: Sun, 16 Mar 2025 17:51:50 +0000 (+0100) Subject: improve the process of queueing and retrieving packets X-Git-Url: https://git.nihav.org/?a=commitdiff_plain;h=69f7e674cd26ef988dac9ce6f76b47c3b38bfa50;p=nihav-encoder.git improve the process of queueing and retrieving packets This should make it queue all packets (without losing them) and stop at the right position without sending flushed packets after cut-off to the muxer. Additionally it should emprove encoding PCM audio. --- diff --git a/src/main.rs b/src/main.rs index c83b4bf..8c456b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -110,6 +110,25 @@ macro_rules! parse_id { } } +fn retrieve_packets(transcoder: &mut Transcoder, mux: &mut Muxer, vdata_size: &mut usize, adata_size: &mut usize) -> bool { + while let Some(pkt) = transcoder.queue.get_packet() { + if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { + return false; + } + let pkt_size = pkt.get_buffer().len(); + match pkt.get_stream().get_media_type() { + StreamType::Video => { *vdata_size += pkt_size; }, + StreamType::Audio => { *adata_size += pkt_size; }, + _ => {}, + }; + if mux.mux_frame(pkt).is_err() { + println!("error muxing packet"); + return false; + } + } + true +} + #[allow(clippy::single_match)] fn main() { let args: Vec<_> = env::args().collect(); @@ -461,6 +480,7 @@ fn main() { if mux_quirks.is_fixed_duration() { transcoder.calc_len = true; } + transcoder.fixed_rate = mux_quirks.is_fixed_rate(); transcoder.queue.set_sync(force_sync || !mux_quirks.is_unsync()); if transcoder.calc_len { @@ -601,19 +621,9 @@ fn main() { let frm = ret.unwrap(); dec_ctx.reorderer.add_frame(frm); while let Some(frm) = dec_ctx.reorderer.get_frame() { - if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts) { + if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts, &mut transcoder.queue) { break; } - while let Ok(Some(pkt)) = encoder.get_packet() { - if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; } - let pkt_size = pkt.get_buffer().len(); - match pkt.get_stream().get_media_type() { - StreamType::Video => { vdata_size += pkt_size; }, - StreamType::Audio => { adata_size += pkt_size; }, - _ => {}, - }; - transcoder.queue.queue_packet(pkt); - } } } else { println!("no decoder for stream {}", src_id); @@ -622,25 +632,18 @@ fn main() { }, }; - while let Some(pkt) = transcoder.queue.get_packet() { - if mux.mux_frame(pkt).is_err() { - println!("error muxing packet"); - break; - } + if !retrieve_packets(&mut transcoder, &mut mux, &mut vdata_size, &mut adata_size) { + break; } } - 'reord_flush_loop: for stream in ism.iter() { + /*'reord_flush_loop:*/ for stream in ism.iter() { let src_id = stream.get_num(); if let OutputMode::Encode(dst_id, ref mut encoder) = transcoder.encoders[src_id] { if let Some(ref mut dec_ctx) = transcoder.decoders[src_id] { while let Some(frm) = dec_ctx.reorderer.get_last_frames() { - if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts) { + if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts, &mut transcoder.queue) { break; } - while let Ok(Some(pkt)) = encoder.get_packet() { - if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'reord_flush_loop; } - transcoder.queue.queue_packet(pkt); - } } } } @@ -658,12 +661,7 @@ fn main() { }; } - while let Some(pkt) = transcoder.queue.get_packet() { - if mux.mux_frame(pkt).is_err() { - println!("error muxing packet"); - break; - } - } + retrieve_packets(&mut transcoder, &mut mux, &mut vdata_size, &mut adata_size); if transcoder.verbose > 0 { println!(); diff --git a/src/transcoder.rs b/src/transcoder.rs index 32eb2b5..aef0004 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -121,7 +121,7 @@ pub struct DecodeContext { } pub trait EncoderInterface { - fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)]) -> bool; + fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool; fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()>; fn get_packet(&mut self) -> EncoderResult>; } @@ -132,7 +132,7 @@ pub struct AudioEncodeContext { } impl EncoderInterface for AudioEncodeContext { - fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, _scale_opts: &[(String, String)]) -> bool { + fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, _scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool { let buf = frm.get_buffer(); let cbuf = if let NABufferType::None = buf { buf @@ -146,6 +146,9 @@ impl EncoderInterface for AudioEncodeContext { if self.encoder.encode(&ofrm).is_err() { return false; } + while let Ok(Some(pkt)) = self.encoder.get_packet() { + queue.queue_packet(pkt); + } } return true; @@ -154,6 +157,9 @@ impl EncoderInterface for AudioEncodeContext { }; let cfrm = NAFrame::new(frm.get_time_information(), frm.frame_type, frm.key, frm.get_info(), cbuf); self.encoder.encode(&cfrm).unwrap(); + while let Ok(Some(pkt)) = self.encoder.get_packet() { + queue.queue_packet(pkt); + } true } fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()> { @@ -175,7 +181,7 @@ pub struct VideoEncodeContext { } impl EncoderInterface for VideoEncodeContext { - fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)]) -> bool { + fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool { let buf = frm.get_buffer(); let cbuf = if let NABufferType::None = buf { if (self.encoder.get_capabilities() & ENC_CAPS_SKIPFRAME) == 0 { @@ -211,6 +217,9 @@ impl EncoderInterface for VideoEncodeContext { }; let cfrm = NAFrame::new(frm.get_time_information(), frm.frame_type, frm.key, frm.get_info(), cbuf); self.encoder.encode(&cfrm).unwrap(); + while let Ok(Some(pkt)) = self.encoder.get_packet() { + queue.queue_packet(pkt); + } true } fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()> { @@ -320,6 +329,7 @@ pub struct Transcoder { pub global_tb: (u32, u32), pub queue: OutputQueue, + pub fixed_rate: bool, } impl Transcoder { @@ -718,8 +728,10 @@ impl Transcoder { parse_and_apply_options!(encoder, &oopts.enc_opts, name); + let enc_stream = ret.unwrap(); + let real_fmt = enc_stream.get_info().get_properties(); //todo check for params mismatch - let enc_ctx: Box = match (&iformat, &ret_eparams.format) { + let enc_ctx: Box = match (&iformat, &real_fmt) { (NACodecTypeInfo::Video(svinfo), NACodecTypeInfo::Video(dvinfo)) => { if svinfo == dvinfo && !forced_out { Box::new(VideoEncodeContext { encoder, scaler: None, scaler_buf: NABufferType::None }) @@ -760,7 +772,7 @@ println!("can't generate default channel map for {} channels", dainfo.channels); }, _ => unreachable!(), }; - out_sm.add_stream_ref(ret.unwrap()); + out_sm.add_stream_ref(enc_stream); self.encoders.push(OutputMode::Encode(out_id, enc_ctx)); } else { println!("encoder {} is not supported by output (expected {})", istr.id, istr.get_info().get_name());