From 69f7e674cd26ef988dac9ce6f76b47c3b38bfa50 Mon Sep 17 00:00:00 2001 From: Kostya Shishkov Date: Sun, 16 Mar 2025 18:51:50 +0100 Subject: [PATCH] improve the process of queueing and retrieving packets This should make it queue all packets (without losing them) and stop at the right position without sending flushed packets after cut-off to the muxer. Additionally it should emprove encoding PCM audio. --- src/main.rs | 54 +++++++++++++++++++++++------------------------ src/transcoder.rs | 22 ++++++++++++++----- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/src/main.rs b/src/main.rs index c83b4bf..8c456b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -110,6 +110,25 @@ macro_rules! parse_id { } } +fn retrieve_packets(transcoder: &mut Transcoder, mux: &mut Muxer, vdata_size: &mut usize, adata_size: &mut usize) -> bool { + while let Some(pkt) = transcoder.queue.get_packet() { + if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { + return false; + } + let pkt_size = pkt.get_buffer().len(); + match pkt.get_stream().get_media_type() { + StreamType::Video => { *vdata_size += pkt_size; }, + StreamType::Audio => { *adata_size += pkt_size; }, + _ => {}, + }; + if mux.mux_frame(pkt).is_err() { + println!("error muxing packet"); + return false; + } + } + true +} + #[allow(clippy::single_match)] fn main() { let args: Vec<_> = env::args().collect(); @@ -461,6 +480,7 @@ fn main() { if mux_quirks.is_fixed_duration() { transcoder.calc_len = true; } + transcoder.fixed_rate = mux_quirks.is_fixed_rate(); transcoder.queue.set_sync(force_sync || !mux_quirks.is_unsync()); if transcoder.calc_len { @@ -601,19 +621,9 @@ fn main() { let frm = ret.unwrap(); dec_ctx.reorderer.add_frame(frm); while let Some(frm) = dec_ctx.reorderer.get_frame() { - if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts) { + if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts, &mut transcoder.queue) { break; } - while let Ok(Some(pkt)) = encoder.get_packet() { - if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; } - let pkt_size = pkt.get_buffer().len(); - match pkt.get_stream().get_media_type() { - StreamType::Video => { vdata_size += pkt_size; }, - StreamType::Audio => { adata_size += pkt_size; }, - _ => {}, - }; - transcoder.queue.queue_packet(pkt); - } } } else { println!("no decoder for stream {}", src_id); @@ -622,25 +632,18 @@ fn main() { }, }; - while let Some(pkt) = transcoder.queue.get_packet() { - if mux.mux_frame(pkt).is_err() { - println!("error muxing packet"); - break; - } + if !retrieve_packets(&mut transcoder, &mut mux, &mut vdata_size, &mut adata_size) { + break; } } - 'reord_flush_loop: for stream in ism.iter() { + /*'reord_flush_loop:*/ for stream in ism.iter() { let src_id = stream.get_num(); if let OutputMode::Encode(dst_id, ref mut encoder) = transcoder.encoders[src_id] { if let Some(ref mut dec_ctx) = transcoder.decoders[src_id] { while let Some(frm) = dec_ctx.reorderer.get_last_frames() { - if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts) { + if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts, &mut transcoder.queue) { break; } - while let Ok(Some(pkt)) = encoder.get_packet() { - if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'reord_flush_loop; } - transcoder.queue.queue_packet(pkt); - } } } } @@ -658,12 +661,7 @@ fn main() { }; } - while let Some(pkt) = transcoder.queue.get_packet() { - if mux.mux_frame(pkt).is_err() { - println!("error muxing packet"); - break; - } - } + retrieve_packets(&mut transcoder, &mut mux, &mut vdata_size, &mut adata_size); if transcoder.verbose > 0 { println!(); diff --git a/src/transcoder.rs b/src/transcoder.rs index 32eb2b5..aef0004 100644 --- a/src/transcoder.rs +++ b/src/transcoder.rs @@ -121,7 +121,7 @@ pub struct DecodeContext { } pub trait EncoderInterface { - fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)]) -> bool; + fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool; fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()>; fn get_packet(&mut self) -> EncoderResult>; } @@ -132,7 +132,7 @@ pub struct AudioEncodeContext { } impl EncoderInterface for AudioEncodeContext { - fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, _scale_opts: &[(String, String)]) -> bool { + fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, _scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool { let buf = frm.get_buffer(); let cbuf = if let NABufferType::None = buf { buf @@ -146,6 +146,9 @@ impl EncoderInterface for AudioEncodeContext { if self.encoder.encode(&ofrm).is_err() { return false; } + while let Ok(Some(pkt)) = self.encoder.get_packet() { + queue.queue_packet(pkt); + } } return true; @@ -154,6 +157,9 @@ impl EncoderInterface for AudioEncodeContext { }; let cfrm = NAFrame::new(frm.get_time_information(), frm.frame_type, frm.key, frm.get_info(), cbuf); self.encoder.encode(&cfrm).unwrap(); + while let Ok(Some(pkt)) = self.encoder.get_packet() { + queue.queue_packet(pkt); + } true } fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()> { @@ -175,7 +181,7 @@ pub struct VideoEncodeContext { } impl EncoderInterface for VideoEncodeContext { - fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)]) -> bool { + fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool { let buf = frm.get_buffer(); let cbuf = if let NABufferType::None = buf { if (self.encoder.get_capabilities() & ENC_CAPS_SKIPFRAME) == 0 { @@ -211,6 +217,9 @@ impl EncoderInterface for VideoEncodeContext { }; let cfrm = NAFrame::new(frm.get_time_information(), frm.frame_type, frm.key, frm.get_info(), cbuf); self.encoder.encode(&cfrm).unwrap(); + while let Ok(Some(pkt)) = self.encoder.get_packet() { + queue.queue_packet(pkt); + } true } fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()> { @@ -320,6 +329,7 @@ pub struct Transcoder { pub global_tb: (u32, u32), pub queue: OutputQueue, + pub fixed_rate: bool, } impl Transcoder { @@ -718,8 +728,10 @@ impl Transcoder { parse_and_apply_options!(encoder, &oopts.enc_opts, name); + let enc_stream = ret.unwrap(); + let real_fmt = enc_stream.get_info().get_properties(); //todo check for params mismatch - let enc_ctx: Box = match (&iformat, &ret_eparams.format) { + let enc_ctx: Box = match (&iformat, &real_fmt) { (NACodecTypeInfo::Video(svinfo), NACodecTypeInfo::Video(dvinfo)) => { if svinfo == dvinfo && !forced_out { Box::new(VideoEncodeContext { encoder, scaler: None, scaler_buf: NABufferType::None }) @@ -760,7 +772,7 @@ println!("can't generate default channel map for {} channels", dainfo.channels); }, _ => unreachable!(), }; - out_sm.add_stream_ref(ret.unwrap()); + out_sm.add_stream_ref(enc_stream); self.encoders.push(OutputMode::Encode(out_id, enc_ctx)); } else { println!("encoder {} is not supported by output (expected {})", istr.id, istr.get_info().get_name()); -- 2.39.5