rmmux: rework video packetisation and add some debug prints
authorKostya Shishkov <kostya.shishkov@gmail.com>
Thu, 11 May 2023 17:26:44 +0000 (19:26 +0200)
committerKostya Shishkov <kostya.shishkov@gmail.com>
Thu, 18 May 2023 16:17:48 +0000 (18:17 +0200)
nihav-realmedia/src/muxers/rmvb/audiostream.rs
nihav-realmedia/src/muxers/rmvb/mod.rs
nihav-realmedia/src/muxers/rmvb/videostream.rs

index 1110b4bba0ade94b92e64637668083cebbee1f81..0e174a2729099524613bfbedf210cb69e78aa012 100644 (file)
@@ -499,6 +499,7 @@ impl RMStreamWriter for AudioStreamWriter {
         bw.seek(SeekFrom::Start(cur_pos))?;
         Ok(())
     }
+    fn set_pkt_size(&mut self, _pkt_size: usize) {}
 }
 
 fn write_audio_metadata(bw: &mut ByteWriter) -> MuxerResult<()> {
index 7d8a7fd65f0f5fb62118b0141e3582cf353bb265..d5af530475a291730652044c1ab0a0e0c47dba67 100644 (file)
@@ -38,6 +38,7 @@ pub trait RMStreamWriter {
     fn get_packet(&mut self) -> Option<(Vec<u8>, u32, bool)>;
     fn flush(&mut self);
     fn finish(&mut self, bw: &mut ByteWriter) -> MuxerResult<()>;
+    fn set_pkt_size(&mut self, pkt_size: usize);
 }
 
 #[derive(Clone,Copy)]
@@ -59,12 +60,13 @@ struct RMStream {
     keyframe:       bool,
     audio:          bool,
     index:          Vec<IndexEntry>,
+    debug:          bool,
 }
 
 impl RMStream {
-    fn new(strno: usize, stream: &NAStream) -> MuxerResult<Self> {
+    fn new(strno: usize, stream: &NAStream, pkt_size: usize) -> MuxerResult<Self> {
         let packetiser = match stream.get_media_type() {
-                StreamType::Video => create_video_stream(stream)?,
+                StreamType::Video => create_video_stream(stream, pkt_size)?,
                 StreamType::Audio => create_audio_stream(stream)?,
                 _ => Box::new(DummyStreamWriter{}),
             };
@@ -80,6 +82,7 @@ impl RMStream {
             keyframe:       false,
             audio:          false,
             index:          Vec::new(),
+            debug:          false,
         })
     }
     fn write_mdpr(&mut self, bw: &mut ByteWriter, strm: &NAStream) -> MuxerResult<()> {
@@ -138,6 +141,9 @@ impl RMStream {
                 self.index.push(IndexEntry{ time: ts, pos: bw.tell(), pkt_no: *pkt_no });
             }
             let is_keyframe = self.keyframe && (!self.audio || first);
+            if self.debug {
+                println!(" writing packet for stream {} size {}{}", self.stream_id, data.len(), if is_keyframe { " kf" } else { "" });
+            }
             bw.write_u16be(0)?; //version;
             bw.write_u16be((data.len() + 12) as u16)?;
             bw.write_u16be(self.stream_id)?;
@@ -177,6 +183,9 @@ struct RMMuxer<'a> {
     data_pos:       u64,
     num_chunks:     u32,
     cur_packet:     u32,
+
+    debug:          bool,
+    vpkt_size:      usize,
 }
 
 impl<'a> RMMuxer<'a> {
@@ -187,6 +196,9 @@ impl<'a> RMMuxer<'a> {
             data_pos:   0,
             num_chunks: 0,
             cur_packet: 0,
+
+            debug:      false,
+            vpkt_size:  1400,
         }
     }
     fn write_index(&mut self) -> MuxerResult<()> {
@@ -284,7 +296,7 @@ impl<'a> MuxCore<'a> for RMMuxer<'a> {
 
         self.streams.clear();
         for (strno, strm) in strmgr.iter().enumerate() {
-            let mut swriter = RMStream::new(strno, &strm)?;
+            let mut swriter = RMStream::new(strno, &strm, self.vpkt_size)?;
             swriter.write_mdpr(self.bw, &strm)?;
             self.streams.push(swriter);
             self.num_chunks += 1;
@@ -336,10 +348,54 @@ impl<'a> MuxCore<'a> for RMMuxer<'a> {
     }
 }
 
+const DEBUG_OPTION: &str = "debug";
+const VPKT_SIZE_OPTION: &str = "vpkt_size";
+
+const MUXER_OPTIONS: &[NAOptionDefinition] = &[
+    NAOptionDefinition {
+        name: DEBUG_OPTION, description: "print some muxer statistics",
+        opt_type: NAOptionDefinitionType::Bool },
+    NAOptionDefinition {
+        name: VPKT_SIZE_OPTION, description: "video packet maximum size",
+        opt_type: NAOptionDefinitionType::Int(Some(1024), Some(14800)) },
+];
+
 impl<'a> NAOptionHandler for RMMuxer<'a> {
-    fn get_supported_options(&self) -> &[NAOptionDefinition] { &[] }
-    fn set_options(&mut self, _options: &[NAOption]) { }
-    fn query_option_value(&self, _name: &str) -> Option<NAValue> { None }
+    fn get_supported_options(&self) -> &[NAOptionDefinition] { MUXER_OPTIONS }
+    fn set_options(&mut self, options: &[NAOption]) {
+        for option in options.iter() {
+            for opt_def in MUXER_OPTIONS.iter() {
+                if opt_def.check(option).is_ok() {
+                    match option.name {
+                        DEBUG_OPTION => {
+                            if let NAValue::Bool(val) = option.value {
+                                self.debug = val;
+                                for stream in self.streams.iter_mut() {
+                                    stream.debug = val;
+                                }
+                            }
+                        },
+                        VPKT_SIZE_OPTION => {
+                            if let NAValue::Int(intval) = option.value {
+                                self.vpkt_size = intval as usize;
+                                for stream in self.streams.iter_mut() {
+                                    stream.packetiser.set_pkt_size(self.vpkt_size);
+                                }
+                            }
+                        },
+                        _ => {},
+                    };
+                }
+            }
+        }
+    }
+    fn query_option_value(&self, name: &str) -> Option<NAValue> {
+        match name {
+            DEBUG_OPTION => Some(NAValue::Bool(self.debug)),
+            VPKT_SIZE_OPTION => Some(NAValue::Int(self.vpkt_size as i64)),
+            _ => None,
+        }
+    }
 }
 
 pub struct RealMediaMuxerCreator {}
@@ -449,7 +505,7 @@ mod test {
             };
         test_remuxing(&dec_config, &enc_config);*/
         test_remuxing_md5(&dec_config, "realmedia", &mux_reg,
-                          [0x26422839, 0xa2d7bdd1, 0xd6ea2a78, 0x1b58033a]);
+                          [0x80e7d0c2, 0x5bf1b72b, 0x653beb40, 0x81ab14e9]);
     }
 
     #[test]
@@ -570,7 +626,7 @@ mod test {
             };
         test_remuxing(&dec_config, &enc_config);*/
         test_remuxing_md5(&dec_config, "realmedia", &mux_reg,
-                          [0xcfa1a27b, 0x78314fa7, 0xeb90c31c, 0x7eafeaa8]);
+                          [0x7cfb02af, 0xbbf64748, 0x086b5005, 0xc55dbc9d]);
     }
     #[test]
     fn test_rm_muxer_ralf() {
@@ -594,6 +650,6 @@ mod test {
             };
         test_remuxing(&dec_config, &enc_config);*/
         test_remuxing_md5(&dec_config, "realmedia", &mux_reg,
-                          [0xa0c336d1, 0x76221455, 0x75252067, 0x6189d4af]);
+                          [0xaf8345be, 0xf3912b40, 0xf720acdc, 0xc825b29e]);
     }
 }
index 1001c519afccdef3592ee33262447080f1deb541..b6ed1a794ef071b049a68084840360489447bae5 100644 (file)
@@ -1,3 +1,4 @@
+use std::collections::VecDeque;
 use nihav_core::frame::*;
 use nihav_core::muxers::*;
 use super::RMStreamWriter;
@@ -27,16 +28,48 @@ impl RMStreamWriter for DummyStreamWriter {
     fn finish(&mut self, _bw: &mut ByteWriter) -> MuxerResult<()> {
         Ok(())
     }
+    fn set_pkt_size(&mut self, _pkt_size: usize) {}
+}
+
+#[derive(Clone,Copy)]
+enum VideoDataType {
+    Frame,
+    Slice{pkt_no: u8, npkt: u8, full_size: u32, offset: u32},
+}
+
+impl VideoDataType {
+    fn is_frame(self) -> bool { matches!(self, VideoDataType::Frame) }
+}
+
+fn val_to_size(val: u32) -> usize { if val < (1 << 14) { 2 } else { 4 } }
+
+#[derive(Clone)]
+struct VideoData {
+    vtype:      VideoDataType,
+    pts:        u32,
+    seq_no:     u8,
+    data:       Vec<u8>,
+}
+
+impl VideoData {
+    fn get_pkt_len(&self) -> usize {
+        let plen = self.data.len();
+        let ts_size = val_to_size(self.pts);
+        match self.vtype {
+            VideoDataType::Frame => plen + val_to_size(plen as u32) + ts_size + 2,
+            VideoDataType::Slice{pkt_no: _, npkt: _, full_size, offset} => plen + val_to_size(full_size) + val_to_size(offset) + 3,
+        }
+    }
 }
 
 struct VideoStreamWriter {
     fcc:        [u8; 4],
-    buf:        Vec<u8>,
-    nslices:    usize,
-    cur_slice:  usize,
     seq_no:     u8,
     time:       u32,
     mi_time:    u32,
+    pkt_size:   usize,
+    queue:      VecDeque<VideoData>,
+    flush:      bool,
 }
 
 impl RMStreamWriter for VideoStreamWriter {
@@ -78,84 +111,167 @@ impl RMStreamWriter for VideoStreamWriter {
         Ok(())
     }
     fn queue_packet(&mut self, pkt: NAPacket, ms: u32) -> bool {
-        if self.nslices == 0 {
-            let src = pkt.get_buffer();
-            let nslices = usize::from(src[0]) + 1;
-            if src.len() > nslices * 8 + 1 {
-                self.nslices = nslices;
-                self.cur_slice = 0;
-                self.buf.resize(src.len(), 0);
-                self.buf.copy_from_slice(&src);
-                self.time = ms;
-                if ms > 0 {
-                    self.mi_time = ms.max(self.mi_time + 1);
-                }
+        let tot_size = self.queue.iter().fold(0usize, |acc, q| acc + q.get_pkt_len());
+        if tot_size > self.pkt_size {
+            return false;
+        }
+
+        self.time = ms;
+        if ms > 0 {
+            self.mi_time = ms.max(self.mi_time + 1);
+        }
+
+        let src = pkt.get_buffer();
+        let nslices = usize::from(src[0]) + 1;
+        let hdr_size = nslices * 8 + 1;
+
+        if nslices == 1 {
+            self.queue.push_back(VideoData {
+                    vtype:      VideoDataType::Frame,
+                    pts:        self.mi_time,
+                    seq_no:     self.seq_no,
+                    data:       src[9..].to_vec(),
+                });
+        } else if src.len() > hdr_size {
+            let mut slice_sizes = [0; 256];
+            let mut slice_offs  = [0; 256];
+
+            for (el, src) in slice_offs.iter_mut().zip(src[1..].chunks_exact(8)) {
+                *el = read_u32be(&src[4..]).unwrap() as usize;
+            }
+            for (dst, offs) in slice_sizes[..nslices - 1].iter_mut().zip(slice_offs.windows(2)) {
+                *dst = offs[1] - offs[0];
+            }
+            slice_sizes[nslices - 1] = src.len() - hdr_size - slice_offs[nslices - 1];
+
+            let src = &src[hdr_size..];
+            let full_size = src.len() as u32;
+            let npkt = nslices as u8;
+            for (pkt_no, (&offset, &size)) in slice_offs.iter().zip(slice_sizes.iter()).take(nslices).enumerate() {
+                let vtype = VideoDataType::Slice{pkt_no: (pkt_no + 1) as u8, npkt, full_size, offset: offset as u32};
+                self.queue.push_back(VideoData {
+                        vtype,
+                        pts:        self.mi_time,
+                        seq_no:     self.seq_no,
+                        data:       src[offset..][..size].to_vec(),
+                    });
             }
-            true
-        } else {
-            false
         }
+
+        self.seq_no = self.seq_no.wrapping_add(1);
+
+        true
     }
     fn get_packet(&mut self) -> Option<(Vec<u8>, u32, bool)> {
-        if self.cur_slice < self.nslices {
-            let first = self.cur_slice == 0;
-            let hdr_size = self.nslices * 8 + 1;
-            let cur_off = (read_u32be(&self.buf[self.cur_slice * 8 + 5..]).unwrap_or(0) as usize) + hdr_size;
-            let next_off = if self.cur_slice + 1 < self.nslices {
-                    (read_u32be(&self.buf[self.cur_slice * 8 + 13..]).unwrap_or(0) as usize) + hdr_size
-                } else {
-                    self.buf.len()
-                };
-            let next_off = next_off.max(cur_off);
-            let src = &self.buf[cur_off..next_off];
-            let ret = if self.nslices == 1 {
-                    let mut dst = vec![0; src.len() + 2];
-                    dst[0] = 0x40;
-                    dst[1] = self.seq_no;
-                    dst[2..].copy_from_slice(src);
-                    dst
-                } else {
-                    let mut dst = Vec::with_capacity(src.len() + 11);
-                    let mut gw = GrowableMemoryWriter::new_write(&mut dst);
-                    let mut bw = ByteWriter::new(&mut gw);
-
-                    let hdr = ((self.nslices as u16) << 7) | ((self.cur_slice + 1) as u16);
-                    bw.write_u16be(hdr).unwrap();
-
-                    let full_size = self.buf.len() - hdr_size;
-                    if full_size < (1 << 14) {
-                        bw.write_u16be(0xC000 | (full_size as u16)).unwrap();
-                    } else {
-                        bw.write_u32be(0x80000000 | (full_size as u32)).unwrap();
+        if self.queue.is_empty() {
+            return None;
+        }
+        let tot_size = self.queue.iter().fold(0usize, |acc, q| acc + q.get_pkt_len());
+        if tot_size < self.pkt_size && !self.flush {
+            return None;
+        }
+        let mut pkt_buf = Vec::new();
+
+        let first = self.queue.pop_front().unwrap();
+        let is_first = match first.vtype {
+                VideoDataType::Frame => true,
+                VideoDataType::Slice{pkt_no, npkt: _, full_size: _, offset: _} => pkt_no == 1,
+            };
+        if self.queue.is_empty() || (first.get_pkt_len() + self.queue[0].get_pkt_len() + 4 > self.pkt_size) {
+            match first.vtype {
+                VideoDataType::Frame => {
+                    pkt_buf.push(0x40); // 0x1 = whole frame
+                    pkt_buf.push(first.seq_no);
+                    pkt_buf.extend_from_slice(&first.data);
+                },
+                VideoDataType::Slice{pkt_no, npkt, full_size: _, offset: _} => {
+                    let id = if pkt_no == npkt { 2 } else { 0 };
+                    write_slice(&mut pkt_buf, id, &first);
+                },
+            };
+        } else {
+            let second = &self.queue[0];
+            match (first.vtype.is_frame(), second.vtype.is_frame()) {
+                (true, true) => {
+                    write_multiple_frame(&mut pkt_buf, &first);
+                    while !self.queue.is_empty() && self.queue[0].vtype.is_frame() && (pkt_buf.len() + self.queue[0].get_pkt_len() < self.pkt_size) {
+                        let frm = self.queue.pop_front().unwrap();
+                        write_multiple_frame(&mut pkt_buf, &frm);
+                    }
+                },
+                (true, false) => {
+                    pkt_buf.push(0x40); // 0x1 = whole frame
+                    pkt_buf.push(first.seq_no);
+                    pkt_buf.extend_from_slice(&first.data);
+                },
+                (false, true) => {
+                    write_slice(&mut pkt_buf, 2, &first);
+                    while !self.queue.is_empty() && self.queue[0].vtype.is_frame() && (pkt_buf.len() + self.queue[0].get_pkt_len() < self.pkt_size) {
+                        let frm = self.queue.pop_front().unwrap();
+                        write_multiple_frame(&mut pkt_buf, &frm);
                     }
-                    let coff = cur_off - hdr_size;
-                    if coff < (1 << 14) {
-                        bw.write_u16be(0x4000 | (coff as u16)).unwrap();
+                },
+                (false, false) => {
+                    if let VideoDataType::Slice{pkt_no, npkt, full_size: _, offset: _} = first.vtype {
+                       let id = if pkt_no == npkt { 2 } else { 0 };
+                       write_slice(&mut pkt_buf, id, &first);
                     } else {
-                        bw.write_u32be(coff as u32).unwrap();
+                        unreachable!()
                     }
-                    bw.write_byte(self.seq_no).unwrap();
-                    bw.write_buf(src).unwrap();
-                    dst
-                };
-            self.cur_slice += 1;
-            if self.cur_slice == self.nslices {
-                self.nslices = 0;
-                self.cur_slice = 0;
-                self.seq_no = self.seq_no.wrapping_add(1);
-            }
-            Some((ret, self.mi_time, first))
-        } else {
-            None
+                },
+            };
         }
+        Some((pkt_buf, first.pts, is_first))
+    }
+    fn flush(&mut self) {
+        self.flush = true;
     }
-    fn flush(&mut self) { }
     fn finish(&mut self, _bw: &mut ByteWriter) -> MuxerResult<()> {
         Ok(())
     }
+    fn set_pkt_size(&mut self, pkt_size: usize) {
+        self.pkt_size = pkt_size;
+    }
+}
+
+fn write_16_or_32(dst: &mut Vec<u8>, val: u32) {
+    if val < (1 << 14) {
+        dst.push((1 << 6) | ((val >> 8) as u8));
+        dst.push(val as u8);
+    } else {
+        dst.push((val >> 24) as u8);
+        dst.push((val >> 16) as u8);
+        dst.push((val >>  8) as u8);
+        dst.push( val        as u8);
+    }
+}
+
+fn write_multiple_frame(dst: &mut Vec<u8>, frm: &VideoData) {
+    dst.push(0xC0); // 0x3 = multiple frame
+    write_16_or_32(dst, frm.data.len() as u32);
+    write_16_or_32(dst, frm.pts as u32);
+    dst.push(frm.seq_no);
+    dst.extend_from_slice(&frm.data);
+}
+
+fn write_slice(dst: &mut Vec<u8>, id: u8, src: &VideoData) {
+    if let VideoDataType::Slice{pkt_no, npkt, full_size, offset} = src.vtype {
+        dst.push((id << 6) | (npkt >> 1));
+        dst.push((npkt << 7) | pkt_no);
+        write_16_or_32(dst, full_size);
+        if id == 0 {
+            write_16_or_32(dst, offset);
+        } else {
+            write_16_or_32(dst, src.data.len() as u32);
+        }
+        dst.push(src.seq_no);
+        dst.extend_from_slice(&src.data);
+    } else {
+        unreachable!()
+    }
 }
 
-pub fn create_video_stream(stream: &NAStream) -> MuxerResult<Box<dyn RMStreamWriter>> {
+pub fn create_video_stream(stream: &NAStream, pkt_size: usize) -> MuxerResult<Box<dyn RMStreamWriter>> {
     let info = stream.get_info();
     let cname = info.get_name();
 
@@ -163,12 +279,12 @@ pub fn create_video_stream(stream: &NAStream) -> MuxerResult<Box<dyn RMStreamWri
         if name == cname {
             return Ok(Box::new(VideoStreamWriter {
                     fcc:        *fcc,
-                    buf:        Vec::new(),
-                    nslices:    0,
-                    cur_slice:  0,
                     seq_no:     0,
                     time:       0,
                     mi_time:    0,
+                    pkt_size,
+                    queue:      VecDeque::new(),
+                    flush:      false,
                 }));
         }
     }