make video player use multi-threaded decoders if possible
authorKostya Shishkov <kostya.shishkov@gmail.com>
Wed, 14 Jun 2023 17:00:07 +0000 (19:00 +0200)
committerKostya Shishkov <kostya.shishkov@gmail.com>
Wed, 14 Jun 2023 17:00:07 +0000 (19:00 +0200)
videoplayer/src/audiodec.rs
videoplayer/src/main.rs
videoplayer/src/videodec.rs

index 304519294c487b805397e62e752c5dcc1d0c8d1b..e9da970ad577d3a3c518c8f100134cff23286b2a 100644 (file)
@@ -12,7 +12,7 @@ use nihav_core::formats::*;
 use nihav_core::codecs::*;
 use nihav_core::soundcvt::*;
 
-use super::{DecoderStuff, PktSendEvent};
+use super::{DecoderStuff, DecoderType, PktSendEvent};
 
 static SKIP_ADECODING: AtomicBool = AtomicBool::new(false);
 static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100);
@@ -181,6 +181,7 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_
     }
     let adevice = ret.unwrap();
     (Some(adevice), std::thread::Builder::new().name("acontrol".to_string()).spawn(move ||{
+            let adec = if let DecoderType::Audio(ref mut dec) = audio_dec.dec { dec } else { panic!("not an audio decoder!"); };
             let mut samplepos = 0usize;
             let dst_chmap = if dst_info.channels == 2 {
                     NAChannelMap::from_str("L,R").unwrap()
@@ -199,7 +200,7 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_
                             std::thread::sleep(Duration::from_millis(100));
                         }
                         if !SKIP_ADECODING.load(Ordering::Relaxed) {
-                            if let Ok(frm) = audio_dec.dec.decode(&mut audio_dec.dsupp, &pkt) {
+                            if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) {
                                 let buf = frm.get_buffer();
                                 if let Some(pts) = frm.get_pts() {
                                     samplepos = NATimeInfo::ts_to_time(pts, u64::from(dst_info.sample_rate), frm.ts.tb_num, frm.ts.tb_den) as usize;
@@ -223,8 +224,9 @@ fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_
                             }
                         }
                     },
+                    Ok(PktSendEvent::GetFrames) => {},
                     Ok(PktSendEvent::Flush) => {
-                        audio_dec.dec.flush();
+                        adec.flush();
                         let mut qdata = queue.lock().unwrap();
                         qdata.flush();
                         SKIP_ADECODING.store(false, Ordering::Relaxed);
index 25ddcabe245f7141ac0dced820b9f9a99e808984..dcd4d0a3538fa51e2a15372f506c11cfbf032137 100644 (file)
@@ -44,16 +44,22 @@ macro_rules! debug_log {
 
 pub enum PktSendEvent {
     Packet(NAPacket),
+    GetFrames,
     Flush,
     End,
     ImmediateEnd,
     HurryUp,
 }
 
+pub enum DecoderType {
+    Audio(Box<dyn NADecoder + Send>),
+    Video(Box<dyn NADecoder + Send>, Box<dyn FrameReorderer + Send>),
+    VideoMT(Box<dyn NADecoderMT + Send>, MTFrameReorderer),
+}
+
 pub struct DecoderStuff {
     pub dsupp:  Box<NADecoderSupport>,
-    pub dec:    Box<dyn NADecoder + Send>,
-    pub reord:  Box<dyn FrameReorderer + Send>,
+    pub dec:    DecoderType,
 }
 
 fn format_time(ms: u64) -> String {
@@ -221,6 +227,9 @@ struct Player {
     video_str:      u32,
     audio_str:      u32,
 
+    vthreads:       usize,
+    use_mt:         bool,
+
     paused:         bool,
     mute:           bool,
     volume:         usize,
@@ -254,6 +263,9 @@ impl Player {
             video_str:      0,
             audio_str:      0,
 
+            vthreads:       3,
+            use_mt:         true,
+
             paused:         false,
             mute:           false,
             volume:         100,
@@ -341,6 +353,7 @@ impl Player {
                 self.vcontrol.fill(disp_queue);
                 std::thread::sleep(Duration::from_millis(10));
             }
+            self.vcontrol.wait_for_frames();
             self.vcontrol.fill(disp_queue);
         }
         debug_log!(self; {format!(" prefilling done, frames {}-{} audio {}", disp_queue.start, disp_queue.end, self.acontrol.get_fill())});
@@ -444,6 +457,10 @@ impl Player {
         nihav_register_all_demuxers(&mut dmx_reg);
         let mut dec_reg = RegisteredDecoders::new();
         nihav_register_all_decoders(&mut dec_reg);
+        let mut mtdec_reg = RegisteredMTDecoders::new();
+        if self.use_mt {
+            nihav_register_all_mt_decoders(&mut mtdec_reg);
+        }
 
         let ret = dmx_reg.find_demuxer(dmx_name);
         if ret.is_none() {
@@ -484,11 +501,32 @@ impl Player {
             let s = dmx.get_stream(i).unwrap();
             let info = s.get_info();
             let decfunc = dec_reg.find_decoder(info.get_name());
+            let decfunc_mt = mtdec_reg.find_decoder(info.get_name());
             println!("stream {} - {} {}", i, s, info.get_name());
             debug_log!(self; {format!(" stream {} - {} {}", i, s, info.get_name())});
             let str_id = s.get_id();
             if info.is_video() {
                 if video_dec.is_none() && self.play_video {
+                    if let Some(decfunc) = decfunc_mt {
+                        let mut dec = (decfunc)();
+                        let mut dsupp = Box::new(NADecoderSupport::new());
+                        let props = info.get_properties().get_video_info().unwrap();
+                        if props.get_width() != 0 {
+                            width  = props.get_width();
+                            height = props.get_height();
+                        }
+                        if dec.init(&mut dsupp, info.clone(), self.vthreads).is_ok() {
+                            video_dec = Some(DecoderStuff{ dsupp, dec: DecoderType::VideoMT(dec, MTFrameReorderer::new()) });
+                            self.video_str = str_id;
+                            let (tbn, tbd) = s.get_timebase();
+                            tb_num = tbn;
+                            tb_den = tbd;
+                            self.has_video = true;
+                            continue;
+                        } else {
+                            println!("failed to create multi-threaded decoder, falling back");
+                        }
+                    }
                     if let Some(decfunc) = decfunc {
                         let mut dec = (decfunc)();
                         let mut dsupp = Box::new(NADecoderSupport::new());
@@ -509,7 +547,7 @@ impl Player {
                         dsupp.pool_u16 = NAVideoBufferPool::new(reorder_depth);
                         dsupp.pool_u32 = NAVideoBufferPool::new(reorder_depth);
                         dec.init(&mut dsupp, info).unwrap();
-                        video_dec = Some(DecoderStuff{ dsupp, dec, reord });
+                        video_dec = Some(DecoderStuff{ dsupp, dec: DecoderType::Video(dec, reord) });
                         self.video_str = str_id;
                         let (tbn, tbd) = s.get_timebase();
                         tb_num = tbn;
@@ -526,8 +564,7 @@ impl Player {
                         let mut dsupp = Box::new(NADecoderSupport::new());
                         ainfo = info.get_properties().get_audio_info();
                         dec.init(&mut dsupp, info).unwrap();
-                        let reord = Box::new(NoReorderer::new());
-                        audio_dec = Some(DecoderStuff{ dsupp, dec, reord });
+                        audio_dec = Some(DecoderStuff{ dsupp, dec: DecoderType::Audio(dec) });
                         self.audio_str = str_id;
                         self.has_audio = true;
                     } else {
@@ -725,6 +762,21 @@ fn main() {
             "-nodebug" => {
                 player.debug = false;
             },
+            "-mt" => {
+                player.use_mt = true;
+            },
+            "-nomt" => {
+                player.use_mt = false;
+            },
+            "-threads" => {
+                if let Some(arg) = aiter.next() {
+                    if let Ok(val) = arg.parse::<usize>() {
+                        player.vthreads = val.max(1);
+                    } else {
+                        println!("wrong number of threads");
+                    }
+                }
+            },
             _ => {
                 player.play(arg, seek_time);
                 if player.end { break; }
index 0a1dcd59220737411ef82e68a5c58f6f46a6327c..ac60466be8fa4827f78b964bcf40f14cb5d1a814 100644 (file)
@@ -10,10 +10,11 @@ use nihav_core::formats::*;
 use nihav_core::codecs::*;
 use nihav_core::scale::*;
 
-use super::{DecoderStuff, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
+use super::{DecoderStuff, DecoderType, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
 
 static SKIP_VDECODING: AtomicBool = AtomicBool::new(false);
 static VIDEO_END: AtomicBool = AtomicBool::new(false);
+static GET_FRAMES_END: AtomicBool = AtomicBool::new(false);
 
 pub const FRAME_QUEUE_SIZE: usize = 25;
 
@@ -91,29 +92,134 @@ impl VideoDecoder {
         Some((opic, time))
     }
     pub fn next_frame(&mut self, pkt: &NAPacket) -> Option<FrameRecord> {
-        if let Ok(frm) = self.dec.dec.decode(&mut self.dec.dsupp, pkt) {
-            self.dec.reord.add_frame(frm);
-            while let Some(frm) = self.dec.reord.get_frame() {
-                let bt = frm.get_buffer();
-                if let NABufferType::None = bt { continue; }
-                let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
-                return self.convert_buf(bt, ts);
-            }
-        }
+        match self.dec.dec {
+            DecoderType::Video(ref mut vdec, ref mut reord) => {
+                if let Ok(frm) = vdec.decode(&mut self.dec.dsupp, pkt) {
+                    reord.add_frame(frm);
+                    while let Some(frm) = reord.get_frame() {
+                        let bt = frm.get_buffer();
+                        if let NABufferType::None = bt { continue; }
+                        let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+                        return self.convert_buf(bt, ts);
+                    }
+                }
+            },
+            DecoderType::VideoMT(ref mut vdec, ref mut reord) => {
+                let queue_id = reord.register_frame();
+                match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) {
+                    Ok(true) => {},
+                    Ok(false) => {
+                        while !vdec.can_take_input() || vdec.has_output() {
+                            match vdec.get_frame() {
+                                (Ok(frm), id) => {
+                                    reord.add_frame(frm, id);
+                                },
+                                (Err(err), id) => {
+                                    reord.drop_frame(id);
+                                    panic!("frame {} decoding error {:?}", id, err);
+                                },
+                            };
+                        }
+                        match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) {
+                            Ok(true) => {},
+                            Ok(false) => panic!("still can't queue frame!"),
+                            Err(err) => panic!("queueing error {:?}", err),
+                        };
+                    },
+                    Err(err) => panic!("queueing error {:?}", err),
+                };
+                while let Some(frm) = reord.get_frame() {
+                    let bt = frm.get_buffer();
+                    if let NABufferType::None = bt { continue; }
+                    let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+                    return self.convert_buf(bt, ts);
+                }
+            },
+            _ => panic!("not a video decoder!"),
+        };
+        None
+    }
+    pub fn more_frames(&mut self, do_not_wait: bool) -> Option<FrameRecord> {
+        match self.dec.dec {
+            DecoderType::Video(ref mut _dec, ref mut reord) => {
+                while let Some(frm) = reord.get_frame() {
+                    let bt = frm.get_buffer();
+                    if let NABufferType::None = bt { continue; }
+                    let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+                    return self.convert_buf(bt, ts);
+                }
+            },
+            DecoderType::VideoMT(ref mut vdec, ref mut reord) => {
+                let mut got_some = false;
+                while vdec.has_output() {
+                    match vdec.get_frame() {
+                        (Ok(frm), id) => {
+                            reord.add_frame(frm, id);
+                            got_some = true;
+                        },
+                        (Err(err), id) => {
+                            reord.drop_frame(id);
+                            panic!("frame {} decoding error {:?}", id, err);
+                        },
+                    };
+                }
+                if !got_some && !do_not_wait {
+                    match vdec.get_frame() {
+                        (Ok(frm), id) => {
+                            reord.add_frame(frm, id);
+                        },
+                        (Err(DecoderError::NoFrame), _) => {},
+                        (Err(err), id) => {
+                            reord.drop_frame(id);
+                            panic!("frame {} decoding error {:?}", id, err);
+                        },
+                    };
+                }
+                while let Some(frm) = reord.get_frame() {
+                    let bt = frm.get_buffer();
+                    if let NABufferType::None = bt { continue; }
+                    let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+                    return self.convert_buf(bt, ts);
+                }
+            },
+            _ => {},
+        };
         None
     }
     pub fn last_frame(&mut self) -> Option<FrameRecord> {
-        while let Some(frm) = self.dec.reord.get_last_frames() {
-            let bt = frm.get_buffer();
-            if let NABufferType::None = bt { continue; }
-            let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
-            return self.convert_buf(bt, ts);
-        }
+        match self.dec.dec {
+            DecoderType::Video(ref mut _dec, ref mut reord) => {
+                while let Some(frm) = reord.get_last_frames() {
+                    let bt = frm.get_buffer();
+                    if let NABufferType::None = bt { continue; }
+                    let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+                    return self.convert_buf(bt, ts);
+                }
+            },
+            DecoderType::VideoMT(ref mut _dec, ref mut reord) => {
+                while let Some(frm) = reord.get_last_frames() {
+                    let bt = frm.get_buffer();
+                    if let NABufferType::None = bt { continue; }
+                    let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+                    return self.convert_buf(bt, ts);
+                }
+            },
+            _ => {},
+        };
         None
     }
     pub fn flush(&mut self) {
-        self.dec.dec.flush();
-        self.dec.reord.flush();
+        match self.dec.dec {
+            DecoderType::Video(ref mut dec, ref mut reord) => {
+                dec.flush();
+                reord.flush();
+            },
+            DecoderType::VideoMT(ref mut dec, ref mut reord) => {
+                dec.flush();
+                reord.flush();
+            },
+            _ => {},
+        };
     }
 }
 
@@ -129,7 +235,16 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v
                             if let Some((buf, time)) = vdec.next_frame(&pkt) {
                                 vfsend.send((buf, time)).unwrap();
                             }
+                            while let Some((buf, time)) = vdec.more_frames(true) {
+                                vfsend.send((buf, time)).unwrap();
+                            }
+                        }
+                    },
+                    Ok(PktSendEvent::GetFrames) => {
+                        while let Some((buf, time)) = vdec.more_frames(false) {
+                            vfsend.send((buf, time)).unwrap();
                         }
+                        GET_FRAMES_END.store(true, Ordering::Relaxed);
                     },
                     Ok(PktSendEvent::Flush) => {
                         vdec.flush();
@@ -152,10 +267,12 @@ fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, v
                     },
                     Ok(PktSendEvent::HurryUp) => {
                         skip_mode = skip_mode.advance();
-                        vdec.dec.dec.set_options(&[NAOption{
+                        if let DecoderType::Video(ref mut dec, ref mut _reord) = vdec.dec.dec {
+                            dec.set_options(&[NAOption{
                                 name: FRAME_SKIP_OPTION,
                                 value: NAValue::String(skip_mode.to_string()),
                             }]);
+                        }
                     },
                     Err(_) => {
                         break;
@@ -286,6 +403,15 @@ impl VideoControl {
     pub fn is_video_end(&self) -> bool {
         VIDEO_END.load(Ordering::Relaxed)
     }
+    pub fn wait_for_frames(&mut self) {
+        GET_FRAMES_END.store(false, Ordering::Relaxed);
+        self.try_send_event(PktSendEvent::GetFrames);
+        while !self.try_send_queued() {
+        }
+        while !GET_FRAMES_END.load(Ordering::Relaxed) {
+            thread::yield_now();
+        }
+    }
 
     pub fn is_yuv(&self) -> bool { self.do_yuv }