1 use std::sync::{Arc, Barrier};
2 use std::sync::atomic::*;
5 use nihav_core::codecs::{DecoderError, DecoderResult};
7 use super::{FrameDecoder, PictureInfo, Shareable};
9 #[derive(Clone,Copy,Debug,PartialEq)]
10 pub enum FrameDecodingStatus {
23 worker: Option<thread::JoinHandle<DecoderResult<()>>>,
24 result: DecoderResult<()>,
30 fn get_id(&self) -> u32 { self.pinfo.full_id }
31 fn get_user_id(&self) -> u32 { self.pinfo.user_id }
32 fn is_working(&self) -> bool {
33 self.worker.is_some() &&
34 !self.complete.load(Ordering::Relaxed) &&
35 !self.error.load(Ordering::Relaxed)
37 fn is_output_candidate(&self) -> bool {
38 !self.output.load(Ordering::Relaxed) &&
39 (self.complete.load(Ordering::Relaxed) || self.error.load(Ordering::Relaxed))
43 pub struct ThreadDispatcher {
44 fstate: Vec<FrameState>,
45 pub max_threads: usize,
49 impl ThreadDispatcher {
50 pub fn new() -> Self {
57 pub fn can_decode_more(&self) -> bool {
58 let out_cand = self.fstate.iter().filter(|state| state.is_output_candidate()).count();
59 if out_cand > self.max_threads {
62 if (self.cur_threads < self.max_threads) || (self.max_threads == 0) {
65 let real_workers = self.fstate.iter().fold(0usize,
66 |acc, state| acc + (state.is_working() as usize));
67 real_workers < self.max_threads
70 fn cleanup(&mut self) {
71 for state in self.fstate.iter_mut() {
72 if state.worker.is_some() && !state.is_working() {
74 std::mem::swap(&mut state.worker, &mut ret);
75 if let Some(handle) = ret {
76 state.result = handle.join().unwrap();
78 self.cur_threads -= 1;
82 fn unref_frame(&mut self, id: u32) {
83 let mut toremove = Vec::new();
84 for state in self.fstate.iter() {
85 if state.num_refs == 0 && state.output.load(Ordering::Relaxed) {
86 toremove.push(state.get_id());
89 if let Some(idx) = self.find_by_id(id) {
90 let mut ref_frm = Vec::new();
91 std::mem::swap(&mut ref_frm, &mut self.fstate[idx].ref_frames);
92 for state in self.fstate.iter_mut() {
93 if ref_frm.contains(&state.get_id()) {
94 assert!(state.num_refs >= 2);
98 if self.fstate[idx].num_refs == 0 && self.fstate[idx].output.load(Ordering::Relaxed) {
99 self.remove_frame(id);
102 for &id in toremove.iter() {
103 self.remove_frame(id);
106 fn find_by_id(&self, id: u32) -> Option<usize> {
107 self.fstate.iter().position(|x| x.get_id() == id)
109 fn set_completed(&self, id: u32) {
110 if let Some(idx) = self.find_by_id(id) {
111 self.fstate[idx].complete.store(true, Ordering::Relaxed);
114 fn set_error(&self, id: u32) {
115 if let Some(idx) = self.find_by_id(id) {
116 self.fstate[idx].error.store(true, Ordering::Relaxed);
119 pub fn update_pos(&self, id: u32, mb_pos: usize) {
120 if let Some(idx) = self.find_by_id(id) {
121 self.fstate[idx].mb_pos.store(mb_pos, Ordering::Relaxed);
124 pub fn check_pos(&self, id: u32, mb_pos: usize) -> FrameDecodingStatus {
125 if let Some(idx) = self.find_by_id(id) {
126 let state = &self.fstate[idx];
127 if !state.error.load(Ordering::Relaxed) {
128 if state.complete.load(Ordering::Relaxed) || mb_pos < state.mb_pos.load(Ordering::Relaxed) {
129 FrameDecodingStatus::Ok
131 FrameDecodingStatus::NotReady
134 FrameDecodingStatus::Error
137 FrameDecodingStatus::NotFound
140 fn remove_frame(&mut self, id: u32) {
141 if let Some(idx) = self.find_by_id(id) {
142 self.fstate.remove(idx);
145 /*fn print_state(&self) {
147 for state in self.fstate.iter() {
148 print!(" s{}b{}r{}{}{}{}", state.get_id(),
149 state.mb_pos.load(Ordering::Relaxed), state.num_refs,
150 if state.error.load(Ordering::Relaxed) { "E" } else {""},
151 if state.complete.load(Ordering::Relaxed) {"C"} else {""},
152 if state.output.load(Ordering::Relaxed) {"O"} else {""});
156 pub fn has_output(&self) -> bool {
157 for state in self.fstate.iter() {
158 if state.is_output_candidate() {
166 pub fn queue_decoding(disp: &mut Shareable<ThreadDispatcher>, mut fdec: FrameDecoder, initial_ref_frames: &[u32], ref_frames: &[u32]) {
167 let barrier = Arc::new(Barrier::new(2));
168 let starter = Arc::clone(&barrier);
170 let pinfo = fdec.cur_pic.clone();
171 let pic_id = pinfo.full_id;
172 let shared_disp = Arc::clone(disp);
173 let worker = thread::Builder::new().name("frame ".to_string() + &pic_id.to_string()).spawn(move || {
176 let mut slices = Vec::new();
177 std::mem::swap(&mut slices, &mut fdec.slices);
179 for (hdr, hdr_size, refs, nal) in slices.iter() {
180 if hdr.first_mb_in_slice != cur_mb {
181 if let Ok(rd) = shared_disp.read() {
182 rd.set_error(pic_id);
184 panic!("can't set error");
186 return Err(DecoderError::InvalidData);
188 match fdec.decode_slice(hdr, *hdr_size, refs, nal) {
189 Ok(pos) => cur_mb = pos,
191 if let Ok(rd) = shared_disp.read() {
192 rd.set_error(pic_id);
194 panic!("can't set error");
201 if cur_mb == fdec.num_mbs {
202 if let Ok(rd) = shared_disp.read() {
203 rd.set_completed(pic_id);
205 panic!("can't set status");
209 DecoderResult::Ok(())
211 let new_state = FrameState {
213 mb_pos: AtomicUsize::new(0),
214 error: AtomicBool::new(false),
215 complete: AtomicBool::new(false),
216 output: AtomicBool::new(false),
217 worker: Some(worker),
218 result: DecoderResult::Err(DecoderError::Bug),
220 ref_frames: initial_ref_frames.to_vec(),
222 if let Ok(ref mut ds) = disp.write() {
223 let new_id = new_state.get_id();
224 if ds.find_by_id(new_id).is_some() {
225 ds.remove_frame(new_id);
228 ds.fstate.push(new_state);
229 for state in ds.fstate.iter_mut() {
230 if ref_frames.contains(&state.get_id()) {
233 if initial_ref_frames.contains(&state.get_id()) {
240 panic!("cannot invoke thread dispatcher");
244 pub fn wait_for_one(dispatch: &mut Shareable<ThreadDispatcher>) -> Result<PictureInfo, (DecoderError, u32)> {
245 /*if let Ok(ref ds) = dispatch.read() {
248 let start = std::time::Instant::now();
250 if std::time::Instant::now().duration_since(start) > std::time::Duration::from_millis(20000) { panic!(" too long!"); }
251 if let Ok(ref ds) = dispatch.read() {
253 for state in ds.fstate.iter() {
254 if state.is_working() {
257 if state.is_output_candidate() {
262 return Err((DecoderError::NoFrame, 0));
265 panic!("can't peek into status");
269 if let Ok(ref mut ds) = dispatch.write() {
271 let mut found = None;
272 for state in ds.fstate.iter() {
273 if state.is_output_candidate() {
274 state.output.store(true, Ordering::Relaxed);
275 if let DecoderResult::Err(err) = state.result {
276 let id = state.get_id();
277 let user_id = state.get_user_id();
279 return Err((err, user_id));
281 found = Some(state.pinfo.clone());
286 if let Some(ret) = found {
287 ds.unref_frame(ret.full_id);
293 panic!("can't grab status");
297 pub fn clear_threads(dispatch: &mut Shareable<ThreadDispatcher>) {
298 /*if let Ok(ref ds) = dispatch.read() {
301 let mut to_wait = Vec::new();
302 if let Ok(ref mut ds) = dispatch.write() {
303 while let Some(state) = ds.fstate.pop() {
304 if let Some(handle) = state.worker {
305 to_wait.push(handle);
310 panic!("can't grab status");
312 while let Some(handle) = to_wait.pop() {
313 let _ = handle.join();