1 use std::time::Duration;
2 use std::thread::JoinHandle;
3 use std::sync::{Arc, Mutex};
4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5 use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
8 use sdl2::AudioSubsystem;
9 use sdl2::audio::{AudioDevice, AudioCallback, AudioSpecDesired};
11 use nihav_core::formats::*;
12 use nihav_core::codecs::*;
13 use nihav_core::soundcvt::*;
15 use super::{DecoderStuff, DecoderType, DecoderState, DecodingState, PktSendEvent};
17 static ADEC_STATE: DecoderState = DecoderState::new();
18 static AUDIO_VOLUME: AtomicUsize = AtomicUsize::new(100);
19 static CUR_QUEUE_FILL: AtomicUsize = AtomicUsize::new(0);
20 static SAMPLE_RATE: AtomicUsize = AtomicUsize::new(0);
21 static CHANNELS: AtomicUsize = AtomicUsize::new(0);
23 static CURRENT_TIME: AtomicUsize = AtomicUsize::new(0);
24 static CURRENT_TIME_SET: AtomicBool = AtomicBool::new(false);
26 const QUEUE_INITIAL_SIZE: usize = 16384;
27 const QUEUE_REFILL_LIMIT: usize = 262144;
39 fn new(srate: usize, chans: usize) -> Self {
40 SAMPLE_RATE.store(srate, Ordering::Relaxed);
41 CHANNELS.store(chans, Ordering::Relaxed);
42 CUR_QUEUE_FILL.store(0, Ordering::Relaxed);
44 queue: Vec::with_capacity(QUEUE_INITIAL_SIZE),
52 if self.start == 0 { return; }
53 let fill = self.fill();
55 if fill < self.start {
56 let (dst, src) = self.queue.split_at_mut(self.start);
57 dst[..fill].copy_from_slice(&src[..fill]);
60 self.queue[i] = self.queue[self.start + i];
67 fn set_time(&mut self) {
68 let fill = self.fill();
69 let samp_time = self.spos.saturating_sub(fill / self.chans);
70 CURRENT_TIME.store(samp_time * 1000 / self.srate, Ordering::Relaxed);
71 CURRENT_TIME_SET.store(true, Ordering::Relaxed);
72 CUR_QUEUE_FILL.store(self.fill(), Ordering::Relaxed);
74 fn add(&mut self, src: &[i16], samplepos: usize) {
75 if self.end + src.len() > self.queue.len() {
78 if self.end + src.len() > self.queue.len() {
79 self.queue.resize(self.end + src.len(), 0);
81 self.queue[self.end..][..src.len()].copy_from_slice(&src);
82 self.end += src.len();
83 self.spos = samplepos;
86 fn add_bytes(&mut self, src: &[u8], samplepos: usize) {
87 let srclen = src.len() / 2;
88 if self.end + srclen > self.queue.len() {
91 if self.end + srclen > self.queue.len() {
92 self.queue.resize(self.end + srclen, 0);
94 for (dst, src) in self.queue[self.end..][..srclen].iter_mut().zip(src.chunks_exact(2)) {
95 *dst = (u16::from(src[0]) + u16::from(src[1]) * 256) as i16;
98 self.spos = samplepos;
101 fn drain(&mut self, size: usize) {
102 let fill = self.fill();
110 fn fill(&self) -> usize { self.end - self.start }
111 fn flush(&mut self) {
117 pub struct AudioOutput {
118 queue: Arc<Mutex<AudioQueue>>,
121 impl AudioCallback for AudioOutput {
124 fn callback(&mut self, out: &mut [Self::Channel]) {
125 let mut queue = self.queue.lock().unwrap();
126 let dstlen = out.len();
127 let copylen = queue.fill().min(dstlen);
128 let volume = AUDIO_VOLUME.load(Ordering::Relaxed) as i32;
130 out[..copylen].copy_from_slice(&queue.queue[queue.start..][..copylen]);
132 for (dst, &src) in out[..copylen].iter_mut().zip(queue.queue[queue.start..].iter()) {
133 *dst = (i32::from(src) * volume / 100).max(-32768).min(32767) as i16;
136 queue.drain(copylen);
137 for el in out[copylen..].iter_mut() { *el = 0; }
141 fn dummy_audio_thread(aprecv: Receiver<PktSendEvent>) -> JoinHandle<()> {
142 std::thread::Builder::new().name("acontrol-dummy".to_string()).spawn(move ||{
144 match aprecv.recv() {
145 Ok(PktSendEvent::End) => break,
146 Ok(PktSendEvent::ImmediateEnd) => break,
156 type AudioPlaybackType = Option<AudioDevice<AudioOutput>>;
158 fn start_audio_decoding(asystem: &AudioSubsystem, ainfo: NAAudioInfo, mut audio_dec: DecoderStuff, aprecv: Receiver<PktSendEvent>) -> (AudioPlaybackType, JoinHandle<()>) {
159 let ch = ainfo.channels.max(2);
160 let desired_spec = AudioSpecDesired {
161 freq: Some(ainfo.sample_rate as i32),
165 let dst_info = NAAudioInfo {
166 sample_rate: ainfo.sample_rate,
168 format: SND_S16_FORMAT,
171 let queue = Arc::new(Mutex::new(AudioQueue::new(ainfo.sample_rate as usize, ch as usize)));
172 let qclone = queue.clone();
173 let ret = asystem.open_playback(None, &desired_spec, |_spec| {
179 return (None, dummy_audio_thread(aprecv))
181 let adevice = ret.unwrap();
182 (Some(adevice), std::thread::Builder::new().name("acontrol".to_string()).spawn(move ||{
183 let adec = if let DecoderType::Audio(ref mut dec) = audio_dec.dec { dec } else { panic!("not an audio decoder!"); };
184 let mut samplepos = 0usize;
185 let dst_chmap = if dst_info.channels == 2 {
186 NAChannelMap::from_str("L,R").unwrap()
188 NAChannelMap::from_str("C").unwrap()
190 ADEC_STATE.set_state(DecodingState::Normal);
192 match aprecv.recv() {
193 Ok(PktSendEvent::Packet(pkt)) => {
195 if CUR_QUEUE_FILL.load(Ordering::Relaxed)
196 < QUEUE_REFILL_LIMIT || ADEC_STATE.is_flushing() {
199 std::thread::sleep(Duration::from_millis(100));
201 if !ADEC_STATE.is_flushing() {
202 if let Ok(frm) = adec.decode(&mut audio_dec.dsupp, &pkt) {
203 let buf = frm.get_buffer();
204 if let Some(pts) = frm.get_pts() {
205 samplepos = NATimeInfo::ts_to_time(pts, u64::from(dst_info.sample_rate), frm.ts.tb_num, frm.ts.tb_den) as usize;
207 samplepos += buf.get_audio_length();
208 if let Ok(out_buf) = convert_audio_frame(&buf, &dst_info, &dst_chmap) {
210 NABufferType::AudioI16(abuf) => {
211 let mut qdata = queue.lock().unwrap();
212 qdata.add(abuf.get_data(), samplepos);
215 NABufferType::AudioPacked(abuf) => {
216 let mut qdata = queue.lock().unwrap();
217 qdata.add_bytes(abuf.get_data(), samplepos);
226 Ok(PktSendEvent::GetFrames) => {},
227 Ok(PktSendEvent::Flush) => {
229 let mut qdata = queue.lock().unwrap();
231 ADEC_STATE.set_state(DecodingState::Waiting);
233 Ok(PktSendEvent::End) => break,
234 Ok(PktSendEvent::ImmediateEnd) => {
235 let mut qdata = queue.lock().unwrap();
239 Ok(PktSendEvent::HurryUp) => {},
246 let qdata = queue.lock().unwrap();
247 if qdata.fill() == 0 || ADEC_STATE.is_flushing() {
251 ADEC_STATE.set_state(DecodingState::End);
255 pub struct AudioControl {
256 aqueue: Vec<PktSendEvent>,
257 apsend: SyncSender<PktSendEvent>,
258 adevice: AudioPlaybackType,
259 athread: JoinHandle<()>,
263 pub fn new(audio_dec: Option<DecoderStuff>, ainfo: Option<NAAudioInfo>, asystem: &AudioSubsystem) -> Self {
264 let (apsend, aprecv) = std::sync::mpsc::sync_channel::<PktSendEvent>(20);
265 let (adevice, athread) = if let Some(audio_dec) = audio_dec {
266 start_audio_decoding(asystem, ainfo.unwrap(), audio_dec, aprecv)
268 (None, dummy_audio_thread(aprecv))
270 ADEC_STATE.set_state(DecodingState::Normal);
279 pub fn has_audio(&self) -> bool { self.adevice.is_some() }
280 pub fn pause(&mut self) {
281 if let Some(ref device) = self.adevice {
285 pub fn resume(&mut self) {
286 if let Some(ref device) = self.adevice {
290 pub fn set_volume(&mut self, volume: usize) {
291 AUDIO_VOLUME.store(volume, Ordering::Relaxed);
293 pub fn get_volume(&self) -> usize {
294 AUDIO_VOLUME.load(Ordering::Relaxed)
296 pub fn is_audio_end(&self) -> bool {
297 matches!(ADEC_STATE.get_state(), DecodingState::End | DecodingState::Error)
299 pub fn get_fill(&self) -> usize { CUR_QUEUE_FILL.load(Ordering::Relaxed) }
300 pub fn get_time(&self) -> Option<u64> {
301 if CURRENT_TIME_SET.load(Ordering::Relaxed) {
302 Some(CURRENT_TIME.load(Ordering::Relaxed) as u64)
307 pub fn get_time_left(&self) -> u64 {
308 let srate = SAMPLE_RATE.load(Ordering::Relaxed);
309 let chans = CHANNELS.load(Ordering::Relaxed);
310 if srate != 0 && chans != 0{
311 let fill = self.get_fill();
312 (fill * 1000 / srate / chans) as u64
318 pub fn get_queue_size(&self) -> usize { self.aqueue.len() }
319 pub fn try_send_audio(&mut self, evt: PktSendEvent) -> bool {
320 if self.aqueue.len() > 0 {
321 self.aqueue.push(evt);
324 self.try_send_event(evt)
327 fn try_send_event(&mut self, evt: PktSendEvent) -> bool {
328 if let Err(TrySendError::Full(evt)) = self.apsend.try_send(evt) {
329 self.aqueue.insert(0, evt);
335 pub fn try_send_queued(&mut self) -> bool {
336 while !self.aqueue.is_empty() {
337 let pkt = self.aqueue.remove(0);
338 if !self.try_send_event(pkt) {
345 pub fn flush(&mut self) {
348 ADEC_STATE.set_state(DecodingState::Flush);
349 CURRENT_TIME_SET.store(false, Ordering::Release);
350 let _ = self.apsend.send(PktSendEvent::Flush);
352 pub fn finish(self) {
353 ADEC_STATE.set_state(DecodingState::Flush);
354 let _ = self.apsend.send(PktSendEvent::ImmediateEnd);
355 self.athread.join().unwrap();