use std::thread::JoinHandle;
-use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
use std::thread;
use nihav_core::codecs::*;
use nihav_core::scale::*;
-use super::{DecoderStuff, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
+use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
-static SKIP_VDECODING: AtomicBool = AtomicBool::new(false);
-static VIDEO_END: AtomicBool = AtomicBool::new(false);
+static VDEC_STATE: DecoderState = DecoderState::new();
pub const FRAME_QUEUE_SIZE: usize = 25;
rgb_pool: NAVideoBufferPool::new(FRAME_QUEUE_SIZE),
tb_num, tb_den,
dec, ofmt_yuv, ofmt_rgb, oinfo_yuv, oinfo_rgb,
- scaler: NAScale::new(ofmt_rgb, ofmt_rgb).unwrap(),
+ scaler: NAScale::new(ofmt_rgb, ofmt_rgb).expect("creating scaler failed"),
ifmt: NAVideoInfo { width: 0, height: 0, flipped: false, format: SDL_RGB_FMT, bits: 24 },
}
}
fn convert_buf(&mut self, bt: NABufferType, ts: u64) -> Option<FrameRecord> {
- let vinfo = bt.get_video_info().unwrap();
+ let vinfo = bt.get_video_info().expect("this should be a video buffer");
if self.ifmt.get_width() != vinfo.get_width() ||
self.ifmt.get_height() != vinfo.get_height() ||
self.ifmt.get_format() != vinfo.get_format() {
let sc_ifmt = ScaleInfo { width: self.ifmt.get_width(), height: self.ifmt.get_height(), fmt: self.ifmt.get_format() };
let do_yuv = if let ColorModel::YUV(_) = self.ifmt.get_format().get_model() { true } else { false };
let ofmt = if do_yuv { self.ofmt_yuv } else { self.ofmt_rgb };
- self.scaler = NAScale::new(sc_ifmt, ofmt).unwrap();
+ self.scaler = NAScale::new(sc_ifmt, ofmt).expect("scaling should not fail");
}
let mut opic = if let ColorModel::YUV(_) = self.ifmt.get_format().get_model() {
- self.yuv_pool.prealloc_video(self.oinfo_yuv, 2).unwrap();
+ self.yuv_pool.prealloc_video(self.oinfo_yuv, 2).expect("video frame pool allocation failure");
while self.yuv_pool.get_free().is_none() {
- if SKIP_VDECODING.load(Ordering::Relaxed) {
+ if VDEC_STATE.is_flushing() {
return None;
}
std::thread::yield_now();
}
- NABufferType::Video(self.yuv_pool.get_free().unwrap())
+ NABufferType::Video(self.yuv_pool.get_free().expect("video frame pool should have a free frame"))
} else {
- self.rgb_pool.prealloc_video(self.oinfo_rgb, 0).unwrap();
+ self.rgb_pool.prealloc_video(self.oinfo_rgb, 0).expect("video frame pool allocation failure");
while self.rgb_pool.get_free().is_none() {
- if SKIP_VDECODING.load(Ordering::Relaxed) {
+ if VDEC_STATE.is_flushing() {
return None;
}
std::thread::yield_now();
}
- NABufferType::VideoPacked(self.rgb_pool.get_free().unwrap())
+ NABufferType::VideoPacked(self.rgb_pool.get_free().expect("video frame pool should have a free frame"))
};
let ret = self.scaler.convert(&bt, &mut opic);
if ret.is_err() { println!(" scaler error {:?}", ret.err()); return None; }
Some((opic, time))
}
pub fn next_frame(&mut self, pkt: &NAPacket) -> Option<FrameRecord> {
- if let Ok(frm) = self.dec.dec.decode(&mut self.dec.dsupp, pkt) {
- self.dec.reord.add_frame(frm);
- while let Some(frm) = self.dec.reord.get_frame() {
- let bt = frm.get_buffer();
- if let NABufferType::None = bt { continue; }
- let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
- return self.convert_buf(bt, ts);
- }
- }
+ match self.dec.dec {
+ DecoderType::Video(ref mut vdec, ref mut reord) => {
+ if let Ok(frm) = vdec.decode(&mut self.dec.dsupp, pkt) {
+ reord.add_frame(frm);
+ while let Some(frm) = reord.get_frame() {
+ let bt = frm.get_buffer();
+ if let NABufferType::None = bt { continue; }
+ let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+ return self.convert_buf(bt, ts);
+ }
+ }
+ },
+ DecoderType::VideoMT(ref mut vdec, ref mut reord) => {
+ let queue_id = reord.register_frame();
+ match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) {
+ Ok(true) => {},
+ Ok(false) => {
+ while !vdec.can_take_input() || vdec.has_output() {
+ match vdec.get_frame() {
+ (Ok(frm), id) => {
+ reord.add_frame(frm, id);
+ },
+ (Err(err), id) => {
+ reord.drop_frame(id);
+ panic!("frame {} decoding error {:?}", id, err);
+ },
+ };
+ }
+ match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) {
+ Ok(true) => {},
+ Ok(false) => panic!("still can't queue frame!"),
+ Err(err) => panic!("queueing error {:?}", err),
+ };
+ },
+ Err(err) => panic!("queueing error {:?}", err),
+ };
+ while let Some(frm) = reord.get_frame() {
+ let bt = frm.get_buffer();
+ if let NABufferType::None = bt { continue; }
+ let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+ return self.convert_buf(bt, ts);
+ }
+ },
+ _ => panic!("not a video decoder!"),
+ };
+ None
+ }
+ pub fn more_frames(&mut self, do_not_wait: bool) -> Option<FrameRecord> {
+ match self.dec.dec {
+ DecoderType::Video(ref mut _dec, ref mut reord) => {
+ while let Some(frm) = reord.get_frame() {
+ let bt = frm.get_buffer();
+ if let NABufferType::None = bt { continue; }
+ let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+ return self.convert_buf(bt, ts);
+ }
+ },
+ DecoderType::VideoMT(ref mut vdec, ref mut reord) => {
+ let mut got_some = false;
+ while vdec.has_output() {
+ match vdec.get_frame() {
+ (Ok(frm), id) => {
+ reord.add_frame(frm, id);
+ got_some = true;
+ },
+ (Err(err), id) => {
+ reord.drop_frame(id);
+ panic!("frame {} decoding error {:?}", id, err);
+ },
+ };
+ }
+ if !got_some && !do_not_wait {
+ match vdec.get_frame() {
+ (Ok(frm), id) => {
+ reord.add_frame(frm, id);
+ },
+ (Err(DecoderError::NoFrame), _) => {},
+ (Err(err), id) => {
+ reord.drop_frame(id);
+ panic!("frame {} decoding error {:?}", id, err);
+ },
+ };
+ }
+ while let Some(frm) = reord.get_frame() {
+ let bt = frm.get_buffer();
+ if let NABufferType::None = bt { continue; }
+ let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+ return self.convert_buf(bt, ts);
+ }
+ },
+ _ => {},
+ };
None
}
pub fn last_frame(&mut self) -> Option<FrameRecord> {
- while let Some(frm) = self.dec.reord.get_last_frames() {
- let bt = frm.get_buffer();
- if let NABufferType::None = bt { continue; }
- let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
- return self.convert_buf(bt, ts);
- }
+ match self.dec.dec {
+ DecoderType::Video(ref mut _dec, ref mut reord) => {
+ while let Some(frm) = reord.get_last_frames() {
+ let bt = frm.get_buffer();
+ if let NABufferType::None = bt { continue; }
+ let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+ return self.convert_buf(bt, ts);
+ }
+ },
+ DecoderType::VideoMT(ref mut _dec, ref mut reord) => {
+ while let Some(frm) = reord.get_last_frames() {
+ let bt = frm.get_buffer();
+ if let NABufferType::None = bt { continue; }
+ let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
+ return self.convert_buf(bt, ts);
+ }
+ },
+ _ => {},
+ };
None
}
pub fn flush(&mut self) {
- self.dec.dec.flush();
- self.dec.reord.flush();
+ match self.dec.dec {
+ DecoderType::Video(ref mut dec, ref mut reord) => {
+ dec.flush();
+ reord.flush();
+ },
+ DecoderType::VideoMT(ref mut dec, ref mut reord) => {
+ dec.flush();
+ reord.flush();
+ },
+ _ => {},
+ };
}
}
fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, video_dec: DecoderStuff, vprecv: Receiver<PktSendEvent>, vfsend: SyncSender<(NABufferType, u64)>) -> JoinHandle<()> {
std::thread::Builder::new().name("vdecoder".to_string()).spawn(move ||{
- SKIP_VDECODING.store(false, Ordering::Relaxed);
+ VDEC_STATE.set_state(DecodingState::Waiting);
let mut vdec = VideoDecoder::new(width, height, tb_num, tb_den, video_dec);
let mut skip_mode = FrameSkipMode::None;
loop {
match vprecv.recv() {
Ok(PktSendEvent::Packet(pkt)) => {
- if !SKIP_VDECODING.load(Ordering::Relaxed) {
+ if !VDEC_STATE.is_flushing() {
if let Some((buf, time)) = vdec.next_frame(&pkt) {
- vfsend.send((buf, time)).unwrap();
+ vfsend.send((buf, time)).expect("video frame should be sent");
+ }
+ while let Some((buf, time)) = vdec.more_frames(true) {
+ vfsend.send((buf, time)).expect("video frame should be sent");
}
}
},
+ Ok(PktSendEvent::GetFrames) => {
+ while let Some((buf, time)) = vdec.more_frames(false) {
+ vfsend.send((buf, time)).expect("video frame should be sent");
+ }
+ VDEC_STATE.set_state(DecodingState::Waiting);
+ },
Ok(PktSendEvent::Flush) => {
vdec.flush();
- SKIP_VDECODING.store(false, Ordering::Relaxed);
+ VDEC_STATE.set_state(DecodingState::Waiting);
},
Ok(PktSendEvent::End) => {
while vdec.yuv_pool.get_free().is_some() && vdec.rgb_pool.get_free().is_some() {
- let ret = vdec.last_frame();
- if ret.is_none() {
+ if let Some(frm) = vdec.last_frame() {
+ vfsend.send(frm).expect("video frame should be sent");
+ } else {
break;
}
- vfsend.send(ret.unwrap()).unwrap();
}
- VIDEO_END.store(true, Ordering::Relaxed);
+ VDEC_STATE.set_state(DecodingState::End);
break;
},
Ok(PktSendEvent::ImmediateEnd) => {
- VIDEO_END.store(true, Ordering::Relaxed);
+ VDEC_STATE.set_state(DecodingState::End);
break;
},
Ok(PktSendEvent::HurryUp) => {
skip_mode = skip_mode.advance();
- vdec.dec.dec.set_options(&[NAOption{
+ if let DecoderType::Video(ref mut dec, ref mut _reord) = vdec.dec.dec {
+ dec.set_options(&[NAOption{
name: FRAME_SKIP_OPTION,
value: NAValue::String(skip_mode.to_string()),
}]);
+ }
},
Err(_) => {
break;
for (dline, sline) in buffer[coff..].chunks_exact_mut(pitch / 2).take(height/2).zip(usrc.chunks(usstride)) {
dline[..csize].copy_from_slice(&sline[..csize]);
}
- }).unwrap();
+ }).expect("surface should be locked");
}
let (vpsend, vprecv) = std::sync::mpsc::sync_channel::<PktSendEvent>(0);
let (vfsend, vfrecv) = std::sync::mpsc::sync_channel::<FrameRecord>(FRAME_QUEUE_SIZE - 1);
- VIDEO_END.store(false, Ordering::Relaxed);
+ VDEC_STATE.set_state(DecodingState::Normal);
let vthread = if let Some(video_dec) = video_dec {
start_video_decoding(width, height, tb_num, tb_den, video_dec, vprecv, vfsend)
_ => {},
};
}
+ VDEC_STATE.set_state(DecodingState::End);
}).unwrap()
};
}
pub fn flush(&mut self) {
self.vqueue.clear();
- SKIP_VDECODING.store(true, Ordering::Release);
+ VDEC_STATE.set_state(DecodingState::Flush);
for _ in 0..8 {
let _ = self.vfrecv.try_recv();
}
true
}
pub fn is_video_end(&self) -> bool {
- VIDEO_END.load(Ordering::Relaxed)
+ matches!(VDEC_STATE.get_state(), DecodingState::End | DecodingState::Error)
+ }
+ pub fn wait_for_frames(&mut self) -> Result<(), ()> {
+ VDEC_STATE.set_state(DecodingState::Prefetch);
+ self.try_send_event(PktSendEvent::GetFrames);
+ while !self.try_send_queued() {
+ }
+ loop {
+ match VDEC_STATE.get_state() {
+ DecodingState::Waiting => {
+ VDEC_STATE.set_state(DecodingState::Normal);
+ return Ok(());
+ },
+ DecodingState::Prefetch => thread::yield_now(),
+ _ => return Err(()),
+ };
+ }
}
pub fn is_yuv(&self) -> bool { self.do_yuv }
while !disp_queue.is_full() {
let is_empty = disp_queue.is_empty();
if let Ok((pic, time)) = self.vfrecv.try_recv() {
- let buf = pic.get_vbuf().unwrap();
+ let buf = pic.get_vbuf().expect("video frame should be of u8 type");
self.do_yuv = buf.get_info().get_format().get_model().is_yuv();
let idx = disp_queue.end;
disp_queue.move_end();
(&mut dst[..csize]).copy_from_slice(&src[..csize]);
}
true
- }).unwrap();
+ }).expect("surface should be locked");
} else {
output_yuv(&mut frm.yuv_tex, &buf, disp_queue.width, disp_queue.height);
}
}
pub fn finish(self) {
- SKIP_VDECODING.store(true, Ordering::Release);
+ VDEC_STATE.set_state(DecodingState::Flush);
for _ in 0..8 {
let _ = self.vfrecv.try_recv();
}