let mut arg_idx = 1;
let mut printed_info = false;
+ let mut force_sync = false;
while arg_idx < args.len() {
match args[arg_idx].as_str() {
"--list-decoders" => {
return;
}
},
+ "--sync" => {
+ force_sync = true;
+ },
"--muxer-options" => {
next_arg!(args, arg_idx);
if !transcoder.parse_muxer_options(&args[arg_idx], &full_reg.mux_reg) {
if mux_quirks.is_fixed_duration() {
transcoder.calc_len = true;
}
+ transcoder.queue.set_sync(force_sync || !mux_quirks.is_unsync());
if transcoder.calc_len {
let mut sids = Vec::new();
println!(" #{}: {} {}", ostr.get_num(), ostr, ostr.get_info().get_name());
}
+ transcoder.queue.reserve_streams(mux.get_num_streams());
+
let mut time = Instant::now();
let show_interval = Duration::from_millis(100);
let mut adata_size = 0;
StreamType::Audio => { adata_size += pkt_size; },
_ => {},
};
- if mux.mux_frame(pkt).is_err() {
- println!("error muxing packet");
- break;
- }
+ transcoder.queue.queue_packet(pkt);
},
OutputMode::Encode(dst_id, ref mut encoder, ref mut cvt) => {
if let Some((ref mut dsupp, ref mut decoder, ref mut reorderer)) = transcoder.decoders[src_id] {
StreamType::Audio => { adata_size += pkt_size; },
_ => {},
};
- mux.mux_frame(pkt).unwrap();
+ transcoder.queue.queue_packet(pkt);
}
}
if let OutputConvert::Audio(ref mut acvt) = cvt {
if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; }
let pkt_size = pkt.get_buffer().len();
adata_size += pkt_size;
- mux.mux_frame(pkt).unwrap();
+ transcoder.queue.queue_packet(pkt);
}
}
}
}
},
};
+
+ while let Some(pkt) = transcoder.queue.get_packet() {
+ if mux.mux_frame(pkt).is_err() {
+ println!("error muxing packet");
+ break;
+ }
+ }
}
'reord_flush_loop: for stream in ism.iter() {
let src_id = stream.get_num();
}
while let Ok(Some(pkt)) = encoder.get_packet() {
if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'reord_flush_loop; }
- mux.mux_frame(pkt).unwrap();
+ transcoder.queue.queue_packet(pkt);
}
}
}
}
}
- 'flush_loop: for enc in transcoder.encoders.iter_mut() {
+ /*'flush_loop:*/ for enc in transcoder.encoders.iter_mut() {
match enc {
OutputMode::Encode(str_id, ref mut encoder, _) => {
let ret = encoder.flush();
break;
} else {
while let Ok(Some(pkt)) = encoder.get_packet() {
- if mux.mux_frame(pkt).is_err() {
- println!("error muxing packet");
- break 'flush_loop;
- }
+ transcoder.queue.queue_packet(pkt);
}
}
},
_ => {},
};
}
+
+ while let Some(pkt) = transcoder.queue.get_packet() {
+ if mux.mux_frame(pkt).is_err() {
+ println!("error muxing packet");
+ break;
+ }
+ }
+
if transcoder.verbose > 0 {
println!();
}
use std::fs::File;
use std::io::BufReader;
use std::pin::Pin;
+use std::collections::VecDeque;
use nihav_core::frame::*;
use nihav_core::options::*;
use nihav_core::codecs::*;
Encode(u32, Box<dyn NAEncoder>, OutputConvert),
}
+#[derive(Default)]
+pub struct OutputQueue {
+ packets: Vec<VecDeque<NAPacket>>,
+ sync: bool,
+ cur_stream: usize,
+ had_warning: bool,
+}
+
+impl OutputQueue {
+ pub fn set_sync(&mut self, sync: bool) { self.sync = sync; }
+ pub fn reserve_streams(&mut self, nstreams: usize) {
+ while self.packets.len() < nstreams {
+ self.packets.push(VecDeque::new());
+ }
+ }
+ pub fn queue_packet(&mut self, pkt: NAPacket) {
+ let stream_no = pkt.get_stream().get_num();
+ if stream_no >= self.packets.len() {
+ panic!("wrong packet stream");
+ }
+ self.packets[stream_no].push_back(pkt);
+ }
+ pub fn get_packet(&mut self) -> Option<NAPacket> {
+ if self.sync {
+ for queue in self.packets.iter() {
+ if queue.is_empty() {
+ if !self.had_warning && self.packets.len() > 1 {
+ for q in self.packets.iter() {
+ if q.len() > 64 {
+ self.had_warning = true;
+ println!("Too many packets in one of the queues, synchronisation is not possible!");
+ self.sync = false;
+ break;
+ }
+ }
+ }
+ return None;
+ }
+ }
+// todo return packet with minimum timestamp
+ self.get_last_packet()
+ } else {
+ self.get_last_packet()
+ }
+ }
+ pub fn get_last_packet(&mut self) -> Option<NAPacket> {
+ let last_str = self.cur_stream;
+ loop {
+ let idx = self.cur_stream;
+ self.cur_stream += 1;
+ if self.cur_stream >= self.packets.len() {
+ self.cur_stream = 0;
+ }
+ if let Some(pkt) = self.packets[idx].pop_front() {
+ return Some(pkt);
+ }
+ if self.cur_stream == last_str {
+ break;
+ }
+ }
+ None
+ }
+}
+
#[derive(Default)]
#[allow(clippy::type_complexity)]
pub struct Transcoder {
pub calc_len: bool,
pub nframes: Vec<usize>,
pub global_tb: (u32, u32),
+
+ pub queue: OutputQueue,
}
impl Transcoder {