summaryrefslogtreecommitdiff
path: root/src/storage/packet_buffer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage/packet_buffer.rs')
-rw-r--r--src/storage/packet_buffer.rs402
1 files changed, 402 insertions, 0 deletions
diff --git a/src/storage/packet_buffer.rs b/src/storage/packet_buffer.rs
new file mode 100644
index 0000000..28119fa
--- /dev/null
+++ b/src/storage/packet_buffer.rs
@@ -0,0 +1,402 @@
+use managed::ManagedSlice;
+
+use crate::storage::{Full, RingBuffer};
+
+use super::Empty;
+
+/// Size and header of a packet.
+#[derive(Debug, Clone, Copy)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub struct PacketMetadata<H> {
+ size: usize,
+ header: Option<H>,
+}
+
+impl<H> PacketMetadata<H> {
+ /// Empty packet description.
+ pub const EMPTY: PacketMetadata<H> = PacketMetadata {
+ size: 0,
+ header: None,
+ };
+
+ fn padding(size: usize) -> PacketMetadata<H> {
+ PacketMetadata {
+ size: size,
+ header: None,
+ }
+ }
+
+ fn packet(size: usize, header: H) -> PacketMetadata<H> {
+ PacketMetadata {
+ size: size,
+ header: Some(header),
+ }
+ }
+
+ fn is_padding(&self) -> bool {
+ self.header.is_none()
+ }
+}
+
+/// An UDP packet ring buffer.
+#[derive(Debug)]
+pub struct PacketBuffer<'a, H: 'a> {
+ metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
+ payload_ring: RingBuffer<'a, u8>,
+}
+
+impl<'a, H> PacketBuffer<'a, H> {
+ /// Create a new packet buffer with the provided metadata and payload storage.
+ ///
+ /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
+ /// storage limits the maximum _total size_ of packets.
+ pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
+ where
+ MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
+ PS: Into<ManagedSlice<'a, u8>>,
+ {
+ PacketBuffer {
+ metadata_ring: RingBuffer::new(metadata_storage),
+ payload_ring: RingBuffer::new(payload_storage),
+ }
+ }
+
+ /// Query whether the buffer is empty.
+ pub fn is_empty(&self) -> bool {
+ self.metadata_ring.is_empty()
+ }
+
+ /// Query whether the buffer is full.
+ pub fn is_full(&self) -> bool {
+ self.metadata_ring.is_full()
+ }
+
+ // There is currently no enqueue_with() because of the complexity of managing padding
+ // in case of failure.
+
+ /// Enqueue a single packet with the given header into the buffer, and
+ /// return a reference to its payload, or return `Err(Full)`
+ /// if the buffer is full.
+ pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> {
+ if self.payload_ring.capacity() < size || self.metadata_ring.is_full() {
+ return Err(Full);
+ }
+
+ // Ring is currently empty. Clear it (resetting `read_at`) to maximize
+ // for contiguous space.
+ if self.payload_ring.is_empty() {
+ self.payload_ring.clear();
+ }
+
+ let window = self.payload_ring.window();
+ let contig_window = self.payload_ring.contiguous_window();
+
+ if window < size {
+ return Err(Full);
+ } else if contig_window < size {
+ if window - contig_window < size {
+ // The buffer length is larger than the current contiguous window
+ // and is larger than the contiguous window will be after adding
+ // the padding necessary to circle around to the beginning of the
+ // ring buffer.
+ return Err(Full);
+ } else {
+ // Add padding to the end of the ring buffer so that the
+ // contiguous window is at the beginning of the ring buffer.
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
+ // note(discard): function does not write to the result
+ // enqueued padding buffer location
+ let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
+ }
+ }
+
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
+
+ let payload_buf = self.payload_ring.enqueue_many(size);
+ debug_assert!(payload_buf.len() == size);
+ Ok(payload_buf)
+ }
+
+ /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet
+ /// is shrunk to the size returned from `f` and enqueued into the buffer.
+ pub fn enqueue_with_infallible<'b, F>(
+ &'b mut self,
+ max_size: usize,
+ header: H,
+ f: F,
+ ) -> Result<usize, Full>
+ where
+ F: FnOnce(&'b mut [u8]) -> usize,
+ {
+ if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() {
+ return Err(Full);
+ }
+
+ let window = self.payload_ring.window();
+ let contig_window = self.payload_ring.contiguous_window();
+
+ if window < max_size {
+ return Err(Full);
+ } else if contig_window < max_size {
+ if window - contig_window < max_size {
+ // The buffer length is larger than the current contiguous window
+ // and is larger than the contiguous window will be after adding
+ // the padding necessary to circle around to the beginning of the
+ // ring buffer.
+ return Err(Full);
+ } else {
+ // Add padding to the end of the ring buffer so that the
+ // contiguous window is at the beginning of the ring buffer.
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
+ // note(discard): function does not write to the result
+ // enqueued padding buffer location
+ let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
+ }
+ }
+
+ let (size, _) = self
+ .payload_ring
+ .enqueue_many_with(|data| (f(&mut data[..max_size]), ()));
+
+ *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
+
+ Ok(size)
+ }
+
+ fn dequeue_padding(&mut self) {
+ let _ = self.metadata_ring.dequeue_one_with(|metadata| {
+ if metadata.is_padding() {
+ // note(discard): function does not use value of dequeued padding bytes
+ let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size);
+ Ok(()) // dequeue metadata
+ } else {
+ Err(()) // don't dequeue metadata
+ }
+ });
+ }
+
+ /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
+ /// returns successfully, or return `Err(EmptyError)` if the buffer is empty.
+ pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty>
+ where
+ F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,
+ {
+ self.dequeue_padding();
+
+ self.metadata_ring.dequeue_one_with(|metadata| {
+ self.payload_ring
+ .dequeue_many_with(|payload_buf| {
+ debug_assert!(payload_buf.len() >= metadata.size);
+
+ match f(
+ metadata.header.as_mut().unwrap(),
+ &mut payload_buf[..metadata.size],
+ ) {
+ Ok(val) => (metadata.size, Ok(val)),
+ Err(err) => (0, Err(err)),
+ }
+ })
+ .1
+ })
+ }
+
+ /// Dequeue a single packet from the buffer, and return a reference to its payload
+ /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
+ pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> {
+ self.dequeue_padding();
+
+ let meta = self.metadata_ring.dequeue_one()?;
+
+ let payload_buf = self.payload_ring.dequeue_many(meta.size);
+ debug_assert!(payload_buf.len() == meta.size);
+ Ok((meta.header.take().unwrap(), payload_buf))
+ }
+
+ /// Peek at a single packet from the buffer without removing it, and return a reference to
+ /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty.
+ ///
+ /// This function otherwise behaves identically to [dequeue](#method.dequeue).
+ pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> {
+ self.dequeue_padding();
+
+ if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
+ Ok((
+ metadata.header.as_ref().unwrap(),
+ self.payload_ring.get_allocated(0, metadata.size),
+ ))
+ } else {
+ Err(Empty)
+ }
+ }
+
+ /// Return the maximum number packets that can be stored.
+ pub fn packet_capacity(&self) -> usize {
+ self.metadata_ring.capacity()
+ }
+
+ /// Return the maximum number of bytes in the payload ring buffer.
+ pub fn payload_capacity(&self) -> usize {
+ self.payload_ring.capacity()
+ }
+
+ /// Reset the packet buffer and clear any staged.
+ #[allow(unused)]
+ pub(crate) fn reset(&mut self) {
+ self.payload_ring.clear();
+ self.metadata_ring.clear();
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ fn buffer() -> PacketBuffer<'static, ()> {
+ PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
+ }
+
+ #[test]
+ fn test_simple() {
+ let mut buffer = buffer();
+ buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
+ assert_eq!(buffer.enqueue(16, ()), Err(Full));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
+ assert_eq!(buffer.dequeue(), Err(Empty));
+ }
+
+ #[test]
+ fn test_peek() {
+ let mut buffer = buffer();
+ assert_eq!(buffer.peek(), Err(Empty));
+ buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
+ assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
+ assert_eq!(buffer.peek(), Err(Empty));
+ }
+
+ #[test]
+ fn test_padding() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(6, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
+ assert_eq!(buffer.metadata_ring.len(), 3);
+ assert!(buffer.dequeue().is_ok());
+
+ assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
+ assert_eq!(buffer.metadata_ring.len(), 0);
+ }
+
+ #[test]
+ fn test_padding_with_large_payload() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(12, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ buffer
+ .enqueue(12, ())
+ .unwrap()
+ .copy_from_slice(b"abcdefghijkl");
+ }
+
+ #[test]
+ fn test_dequeue_with() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(6, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
+ assert_eq!(buffer.metadata_ring.len(), 3);
+ assert!(buffer.dequeue().is_ok());
+
+ assert!(matches!(
+ buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)),
+ Ok(Err(_))
+ ));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+
+ assert!(buffer
+ .dequeue_with(|&mut (), payload| {
+ assert_eq!(payload, &b"abcd"[..]);
+ Result::<(), ()>::Ok(())
+ })
+ .is_ok());
+ assert_eq!(buffer.metadata_ring.len(), 0);
+ }
+
+ #[test]
+ fn test_metadata_full_empty() {
+ let mut buffer = buffer();
+ assert!(buffer.is_empty());
+ assert!(!buffer.is_full());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(!buffer.is_empty());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(!buffer.is_full());
+ assert!(!buffer.is_empty());
+ assert!(buffer.enqueue(1, ()).is_ok());
+ assert!(buffer.is_full());
+ assert!(!buffer.is_empty());
+ assert_eq!(buffer.metadata_ring.len(), 4);
+ assert_eq!(buffer.enqueue(1, ()), Err(Full));
+ }
+
+ #[test]
+ fn test_window_too_small() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(4, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert_eq!(buffer.enqueue(16, ()), Err(Full));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ }
+
+ #[test]
+ fn test_contiguous_window_too_small() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(4, ()).is_ok());
+ assert!(buffer.enqueue(8, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert_eq!(buffer.enqueue(8, ()), Err(Full));
+ assert_eq!(buffer.metadata_ring.len(), 1);
+ }
+
+ #[test]
+ fn test_contiguous_window_wrap() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(15, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert!(buffer.enqueue(16, ()).is_ok());
+ }
+
+ #[test]
+ fn test_capacity_too_small() {
+ let mut buffer = buffer();
+ assert_eq!(buffer.enqueue(32, ()), Err(Full));
+ }
+
+ #[test]
+ fn test_contig_window_prioritized() {
+ let mut buffer = buffer();
+ assert!(buffer.enqueue(4, ()).is_ok());
+ assert!(buffer.dequeue().is_ok());
+ assert!(buffer.enqueue(5, ()).is_ok());
+ }
+
+ #[test]
+ fn clear() {
+ let mut buffer = buffer();
+
+ // Ensure enqueuing data in the buffer fills it somewhat.
+ assert!(buffer.is_empty());
+ assert!(buffer.enqueue(6, ()).is_ok());
+
+ // Ensure that resetting the buffer causes it to be empty.
+ assert!(!buffer.is_empty());
+ buffer.reset();
+ assert!(buffer.is_empty());
+ }
+}