]> git.nihav.org Git - nihav-encoder.git/commitdiff
improve the process of queueing and retrieving packets
authorKostya Shishkov <kostya.shishkov@gmail.com>
Sun, 16 Mar 2025 17:51:50 +0000 (18:51 +0100)
committerKostya Shishkov <kostya.shishkov@gmail.com>
Sun, 16 Mar 2025 17:51:50 +0000 (18:51 +0100)
This should make it queue all packets (without losing them) and stop
at the right position without sending flushed packets after cut-off
to the muxer.

Additionally it should emprove encoding PCM audio.

src/main.rs
src/transcoder.rs

index c83b4bf1c428da5caafeb431d85887210188caf6..8c456b363b6ac9bc4eb8af43abda2bf683a24392 100644 (file)
@@ -110,6 +110,25 @@ macro_rules! parse_id {
     }
 }
 
+fn retrieve_packets(transcoder: &mut Transcoder, mux: &mut Muxer, vdata_size: &mut usize, adata_size: &mut usize) -> bool {
+    while let Some(pkt) = transcoder.queue.get_packet() {
+        if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) {
+            return false;
+        }
+        let pkt_size = pkt.get_buffer().len();
+        match pkt.get_stream().get_media_type() {
+            StreamType::Video => { *vdata_size += pkt_size; },
+            StreamType::Audio => { *adata_size += pkt_size; },
+            _ => {},
+        };
+        if mux.mux_frame(pkt).is_err() {
+            println!("error muxing packet");
+            return false;
+        }
+    }
+    true
+}
+
 #[allow(clippy::single_match)]
 fn main() {
     let args: Vec<_> = env::args().collect();
@@ -461,6 +480,7 @@ fn main() {
     if mux_quirks.is_fixed_duration() {
         transcoder.calc_len = true;
     }
+    transcoder.fixed_rate = mux_quirks.is_fixed_rate();
     transcoder.queue.set_sync(force_sync || !mux_quirks.is_unsync());
 
     if transcoder.calc_len {
@@ -601,19 +621,9 @@ fn main() {
                     let frm = ret.unwrap();
                     dec_ctx.reorderer.add_frame(frm);
                     while let Some(frm) = dec_ctx.reorderer.get_frame() {
-                        if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts) {
+                        if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts, &mut transcoder.queue) {
                             break;
                         }
-                        while let Ok(Some(pkt)) = encoder.get_packet() {
-                            if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; }
-                            let pkt_size = pkt.get_buffer().len();
-                            match pkt.get_stream().get_media_type() {
-                                StreamType::Video => { vdata_size += pkt_size; },
-                                StreamType::Audio => { adata_size += pkt_size; },
-                                _ => {},
-                            };
-                            transcoder.queue.queue_packet(pkt);
-                        }
                     }
                 } else {
                     println!("no decoder for stream {}", src_id);
@@ -622,25 +632,18 @@ fn main() {
             },
         };
 
-        while let Some(pkt) = transcoder.queue.get_packet() {
-            if mux.mux_frame(pkt).is_err() {
-                println!("error muxing packet");
-                break;
-            }
+        if !retrieve_packets(&mut transcoder, &mut mux, &mut vdata_size, &mut adata_size) {
+            break;
         }
     }
-    'reord_flush_loop: for stream in ism.iter() {
+    /*'reord_flush_loop:*/ for stream in ism.iter() {
         let src_id = stream.get_num();
         if let OutputMode::Encode(dst_id, ref mut encoder) = transcoder.encoders[src_id] {
             if let Some(ref mut dec_ctx) = transcoder.decoders[src_id] {
                 while let Some(frm) = dec_ctx.reorderer.get_last_frames() {
-                    if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts) {
+                    if !encoder.encode_frame(dst_id, frm, &transcoder.scale_opts, &mut transcoder.queue) {
                         break;
                     }
-                    while let Ok(Some(pkt)) = encoder.get_packet() {
-                        if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'reord_flush_loop; }
-                        transcoder.queue.queue_packet(pkt);
-                    }
                 }
             }
         }
@@ -658,12 +661,7 @@ fn main() {
         };
     }
 
-    while let Some(pkt) = transcoder.queue.get_packet() {
-        if mux.mux_frame(pkt).is_err() {
-            println!("error muxing packet");
-            break;
-        }
-    }
+    retrieve_packets(&mut transcoder, &mut mux, &mut vdata_size, &mut adata_size);
 
     if transcoder.verbose > 0 {
         println!();
index 32eb2b5663596a0bf98c0fa488a2f2272bb1a7ca..aef00046440266a98863db17984b46af61e99d72 100644 (file)
@@ -121,7 +121,7 @@ pub struct DecodeContext {
 }
 
 pub trait EncoderInterface {
-    fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)]) -> bool;
+    fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool;
     fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()>;
     fn get_packet(&mut self) -> EncoderResult<Option<NAPacket>>;
 }
@@ -132,7 +132,7 @@ pub struct AudioEncodeContext {
 }
 
 impl EncoderInterface for AudioEncodeContext {
-    fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, _scale_opts: &[(String, String)]) -> bool {
+    fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, _scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool {
         let buf = frm.get_buffer();
         let cbuf = if let NABufferType::None = buf {
                 buf
@@ -146,6 +146,9 @@ impl EncoderInterface for AudioEncodeContext {
                     if self.encoder.encode(&ofrm).is_err() {
                         return false;
                     }
+                    while let Ok(Some(pkt)) = self.encoder.get_packet() {
+                        queue.queue_packet(pkt);
+                    }
                 }
 
                 return true;
@@ -154,6 +157,9 @@ impl EncoderInterface for AudioEncodeContext {
             };
         let cfrm = NAFrame::new(frm.get_time_information(), frm.frame_type, frm.key, frm.get_info(), cbuf);
         self.encoder.encode(&cfrm).unwrap();
+        while let Ok(Some(pkt)) = self.encoder.get_packet() {
+            queue.queue_packet(pkt);
+        }
         true        
     }
     fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()> {
@@ -175,7 +181,7 @@ pub struct VideoEncodeContext {
 }
 
 impl EncoderInterface for VideoEncodeContext {
-    fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)]) -> bool {
+    fn encode_frame(&mut self, dst_id: u32, frm: NAFrameRef, scale_opts: &[(String, String)], queue: &mut OutputQueue) -> bool {
         let buf = frm.get_buffer();
         let cbuf = if let NABufferType::None = buf {
             if (self.encoder.get_capabilities() & ENC_CAPS_SKIPFRAME) == 0 {
@@ -211,6 +217,9 @@ impl EncoderInterface for VideoEncodeContext {
         };
         let cfrm = NAFrame::new(frm.get_time_information(), frm.frame_type, frm.key, frm.get_info(), cbuf);
         self.encoder.encode(&cfrm).unwrap();
+        while let Ok(Some(pkt)) = self.encoder.get_packet() {
+            queue.queue_packet(pkt);
+        }
         true
     }
     fn flush(&mut self, queue: &mut OutputQueue) -> EncoderResult<()> {
@@ -320,6 +329,7 @@ pub struct Transcoder {
     pub global_tb:      (u32, u32),
 
     pub queue:          OutputQueue,
+    pub fixed_rate:     bool,
 }
 
 impl Transcoder {
@@ -718,8 +728,10 @@ impl Transcoder {
 
                 parse_and_apply_options!(encoder, &oopts.enc_opts, name);
 
+                let enc_stream = ret.unwrap();
+                let real_fmt = enc_stream.get_info().get_properties();
 //todo check for params mismatch
-                let enc_ctx: Box<dyn EncoderInterface> = match (&iformat, &ret_eparams.format) {
+                let enc_ctx: Box<dyn EncoderInterface> = match (&iformat, &real_fmt) {
                         (NACodecTypeInfo::Video(svinfo), NACodecTypeInfo::Video(dvinfo)) => {
                             if svinfo == dvinfo && !forced_out {
                                 Box::new(VideoEncodeContext { encoder, scaler: None, scaler_buf: NABufferType::None })
@@ -760,7 +772,7 @@ println!("can't generate default channel map for {} channels", dainfo.channels);
                         },
                         _ => unreachable!(),
                     };
-                out_sm.add_stream_ref(ret.unwrap());
+                out_sm.add_stream_ref(enc_stream);
                 self.encoders.push(OutputMode::Encode(out_id, enc_ctx));
             } else {
 println!("encoder {} is not supported by output (expected {})", istr.id, istr.get_info().get_name());