From b0481c9e4084c2946507754bc194a64306a278f8 Mon Sep 17 00:00:00 2001 From: Kostya Shishkov Date: Fri, 5 May 2023 18:01:34 +0200 Subject: [PATCH] improve audio processing pipeline --- src/acvt.rs | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 32 +++++++--- 2 files changed, 201 insertions(+), 8 deletions(-) create mode 100644 src/acvt.rs diff --git a/src/acvt.rs b/src/acvt.rs new file mode 100644 index 0000000..5343039 --- /dev/null +++ b/src/acvt.rs @@ -0,0 +1,177 @@ +use nihav_core::frame::*; +use nihav_core::soundcvt::*; + +struct AudioQueue { + start: usize, + end: usize, + stride: usize, + channels: usize, + data: Vec, + ileaved: bool, +} + +impl> AudioQueue { + fn new(channels: usize, rec_size: usize, ileaved: bool) -> Self { + Self { + start: 0, + end: 0, + stride: if ileaved { rec_size * channels } else { rec_size }, + channels, + ileaved, + data: vec![0.into(); rec_size * channels], + } + } + fn get_cur_size(&self) -> usize { self.end - self.start } + fn get_cur_avail(&self) -> usize { self.stride - self.end } + fn get_potentially_avail(&self) -> usize { self.stride - self.get_cur_size() } + fn read(&mut self, src: &NAAudioBuffer) { + let mut to_copy = src.get_length(); + if self.ileaved { + to_copy *= self.channels; + } + if self.get_cur_avail() < to_copy { + if self.get_potentially_avail() >= to_copy { + self.renorm(); + } else { + let new_len = (self.stride * 2).max(self.get_cur_size() + src.get_length()); + let mut new_buf = vec![0.into(); new_len * self.channels]; + let new_stride = if !self.ileaved { new_len } else { new_len * self.channels }; + + let old_len = self.get_cur_size(); + let new_len = src.get_length(); + for (dst, (old, new)) in new_buf.chunks_exact_mut(new_stride).zip( + self.data.chunks_exact(self.stride).zip( + src.get_data().chunks(src.get_stride()))) { + dst[..old_len].copy_from_slice(&old[self.start..self.end]); + dst[old_len..][..new_len].copy_from_slice(&new[..new_len]); + } + self.data = new_buf; + self.stride = new_stride; + self.start = 0; + self.end = old_len + new_len; + return; + } + } + for (dst, src) in self.data.chunks_exact_mut(self.stride).zip(src.get_data().chunks_exact(src.get_stride())) { + dst[self.end..][..to_copy].copy_from_slice(&src[..to_copy]); + } + self.end += to_copy; + } + fn write(&mut self, dbuf: &mut NAAudioBuffer) { + let dst_len = dbuf.get_length(); + let dst_stride = dbuf.get_stride(); + let dst = dbuf.get_data_mut().unwrap(); + + for (dst, src) in dst.chunks_mut(dst_stride).zip(self.data.chunks_exact(self.stride)) { + dst[..dst_len].copy_from_slice(&src[self.start..][..dst_len]); + } + self.start += dst_len; + } + fn renorm(&mut self) { + if self.start == 0 { + return; + } + + let move_size = self.end - self.start; + if move_size > 0 { + for chan in self.data.chunks_exact_mut(self.stride) { + for i in 0..move_size { + chan[i] = chan[self.start + i]; + } + } + } + self.end -= self.start; + self.start = 0; + } +} + +enum AudioDataType { + U8(AudioQueue), + I16(AudioQueue), + I32(AudioQueue), + F32(AudioQueue), + Packed(AudioQueue), +} + +impl AudioDataType { + fn get_length(&self) -> usize { + match self { + AudioDataType::U8(ref queue) => queue.get_cur_size(), + AudioDataType::I16(ref queue) => queue.get_cur_size(), + AudioDataType::I32(ref queue) => queue.get_cur_size(), + AudioDataType::F32(ref queue) => queue.get_cur_size(), + AudioDataType::Packed(ref queue) => queue.get_cur_size(), + } + } +} + +pub struct AudioConverter { + queue: AudioDataType, + dst_fmt: NAAudioInfo, + dst_chmap: NAChannelMap, + apts: Option, +} + +impl AudioConverter { + pub fn new(_sinfo: &NAAudioInfo, dinfo: &NAAudioInfo, dst_chmap: NAChannelMap) -> Self { + let ch = usize::from(dinfo.channels); + let size = dinfo.block_len * 2; + let il = !dinfo.format.planar; + let queue = match (dinfo.format.bits, dinfo.format.float, dinfo.format.signed) { + ( 8, false, false) => AudioDataType::U8(AudioQueue::new(ch, size, il)), + (16, false, true) => AudioDataType::I16(AudioQueue::new(ch, size, il)), + (32, false, true) => AudioDataType::I32(AudioQueue::new(ch, size, il)), + (32, true, _) => AudioDataType::F32(AudioQueue::new(ch, size, il)), + _ => AudioDataType::Packed(AudioQueue::new(ch, size, il)), + }; + Self { + queue, + dst_fmt: *dinfo, + dst_chmap, + apts: None, + } + } + pub fn queue_frame(&mut self, buf: NABufferType, tinfo: NATimeInfo) -> bool { + let ret = convert_audio_frame(&buf, &self.dst_fmt, &self.dst_chmap); + if let Ok(dbuf) = ret { + if self.apts.is_none() && tinfo.get_pts().is_some() { + self.apts = tinfo.get_pts(); + } + match (&mut self.queue, dbuf) { + (AudioDataType::U8(ref mut queue), NABufferType::AudioU8(ref buf)) => queue.read(buf), + (AudioDataType::I16(ref mut queue), NABufferType::AudioI16(ref buf)) => queue.read(buf), + (AudioDataType::I32(ref mut queue), NABufferType::AudioI32(ref buf)) => queue.read(buf), + (AudioDataType::F32(ref mut queue), NABufferType::AudioF32(ref buf)) => queue.read(buf), + (AudioDataType::Packed(ref mut queue), NABufferType::AudioPacked(ref buf)) => queue.read(buf), + _ => unimplemented!(), + }; + true + } else { + false + } + } + pub fn get_frame(&mut self, info: NACodecInfoRef) -> Option { + if self.queue.get_length() >= self.dst_fmt.block_len { + if let Ok(mut abuf) = alloc_audio_buffer(self.dst_fmt, self.dst_fmt.block_len, self.dst_chmap.clone()) { + match (&mut self.queue, &mut abuf) { + (AudioDataType::U8(ref mut queue), NABufferType::AudioU8(ref mut buf)) => queue.write(buf), + (AudioDataType::I16(ref mut queue), NABufferType::AudioI16(ref mut buf)) => queue.write(buf), + (AudioDataType::I32(ref mut queue), NABufferType::AudioI32(ref mut buf)) => queue.write(buf), + (AudioDataType::F32(ref mut queue), NABufferType::AudioF32(ref mut buf)) => queue.write(buf), + (AudioDataType::Packed(ref mut queue), NABufferType::AudioPacked(ref mut buf)) => queue.write(buf), + _ => unimplemented!(), + }; + let tinfo = NATimeInfo::new(self.apts, None, Some(self.dst_fmt.block_len as u64), 1, self.dst_fmt.sample_rate); + if let Some(ref mut val) = self.apts { + *val += self.dst_fmt.block_len as u64; + } + Some(NAFrame::new(tinfo, FrameType::I, true, info, abuf)) + } else { + println!(" failed to allocate audio frame"); + None + } + } else { + None + } + } +} diff --git a/src/main.rs b/src/main.rs index a2889a8..ff19ae1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,6 @@ use nihav_core::demuxers::*; use nihav_core::muxers::*; use nihav_core::reorder::*; use nihav_core::scale::*; -use nihav_core::soundcvt::*; use nihav_registry::detect; use nihav_registry::register; use std::env; @@ -22,6 +21,8 @@ use std::time::{Duration, Instant}; mod demux; use crate::demux::*; mod null; +mod acvt; +use crate::acvt::*; fn format_time(ms: u64) -> String { let s = ms / 1000; @@ -102,7 +103,7 @@ struct OutputStreamOptions { enum OutputConvert { Video(NAScale, NABufferType), - Audio(NAAudioInfo, NAChannelMap), + Audio(AudioConverter), None, } @@ -549,8 +550,9 @@ println!("can't generate default channel map for {} channels", dainfo.channels); return false; }, }; + let acvt = AudioConverter::new(sainfo, dainfo, dchmap); //todo channelmap - OutputConvert::Audio(*dainfo, dchmap) + OutputConvert::Audio(acvt) } }, _ => OutputConvert::None, @@ -627,7 +629,8 @@ println!("can't generate default channel map for {} channels", dainfo.channels); }, }; //todo channelmap - OutputConvert::Audio(*dainfo, dchmap) + let acvt = AudioConverter::new(sainfo, dainfo, dchmap); + OutputConvert::Audio(acvt) } }, _ => OutputConvert::None, @@ -766,13 +769,12 @@ fn encode_frame(dst_id: u32, encoder: &mut Box, cvt: &mut OutputC } dbuf.clone() }, - OutputConvert::Audio(ref dinfo, ref dchmap) => { - let ret = convert_audio_frame(&buf, dinfo, dchmap); - if ret.is_err() { + OutputConvert::Audio(ref mut acvt) => { + if !acvt.queue_frame(buf, frm.get_time_information()) { println!("error converting audio for stream {}", dst_id); return false; } - ret.unwrap() + return true; }, } }; @@ -1202,6 +1204,7 @@ println!("stream {} - {} {}", i, s, info.get_name()); break; } let frm = ret.unwrap(); + let tinfo = frm.get_info(); reorderer.add_frame(frm); while let Some(frm) = reorderer.get_frame() { if !encode_frame(dst_id, encoder, cvt, frm, &transcoder.scale_opts) { @@ -1218,6 +1221,19 @@ println!("stream {} - {} {}", i, s, info.get_name()); mux.mux_frame(pkt).unwrap(); } } + if let OutputConvert::Audio(ref mut acvt) = cvt { + while let Some(ofrm) = acvt.get_frame(tinfo.clone()) { + if encoder.encode(&ofrm).is_err() { + break; + } + while let Ok(Some(pkt)) = encoder.get_packet() { + if transcoder.end != NATimePoint::None && !pkt.ts.less_than(transcoder.end) { break 'main_loop; } + let pkt_size = pkt.get_buffer().len(); + adata_size += pkt_size; + mux.mux_frame(pkt).unwrap(); + } + } + } } else { println!("no decoder for stream {}", src_id); break; -- 2.30.2