try to improve state handling in decoding threads
authorKostya Shishkov <kostya.shishkov@gmail.com>
Sun, 18 Jun 2023 13:28:45 +0000 (15:28 +0200)
committerKostya Shishkov <kostya.shishkov@gmail.com>
Sun, 18 Jun 2023 13:28:45 +0000 (15:28 +0200)
videoplayer/src/audiodec.rs
videoplayer/src/main.rs
videoplayer/src/videodec.rs

index e9da970ad577d3a3c518c8f100134cff23286b2a..3ed639881d56c241f9fab18b6e3f2e36d81466e3 100644 (file)
@@ -12,11 +12,10 @@ use nihav_core::formats::*;
 use nihav_core::codecs::*;
 use nihav_core::soundcvt::*;
 
-use super::{DecoderStuff, DecoderType, PktSendEvent};
+use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, PktSendEvent};
 
-static SKIP_ADECODING: AtomicBool = AtomicBool::new(false);
+static ADEC_STATE: DecoderState = DecoderState::new();
 static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100);
-static AUDIO_END: AtomicBool = AtomicBool::new(false);
 static CUR_QUEUE_FILL: AtomicUsize = AtomicUsize::new(0);
 static SAMPLE_RATE: AtomicUsize = AtomicUsize::new(0);
 static CHANNELS: AtomicUsize = AtomicUsize::new(0);
@@ -188,18 +187,18 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_
                 } else {
                     NAChannelMap::from_str("C").unwrap()
                 };
-            SKIP_ADECODING.store(false, Ordering::Relaxed);
+            ADEC_STATE.set_state(DecodingState::Normal);
             loop {
                 match aprecv.recv() {
                     Ok(PktSendEvent::Packet(pkt)) => {
                         loop {
                             if CUR_QUEUE_FILL.load(Ordering::Relaxed)
- < QUEUE_REFILL_LIMIT || SKIP_ADECODING.load(Ordering::Relaxed) {
+ < QUEUE_REFILL_LIMIT || ADEC_STATE.is_flushing() {
                                 break;
                             }
                             std::thread::sleep(Duration::from_millis(100));
                         }
-                        if !SKIP_ADECODING.load(Ordering::Relaxed) {
+                        if !ADEC_STATE.is_flushing() {
                             if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) {
                                 let buf = frm.get_buffer();
                                 if let Some(pts) = frm.get_pts() {
@@ -229,7 +228,7 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_
                         adec.flush();
                         let mut qdata = queue.lock().unwrap();
                         qdata.flush();
-                        SKIP_ADECODING.store(false, Ordering::Relaxed);
+                        ADEC_STATE.set_state(DecodingState::Waiting);
                     },
                     Ok(PktSendEvent::End) => break,
                     Ok(PktSendEvent::ImmediateEnd) => {
@@ -245,11 +244,11 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_
             }
             loop {
                 let qdata = queue.lock().unwrap();
-                if qdata.fill() == 0 || SKIP_ADECODING.load(Ordering::Relaxed) {
+                if qdata.fill() == 0 || ADEC_STATE.is_flushing() {
                     break;
                 }
             }
-            AUDIO_END.store(true, Ordering::Relaxed);
+            ADEC_STATE.set_state(DecodingState::End);
         }).unwrap())
 }
 
@@ -268,7 +267,7 @@ impl AudioControl {
             } else {
                 (None, dummy_audio_thread(aprecv))
             };
-        AUDIO_END.store(false, Ordering::Relaxed);
+        ADEC_STATE.set_state(DecodingState::Normal);
 
         Self {
             aqueue:     Vec::new(),
@@ -295,7 +294,7 @@ impl AudioControl {
         AUDIO_VOLUME.load(Ordering::Relaxed)
     }
     pub fn is_audio_end(&self) -> bool {
-        AUDIO_END.load(Ordering::Relaxed)
+        matches!(ADEC_STATE.get_state(), DecodingState::End | DecodingState::Error)
     }
     pub fn get_fill(&self) -> usize { CUR_QUEUE_FILL.load(Ordering::Relaxed) }
     pub fn get_time(&self) -> Option<u64> {
@@ -346,12 +345,12 @@ impl AudioControl {
     pub fn flush(&mut self) {
         self.pause();
         self.aqueue.clear();
-        SKIP_ADECODING.store(true, Ordering::Release);
+        ADEC_STATE.set_state(DecodingState::Flush);
         CURRENT_TIME_SET.store(false, Ordering::Release);
         let _ = self.apsend.send(PktSendEvent::Flush);
     }
     pub fn finish(self) {
-        SKIP_ADECODING.store(true, Ordering::Release);
+        ADEC_STATE.set_state(DecodingState::Flush);
         let _ = self.apsend.send(PktSendEvent::ImmediateEnd);
         self.athread.join().unwrap();
     }
index 08b43cb55044dc11f2acf6fe50aafa69112efb50..fa2c4d5bb143bdd9d1b788c33a3f0e0ade6dcdcf 100644 (file)
@@ -9,6 +9,7 @@ use std::io::Write;
 use std::path::Path;
 use std::time::{Duration, Instant};
 use std::thread;
+use std::sync::atomic::{AtomicU8, Ordering};
 
 use sdl2::event::{Event, WindowEvent};
 use sdl2::keyboard::Keycode;
@@ -33,6 +34,55 @@ use videodec::*;
 mod osd;
 use osd::*;
 
+#[repr(u8)]
+#[derive(Clone,Copy,Debug,PartialEq)]
+enum DecodingState {
+    Normal,
+    Waiting,
+    Flush,
+    Prefetch,
+    Error,
+    End,
+}
+
+impl Default for DecodingState {
+    fn default() -> Self { DecodingState::Normal }
+}
+
+impl From<u8> for DecodingState {
+    fn from(val: u8) -> Self {
+        match val {
+            0 => DecodingState::Normal,
+            1 => DecodingState::Waiting,
+            2 => DecodingState::Flush,
+            3 => DecodingState::Prefetch,
+            4 => DecodingState::End,
+            _ => DecodingState::Error,
+        }
+    }
+}
+
+struct DecoderState {
+    state:  AtomicU8
+}
+
+impl DecoderState {
+    const fn new() -> Self {
+        Self {
+            state:  AtomicU8::new(DecodingState::Normal as u8)
+        }
+    }
+    fn set_state(&self, state: DecodingState) {
+        self.state.store(state as u8, Ordering::Release);
+    }
+    fn get_state(&self) -> DecodingState {
+        self.state.load(Ordering::Acquire).into()
+    }
+    fn is_flushing(&self) -> bool {
+        matches!(self.get_state(), DecodingState::Flush | DecodingState::Error)
+    }
+}
+
 #[cfg(feature="debug")]
 macro_rules! debug_log {
     ($log: expr; $blk: block) => {
@@ -315,7 +365,7 @@ impl Player {
             logfile:        File::create("debug.log").unwrap(),
         }
     }
-    fn seek(&mut self, off: u64, fwd: bool, dmx: &mut Demuxer, disp_queue: &mut DispQueue) {
+    fn seek(&mut self, off: u64, fwd: bool, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> Result<(), ()> {
         let cur_time = self.tkeep.get_cur_time();
         let seektime = if fwd { cur_time + off * 1000 } else {
                 cur_time.saturating_sub(off * 1000) };
@@ -324,7 +374,7 @@ impl Player {
         let ret = dmx.seek(NATimePoint::Milliseconds(seektime));
         if ret.is_err() {
             println!(" seek error");
-            return;
+            return Ok(()); //TODO: not ignore some of seek errors?
         }
 
         self.acontrol.flush();
@@ -332,7 +382,7 @@ impl Player {
         disp_queue.flush();
 
         self.tkeep.reset_ts();
-        self.prefill(dmx, disp_queue);
+        self.prefill(dmx, disp_queue)?;
         if !disp_queue.is_empty() {
             self.tkeep.reset_all(disp_queue.first_ts);
         } else {
@@ -351,8 +401,9 @@ impl Player {
         if !self.paused {
             self.acontrol.resume();
         }
+        Ok(())
     }
-    fn prefill(&mut self, dmx: &mut Demuxer, disp_queue: &mut DispQueue) {
+    fn prefill(&mut self, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> Result<(), ()> {
         debug_log!(self; {" prefilling"});
         while self.vcontrol.get_queue_size() < FRAME_QUEUE_LEN {
             let mut try_send = self.acontrol.get_queue_size() < FRAME_QUEUE_LEN && (!self.has_video || (!self.vcontrol.is_filled(FRAME_QUEUE_LEN) && !disp_queue.is_full()));
@@ -389,10 +440,11 @@ impl Player {
                 self.vcontrol.fill(disp_queue);
                 std::thread::sleep(Duration::from_millis(10));
             }
-            self.vcontrol.wait_for_frames();
+            self.vcontrol.wait_for_frames()?;
             self.vcontrol.fill(disp_queue);
         }
         debug_log!(self; {format!(" prefilling done, frames {}-{} audio {}", disp_queue.start, disp_queue.end, self.acontrol.get_fill())});
+        Ok(())
     }
     fn toggle_pause(&mut self) {
         self.paused = !self.paused;
@@ -409,12 +461,12 @@ impl Player {
             self.acontrol.resume();
         }
     }
-    fn handle_events(&mut self, event_pump: &mut sdl2::EventPump, canvas: &mut Canvas<Window>, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> bool {
+    fn handle_events(&mut self, event_pump: &mut sdl2::EventPump, canvas: &mut Canvas<Window>, dmx: &mut Demuxer, disp_queue: &mut DispQueue) -> Result<bool, ()> {
         for event in event_pump.poll_iter() {
             if let Event::Quit {..} = event {
                 self.end = true;
                 println!();
-                return true;
+                return Ok(true);
             }
             if let Event::Window {win_event: WindowEvent::Exposed, ..} = event {
                 canvas.clear();
@@ -429,15 +481,15 @@ impl Player {
                     Keycode::Escape | Keycode::Q => {
                         self.end = true;
                         println!();
-                        return true;
+                        return Ok(true);
                     },
-                    Keycode::Return => return true,
-                    Keycode::Right      => { self.seek(10, true,  dmx, disp_queue); },
-                    Keycode::Left       => { self.seek(10, false, dmx, disp_queue); },
-                    Keycode::Up         => { self.seek(60, true,  dmx, disp_queue); },
-                    Keycode::Down       => { self.seek(60, false, dmx, disp_queue); },
-                    Keycode::PageUp     => { self.seek(600, true,  dmx, disp_queue); },
-                    Keycode::PageDown   => { self.seek(600, false, dmx, disp_queue); },
+                    Keycode::Return => return Ok(true),
+                    Keycode::Right      => { self.seek(10, true,  dmx, disp_queue)?; },
+                    Keycode::Left       => { self.seek(10, false, dmx, disp_queue)?; },
+                    Keycode::Up         => { self.seek(60, true,  dmx, disp_queue)?; },
+                    Keycode::Down       => { self.seek(60, false, dmx, disp_queue)?; },
+                    Keycode::PageUp     => { self.seek(600, true,  dmx, disp_queue)?; },
+                    Keycode::PageDown   => { self.seek(600, false, dmx, disp_queue)?; },
                     Keycode::Space => { self.toggle_pause(); },
                     Keycode::Plus | Keycode::KpPlus => {
                         self.volume = (self.volume + 10).min(MAX_VOLUME);
@@ -476,7 +528,7 @@ impl Player {
                 }
             }
         }
-        false
+        Ok(false)
     }
     fn play(&mut self, name: &str, start_time: NATimePoint) {
         debug_log!(self; {format!("Playing {}", name)});
@@ -671,7 +723,13 @@ impl Player {
         }
 
         // play
-        self.prefill(&mut dmx, &mut disp_q);
+        if self.prefill(&mut dmx, &mut disp_q).is_err() {
+            std::mem::swap(&mut self.vcontrol, &mut new_vcontrol);
+            new_vcontrol.finish();
+            std::mem::swap(&mut self.acontrol, &mut new_acontrol);
+            new_acontrol.finish();
+            return;
+        }
         self.tkeep.reset_all(if !disp_q.is_empty() { disp_q.first_ts } else { 0 });
         if !self.paused {
             self.acontrol.resume();
@@ -680,7 +738,8 @@ impl Player {
         let mut last_disp = Instant::now();
         let mut has_data = true;
         'main: loop {
-            if self.handle_events(&mut event_pump, &mut canvas, &mut dmx, &mut disp_q) {
+            let ret = self.handle_events(&mut event_pump, &mut canvas, &mut dmx, &mut disp_q);
+            if matches!(ret, Ok(true) | Err(_)) {
                 println!();
                 break 'main;
             }
index ac60466be8fa4827f78b964bcf40f14cb5d1a814..1a34aad80d221bf691d0ef3b7730453fbb386f51 100644 (file)
@@ -1,5 +1,4 @@
 use std::thread::JoinHandle;
-use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
 use std::thread;
 
@@ -10,11 +9,9 @@ use nihav_core::formats::*;
 use nihav_core::codecs::*;
 use nihav_core::scale::*;
 
-use super::{DecoderStuff, DecoderType, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
+use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
 
-static SKIP_VDECODING: AtomicBool = AtomicBool::new(false);
-static VIDEO_END: AtomicBool = AtomicBool::new(false);
-static GET_FRAMES_END: AtomicBool = AtomicBool::new(false);
+static VDEC_STATE: DecoderState = DecoderState::new();
 
 pub const FRAME_QUEUE_SIZE: usize = 25;
 
@@ -69,7 +66,7 @@ impl VideoDecoder {
         let mut opic = if let ColorModel::YUV(_) = self.ifmt.get_format().get_model() {
                 self.yuv_pool.prealloc_video(self.oinfo_yuv, 2).unwrap();
                 while self.yuv_pool.get_free().is_none() {
-                    if SKIP_VDECODING.load(Ordering::Relaxed) {
+                    if VDEC_STATE.is_flushing() {
                         return None;
                     }
                     std::thread::yield_now();
@@ -78,7 +75,7 @@ impl VideoDecoder {
             } else {
                 self.rgb_pool.prealloc_video(self.oinfo_rgb, 0).unwrap();
                 while self.rgb_pool.get_free().is_none() {
-                    if SKIP_VDECODING.load(Ordering::Relaxed) {
+                    if VDEC_STATE.is_flushing() {
                         return None;
                     }
                     std::thread::yield_now();
@@ -225,13 +222,13 @@ impl VideoDecoder {
 
 fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, video_dec: DecoderStuff, vprecv: Receiver<PktSendEvent>, vfsend: SyncSender<(NABufferType, u64)>) -> JoinHandle<()> {
     std::thread::Builder::new().name("vdecoder".to_string()).spawn(move ||{
-            SKIP_VDECODING.store(false, Ordering::Relaxed);
+            VDEC_STATE.set_state(DecodingState::Waiting);
             let mut vdec = VideoDecoder::new(width, height, tb_num, tb_den, video_dec);
             let mut skip_mode = FrameSkipMode::None;
             loop {
                 match vprecv.recv() {
                     Ok(PktSendEvent::Packet(pkt)) => {
-                        if !SKIP_VDECODING.load(Ordering::Relaxed) {
+                        if !VDEC_STATE.is_flushing() {
                             if let Some((buf, time)) = vdec.next_frame(&pkt) {
                                 vfsend.send((buf, time)).unwrap();
                             }
@@ -244,11 +241,11 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v
                         while let Some((buf, time)) = vdec.more_frames(false) {
                             vfsend.send((buf, time)).unwrap();
                         }
-                        GET_FRAMES_END.store(true, Ordering::Relaxed);
+                        VDEC_STATE.set_state(DecodingState::Waiting);
                     },
                     Ok(PktSendEvent::Flush) => {
                         vdec.flush();
-                        SKIP_VDECODING.store(false, Ordering::Relaxed);
+                        VDEC_STATE.set_state(DecodingState::Waiting);
                     },
                     Ok(PktSendEvent::End) => {
                         while vdec.yuv_pool.get_free().is_some() && vdec.rgb_pool.get_free().is_some() {
@@ -258,11 +255,11 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v
                             }
                             vfsend.send(ret.unwrap()).unwrap();
                         }
-                        VIDEO_END.store(true, Ordering::Relaxed);
+                        VDEC_STATE.set_state(DecodingState::End);
                         break;
                     },
                     Ok(PktSendEvent::ImmediateEnd) => {
-                        VIDEO_END.store(true, Ordering::Relaxed);
+                        VDEC_STATE.set_state(DecodingState::End);
                         break;
                     },
                     Ok(PktSendEvent::HurryUp) => {
@@ -335,7 +332,7 @@ impl VideoControl {
         let (vpsend, vprecv) = std::sync::mpsc::sync_channel::<PktSendEvent>(0);
         let (vfsend, vfrecv) = std::sync::mpsc::sync_channel::<FrameRecord>(FRAME_QUEUE_SIZE - 1);
 
-        VIDEO_END.store(false, Ordering::Relaxed);
+        VDEC_STATE.set_state(DecodingState::Normal);
 
         let vthread = if let Some(video_dec) = video_dec {
                 start_video_decoding(width, height, tb_num, tb_den, video_dec, vprecv, vfsend)
@@ -351,6 +348,7 @@ impl VideoControl {
                                 _ => {},
                             };
                         }
+                        VDEC_STATE.set_state(DecodingState::End);
                     }).unwrap()
             };
 
@@ -364,7 +362,7 @@ impl VideoControl {
     }
     pub fn flush(&mut self) {
         self.vqueue.clear();
-        SKIP_VDECODING.store(true, Ordering::Release);
+        VDEC_STATE.set_state(DecodingState::Flush);
         for _ in 0..8 {
             let _ = self.vfrecv.try_recv();
         }
@@ -401,15 +399,22 @@ impl VideoControl {
         true
     }
     pub fn is_video_end(&self) -> bool {
-        VIDEO_END.load(Ordering::Relaxed)
+        matches!(VDEC_STATE.get_state(), DecodingState::End | DecodingState::Error)
     }
-    pub fn wait_for_frames(&mut self) {
-        GET_FRAMES_END.store(false, Ordering::Relaxed);
+    pub fn wait_for_frames(&mut self) -> Result<(), ()> {
+        VDEC_STATE.set_state(DecodingState::Prefetch);
         self.try_send_event(PktSendEvent::GetFrames);
         while !self.try_send_queued() {
         }
-        while !GET_FRAMES_END.load(Ordering::Relaxed) {
-            thread::yield_now();
+        loop {
+            match VDEC_STATE.get_state() {
+                DecodingState::Waiting => {
+                    VDEC_STATE.set_state(DecodingState::Normal);
+                    return Ok(());
+                },
+                DecodingState::Prefetch => thread::yield_now(),
+                _ => return Err(()),
+            };
         }
     }
 
@@ -451,7 +456,7 @@ impl VideoControl {
     }
 
     pub fn finish(self) {
-        SKIP_VDECODING.store(true, Ordering::Release);
+        VDEC_STATE.set_state(DecodingState::Flush);
         for _ in 0..8 {
             let _ = self.vfrecv.try_recv();
         }