]> git.nihav.org Git - nihav-encoder.git/commitdiff
prepare framework for syncing output packets
authorKostya Shishkov <kostya.shishkov@gmail.com>
Sat, 15 Mar 2025 12:44:34 +0000 (13:44 +0100)
committerKostya Shishkov <kostya.shishkov@gmail.com>
Sat, 15 Mar 2025 12:44:34 +0000 (13:44 +0100)
src/main.rs
src/transcoder.rs

index e1eba18ae24975945c299714d51710c3b787aa25..24a395ee77a4443f2877e18f5bcb48a9e2cc71eb 100644 (file)
@@ -201,6 +201,7 @@ fn main() {
 
     let mut arg_idx = 1;
     let mut printed_info = false;
+    let mut force_sync = false;
     while arg_idx < args.len() {
         match args[arg_idx].as_str() {
             "--list-decoders" => {
@@ -339,6 +340,9 @@ fn main() {
                     return;
                 }
             },
+            "--sync" => {
+                force_sync = true;
+            },
             "--muxer-options" => {
                 next_arg!(args, arg_idx);
                 if !transcoder.parse_muxer_options(&args[arg_idx], &full_reg.mux_reg) {
@@ -508,6 +512,7 @@ fn main() {
     if mux_quirks.is_fixed_duration() {
         transcoder.calc_len = true;
     }
+    transcoder.queue.set_sync(force_sync || !mux_quirks.is_unsync());
 
     if transcoder.calc_len {
         let mut sids = Vec::new();
@@ -562,6 +567,8 @@ fn main() {
         println!(" #{}: {} {}", ostr.get_num(), ostr, ostr.get_info().get_name());
     }
 
+    transcoder.queue.reserve_streams(mux.get_num_streams());
+
     let mut time = Instant::now();
     let show_interval = Duration::from_millis(100);
     let mut adata_size = 0;
@@ -630,10 +637,7 @@ fn main() {
                     StreamType::Audio => { adata_size += pkt_size; },
                     _ => {},
                 };
-                if mux.mux_frame(pkt).is_err() {
-                    println!("error muxing packet");
-                    break;
-                }
+                transcoder.queue.queue_packet(pkt);
             },
             OutputMode::Encode(dst_id, ref mut encoder, ref mut cvt) => {
                 if let Some((ref mut dsupp, ref mut decoder, ref mut reorderer)) = transcoder.decoders[src_id] {
@@ -660,7 +664,7 @@ fn main() {
                                 StreamType::Audio => { adata_size += pkt_size; },
                                 _ => {},
                             };
-                            mux.mux_frame(pkt).unwrap();
+                            transcoder.queue.queue_packet(pkt);
                         }
                     }
                     if let OutputConvert::Audio(ref mut acvt) = cvt {
@@ -672,7 +676,7 @@ fn main() {
                                 if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; }
                                 let pkt_size = pkt.get_buffer().len();
                                 adata_size += pkt_size;
-                                mux.mux_frame(pkt).unwrap();
+                                transcoder.queue.queue_packet(pkt);
                             }
                         }
                     }
@@ -682,6 +686,13 @@ fn main() {
                 }
             },
         };
+
+        while let Some(pkt) = transcoder.queue.get_packet() {
+            if mux.mux_frame(pkt).is_err() {
+                println!("error muxing packet");
+                break;
+            }
+        }
     }
     'reord_flush_loop: for stream in ism.iter() {
         let src_id = stream.get_num();
@@ -693,13 +704,13 @@ fn main() {
                     }
                     while let Ok(Some(pkt)) = encoder.get_packet() {
                         if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'reord_flush_loop; }
-                        mux.mux_frame(pkt).unwrap();
+                        transcoder.queue.queue_packet(pkt);
                     }
                 }
             }
         }
     }
-    'flush_loop: for enc in transcoder.encoders.iter_mut() {
+    /*'flush_loop:*/ for enc in transcoder.encoders.iter_mut() {
         match enc {
             OutputMode::Encode(str_id, ref mut encoder, _) => {
                 let ret = encoder.flush();
@@ -708,16 +719,21 @@ fn main() {
                     break;
                 } else {
                     while let Ok(Some(pkt)) = encoder.get_packet() {
-                        if mux.mux_frame(pkt).is_err() {
-                            println!("error muxing packet");
-                            break 'flush_loop;
-                        }
+                        transcoder.queue.queue_packet(pkt);
                     }
                 }
             },
             _ => {},
         };
     }
+
+    while let Some(pkt) = transcoder.queue.get_packet() {
+        if mux.mux_frame(pkt).is_err() {
+            println!("error muxing packet");
+            break;
+        }
+    }
+
     if transcoder.verbose > 0 {
         println!();
     }
index b6ba295cd29bf0f22724361196cd21c95a3d95ed..f57d45e8ca523ccc43305f1ab56361809eb62275 100644 (file)
@@ -1,6 +1,7 @@
 use std::fs::File;
 use std::io::BufReader;
 use std::pin::Pin;
+use std::collections::VecDeque;
 use nihav_core::frame::*;
 use nihav_core::options::*;
 use nihav_core::codecs::*;
@@ -126,6 +127,70 @@ pub enum OutputMode {
     Encode(u32, Box<dyn NAEncoder>, OutputConvert),
 }
 
+#[derive(Default)]
+pub struct OutputQueue {
+    packets:        Vec<VecDeque<NAPacket>>,
+    sync:           bool,
+    cur_stream:     usize,
+    had_warning:    bool,
+}
+
+impl OutputQueue {
+    pub fn set_sync(&mut self, sync: bool) { self.sync = sync; }
+    pub fn reserve_streams(&mut self, nstreams: usize) {
+        while self.packets.len() < nstreams {
+            self.packets.push(VecDeque::new());
+        }
+    }
+    pub fn queue_packet(&mut self, pkt: NAPacket) {
+        let stream_no = pkt.get_stream().get_num();
+        if stream_no >= self.packets.len() {
+            panic!("wrong packet stream");
+        }
+        self.packets[stream_no].push_back(pkt);
+    }
+    pub fn get_packet(&mut self) -> Option<NAPacket> {
+        if self.sync {
+            for queue in self.packets.iter() {
+                if queue.is_empty() {
+                    if !self.had_warning && self.packets.len() > 1 {
+                        for q in self.packets.iter() {
+                            if q.len() > 64 {
+                                self.had_warning = true;
+                                println!("Too many packets in one of the queues, synchronisation is not possible!");
+                                self.sync = false;
+                                break;
+                            }
+                        }
+                    }
+                    return None;
+                }
+            }
+// todo return packet with minimum timestamp
+            self.get_last_packet()
+        } else {
+            self.get_last_packet()
+        }
+    }
+    pub fn get_last_packet(&mut self) -> Option<NAPacket> {
+        let last_str = self.cur_stream;
+        loop {
+            let idx = self.cur_stream;
+            self.cur_stream += 1;
+            if self.cur_stream >= self.packets.len() {
+                self.cur_stream = 0;
+            }
+            if let Some(pkt) = self.packets[idx].pop_front() {
+                return Some(pkt);
+            }
+            if self.cur_stream == last_str {
+                break;
+            }
+        }
+        None
+    }
+}
+
 #[derive(Default)]
 #[allow(clippy::type_complexity)]
 pub struct Transcoder {
@@ -149,6 +214,8 @@ pub struct Transcoder {
     pub calc_len:       bool,
     pub nframes:        Vec<usize>,
     pub global_tb:      (u32, u32),
+
+    pub queue:          OutputQueue,
 }
 
 impl Transcoder {