e9da970ad577d3a3c518c8f100134cff23286b2a
[nihav-player.git] / videoplayer / src / audiodec.rs
1 use std::time::Duration;
2 use std::thread::JoinHandle;
3 use std::sync::{Arc, Mutex};
4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5 use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
6 use std::str::FromStr;
7
8 use sdl2::AudioSubsystem;
9 use sdl2::audio::{AudioDevice, AudioCallback, AudioSpecDesired};
10
11 use nihav_core::formats::*;
12 use nihav_core::codecs::*;
13 use nihav_core::soundcvt::*;
14
15 use super::{DecoderStuff, DecoderType, PktSendEvent};
16
17 static SKIP_ADECODING: AtomicBool = AtomicBool::new(false);
18 static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100);
19 static AUDIO_END: AtomicBool = AtomicBool::new(false);
20 static CUR_QUEUE_FILL: AtomicUsize = AtomicUsize::new(0);
21 static SAMPLE_RATE: AtomicUsize = AtomicUsize::new(0);
22 static CHANNELS: AtomicUsize = AtomicUsize::new(0);
23
24 static CURRENT_TIME: AtomicUsize = AtomicUsize::new(0);
25 static CURRENT_TIME_SET: AtomicBool = AtomicBool::new(false);
26
27 const QUEUE_INITIAL_SIZE: usize = 16384;
28 const QUEUE_REFILL_LIMIT: usize = 262144;
29
30 struct AudioQueue {
31 queue: Vec<i16>,
32 start: usize,
33 end: usize,
34 srate: usize,
35 chans: usize,
36 spos: usize,
37 }
38
39 impl AudioQueue {
40 fn new(srate: usize, chans: usize) -> Self {
41 SAMPLE_RATE.store(srate, Ordering::Relaxed);
42 CHANNELS.store(chans, Ordering::Relaxed);
43 CUR_QUEUE_FILL.store(0, Ordering::Relaxed);
44 Self {
45 queue: Vec::with_capacity(QUEUE_INITIAL_SIZE),
46 start: 0,
47 end: 0,
48 srate, chans,
49 spos: 0,
50 }
51 }
52 fn home(&mut self) {
53 if self.start == 0 { return; }
54 let fill = self.fill();
55 if fill > 0 {
56 if fill < self.start {
57 let (dst, src) = self.queue.split_at_mut(self.start);
58 dst[..fill].copy_from_slice(&src[..fill]);
59 } else {
60 for i in 0..fill {
61 self.queue[i] = self.queue[self.start + i];
62 }
63 }
64 }
65 self.start = 0;
66 self.end = fill;
67 }
68 fn set_time(&mut self) {
69 let fill = self.fill();
70 let samp_time = self.spos.saturating_sub(fill / self.chans);
71 CURRENT_TIME.store(samp_time * 1000 / self.srate, Ordering::Relaxed);
72 CURRENT_TIME_SET.store(true, Ordering::Relaxed);
73 CUR_QUEUE_FILL.store(self.fill(), Ordering::Relaxed);
74 }
75 fn add(&mut self, src: &[i16], samplepos: usize) {
76 if self.end + src.len() > self.queue.len() {
77 self.home();
78 }
79 if self.end + src.len() > self.queue.len() {
80 self.queue.resize(self.end + src.len(), 0);
81 }
82 self.queue[self.end..][..src.len()].copy_from_slice(&src);
83 self.end += src.len();
84 self.spos = samplepos;
85 self.set_time();
86 }
87 fn add_bytes(&mut self, src: &[u8], samplepos: usize) {
88 let srclen = src.len() / 2;
89 if self.end + srclen > self.queue.len() {
90 self.home();
91 }
92 if self.end + srclen > self.queue.len() {
93 self.queue.resize(self.end + srclen, 0);
94 }
95 for (dst, src) in self.queue[self.end..][..srclen].iter_mut().zip(src.chunks_exact(2)) {
96 *dst = (u16::from(src[0]) + u16::from(src[1]) * 256) as i16;
97 }
98 self.end += srclen;
99 self.spos = samplepos;
100 self.set_time();
101 }
102 fn drain(&mut self, size: usize) {
103 let fill = self.fill();
104 if size >= fill {
105 self.flush();
106 } else {
107 self.start += size;
108 }
109 self.set_time();
110 }
111 fn fill(&self) -> usize { self.end - self.start }
112 fn flush(&mut self) {
113 self.start = 0;
114 self.end = 0;
115 }
116 }
117
118 pub struct AudioOutput {
119 queue: Arc<Mutex<AudioQueue>>,
120 }
121
122 impl AudioCallback for AudioOutput {
123 type Channel = i16;
124
125 fn callback(&mut self, out: &mut [Self::Channel]) {
126 let mut queue = self.queue.lock().unwrap();
127 let dstlen = out.len();
128 let copylen = queue.fill().min(dstlen);
129 let volume = AUDIO_VOLUME.load(Ordering::Relaxed) as i32;
130 if volume == 100 {
131 out[..copylen].copy_from_slice(&queue.queue[queue.start..][..copylen]);
132 } else {
133 for (dst, &src) in out[..copylen].iter_mut().zip(queue.queue[queue.start..].iter()) {
134 *dst = (i32::from(src) * volume / 100).max(-32768).min(32767) as i16;
135 }
136 }
137 queue.drain(copylen);
138 for el in out[copylen..].iter_mut() { *el = 0; }
139 }
140 }
141
142 fn dummy_audio_thread(aprecv: Receiver<PktSendEvent>) -> JoinHandle<()> {
143 std::thread::Builder::new().name("acontrol-dummy".to_string()).spawn(move ||{
144 loop {
145 match aprecv.recv() {
146 Ok(PktSendEvent::End) => break,
147 Ok(PktSendEvent::ImmediateEnd) => break,
148 Err(_) => {
149 break;
150 },
151 _ => {},
152 };
153 }
154 }).unwrap()
155 }
156
157 type AudioPlaybackType = Option<AudioDevice<AudioOutput>>;
158
159 fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_dec: DecoderStuff, aprecv: Receiver<PktSendEvent>) -> (AudioPlaybackType, JoinHandle<()>) {
160 let ch = ainfo.channels.max(2);
161 let desired_spec = AudioSpecDesired {
162 freq: Some(ainfo.sample_rate as i32),
163 channels: Some(ch),
164 samples: None
165 };
166 let dst_info = NAAudioInfo {
167 sample_rate: ainfo.sample_rate,
168 channels: ch,
169 format: SND_S16_FORMAT,
170 block_len: 0,
171 };
172 let queue = Arc::new(Mutex::new(AudioQueue::new(ainfo.sample_rate as usize, ch as usize)));
173 let qclone = queue.clone();
174 let ret = asystem.open_playback(None, &desired_spec, |_spec| {
175 AudioOutput {
176 queue: qclone,
177 }
178 });
179 if ret.is_err() {
180 return (None, dummy_audio_thread(aprecv))
181 }
182 let adevice = ret.unwrap();
183 (Some(adevice), std::thread::Builder::new().name("acontrol".to_string()).spawn(move ||{
184 let adec = if let DecoderType::Audio(ref mut dec) = audio_dec.dec { dec } else { panic!("not an audio decoder!"); };
185 let mut samplepos = 0usize;
186 let dst_chmap = if dst_info.channels == 2 {
187 NAChannelMap::from_str("L,R").unwrap()
188 } else {
189 NAChannelMap::from_str("C").unwrap()
190 };
191 SKIP_ADECODING.store(false, Ordering::Relaxed);
192 loop {
193 match aprecv.recv() {
194 Ok(PktSendEvent::Packet(pkt)) => {
195 loop {
196 if CUR_QUEUE_FILL.load(Ordering::Relaxed)
197 < QUEUE_REFILL_LIMIT || SKIP_ADECODING.load(Ordering::Relaxed) {
198 break;
199 }
200 std::thread::sleep(Duration::from_millis(100));
201 }
202 if !SKIP_ADECODING.load(Ordering::Relaxed) {
203 if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) {
204 let buf = frm.get_buffer();
205 if let Some(pts) = frm.get_pts() {
206 samplepos = NATimeInfo::ts_to_time(pts, u64::from(dst_info.sample_rate), frm.ts.tb_num, frm.ts.tb_den) as usize;
207 }
208 samplepos += buf.get_audio_length();
209 if let Ok(out_buf) = convert_audio_frame(&buf, &dst_info, &dst_chmap) {
210 match out_buf {
211 NABufferType::AudioI16(abuf) => {
212 let mut qdata = queue.lock().unwrap();
213 qdata.add(abuf.get_data(), samplepos);
214 drop(qdata);
215 },
216 NABufferType::AudioPacked(abuf) => {
217 let mut qdata = queue.lock().unwrap();
218 qdata.add_bytes(abuf.get_data(), samplepos);
219 drop(qdata);
220 },
221 _ => {},
222 };
223 }
224 }
225 }
226 },
227 Ok(PktSendEvent::GetFrames) => {},
228 Ok(PktSendEvent::Flush) => {
229 adec.flush();
230 let mut qdata = queue.lock().unwrap();
231 qdata.flush();
232 SKIP_ADECODING.store(false, Ordering::Relaxed);
233 },
234 Ok(PktSendEvent::End) => break,
235 Ok(PktSendEvent::ImmediateEnd) => {
236 let mut qdata = queue.lock().unwrap();
237 qdata.flush();
238 break;
239 },
240 Ok(PktSendEvent::HurryUp) => {},
241 Err(_) => {
242 break;
243 },
244 };
245 }
246 loop {
247 let qdata = queue.lock().unwrap();
248 if qdata.fill() == 0 || SKIP_ADECODING.load(Ordering::Relaxed) {
249 break;
250 }
251 }
252 AUDIO_END.store(true, Ordering::Relaxed);
253 }).unwrap())
254 }
255
256 pub struct AudioControl {
257 aqueue: Vec<PktSendEvent>,
258 apsend: SyncSender<PktSendEvent>,
259 adevice: AudioPlaybackType,
260 athread: JoinHandle<()>,
261 }
262
263 impl AudioControl {
264 pub fn new(audio_dec: Option<DecoderStuff>, ainfo: Option<NAAudioInfo>, asystem: &AudioSubsystem) -> Self {
265 let (apsend, aprecv) = std::sync::mpsc::sync_channel::<PktSendEvent>(20);
266 let (adevice, athread) = if let Some(audio_dec) = audio_dec {
267 start_audio_decoding(asystem, ainfo.unwrap(), audio_dec, aprecv)
268 } else {
269 (None, dummy_audio_thread(aprecv))
270 };
271 AUDIO_END.store(false, Ordering::Relaxed);
272
273 Self {
274 aqueue: Vec::new(),
275 apsend,
276 adevice,
277 athread,
278 }
279 }
280 pub fn has_audio(&self) -> bool { self.adevice.is_some() }
281 pub fn pause(&mut self) {
282 if let Some(ref device) = self.adevice {
283 device.pause();
284 }
285 }
286 pub fn resume(&mut self) {
287 if let Some(ref device) = self.adevice {
288 device.resume();
289 }
290 }
291 pub fn set_volume(&mut self, volume: usize) {
292 AUDIO_VOLUME.store(volume, Ordering::Relaxed);
293 }
294 pub fn get_volume(&self) -> usize {
295 AUDIO_VOLUME.load(Ordering::Relaxed)
296 }
297 pub fn is_audio_end(&self) -> bool {
298 AUDIO_END.load(Ordering::Relaxed)
299 }
300 pub fn get_fill(&self) -> usize { CUR_QUEUE_FILL.load(Ordering::Relaxed) }
301 pub fn get_time(&self) -> Option<u64> {
302 if CURRENT_TIME_SET.load(Ordering::Relaxed) {
303 Some(CURRENT_TIME.load(Ordering::Relaxed) as u64)
304 } else {
305 None
306 }
307 }
308 pub fn get_time_left(&self) -> u64 {
309 let srate = SAMPLE_RATE.load(Ordering::Relaxed);
310 let chans = CHANNELS.load(Ordering::Relaxed);
311 if srate != 0 && chans != 0{
312 let fill = self.get_fill();
313 (fill * 1000 / srate / chans) as u64
314 } else {
315 0
316 }
317 }
318
319 pub fn get_queue_size(&self) -> usize { self.aqueue.len() }
320 pub fn try_send_audio(&mut self, evt: PktSendEvent) -> bool {
321 if self.aqueue.len() > 0 {
322 self.aqueue.push(evt);
323 false
324 } else {
325 self.try_send_event(evt)
326 }
327 }
328 fn try_send_event(&mut self, evt: PktSendEvent) -> bool {
329 if let Err(TrySendError::Full(evt)) = self.apsend.try_send(evt) {
330 self.aqueue.insert(0, evt);
331 false
332 } else {
333 true
334 }
335 }
336 pub fn try_send_queued(&mut self) -> bool {
337 while !self.aqueue.is_empty() {
338 let pkt = self.aqueue.remove(0);
339 if !self.try_send_event(pkt) {
340 return false;
341 }
342 }
343 true
344 }
345
346 pub fn flush(&mut self) {
347 self.pause();
348 self.aqueue.clear();
349 SKIP_ADECODING.store(true, Ordering::Release);
350 CURRENT_TIME_SET.store(false, Ordering::Release);
351 let _ = self.apsend.send(PktSendEvent::Flush);
352 }
353 pub fn finish(self) {
354 SKIP_ADECODING.store(true, Ordering::Release);
355 let _ = self.apsend.send(PktSendEvent::ImmediateEnd);
356 self.athread.join().unwrap();
357 }
358 }