diff options
Diffstat (limited to 'src/storage/packet_buffer.rs')
-rw-r--r-- | src/storage/packet_buffer.rs | 402 |
1 files changed, 402 insertions, 0 deletions
diff --git a/src/storage/packet_buffer.rs b/src/storage/packet_buffer.rs new file mode 100644 index 0000000..28119fa --- /dev/null +++ b/src/storage/packet_buffer.rs @@ -0,0 +1,402 @@ +use managed::ManagedSlice; + +use crate::storage::{Full, RingBuffer}; + +use super::Empty; + +/// Size and header of a packet. +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct PacketMetadata<H> { + size: usize, + header: Option<H>, +} + +impl<H> PacketMetadata<H> { + /// Empty packet description. + pub const EMPTY: PacketMetadata<H> = PacketMetadata { + size: 0, + header: None, + }; + + fn padding(size: usize) -> PacketMetadata<H> { + PacketMetadata { + size: size, + header: None, + } + } + + fn packet(size: usize, header: H) -> PacketMetadata<H> { + PacketMetadata { + size: size, + header: Some(header), + } + } + + fn is_padding(&self) -> bool { + self.header.is_none() + } +} + +/// An UDP packet ring buffer. +#[derive(Debug)] +pub struct PacketBuffer<'a, H: 'a> { + metadata_ring: RingBuffer<'a, PacketMetadata<H>>, + payload_ring: RingBuffer<'a, u8>, +} + +impl<'a, H> PacketBuffer<'a, H> { + /// Create a new packet buffer with the provided metadata and payload storage. + /// + /// Metadata storage limits the maximum _number_ of packets in the buffer and payload + /// storage limits the maximum _total size_ of packets. + pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H> + where + MS: Into<ManagedSlice<'a, PacketMetadata<H>>>, + PS: Into<ManagedSlice<'a, u8>>, + { + PacketBuffer { + metadata_ring: RingBuffer::new(metadata_storage), + payload_ring: RingBuffer::new(payload_storage), + } + } + + /// Query whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.metadata_ring.is_empty() + } + + /// Query whether the buffer is full. + pub fn is_full(&self) -> bool { + self.metadata_ring.is_full() + } + + // There is currently no enqueue_with() because of the complexity of managing padding + // in case of failure. + + /// Enqueue a single packet with the given header into the buffer, and + /// return a reference to its payload, or return `Err(Full)` + /// if the buffer is full. + pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> { + if self.payload_ring.capacity() < size || self.metadata_ring.is_full() { + return Err(Full); + } + + // Ring is currently empty. Clear it (resetting `read_at`) to maximize + // for contiguous space. + if self.payload_ring.is_empty() { + self.payload_ring.clear(); + } + + let window = self.payload_ring.window(); + let contig_window = self.payload_ring.contiguous_window(); + + if window < size { + return Err(Full); + } else if contig_window < size { + if window - contig_window < size { + // The buffer length is larger than the current contiguous window + // and is larger than the contiguous window will be after adding + // the padding necessary to circle around to the beginning of the + // ring buffer. + return Err(Full); + } else { + // Add padding to the end of the ring buffer so that the + // contiguous window is at the beginning of the ring buffer. + *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window); + // note(discard): function does not write to the result + // enqueued padding buffer location + let _buf_enqueued = self.payload_ring.enqueue_many(contig_window); + } + } + + *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); + + let payload_buf = self.payload_ring.enqueue_many(size); + debug_assert!(payload_buf.len() == size); + Ok(payload_buf) + } + + /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet + /// is shrunk to the size returned from `f` and enqueued into the buffer. + pub fn enqueue_with_infallible<'b, F>( + &'b mut self, + max_size: usize, + header: H, + f: F, + ) -> Result<usize, Full> + where + F: FnOnce(&'b mut [u8]) -> usize, + { + if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() { + return Err(Full); + } + + let window = self.payload_ring.window(); + let contig_window = self.payload_ring.contiguous_window(); + + if window < max_size { + return Err(Full); + } else if contig_window < max_size { + if window - contig_window < max_size { + // The buffer length is larger than the current contiguous window + // and is larger than the contiguous window will be after adding + // the padding necessary to circle around to the beginning of the + // ring buffer. + return Err(Full); + } else { + // Add padding to the end of the ring buffer so that the + // contiguous window is at the beginning of the ring buffer. + *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window); + // note(discard): function does not write to the result + // enqueued padding buffer location + let _buf_enqueued = self.payload_ring.enqueue_many(contig_window); + } + } + + let (size, _) = self + .payload_ring + .enqueue_many_with(|data| (f(&mut data[..max_size]), ())); + + *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); + + Ok(size) + } + + fn dequeue_padding(&mut self) { + let _ = self.metadata_ring.dequeue_one_with(|metadata| { + if metadata.is_padding() { + // note(discard): function does not use value of dequeued padding bytes + let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size); + Ok(()) // dequeue metadata + } else { + Err(()) // don't dequeue metadata + } + }); + } + + /// Call `f` with a single packet from the buffer, and dequeue the packet if `f` + /// returns successfully, or return `Err(EmptyError)` if the buffer is empty. + pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty> + where + F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>, + { + self.dequeue_padding(); + + self.metadata_ring.dequeue_one_with(|metadata| { + self.payload_ring + .dequeue_many_with(|payload_buf| { + debug_assert!(payload_buf.len() >= metadata.size); + + match f( + metadata.header.as_mut().unwrap(), + &mut payload_buf[..metadata.size], + ) { + Ok(val) => (metadata.size, Ok(val)), + Err(err) => (0, Err(err)), + } + }) + .1 + }) + } + + /// Dequeue a single packet from the buffer, and return a reference to its payload + /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty. + pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> { + self.dequeue_padding(); + + let meta = self.metadata_ring.dequeue_one()?; + + let payload_buf = self.payload_ring.dequeue_many(meta.size); + debug_assert!(payload_buf.len() == meta.size); + Ok((meta.header.take().unwrap(), payload_buf)) + } + + /// Peek at a single packet from the buffer without removing it, and return a reference to + /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty. + /// + /// This function otherwise behaves identically to [dequeue](#method.dequeue). + pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> { + self.dequeue_padding(); + + if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() { + Ok(( + metadata.header.as_ref().unwrap(), + self.payload_ring.get_allocated(0, metadata.size), + )) + } else { + Err(Empty) + } + } + + /// Return the maximum number packets that can be stored. + pub fn packet_capacity(&self) -> usize { + self.metadata_ring.capacity() + } + + /// Return the maximum number of bytes in the payload ring buffer. + pub fn payload_capacity(&self) -> usize { + self.payload_ring.capacity() + } + + /// Reset the packet buffer and clear any staged. + #[allow(unused)] + pub(crate) fn reset(&mut self) { + self.payload_ring.clear(); + self.metadata_ring.clear(); + } +} + +#[cfg(test)] +mod test { + use super::*; + + fn buffer() -> PacketBuffer<'static, ()> { + PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16]) + } + + #[test] + fn test_simple() { + let mut buffer = buffer(); + buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); + assert_eq!(buffer.enqueue(16, ()), Err(Full)); + assert_eq!(buffer.metadata_ring.len(), 1); + assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.dequeue(), Err(Empty)); + } + + #[test] + fn test_peek() { + let mut buffer = buffer(); + assert_eq!(buffer.peek(), Err(Empty)); + buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); + assert_eq!(buffer.metadata_ring.len(), 1); + assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); + assert_eq!(buffer.peek(), Err(Empty)); + } + + #[test] + fn test_padding() { + let mut buffer = buffer(); + assert!(buffer.enqueue(6, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); + assert_eq!(buffer.metadata_ring.len(), 3); + assert!(buffer.dequeue().is_ok()); + + assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]); + assert_eq!(buffer.metadata_ring.len(), 0); + } + + #[test] + fn test_padding_with_large_payload() { + let mut buffer = buffer(); + assert!(buffer.enqueue(12, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer + .enqueue(12, ()) + .unwrap() + .copy_from_slice(b"abcdefghijkl"); + } + + #[test] + fn test_dequeue_with() { + let mut buffer = buffer(); + assert!(buffer.enqueue(6, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); + assert_eq!(buffer.metadata_ring.len(), 3); + assert!(buffer.dequeue().is_ok()); + + assert!(matches!( + buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)), + Ok(Err(_)) + )); + assert_eq!(buffer.metadata_ring.len(), 1); + + assert!(buffer + .dequeue_with(|&mut (), payload| { + assert_eq!(payload, &b"abcd"[..]); + Result::<(), ()>::Ok(()) + }) + .is_ok()); + assert_eq!(buffer.metadata_ring.len(), 0); + } + + #[test] + fn test_metadata_full_empty() { + let mut buffer = buffer(); + assert!(buffer.is_empty()); + assert!(!buffer.is_full()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(!buffer.is_empty()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(!buffer.is_full()); + assert!(!buffer.is_empty()); + assert!(buffer.enqueue(1, ()).is_ok()); + assert!(buffer.is_full()); + assert!(!buffer.is_empty()); + assert_eq!(buffer.metadata_ring.len(), 4); + assert_eq!(buffer.enqueue(1, ()), Err(Full)); + } + + #[test] + fn test_window_too_small() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert_eq!(buffer.enqueue(16, ()), Err(Full)); + assert_eq!(buffer.metadata_ring.len(), 1); + } + + #[test] + fn test_contiguous_window_too_small() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.enqueue(8, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert_eq!(buffer.enqueue(8, ()), Err(Full)); + assert_eq!(buffer.metadata_ring.len(), 1); + } + + #[test] + fn test_contiguous_window_wrap() { + let mut buffer = buffer(); + assert!(buffer.enqueue(15, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert!(buffer.enqueue(16, ()).is_ok()); + } + + #[test] + fn test_capacity_too_small() { + let mut buffer = buffer(); + assert_eq!(buffer.enqueue(32, ()), Err(Full)); + } + + #[test] + fn test_contig_window_prioritized() { + let mut buffer = buffer(); + assert!(buffer.enqueue(4, ()).is_ok()); + assert!(buffer.dequeue().is_ok()); + assert!(buffer.enqueue(5, ()).is_ok()); + } + + #[test] + fn clear() { + let mut buffer = buffer(); + + // Ensure enqueuing data in the buffer fills it somewhat. + assert!(buffer.is_empty()); + assert!(buffer.enqueue(6, ()).is_ok()); + + // Ensure that resetting the buffer causes it to be empty. + assert!(!buffer.is_empty()); + buffer.reset(); + assert!(buffer.is_empty()); + } +} |