From: Kostya Shishkov Date: Sun, 18 Jun 2023 13:28:45 +0000 (+0200) Subject: try to improve state handling in decoding threads X-Git-Url: https://git.nihav.org/?p=nihav-player.git;a=commitdiff_plain;h=4e72c04a50ab02f6be479799d5b48161f043ebd2 try to improve state handling in decoding threads --- diff --git a/videoplayer/src/audiodec.rs b/videoplayer/src/audiodec.rs index e9da970..3ed6398 100644 --- a/videoplayer/src/audiodec.rs +++ b/videoplayer/src/audiodec.rs @@ -12,11 +12,10 @@ use nihav_core::formats::*; use nihav_core::codecs::*; use nihav_core::soundcvt::*; -use super::{DecoderStuff, DecoderType, PktSendEvent}; +use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, PktSendEvent}; -static SKIP_ADECODING: AtomicBool = AtomicBool::new(false); +static ADEC_STATE: DecoderState = DecoderState::new(); static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100); -static AUDIO_END: AtomicBool = AtomicBool::new(false); static CUR_QUEUE_FILL: AtomicUsize = AtomicUsize::new(0); static SAMPLE_RATE: AtomicUsize = AtomicUsize::new(0); static CHANNELS: AtomicUsize = AtomicUsize::new(0); @@ -188,18 +187,18 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_ } else { NAChannelMap::from_str("C").unwrap() }; - SKIP_ADECODING.store(false, Ordering::Relaxed); + ADEC_STATE.set_state(DecodingState::Normal); loop { match aprecv.recv() { Ok(PktSendEvent::Packet(pkt)) => { loop { if CUR_QUEUE_FILL.load(Ordering::Relaxed) - < QUEUE_REFILL_LIMIT || SKIP_ADECODING.load(Ordering::Relaxed) { + < QUEUE_REFILL_LIMIT || ADEC_STATE.is_flushing() { break; } std::thread::sleep(Duration::from_millis(100)); } - if !SKIP_ADECODING.load(Ordering::Relaxed) { + if !ADEC_STATE.is_flushing() { if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) { let buf = frm.get_buffer(); if let Some(pts) = frm.get_pts() { @@ -229,7 +228,7 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_ adec.flush(); let mut qdata = queue.lock().unwrap(); qdata.flush(); - SKIP_ADECODING.store(false, Ordering::Relaxed); + ADEC_STATE.set_state(DecodingState::Waiting); }, Ok(PktSendEvent::End) => break, Ok(PktSendEvent::ImmediateEnd) => { @@ -245,11 +244,11 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_ } loop { let qdata = queue.lock().unwrap(); - if qdata.fill() == 0 || SKIP_ADECODING.load(Ordering::Relaxed) { + if qdata.fill() == 0 || ADEC_STATE.is_flushing() { break; } } - AUDIO_END.store(true, Ordering::Relaxed); + ADEC_STATE.set_state(DecodingState::End); }).unwrap()) } @@ -268,7 +267,7 @@ impl AudioControl { } else { (None, dummy_audio_thread(aprecv)) }; - AUDIO_END.store(false, Ordering::Relaxed); + ADEC_STATE.set_state(DecodingState::Normal); Self { aqueue: Vec::new(), @@ -295,7 +294,7 @@ impl AudioControl { AUDIO_VOLUME.load(Ordering::Relaxed) } pub fn is_audio_end(&self) -> bool { - AUDIO_END.load(Ordering::Relaxed) + matches!(ADEC_STATE.get_state(), DecodingState::End | DecodingState::Error) } pub fn get_fill(&self) -> usize { CUR_QUEUE_FILL.load(Ordering::Relaxed) } pub fn get_time(&self) -> Option { @@ -346,12 +345,12 @@ impl AudioControl { pub fn flush(&mut self) { self.pause(); self.aqueue.clear(); - SKIP_ADECODING.store(true, Ordering::Release); + ADEC_STATE.set_state(DecodingState::Flush); CURRENT_TIME_SET.store(false, Ordering::Release); let _ = self.apsend.send(PktSendEvent::Flush); } pub fn finish(self) { - SKIP_ADECODING.store(true, Ordering::Release); + ADEC_STATE.set_state(DecodingState::Flush); let _ = self.apsend.send(PktSendEvent::ImmediateEnd); self.athread.join().unwrap(); } diff --git a/videoplayer/src/main.rs b/videoplayer/src/main.rs index 08b43cb..fa2c4d5 100644 --- a/videoplayer/src/main.rs +++ b/videoplayer/src/main.rs @@ -9,6 +9,7 @@ use std::io::Write; use std::path::Path; use std::time::{Duration, Instant}; use std::thread; +use std::sync::atomic::{AtomicU8, Ordering}; use sdl2::event::{Event, WindowEvent}; use sdl2::keyboard::Keycode; @@ -33,6 +34,55 @@ use videodec::*; mod osd; use osd::*; +#[repr(u8)] +#[derive(Clone,Copy,Debug,PartialEq)] +enum DecodingState { + Normal, + Waiting, + Flush, + Prefetch, + Error, + End, +} + +impl Default for DecodingState { + fn default() -> Self { DecodingState::Normal } +} + +impl From for DecodingState { + fn from(val: u8) -> Self { + match val { + 0 => DecodingState::Normal, + 1 => DecodingState::Waiting, + 2 => DecodingState::Flush, + 3 => DecodingState::Prefetch, + 4 => DecodingState::End, + _ => DecodingState::Error, + } + } +} + +struct DecoderState { + state: AtomicU8 +} + +impl DecoderState { + const fn new() -> Self { + Self { + state: AtomicU8::new(DecodingState::Normal as u8) + } + } + fn set_state(&self, state: DecodingState) { + self.state.store(state as u8, Ordering::Release); + } + fn get_state(&self) -> DecodingState { + self.state.load(Ordering::Acquire).into() + } + fn is_flushing(&self) -> bool { + matches!(self.get_state(), DecodingState::Flush | DecodingState::Error) + } +} + #[cfg(feature="debug")] macro_rules! debug_log { ($log: expr; $blk: block) => { @@ -315,7 +365,7 @@ impl Player { logfile: File::create("debug.log").unwrap(), } } - fn seek(&mut self, off: u64, fwd: bool, dmx: &mut Demuxer, disp_queue: &mut DispQueue) { + fn seek(&mut self, off: u64, fwd: bool, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> Result<(), ()> { let cur_time = self.tkeep.get_cur_time(); let seektime = if fwd { cur_time + off * 1000 } else { cur_time.saturating_sub(off * 1000) }; @@ -324,7 +374,7 @@ impl Player { let ret = dmx.seek(NATimePoint::Milliseconds(seektime)); if ret.is_err() { println!(" seek error"); - return; + return Ok(()); //TODO: not ignore some of seek errors? } self.acontrol.flush(); @@ -332,7 +382,7 @@ impl Player { disp_queue.flush(); self.tkeep.reset_ts(); - self.prefill(dmx, disp_queue); + self.prefill(dmx, disp_queue)?; if !disp_queue.is_empty() { self.tkeep.reset_all(disp_queue.first_ts); } else { @@ -351,8 +401,9 @@ impl Player { if !self.paused { self.acontrol.resume(); } + Ok(()) } - fn prefill(&mut self, dmx: &mut Demuxer, disp_queue: &mut DispQueue) { + fn prefill(&mut self, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> Result<(), ()> { debug_log!(self; {" prefilling"}); while self.vcontrol.get_queue_size() < FRAME_QUEUE_LEN { let mut try_send = self.acontrol.get_queue_size() < FRAME_QUEUE_LEN && (!self.has_video || (!self.vcontrol.is_filled(FRAME_QUEUE_LEN) && !disp_queue.is_full())); @@ -389,10 +440,11 @@ impl Player { self.vcontrol.fill(disp_queue); std::thread::sleep(Duration::from_millis(10)); } - self.vcontrol.wait_for_frames(); + self.vcontrol.wait_for_frames()?; self.vcontrol.fill(disp_queue); } debug_log!(self; {format!(" prefilling done, frames {}-{} audio {}", disp_queue.start, disp_queue.end, self.acontrol.get_fill())}); + Ok(()) } fn toggle_pause(&mut self) { self.paused = !self.paused; @@ -409,12 +461,12 @@ impl Player { self.acontrol.resume(); } } - fn handle_events(&mut self, event_pump: &mut sdl2::EventPump, canvas: &mut Canvas, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> bool { + fn handle_events(&mut self, event_pump: &mut sdl2::EventPump, canvas: &mut Canvas, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> Result { for event in event_pump.poll_iter() { if let Event::Quit {..} = event { self.end = true; println!(); - return true; + return Ok(true); } if let Event::Window {win_event: WindowEvent::Exposed, ..} = event { canvas.clear(); @@ -429,15 +481,15 @@ impl Player { Keycode::Escape | Keycode::Q => { self.end = true; println!(); - return true; + return Ok(true); }, - Keycode::Return => return true, - Keycode::Right => { self.seek(10, true, dmx, disp_queue); }, - Keycode::Left => { self.seek(10, false, dmx, disp_queue); }, - Keycode::Up => { self.seek(60, true, dmx, disp_queue); }, - Keycode::Down => { self.seek(60, false, dmx, disp_queue); }, - Keycode::PageUp => { self.seek(600, true, dmx, disp_queue); }, - Keycode::PageDown => { self.seek(600, false, dmx, disp_queue); }, + Keycode::Return => return Ok(true), + Keycode::Right => { self.seek(10, true, dmx, disp_queue)?; }, + Keycode::Left => { self.seek(10, false, dmx, disp_queue)?; }, + Keycode::Up => { self.seek(60, true, dmx, disp_queue)?; }, + Keycode::Down => { self.seek(60, false, dmx, disp_queue)?; }, + Keycode::PageUp => { self.seek(600, true, dmx, disp_queue)?; }, + Keycode::PageDown => { self.seek(600, false, dmx, disp_queue)?; }, Keycode::Space => { self.toggle_pause(); }, Keycode::Plus | Keycode::KpPlus => { self.volume = (self.volume + 10).min(MAX_VOLUME); @@ -476,7 +528,7 @@ impl Player { } } } - false + Ok(false) } fn play(&mut self, name: &str, start_time: NATimePoint) { debug_log!(self; {format!("Playing {}", name)}); @@ -671,7 +723,13 @@ impl Player { } // play - self.prefill(&mut dmx, &mut disp_q); + if self.prefill(&mut dmx, &mut disp_q).is_err() { + std::mem::swap(&mut self.vcontrol, &mut new_vcontrol); + new_vcontrol.finish(); + std::mem::swap(&mut self.acontrol, &mut new_acontrol); + new_acontrol.finish(); + return; + } self.tkeep.reset_all(if !disp_q.is_empty() { disp_q.first_ts } else { 0 }); if !self.paused { self.acontrol.resume(); @@ -680,7 +738,8 @@ impl Player { let mut last_disp = Instant::now(); let mut has_data = true; 'main: loop { - if self.handle_events(&mut event_pump, &mut canvas, &mut dmx, &mut disp_q) { + let ret = self.handle_events(&mut event_pump, &mut canvas, &mut dmx, &mut disp_q); + if matches!(ret, Ok(true) | Err(_)) { println!(); break 'main; } diff --git a/videoplayer/src/videodec.rs b/videoplayer/src/videodec.rs index ac60466..1a34aad 100644 --- a/videoplayer/src/videodec.rs +++ b/videoplayer/src/videodec.rs @@ -1,5 +1,4 @@ use std::thread::JoinHandle; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, SyncSender, TrySendError}; use std::thread; @@ -10,11 +9,9 @@ use nihav_core::formats::*; use nihav_core::codecs::*; use nihav_core::scale::*; -use super::{DecoderStuff, DecoderType, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN}; +use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN}; -static SKIP_VDECODING: AtomicBool = AtomicBool::new(false); -static VIDEO_END: AtomicBool = AtomicBool::new(false); -static GET_FRAMES_END: AtomicBool = AtomicBool::new(false); +static VDEC_STATE: DecoderState = DecoderState::new(); pub const FRAME_QUEUE_SIZE: usize = 25; @@ -69,7 +66,7 @@ impl VideoDecoder { let mut opic = if let ColorModel::YUV(_) = self.ifmt.get_format().get_model() { self.yuv_pool.prealloc_video(self.oinfo_yuv, 2).unwrap(); while self.yuv_pool.get_free().is_none() { - if SKIP_VDECODING.load(Ordering::Relaxed) { + if VDEC_STATE.is_flushing() { return None; } std::thread::yield_now(); @@ -78,7 +75,7 @@ impl VideoDecoder { } else { self.rgb_pool.prealloc_video(self.oinfo_rgb, 0).unwrap(); while self.rgb_pool.get_free().is_none() { - if SKIP_VDECODING.load(Ordering::Relaxed) { + if VDEC_STATE.is_flushing() { return None; } std::thread::yield_now(); @@ -225,13 +222,13 @@ impl VideoDecoder { fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, video_dec: DecoderStuff, vprecv: Receiver, vfsend: SyncSender<(NABufferType, u64)>) -> JoinHandle<()> { std::thread::Builder::new().name("vdecoder".to_string()).spawn(move ||{ - SKIP_VDECODING.store(false, Ordering::Relaxed); + VDEC_STATE.set_state(DecodingState::Waiting); let mut vdec = VideoDecoder::new(width, height, tb_num, tb_den, video_dec); let mut skip_mode = FrameSkipMode::None; loop { match vprecv.recv() { Ok(PktSendEvent::Packet(pkt)) => { - if !SKIP_VDECODING.load(Ordering::Relaxed) { + if !VDEC_STATE.is_flushing() { if let Some((buf, time)) = vdec.next_frame(&pkt) { vfsend.send((buf, time)).unwrap(); } @@ -244,11 +241,11 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v while let Some((buf, time)) = vdec.more_frames(false) { vfsend.send((buf, time)).unwrap(); } - GET_FRAMES_END.store(true, Ordering::Relaxed); + VDEC_STATE.set_state(DecodingState::Waiting); }, Ok(PktSendEvent::Flush) => { vdec.flush(); - SKIP_VDECODING.store(false, Ordering::Relaxed); + VDEC_STATE.set_state(DecodingState::Waiting); }, Ok(PktSendEvent::End) => { while vdec.yuv_pool.get_free().is_some() && vdec.rgb_pool.get_free().is_some() { @@ -258,11 +255,11 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v } vfsend.send(ret.unwrap()).unwrap(); } - VIDEO_END.store(true, Ordering::Relaxed); + VDEC_STATE.set_state(DecodingState::End); break; }, Ok(PktSendEvent::ImmediateEnd) => { - VIDEO_END.store(true, Ordering::Relaxed); + VDEC_STATE.set_state(DecodingState::End); break; }, Ok(PktSendEvent::HurryUp) => { @@ -335,7 +332,7 @@ impl VideoControl { let (vpsend, vprecv) = std::sync::mpsc::sync_channel::(0); let (vfsend, vfrecv) = std::sync::mpsc::sync_channel::(FRAME_QUEUE_SIZE - 1); - VIDEO_END.store(false, Ordering::Relaxed); + VDEC_STATE.set_state(DecodingState::Normal); let vthread = if let Some(video_dec) = video_dec { start_video_decoding(width, height, tb_num, tb_den, video_dec, vprecv, vfsend) @@ -351,6 +348,7 @@ impl VideoControl { _ => {}, }; } + VDEC_STATE.set_state(DecodingState::End); }).unwrap() }; @@ -364,7 +362,7 @@ impl VideoControl { } pub fn flush(&mut self) { self.vqueue.clear(); - SKIP_VDECODING.store(true, Ordering::Release); + VDEC_STATE.set_state(DecodingState::Flush); for _ in 0..8 { let _ = self.vfrecv.try_recv(); } @@ -401,15 +399,22 @@ impl VideoControl { true } pub fn is_video_end(&self) -> bool { - VIDEO_END.load(Ordering::Relaxed) + matches!(VDEC_STATE.get_state(), DecodingState::End | DecodingState::Error) } - pub fn wait_for_frames(&mut self) { - GET_FRAMES_END.store(false, Ordering::Relaxed); + pub fn wait_for_frames(&mut self) -> Result<(), ()> { + VDEC_STATE.set_state(DecodingState::Prefetch); self.try_send_event(PktSendEvent::GetFrames); while !self.try_send_queued() { } - while !GET_FRAMES_END.load(Ordering::Relaxed) { - thread::yield_now(); + loop { + match VDEC_STATE.get_state() { + DecodingState::Waiting => { + VDEC_STATE.set_state(DecodingState::Normal); + return Ok(()); + }, + DecodingState::Prefetch => thread::yield_now(), + _ => return Err(()), + }; } } @@ -451,7 +456,7 @@ impl VideoControl { } pub fn finish(self) { - SKIP_VDECODING.store(true, Ordering::Release); + VDEC_STATE.set_state(DecodingState::Flush); for _ in 0..8 { let _ = self.vfrecv.try_recv(); }