use NAPacketiser::attach_stream() where appropriate
[nihav-player.git] / videoplayer / src / audiodec.rs
CommitLineData
69b93cb5
KS
1use std::time::Duration;
2use std::thread::JoinHandle;
3use std::sync::{Arc, Mutex};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
6use std::str::FromStr;
7
8use sdl2::AudioSubsystem;
9use sdl2::audio::{AudioDevice, AudioCallback, AudioSpecDesired};
10
11use nihav_core::formats::*;
12use nihav_core::codecs::*;
13use nihav_core::soundcvt::*;
14
4e72c04a 15use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, PktSendEvent};
69b93cb5 16
4e72c04a 17static ADEC_STATE: DecoderState = DecoderState::new();
69b93cb5 18static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100);
69b93cb5
KS
19static CUR_QUEUE_FILL: AtomicUsize = AtomicUsize::new(0);
20static SAMPLE_RATE: AtomicUsize = AtomicUsize::new(0);
21static CHANNELS: AtomicUsize = AtomicUsize::new(0);
22
23static CURRENT_TIME: AtomicUsize = AtomicUsize::new(0);
24static CURRENT_TIME_SET: AtomicBool = AtomicBool::new(false);
25
26const QUEUE_INITIAL_SIZE: usize = 16384;
27const QUEUE_REFILL_LIMIT: usize = 262144;
28
29struct AudioQueue {
30 queue: Vec<i16>,
31 start: usize,
32 end: usize,
33 srate: usize,
34 chans: usize,
35 spos: usize,
36}
37
38impl AudioQueue {
39 fn new(srate: usize, chans: usize) -> Self {
40 SAMPLE_RATE.store(srate, Ordering::Relaxed);
41 CHANNELS.store(chans, Ordering::Relaxed);
42 CUR_QUEUE_FILL.store(0, Ordering::Relaxed);
43 Self {
44 queue: Vec::with_capacity(QUEUE_INITIAL_SIZE),
45 start: 0,
46 end: 0,
47 srate, chans,
48 spos: 0,
49 }
50 }
51 fn home(&mut self) {
52 if self.start == 0 { return; }
53 let fill = self.fill();
54 if fill > 0 {
55 if fill < self.start {
56 let (dst, src) = self.queue.split_at_mut(self.start);
57 dst[..fill].copy_from_slice(&src[..fill]);
58 } else {
59 for i in 0..fill {
60 self.queue[i] = self.queue[self.start + i];
61 }
62 }
63 }
64 self.start = 0;
65 self.end = fill;
66 }
67 fn set_time(&mut self) {
68 let fill = self.fill();
69 let samp_time = self.spos.saturating_sub(fill / self.chans);
70 CURRENT_TIME.store(samp_time * 1000 / self.srate, Ordering::Relaxed);
71 CURRENT_TIME_SET.store(true, Ordering::Relaxed);
72 CUR_QUEUE_FILL.store(self.fill(), Ordering::Relaxed);
73 }
74 fn add(&mut self, src: &[i16], samplepos: usize) {
75 if self.end + src.len() > self.queue.len() {
76 self.home();
77 }
78 if self.end + src.len() > self.queue.len() {
79 self.queue.resize(self.end + src.len(), 0);
80 }
bbd69555 81 self.queue[self.end..][..src.len()].copy_from_slice(src);
69b93cb5
KS
82 self.end += src.len();
83 self.spos = samplepos;
84 self.set_time();
85 }
86 fn add_bytes(&mut self, src: &[u8], samplepos: usize) {
87 let srclen = src.len() / 2;
88 if self.end + srclen > self.queue.len() {
89 self.home();
90 }
91 if self.end + srclen > self.queue.len() {
92 self.queue.resize(self.end + srclen, 0);
93 }
94 for (dst, src) in self.queue[self.end..][..srclen].iter_mut().zip(src.chunks_exact(2)) {
95 *dst = (u16::from(src[0]) + u16::from(src[1]) * 256) as i16;
96 }
97 self.end += srclen;
98 self.spos = samplepos;
99 self.set_time();
100 }
101 fn drain(&mut self, size: usize) {
102 let fill = self.fill();
103 if size >= fill {
104 self.flush();
105 } else {
106 self.start += size;
107 }
108 self.set_time();
109 }
110 fn fill(&self) -> usize { self.end - self.start }
111 fn flush(&mut self) {
112 self.start = 0;
113 self.end = 0;
114 }
115}
116
117pub struct AudioOutput {
118 queue: Arc<Mutex<AudioQueue>>,
119}
120
121impl AudioCallback for AudioOutput {
122 type Channel = i16;
123
124 fn callback(&mut self, out: &mut [Self::Channel]) {
54ede181 125 let mut queue = self.queue.lock().expect("audio queue should be accessible");
69b93cb5
KS
126 let dstlen = out.len();
127 let copylen = queue.fill().min(dstlen);
128 let volume = AUDIO_VOLUME.load(Ordering::Relaxed) as i32;
129 if volume == 100 {
130 out[..copylen].copy_from_slice(&queue.queue[queue.start..][..copylen]);
131 } else {
132 for (dst, &src) in out[..copylen].iter_mut().zip(queue.queue[queue.start..].iter()) {
133 *dst = (i32::from(src) * volume / 100).max(-32768).min(32767) as i16;
134 }
135 }
136 queue.drain(copylen);
137 for el in out[copylen..].iter_mut() { *el = 0; }
138 }
139}
140
141fn dummy_audio_thread(aprecv: Receiver<PktSendEvent>) -> JoinHandle<()> {
142 std::thread::Builder::new().name("acontrol-dummy".to_string()).spawn(move ||{
143 loop {
144 match aprecv.recv() {
145 Ok(PktSendEvent::End) => break,
146 Ok(PktSendEvent::ImmediateEnd) => break,
147 Err(_) => {
148 break;
149 },
150 _ => {},
151 };
152 }
153 }).unwrap()
154}
155
156type AudioPlaybackType = Option<AudioDevice<AudioOutput>>;
157
e2ca0dbe 158fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, sbr_hack: bool, mut audio_dec: DecoderStuff, aprecv: Receiver<PktSendEvent>) -> (AudioPlaybackType, JoinHandle<()>) {
69b93cb5 159 let ch = ainfo.channels.max(2);
e2ca0dbe 160 let sample_rate = if !sbr_hack { ainfo.sample_rate } else { ainfo.sample_rate * 2 };
69b93cb5 161 let desired_spec = AudioSpecDesired {
e2ca0dbe 162 freq: Some(sample_rate as i32),
69b93cb5
KS
163 channels: Some(ch),
164 samples: None
165 };
166 let dst_info = NAAudioInfo {
bbd69555 167 sample_rate,
69b93cb5
KS
168 channels: ch,
169 format: SND_S16_FORMAT,
170 block_len: 0,
171 };
e2ca0dbe 172 let queue = Arc::new(Mutex::new(AudioQueue::new(sample_rate as usize, ch as usize)));
69b93cb5
KS
173 let qclone = queue.clone();
174 let ret = asystem.open_playback(None, &desired_spec, |_spec| {
175 AudioOutput {
176 queue: qclone,
177 }
178 });
179 if ret.is_err() {
180 return (None, dummy_audio_thread(aprecv))
181 }
182 let adevice = ret.unwrap();
183 (Some(adevice), std::thread::Builder::new().name("acontrol".to_string()).spawn(move ||{
37f130a7 184 let adec = if let DecoderType::Audio(ref mut dec) = audio_dec.dec { dec } else { panic!("not an audio decoder!"); };
69b93cb5
KS
185 let mut samplepos = 0usize;
186 let dst_chmap = if dst_info.channels == 2 {
54ede181 187 NAChannelMap::from_str("L,R").expect("should be able to create stereo channel map")
69b93cb5 188 } else {
54ede181 189 NAChannelMap::from_str("C").expect("should be able to create single-channel map")
69b93cb5 190 };
4e72c04a 191 ADEC_STATE.set_state(DecodingState::Normal);
69b93cb5
KS
192 loop {
193 match aprecv.recv() {
194 Ok(PktSendEvent::Packet(pkt)) => {
195 loop {
196 if CUR_QUEUE_FILL.load(Ordering::Relaxed)
4e72c04a 197 < QUEUE_REFILL_LIMIT || ADEC_STATE.is_flushing() {
69b93cb5
KS
198 break;
199 }
200 std::thread::sleep(Duration::from_millis(100));
201 }
4e72c04a 202 if !ADEC_STATE.is_flushing() {
37f130a7 203 if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) {
69b93cb5
KS
204 let buf = frm.get_buffer();
205 if let Some(pts) = frm.get_pts() {
206 samplepos = NATimeInfo::ts_to_time(pts, u64::from(dst_info.sample_rate), frm.ts.tb_num, frm.ts.tb_den) as usize;
e2ca0dbe
KS
207 if sbr_hack {
208 samplepos >>= 2;
209 }
69b93cb5
KS
210 }
211 samplepos += buf.get_audio_length();
212 if let Ok(out_buf) = convert_audio_frame(&buf, &dst_info, &dst_chmap) {
213 match out_buf {
214 NABufferType::AudioI16(abuf) => {
54ede181 215 let mut qdata = queue.lock().expect("audio queue should be accessible");
69b93cb5
KS
216 qdata.add(abuf.get_data(), samplepos);
217 drop(qdata);
218 },
219 NABufferType::AudioPacked(abuf) => {
54ede181 220 let mut qdata = queue.lock().expect("audio queue should be accessible");
69b93cb5
KS
221 qdata.add_bytes(abuf.get_data(), samplepos);
222 drop(qdata);
223 },
224 _ => {},
225 };
226 }
227 }
228 }
229 },
37f130a7 230 Ok(PktSendEvent::GetFrames) => {},
69b93cb5 231 Ok(PktSendEvent::Flush) => {
37f130a7 232 adec.flush();
54ede181 233 let mut qdata = queue.lock().expect("audio queue should be accessible");
69b93cb5 234 qdata.flush();
4e72c04a 235 ADEC_STATE.set_state(DecodingState::Waiting);
69b93cb5
KS
236 },
237 Ok(PktSendEvent::End) => break,
238 Ok(PktSendEvent::ImmediateEnd) => {
54ede181 239 let mut qdata = queue.lock().expect("audio queue should be accessible");
69b93cb5
KS
240 qdata.flush();
241 break;
242 },
243 Ok(PktSendEvent::HurryUp) => {},
244 Err(_) => {
245 break;
246 },
247 };
248 }
249 loop {
54ede181 250 let qdata = queue.lock().expect("audio queue should be accessible");
4e72c04a 251 if qdata.fill() == 0 || ADEC_STATE.is_flushing() {
69b93cb5
KS
252 break;
253 }
254 }
4e72c04a 255 ADEC_STATE.set_state(DecodingState::End);
69b93cb5
KS
256 }).unwrap())
257}
258
259pub struct AudioControl {
260 aqueue: Vec<PktSendEvent>,
261 apsend: SyncSender<PktSendEvent>,
262 adevice: AudioPlaybackType,
263 athread: JoinHandle<()>,
264}
265
266impl AudioControl {
e2ca0dbe 267 pub fn new(audio_dec: Option<DecoderStuff>, ainfo: Option<NAAudioInfo>, sbr_hack: bool, asystem: &AudioSubsystem) -> Self {
69b93cb5
KS
268 let (apsend, aprecv) = std::sync::mpsc::sync_channel::<PktSendEvent>(20);
269 let (adevice, athread) = if let Some(audio_dec) = audio_dec {
e2ca0dbe 270 start_audio_decoding(asystem, ainfo.expect("audio info should be present"), sbr_hack, audio_dec, aprecv)
69b93cb5
KS
271 } else {
272 (None, dummy_audio_thread(aprecv))
273 };
4e72c04a 274 ADEC_STATE.set_state(DecodingState::Normal);
69b93cb5
KS
275
276 Self {
277 aqueue: Vec::new(),
278 apsend,
279 adevice,
280 athread,
281 }
282 }
283 pub fn has_audio(&self) -> bool { self.adevice.is_some() }
284 pub fn pause(&mut self) {
285 if let Some(ref device) = self.adevice {
286 device.pause();
287 }
288 }
289 pub fn resume(&mut self) {
290 if let Some(ref device) = self.adevice {
291 device.resume();
292 }
293 }
294 pub fn set_volume(&mut self, volume: usize) {
295 AUDIO_VOLUME.store(volume, Ordering::Relaxed);
296 }
297 pub fn get_volume(&self) -> usize {
298 AUDIO_VOLUME.load(Ordering::Relaxed)
299 }
300 pub fn is_audio_end(&self) -> bool {
4e72c04a 301 matches!(ADEC_STATE.get_state(), DecodingState::End | DecodingState::Error)
69b93cb5
KS
302 }
303 pub fn get_fill(&self) -> usize { CUR_QUEUE_FILL.load(Ordering::Relaxed) }
304 pub fn get_time(&self) -> Option<u64> {
305 if CURRENT_TIME_SET.load(Ordering::Relaxed) {
306 Some(CURRENT_TIME.load(Ordering::Relaxed) as u64)
307 } else {
308 None
309 }
310 }
311 pub fn get_time_left(&self) -> u64 {
312 let srate = SAMPLE_RATE.load(Ordering::Relaxed);
313 let chans = CHANNELS.load(Ordering::Relaxed);
314 if srate != 0 && chans != 0{
315 let fill = self.get_fill();
316 (fill * 1000 / srate / chans) as u64
317 } else {
318 0
319 }
320 }
321
322 pub fn get_queue_size(&self) -> usize { self.aqueue.len() }
323 pub fn try_send_audio(&mut self, evt: PktSendEvent) -> bool {
bbd69555 324 if !self.aqueue.is_empty() {
69b93cb5
KS
325 self.aqueue.push(evt);
326 false
327 } else {
328 self.try_send_event(evt)
329 }
330 }
331 fn try_send_event(&mut self, evt: PktSendEvent) -> bool {
332 if let Err(TrySendError::Full(evt)) = self.apsend.try_send(evt) {
333 self.aqueue.insert(0, evt);
334 false
335 } else {
336 true
337 }
338 }
339 pub fn try_send_queued(&mut self) -> bool {
340 while !self.aqueue.is_empty() {
341 let pkt = self.aqueue.remove(0);
342 if !self.try_send_event(pkt) {
343 return false;
344 }
345 }
346 true
347 }
348
349 pub fn flush(&mut self) {
350 self.pause();
b5053bfc 351 self.aqueue.clear();
4e72c04a 352 ADEC_STATE.set_state(DecodingState::Flush);
69b93cb5
KS
353 CURRENT_TIME_SET.store(false, Ordering::Release);
354 let _ = self.apsend.send(PktSendEvent::Flush);
355 }
356 pub fn finish(self) {
4e72c04a 357 ADEC_STATE.set_state(DecodingState::Flush);
69b93cb5
KS
358 let _ = self.apsend.send(PktSendEvent::ImmediateEnd);
359 self.athread.join().unwrap();
360 }
361}