try to improve state handling in decoding threads
[nihav-player.git] / videoplayer / src / videodec.rs
CommitLineData
69b93cb5 1use std::thread::JoinHandle;
69b93cb5
KS
2use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
3use std::thread;
4
5use sdl2::render::Texture;
6
7use nihav_core::frame::{NABufferType, NAVideoBuffer};
8use nihav_core::formats::*;
9use nihav_core::codecs::*;
10use nihav_core::scale::*;
11
4e72c04a 12use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, DispQueue, FrameRecord, PktSendEvent, FRAME_QUEUE_LEN};
69b93cb5 13
4e72c04a 14static VDEC_STATE: DecoderState = DecoderState::new();
69b93cb5
KS
15
16pub const FRAME_QUEUE_SIZE: usize = 25;
17
18pub const SDL_RGB_FMT: NAPixelFormaton = NAPixelFormaton { model: ColorModel::RGB(RGBSubmodel::RGB), components: 3,
19 comp_info: [
20 Some(NAPixelChromaton { h_ss: 0, v_ss: 0, packed: true, depth: 8, shift: 0, comp_offs: 0, next_elem: 3 }),
21 Some(NAPixelChromaton { h_ss: 0, v_ss: 0, packed: true, depth: 8, shift: 0, comp_offs: 1, next_elem: 3 }),
22 Some(NAPixelChromaton { h_ss: 0, v_ss: 0, packed: true, depth: 8, shift: 0, comp_offs: 2, next_elem: 3 }),
23 None, None
24 ], elem_size: 3, be: false, alpha: false, palette: false };
25
26pub struct VideoDecoder {
27 yuv_pool: NAVideoBufferPool<u8>,
28 rgb_pool: NAVideoBufferPool<u8>,
29 tb_num: u32,
30 tb_den: u32,
31 dec: DecoderStuff,
32 ifmt: NAVideoInfo,
33 scaler: NAScale,
34 ofmt_rgb: ScaleInfo,
35 ofmt_yuv: ScaleInfo,
36 oinfo_yuv: NAVideoInfo,
37 oinfo_rgb: NAVideoInfo,
38}
39
40impl VideoDecoder {
41 pub fn new(width: usize, height: usize, tb_num: u32, tb_den: u32, dec: DecoderStuff) -> Self {
42 let ofmt_rgb = ScaleInfo { width, height, fmt: SDL_RGB_FMT };
43 let ofmt_yuv = ScaleInfo { width, height, fmt: YUV420_FORMAT };
44 let oinfo_rgb = NAVideoInfo { width, height, flipped: false, format: SDL_RGB_FMT, bits: 24 };
45 let oinfo_yuv = NAVideoInfo { width, height, flipped: false, format: YUV420_FORMAT, bits: 12 };
46 Self {
47 yuv_pool: NAVideoBufferPool::new(FRAME_QUEUE_SIZE),
48 rgb_pool: NAVideoBufferPool::new(FRAME_QUEUE_SIZE),
49 tb_num, tb_den,
50 dec, ofmt_yuv, ofmt_rgb, oinfo_yuv, oinfo_rgb,
51 scaler: NAScale::new(ofmt_rgb, ofmt_rgb).unwrap(),
52 ifmt: NAVideoInfo { width: 0, height: 0, flipped: false, format: SDL_RGB_FMT, bits: 24 },
53 }
54 }
55 fn convert_buf(&mut self, bt: NABufferType, ts: u64) -> Option<FrameRecord> {
56 let vinfo = bt.get_video_info().unwrap();
57 if self.ifmt.get_width() != vinfo.get_width() ||
58 self.ifmt.get_height() != vinfo.get_height() ||
59 self.ifmt.get_format() != vinfo.get_format() {
60 self.ifmt = vinfo;
61 let sc_ifmt = ScaleInfo { width: self.ifmt.get_width(), height: self.ifmt.get_height(), fmt: self.ifmt.get_format() };
62 let do_yuv = if let ColorModel::YUV(_) = self.ifmt.get_format().get_model() { true } else { false };
63 let ofmt = if do_yuv { self.ofmt_yuv } else { self.ofmt_rgb };
64 self.scaler = NAScale::new(sc_ifmt, ofmt).unwrap();
65 }
66 let mut opic = if let ColorModel::YUV(_) = self.ifmt.get_format().get_model() {
67 self.yuv_pool.prealloc_video(self.oinfo_yuv, 2).unwrap();
68 while self.yuv_pool.get_free().is_none() {
4e72c04a 69 if VDEC_STATE.is_flushing() {
69b93cb5
KS
70 return None;
71 }
72 std::thread::yield_now();
73 }
74 NABufferType::Video(self.yuv_pool.get_free().unwrap())
75 } else {
76 self.rgb_pool.prealloc_video(self.oinfo_rgb, 0).unwrap();
77 while self.rgb_pool.get_free().is_none() {
4e72c04a 78 if VDEC_STATE.is_flushing() {
69b93cb5
KS
79 return None;
80 }
81 std::thread::yield_now();
82 }
83 NABufferType::VideoPacked(self.rgb_pool.get_free().unwrap())
84 };
85 let ret = self.scaler.convert(&bt, &mut opic);
86 if ret.is_err() { println!(" scaler error {:?}", ret.err()); return None; }
87 ret.unwrap();
88 let time = NATimeInfo::ts_to_time(ts, 1000, self.tb_num, self.tb_den);
89 Some((opic, time))
90 }
91 pub fn next_frame(&mut self, pkt: &NAPacket) -> Option<FrameRecord> {
37f130a7
KS
92 match self.dec.dec {
93 DecoderType::Video(ref mut vdec, ref mut reord) => {
94 if let Ok(frm) = vdec.decode(&mut self.dec.dsupp, pkt) {
95 reord.add_frame(frm);
96 while let Some(frm) = reord.get_frame() {
97 let bt = frm.get_buffer();
98 if let NABufferType::None = bt { continue; }
99 let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
100 return self.convert_buf(bt, ts);
101 }
102 }
103 },
104 DecoderType::VideoMT(ref mut vdec, ref mut reord) => {
105 let queue_id = reord.register_frame();
106 match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) {
107 Ok(true) => {},
108 Ok(false) => {
109 while !vdec.can_take_input() || vdec.has_output() {
110 match vdec.get_frame() {
111 (Ok(frm), id) => {
112 reord.add_frame(frm, id);
113 },
114 (Err(err), id) => {
115 reord.drop_frame(id);
116 panic!("frame {} decoding error {:?}", id, err);
117 },
118 };
119 }
120 match vdec.queue_pkt(&mut self.dec.dsupp, &pkt, queue_id) {
121 Ok(true) => {},
122 Ok(false) => panic!("still can't queue frame!"),
123 Err(err) => panic!("queueing error {:?}", err),
124 };
125 },
126 Err(err) => panic!("queueing error {:?}", err),
127 };
128 while let Some(frm) = reord.get_frame() {
129 let bt = frm.get_buffer();
130 if let NABufferType::None = bt { continue; }
131 let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
132 return self.convert_buf(bt, ts);
133 }
134 },
135 _ => panic!("not a video decoder!"),
136 };
137 None
138 }
139 pub fn more_frames(&mut self, do_not_wait: bool) -> Option<FrameRecord> {
140 match self.dec.dec {
141 DecoderType::Video(ref mut _dec, ref mut reord) => {
142 while let Some(frm) = reord.get_frame() {
143 let bt = frm.get_buffer();
144 if let NABufferType::None = bt { continue; }
145 let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
146 return self.convert_buf(bt, ts);
147 }
148 },
149 DecoderType::VideoMT(ref mut vdec, ref mut reord) => {
150 let mut got_some = false;
151 while vdec.has_output() {
152 match vdec.get_frame() {
153 (Ok(frm), id) => {
154 reord.add_frame(frm, id);
155 got_some = true;
156 },
157 (Err(err), id) => {
158 reord.drop_frame(id);
159 panic!("frame {} decoding error {:?}", id, err);
160 },
161 };
162 }
163 if !got_some && !do_not_wait {
164 match vdec.get_frame() {
165 (Ok(frm), id) => {
166 reord.add_frame(frm, id);
167 },
168 (Err(DecoderError::NoFrame), _) => {},
169 (Err(err), id) => {
170 reord.drop_frame(id);
171 panic!("frame {} decoding error {:?}", id, err);
172 },
173 };
174 }
175 while let Some(frm) = reord.get_frame() {
176 let bt = frm.get_buffer();
177 if let NABufferType::None = bt { continue; }
178 let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
179 return self.convert_buf(bt, ts);
180 }
181 },
182 _ => {},
183 };
69b93cb5
KS
184 None
185 }
186 pub fn last_frame(&mut self) -> Option<FrameRecord> {
37f130a7
KS
187 match self.dec.dec {
188 DecoderType::Video(ref mut _dec, ref mut reord) => {
189 while let Some(frm) = reord.get_last_frames() {
190 let bt = frm.get_buffer();
191 if let NABufferType::None = bt { continue; }
192 let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
193 return self.convert_buf(bt, ts);
194 }
195 },
196 DecoderType::VideoMT(ref mut _dec, ref mut reord) => {
197 while let Some(frm) = reord.get_last_frames() {
198 let bt = frm.get_buffer();
199 if let NABufferType::None = bt { continue; }
200 let ts = frm.get_dts().unwrap_or_else(|| frm.get_pts().unwrap_or(0));
201 return self.convert_buf(bt, ts);
202 }
203 },
204 _ => {},
205 };
69b93cb5
KS
206 None
207 }
208 pub fn flush(&mut self) {
37f130a7
KS
209 match self.dec.dec {
210 DecoderType::Video(ref mut dec, ref mut reord) => {
211 dec.flush();
212 reord.flush();
213 },
214 DecoderType::VideoMT(ref mut dec, ref mut reord) => {
215 dec.flush();
216 reord.flush();
217 },
218 _ => {},
219 };
69b93cb5
KS
220 }
221}
222
223fn start_video_decoding(width: usize, height: usize, tb_num: u32, tb_den: u32, video_dec: DecoderStuff, vprecv: Receiver<PktSendEvent>, vfsend: SyncSender<(NABufferType, u64)>) -> JoinHandle<()> {
224 std::thread::Builder::new().name("vdecoder".to_string()).spawn(move ||{
4e72c04a 225 VDEC_STATE.set_state(DecodingState::Waiting);
69b93cb5
KS
226 let mut vdec = VideoDecoder::new(width, height, tb_num, tb_den, video_dec);
227 let mut skip_mode = FrameSkipMode::None;
228 loop {
229 match vprecv.recv() {
230 Ok(PktSendEvent::Packet(pkt)) => {
4e72c04a 231 if !VDEC_STATE.is_flushing() {
69b93cb5
KS
232 if let Some((buf, time)) = vdec.next_frame(&pkt) {
233 vfsend.send((buf, time)).unwrap();
234 }
37f130a7
KS
235 while let Some((buf, time)) = vdec.more_frames(true) {
236 vfsend.send((buf, time)).unwrap();
237 }
238 }
239 },
240 Ok(PktSendEvent::GetFrames) => {
241 while let Some((buf, time)) = vdec.more_frames(false) {
242 vfsend.send((buf, time)).unwrap();
69b93cb5 243 }
4e72c04a 244 VDEC_STATE.set_state(DecodingState::Waiting);
69b93cb5
KS
245 },
246 Ok(PktSendEvent::Flush) => {
247 vdec.flush();
4e72c04a 248 VDEC_STATE.set_state(DecodingState::Waiting);
69b93cb5
KS
249 },
250 Ok(PktSendEvent::End) => {
251 while vdec.yuv_pool.get_free().is_some() && vdec.rgb_pool.get_free().is_some() {
252 let ret = vdec.last_frame();
253 if ret.is_none() {
254 break;
255 }
256 vfsend.send(ret.unwrap()).unwrap();
257 }
4e72c04a 258 VDEC_STATE.set_state(DecodingState::End);
69b93cb5
KS
259 break;
260 },
261 Ok(PktSendEvent::ImmediateEnd) => {
4e72c04a 262 VDEC_STATE.set_state(DecodingState::End);
69b93cb5
KS
263 break;
264 },
265 Ok(PktSendEvent::HurryUp) => {
266 skip_mode = skip_mode.advance();
37f130a7
KS
267 if let DecoderType::Video(ref mut dec, ref mut _reord) = vdec.dec.dec {
268 dec.set_options(&[NAOption{
69b93cb5
KS
269 name: FRAME_SKIP_OPTION,
270 value: NAValue::String(skip_mode.to_string()),
271 }]);
37f130a7 272 }
69b93cb5
KS
273 },
274 Err(_) => {
275 break;
276 },
277 };
278 }
279 }).unwrap()
280}
281
282trait Advance {
283 fn advance(&self) -> Self;
284}
285
286impl Advance for FrameSkipMode {
287 fn advance(&self) -> Self {
288 match *self {
289 FrameSkipMode::None => FrameSkipMode::KeyframesOnly,
290 FrameSkipMode::KeyframesOnly => FrameSkipMode::IntraOnly,
291 FrameSkipMode::IntraOnly => FrameSkipMode::None,
292 }
293 }
294}
295
296fn output_yuv(yuv_texture: &mut Texture, buf: &NAVideoBuffer<u8>, width: usize, height: usize) {
297 let src = buf.get_data();
298 let ysstride = buf.get_stride(0);
299 let ysrc = &src[buf.get_offset(0)..];
300 let usstride = buf.get_stride(2);
301 let usrc = &src[buf.get_offset(2)..];
302 let vsstride = buf.get_stride(1);
303 let vsrc = &src[buf.get_offset(1)..];
304 yuv_texture.with_lock(None, |buffer: &mut [u8], pitch: usize| {
305 let csize = pitch.min(width);
306 for (dline, sline) in buffer.chunks_exact_mut(pitch).take(height).zip(ysrc.chunks_exact(ysstride)) {
307 dline[..csize].copy_from_slice(&sline[..csize]);
308 }
309 let coff = pitch * height;
310 let csize = (pitch / 2).min(width / 2);
311 for (dline, sline) in buffer[coff..].chunks_exact_mut(pitch / 2).take(height/2).zip(vsrc.chunks(vsstride)) {
312 dline[..csize].copy_from_slice(&sline[..csize]);
313 }
314 let coff = pitch * height + (pitch / 2) * (height / 2);
315 for (dline, sline) in buffer[coff..].chunks_exact_mut(pitch / 2).take(height/2).zip(usrc.chunks(usstride)) {
316 dline[..csize].copy_from_slice(&sline[..csize]);
317 }
318 }).unwrap();
319}
320
321
322pub struct VideoControl {
323 vqueue: Vec<PktSendEvent>,
324 vpsend: SyncSender<PktSendEvent>,
325 vfrecv: Receiver<FrameRecord>,
326 do_yuv: bool,
327 vthread: JoinHandle<()>,
328}
329
330impl VideoControl {
331 pub fn new(video_dec: Option<DecoderStuff>, width: usize, height: usize, tb_num: u32, tb_den: u32) -> Self {
332 let (vpsend, vprecv) = std::sync::mpsc::sync_channel::<PktSendEvent>(0);
333 let (vfsend, vfrecv) = std::sync::mpsc::sync_channel::<FrameRecord>(FRAME_QUEUE_SIZE - 1);
334
4e72c04a 335 VDEC_STATE.set_state(DecodingState::Normal);
69b93cb5
KS
336
337 let vthread = if let Some(video_dec) = video_dec {
338 start_video_decoding(width, height, tb_num, tb_den, video_dec, vprecv, vfsend)
339 } else {
340 thread::Builder::new().name("vdecoder-dummy".to_string()).spawn(move ||{
341 loop {
342 match vprecv.recv() {
343 Ok(PktSendEvent::End) => break,
344 Ok(PktSendEvent::ImmediateEnd) => break,
345 Err(_) => {
346 break;
347 },
348 _ => {},
349 };
350 }
4e72c04a 351 VDEC_STATE.set_state(DecodingState::End);
69b93cb5
KS
352 }).unwrap()
353 };
354
355
356 Self {
357 vqueue: Vec::with_capacity(FRAME_QUEUE_LEN),
358 vpsend, vfrecv,
359 do_yuv: false,
360 vthread,
361 }
362 }
363 pub fn flush(&mut self) {
b5053bfc 364 self.vqueue.clear();
4e72c04a 365 VDEC_STATE.set_state(DecodingState::Flush);
69b93cb5
KS
366 for _ in 0..8 {
367 let _ = self.vfrecv.try_recv();
368 }
369 let _ = self.vpsend.send(PktSendEvent::Flush);
370 while self.vfrecv.try_recv().is_ok() { }
371 }
372 pub fn get_queue_size(&self) -> usize { self.vqueue.len() }
373 pub fn is_filled(&self, size: usize) -> bool {
374 self.vqueue.len() >= size
375 }
376 pub fn try_send_video(&mut self, evt: PktSendEvent) -> bool {
377 if self.vqueue.len() > 0 {
378 self.vqueue.push(evt);
379 false
380 } else {
381 self.try_send_event(evt)
382 }
383 }
384 fn try_send_event(&mut self, evt: PktSendEvent) -> bool {
385 if let Err(TrySendError::Full(evt)) = self.vpsend.try_send(evt) {
386 self.vqueue.insert(0, evt);
387 false
388 } else {
389 true
390 }
391 }
392 pub fn try_send_queued(&mut self) -> bool {
393 while !self.vqueue.is_empty() {
394 let pkt = self.vqueue.remove(0);
395 if !self.try_send_event(pkt) {
396 return false;
397 }
398 }
399 true
400 }
401 pub fn is_video_end(&self) -> bool {
4e72c04a 402 matches!(VDEC_STATE.get_state(), DecodingState::End | DecodingState::Error)
69b93cb5 403 }
4e72c04a
KS
404 pub fn wait_for_frames(&mut self) -> Result<(), ()> {
405 VDEC_STATE.set_state(DecodingState::Prefetch);
37f130a7
KS
406 self.try_send_event(PktSendEvent::GetFrames);
407 while !self.try_send_queued() {
408 }
4e72c04a
KS
409 loop {
410 match VDEC_STATE.get_state() {
411 DecodingState::Waiting => {
412 VDEC_STATE.set_state(DecodingState::Normal);
413 return Ok(());
414 },
415 DecodingState::Prefetch => thread::yield_now(),
416 _ => return Err(()),
417 };
37f130a7
KS
418 }
419 }
69b93cb5
KS
420
421 pub fn is_yuv(&self) -> bool { self.do_yuv }
422
423 pub fn fill(&mut self, disp_queue: &mut DispQueue) {
424 while !disp_queue.is_full() {
425 let is_empty = disp_queue.is_empty();
426 if let Ok((pic, time)) = self.vfrecv.try_recv() {
427 let buf = pic.get_vbuf().unwrap();
428 self.do_yuv = buf.get_info().get_format().get_model().is_yuv();
429 let idx = disp_queue.end;
430 disp_queue.move_end();
431 let frm = &mut disp_queue.pool[idx];
432 if !self.do_yuv {
433 let sstride = buf.get_stride(0);
434 let src = buf.get_data();
435 frm.rgb_tex.with_lock(None, |buffer: &mut [u8], pitch: usize| {
436 let csize = sstride.min(pitch);
437 for (dst, src) in buffer.chunks_mut(pitch).zip(src.chunks(sstride)) {
438 (&mut dst[..csize]).copy_from_slice(&src[..csize]);
439 }
440 true
441 }).unwrap();
442 } else {
443 output_yuv(&mut frm.yuv_tex, &buf, disp_queue.width, disp_queue.height);
444 }
445 frm.valid = true;
446 frm.is_yuv = self.do_yuv;
447 frm.ts = time;
448 if is_empty {
449 disp_queue.first_ts = time;
450 }
451 disp_queue.last_ts = time;
452 } else {
453 break;
454 }
455 }
456 }
457
458 pub fn finish(self) {
4e72c04a 459 VDEC_STATE.set_state(DecodingState::Flush);
69b93cb5
KS
460 for _ in 0..8 {
461 let _ = self.vfrecv.try_recv();
462 }
463 let _ = self.vpsend.send(PktSendEvent::ImmediateEnd);
464 self.vthread.join().unwrap();
465 }
466}