From 37f130a74415deaf920b04209e1c334a8876c381 Mon Sep 17 00:00:00 2001 From: Kostya Shishkov Date: Wed, 14 Jun 2023 19:00:07 +0200 Subject: [PATCH] make video player use multi-threaded decoders if possible --- videoplayer/src/audiodec.rs | 8 +- videoplayer/src/main.rs | 62 ++++++++++++-- videoplayer/src/videodec.rs | 164 +++++++++++++++++++++++++++++++----- 3 files changed, 207 insertions(+), 27 deletions(-) diff --git a/videoplayer/src/audiodec.rs b/videoplayer/src/audiodec.rs index 3045192..e9da970 100644 --- a/videoplayer/src/audiodec.rs +++ b/videoplayer/src/audiodec.rs @@ -12,7 +12,7 @@ use nihav_core::formats::*; use nihav_core::codecs::*; use nihav_core::soundcvt::*; -use super::{DecoderStuff, PktSendEvent}; +use super::{DecoderStuff, DecoderType, PktSendEvent}; static SKIP_ADECODING: AtomicBool = AtomicBool::new(false); static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100); @@ -181,6 +181,7 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_ } let adevice = ret.unwrap(); (Some(adevice), std::thread::Builder::new().name("acontrol".to_string()).spawn(move ||{ + let adec = if let DecoderType::Audio(ref mut dec) = audio_dec.dec { dec } else { panic!("not an audio decoder!"); }; let mut samplepos = 0usize; let dst_chmap = if dst_info.channels == 2 { NAChannelMap::from_str("L,R").unwrap() @@ -199,7 +200,7 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_ std::thread::sleep(Duration::from_millis(100)); } if !SKIP_ADECODING.load(Ordering::Relaxed) { - if let Ok(frm) = audio_dec.dec.decode(&mut audio_dec.dsupp, &pkt) { + if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) { let buf = frm.get_buffer(); if let Some(pts) = frm.get_pts() { samplepos = NATimeInfo::ts_to_time(pts, u64::from(dst_info.sample_rate), frm.ts.tb_num, frm.ts.tb_den) as usize; @@ -223,8 +224,9 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_ } } }, + Ok(PktSendEvent::GetFrames) => {}, Ok(PktSendEvent::Flush) => { - audio_dec.dec.flush(); + adec.flush(); let mut qdata = queue.lock().unwrap(); qdata.flush(); SKIP_ADECODING.store(false, Ordering::Relaxed); diff --git a/videoplayer/src/main.rs b/videoplayer/src/main.rs index 25ddcab..dcd4d0a 100644 --- a/videoplayer/src/main.rs +++ b/videoplayer/src/main.rs @@ -44,16 +44,22 @@ macro_rules! debug_log { pub enum PktSendEvent { Packet(NAPacket), + GetFrames, Flush, End, ImmediateEnd, HurryUp, } +pub enum DecoderType { + Audio(Box), + Video(Box, Box), + VideoMT(Box, MTFrameReorderer), +} + pub struct DecoderStuff { pub dsupp: Box, - pub dec: Box, - pub reord: Box, + pub dec: DecoderType, } fn format_time(ms: u64) -> String { @@ -221,6 +227,9 @@ struct Player { video_str: u32, audio_str: u32, + vthreads: usize, + use_mt: bool, + paused: bool, mute: bool, volume: usize, @@ -254,6 +263,9 @@ impl Player { video_str: 0, audio_str: 0, + vthreads: 3, + use_mt: true, + paused: false, mute: false, volume: 100, @@ -341,6 +353,7 @@ impl Player { self.vcontrol.fill(disp_queue); std::thread::sleep(Duration::from_millis(10)); } + self.vcontrol.wait_for_frames(); self.vcontrol.fill(disp_queue); } debug_log!(self; {format!(" prefilling done, frames {}-{} audio {}", disp_queue.start, disp_queue.end, self.acontrol.get_fill())}); @@ -444,6 +457,10 @@ impl Player { nihav_register_all_demuxers(&mut dmx_reg); let mut dec_reg = RegisteredDecoders::new(); nihav_register_all_decoders(&mut dec_reg); + let mut mtdec_reg = RegisteredMTDecoders::new(); + if self.use_mt { + nihav_register_all_mt_decoders(&mut mtdec_reg); + } let ret = dmx_reg.find_demuxer(dmx_name); if ret.is_none() { @@ -484,11 +501,32 @@ impl Player { let s = dmx.get_stream(i).unwrap(); let info = s.get_info(); let decfunc = dec_reg.find_decoder(info.get_name()); + let decfunc_mt = mtdec_reg.find_decoder(info.get_name()); println!("stream {} - {} {}", i, s, info.get_name()); debug_log!(self; {format!(" stream {} - {} {}", i, s, info.get_name())}); let str_id = s.get_id(); if info.is_video() { if video_dec.is_none() && self.play_video { + if let Some(decfunc) = decfunc_mt { + let mut dec = (decfunc)(); + let mut dsupp = Box::new(NADecoderSupport::new()); + let props = info.get_properties().get_video_info().unwrap(); + if props.get_width() != 0 { + width = props.get_width(); + height = props.get_height(); + } + if dec.init(&mut dsupp, info.clone(), self.vthreads).is_ok() { + video_dec = Some(DecoderStuff{ dsupp, dec: DecoderType::VideoMT(dec, MTFrameReorderer::new()) }); + self.video_str = str_id; + let (tbn, tbd) = s.get_timebase(); + tb_num = tbn; + tb_den = tbd; + self.has_video = true; + continue; + } else { + println!("failed to create multi-threaded decoder, falling back"); + } + } if let Some(decfunc) = decfunc { let mut dec = (decfunc)(); let mut dsupp = Box::new(NADecoderSupport::new()); @@ -509,7 +547,7 @@ impl Player { dsupp.pool_u16 = NAVideoBufferPool::new(reorder_depth); dsupp.pool_u32 = NAVideoBufferPool::new(reorder_depth); dec.init(&mut dsupp, info).unwrap(); - video_dec = Some(DecoderStuff{ dsupp, dec, reord }); + video_dec = Some(DecoderStuff{ dsupp, dec: DecoderType::Video(dec, reord) }); self.video_str = str_id; let (tbn, tbd) = s.get_timebase(); tb_num = tbn; @@ -526,8 +564,7 @@ impl Player { let mut dsupp = Box::new(NADecoderSupport::new()); ainfo = info.get_properties().get_audio_info(); dec.init(&mut dsupp, info).unwrap(); - let reord = Box::new(NoReorderer::new()); - audio_dec = Some(DecoderStuff{ dsupp, dec, reord }); + audio_dec = Some(DecoderStuff{ dsupp, dec: DecoderType::Audio(dec) }); self.audio_str = str_id; self.has_audio = true; } else { @@ -725,6 +762,21 @@ fn main() { "-nodebug" => { player.debug = false; }, + "-mt" => { + player.use_mt = true; + }, + "-nomt" => { + player.use_mt = false; + }, + "-threads" => { + if let Some(arg) = aiter.next() { + if let Ok(val) = arg.parse::() { + player.vthreads = val.max(1); + } else { + println!("wrong number of threads"); + } + } + }, _ => { player.play(arg, seek_time); if player.end { break; } diff --git a/videoplayer/src/videodec.rs b/videoplayer/src/videodec.rs index 0a1dcd5..ac60466 100644 --- a/videoplayer/src/videodec.rs +++ b/videoplayer/src/videodec.rs @@ -10,10 +10,11 @@ use nihav_core::formats::*; use nihav_core::codecs::*; use nihav_core::scale::*; -use super::{DecoderStuff, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN}; +use super::{DecoderStuff, DecoderType, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN}; static SKIP_VDECODING: AtomicBool = AtomicBool::new(false); static VIDEO_END: AtomicBool = AtomicBool::new(false); +static GET_FRAMES_END: AtomicBool = AtomicBool::new(false); pub const FRAME_QUEUE_SIZE: usize = 25; @@ -91,29 +92,134 @@ impl VideoDecoder { Some((opic, time)) } pub fn next_frame(&mut self, pkt: &NAPacket) -> Option { - if let Ok(frm) = self.dec.dec.decode(&mut self.dec.dsupp, pkt) { - self.dec.reord.add_frame(frm); - while let Some(frm) = self.dec.reord.get_frame() { - let bt = frm.get_buffer(); - if let NABufferType::None = bt { continue; } - let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); - return self.convert_buf(bt, ts); - } - } + match self.dec.dec { + DecoderType::Video(ref mut vdec, ref mut reord) => { + if let Ok(frm) = vdec.decode(&mut self.dec.dsupp, pkt) { + reord.add_frame(frm); + while let Some(frm) = reord.get_frame() { + let bt = frm.get_buffer(); + if let NABufferType::None = bt { continue; } + let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); + return self.convert_buf(bt, ts); + } + } + }, + DecoderType::VideoMT(ref mut vdec, ref mut reord) => { + let queue_id = reord.register_frame(); + match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) { + Ok(true) => {}, + Ok(false) => { + while !vdec.can_take_input() || vdec.has_output() { + match vdec.get_frame() { + (Ok(frm), id) => { + reord.add_frame(frm, id); + }, + (Err(err), id) => { + reord.drop_frame(id); + panic!("frame {} decoding error {:?}", id, err); + }, + }; + } + match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) { + Ok(true) => {}, + Ok(false) => panic!("still can't queue frame!"), + Err(err) => panic!("queueing error {:?}", err), + }; + }, + Err(err) => panic!("queueing error {:?}", err), + }; + while let Some(frm) = reord.get_frame() { + let bt = frm.get_buffer(); + if let NABufferType::None = bt { continue; } + let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); + return self.convert_buf(bt, ts); + } + }, + _ => panic!("not a video decoder!"), + }; + None + } + pub fn more_frames(&mut self, do_not_wait: bool) -> Option { + match self.dec.dec { + DecoderType::Video(ref mut _dec, ref mut reord) => { + while let Some(frm) = reord.get_frame() { + let bt = frm.get_buffer(); + if let NABufferType::None = bt { continue; } + let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); + return self.convert_buf(bt, ts); + } + }, + DecoderType::VideoMT(ref mut vdec, ref mut reord) => { + let mut got_some = false; + while vdec.has_output() { + match vdec.get_frame() { + (Ok(frm), id) => { + reord.add_frame(frm, id); + got_some = true; + }, + (Err(err), id) => { + reord.drop_frame(id); + panic!("frame {} decoding error {:?}", id, err); + }, + }; + } + if !got_some && !do_not_wait { + match vdec.get_frame() { + (Ok(frm), id) => { + reord.add_frame(frm, id); + }, + (Err(DecoderError::NoFrame), _) => {}, + (Err(err), id) => { + reord.drop_frame(id); + panic!("frame {} decoding error {:?}", id, err); + }, + }; + } + while let Some(frm) = reord.get_frame() { + let bt = frm.get_buffer(); + if let NABufferType::None = bt { continue; } + let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); + return self.convert_buf(bt, ts); + } + }, + _ => {}, + }; None } pub fn last_frame(&mut self) -> Option { - while let Some(frm) = self.dec.reord.get_last_frames() { - let bt = frm.get_buffer(); - if let NABufferType::None = bt { continue; } - let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); - return self.convert_buf(bt, ts); - } + match self.dec.dec { + DecoderType::Video(ref mut _dec, ref mut reord) => { + while let Some(frm) = reord.get_last_frames() { + let bt = frm.get_buffer(); + if let NABufferType::None = bt { continue; } + let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); + return self.convert_buf(bt, ts); + } + }, + DecoderType::VideoMT(ref mut _dec, ref mut reord) => { + while let Some(frm) = reord.get_last_frames() { + let bt = frm.get_buffer(); + if let NABufferType::None = bt { continue; } + let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0)); + return self.convert_buf(bt, ts); + } + }, + _ => {}, + }; None } pub fn flush(&mut self) { - self.dec.dec.flush(); - self.dec.reord.flush(); + match self.dec.dec { + DecoderType::Video(ref mut dec, ref mut reord) => { + dec.flush(); + reord.flush(); + }, + DecoderType::VideoMT(ref mut dec, ref mut reord) => { + dec.flush(); + reord.flush(); + }, + _ => {}, + }; } } @@ -129,7 +235,16 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v if let Some((buf, time)) = vdec.next_frame(&pkt) { vfsend.send((buf, time)).unwrap(); } + while let Some((buf, time)) = vdec.more_frames(true) { + vfsend.send((buf, time)).unwrap(); + } + } + }, + Ok(PktSendEvent::GetFrames) => { + while let Some((buf, time)) = vdec.more_frames(false) { + vfsend.send((buf, time)).unwrap(); } + GET_FRAMES_END.store(true, Ordering::Relaxed); }, Ok(PktSendEvent::Flush) => { vdec.flush(); @@ -152,10 +267,12 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v }, Ok(PktSendEvent::HurryUp) => { skip_mode = skip_mode.advance(); - vdec.dec.dec.set_options(&[NAOption{ + if let DecoderType::Video(ref mut dec, ref mut _reord) = vdec.dec.dec { + dec.set_options(&[NAOption{ name: FRAME_SKIP_OPTION, value: NAValue::String(skip_mode.to_string()), }]); + } }, Err(_) => { break; @@ -286,6 +403,15 @@ impl VideoControl { pub fn is_video_end(&self) -> bool { VIDEO_END.load(Ordering::Relaxed) } + pub fn wait_for_frames(&mut self) { + GET_FRAMES_END.store(false, Ordering::Relaxed); + self.try_send_event(PktSendEvent::GetFrames); + while !self.try_send_queued() { + } + while !GET_FRAMES_END.load(Ordering::Relaxed) { + thread::yield_now(); + } + } pub fn is_yuv(&self) -> bool { self.do_yuv } -- 2.30.2