diff options
Diffstat (limited to 'tests/sink.rs')
-rw-r--r-- | tests/sink.rs | 485 |
1 files changed, 162 insertions, 323 deletions
diff --git a/tests/sink.rs b/tests/sink.rs index 597ed34..dc826bd 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -1,264 +1,221 @@ -mod sassert_next { - use futures::stream::{Stream, StreamExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::fmt; - - pub fn sassert_next<S>(s: &mut S, item: S::Item) - where - S: Stream + Unpin, - S::Item: Eq + fmt::Debug, - { - match s.poll_next_unpin(&mut panic_context()) { - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Ready(Some(e)) => assert_eq!(e, item), - Poll::Pending => panic!("stream wasn't ready"), - } +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{self, Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +fn sassert_next<S>(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), } } -mod unwrap { - use futures::task::Poll; - use std::fmt; - - pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { - match x { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), - Poll::Pending => panic!("Poll::Pending"), - } +fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), } } -mod flag_cx { - use futures::task::{self, ArcWake, Context}; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - - // An Unpark struct that records unpark events for inspection - pub struct Flag(AtomicBool); +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); - impl Flag { - pub fn new() -> Arc<Self> { - Arc::new(Self(AtomicBool::new(false))) - } - - pub fn take(&self) -> bool { - self.0.swap(false, Ordering::SeqCst) - } +impl Flag { + fn new() -> Arc<Self> { + Arc::new(Self(AtomicBool::new(false))) + } - pub fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } + fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) } - impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc<Self>) { - arc_self.set(true) - } + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) } +} - pub fn flag_cx<F, R>(f: F) -> R - where - F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, - { - let flag = Flag::new(); - let waker = task::waker_ref(&flag); - let cx = &mut Context::from_waker(&waker); - f(flag.clone(), cx) +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.set(true) } } -mod start_send_fut { - use futures::future::Future; - use futures::ready; - use futures::sink::Sink; - use futures::task::{Context, Poll}; - use std::pin::Pin; +fn flag_cx<F, R>(f: F) -> R +where + F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} - // Sends a value on an i32 channel sink - pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); +// Sends a value on an i32 channel sink +struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); - impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { - pub fn new(sink: S, item: Item) -> Self { - Self(Some(sink), Some(item)) - } +impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) } +} - impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { - type Output = Result<S, S::Error>; +impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { + type Output = Result<S, S::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let Self(inner, item) = self.get_mut(); - { - let mut inner = inner.as_mut().unwrap(); - ready!(Pin::new(&mut inner).poll_ready(cx))?; - Pin::new(&mut inner).start_send(item.take().unwrap())?; - } - Poll::Ready(Ok(inner.take().unwrap())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; } + Poll::Ready(Ok(inner.take().unwrap())) } } -mod manual_flush { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::mem; - use std::pin::Pin; - - // Immediately accepts all requests to start pushing, but completion is managed - // by manually flushing - pub struct ManualFlush<T: Unpin> { - data: Vec<T>, - waiting_tasks: Vec<Waker>, - } +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush<T: Unpin> { + data: Vec<T>, + waiting_tasks: Vec<Waker>, +} - impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { - type Error = (); +impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { + type Error = (); - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } - fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { - if let Some(item) = item { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) + fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); } + Ok(()) + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.data.is_empty() { - Poll::Ready(Ok(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Poll::Pending - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.poll_flush(cx) - } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) } +} - impl<T: Unpin> ManualFlush<T> { - pub fn new() -> Self { - Self { - data: Vec::new(), - waiting_tasks: Vec::new(), - } - } +impl<T: Unpin> ManualFlush<T> { + fn new() -> Self { + Self { data: Vec::new(), waiting_tasks: Vec::new() } + } - pub fn force_flush(&mut self) -> Vec<T> { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) + fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.wake() } + mem::replace(&mut self.data, Vec::new()) } } -mod allowance { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::cell::{Cell, RefCell}; - use std::pin::Pin; - use std::rc::Rc; +struct ManualAllow<T: Unpin> { + data: Vec<T>, + allow: Rc<Allow>, +} - pub struct ManualAllow<T: Unpin> { - pub data: Vec<T>, - allow: Rc<Allow>, - } +struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Waker>>, +} - pub struct Allow { - flag: Cell<bool>, - tasks: RefCell<Vec<Waker>>, +impl Allow { + fn new() -> Self { + Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) } } - impl Allow { - pub fn new() -> Self { - Self { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - pub fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - pub fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false } } - impl<T: Unpin> Sink<T> for ManualAllow<T> { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.allow.check(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); } + } +} - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.data.push(item); - Ok(()) - } +impl<T: Unpin> Sink<T> for ManualAllow<T> { + type Error = (); - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.allow.check(cx) { Poll::Ready(Ok(())) + } else { + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) } - pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() }; + (manual_allow, allow) } #[test] fn either_sink() { - use futures::sink::{Sink, SinkExt}; - use std::collections::VecDeque; - use std::pin::Pin; - - let mut s = if true { - Vec::<i32>::new().left_sink() - } else { - VecDeque::<i32>::new().right_sink() - }; + let mut s = + if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() }; Pin::new(&mut s).start_send(0).unwrap(); } #[test] fn vec_sink() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); @@ -269,10 +226,6 @@ fn vec_sink() { #[test] fn vecdeque_sink() { - use futures::sink::Sink; - use std::collections::VecDeque; - use std::pin::Pin; - let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); @@ -284,9 +237,6 @@ fn vecdeque_sink() { #[test] fn send() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut v = Vec::new(); block_on(v.send(0)).unwrap(); @@ -301,10 +251,6 @@ fn send() { #[test] fn send_all() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); @@ -321,15 +267,6 @@ fn send_all() { // channel is full #[test] fn mpsc_blocking_start_send() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use sassert_next::sassert_next; - use unwrap::unwrap; - let (mut tx, mut rx) = mpsc::channel::<i32>(0); block_on(future::lazy(|_| { @@ -351,19 +288,9 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn with_flush() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures::never::Never; - use futures::sink::{Sink, SinkExt}; - use std::mem; - use std::pin::Pin; - - use flag_cx::flag_cx; - use unwrap::unwrap; - let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { @@ -390,11 +317,6 @@ fn with_flush() { // test simple use of with to change data #[test] fn with_as_map() { - use futures::executor::block_on; - use futures::future; - use futures::never::Never; - use futures::sink::SinkExt; - let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -405,10 +327,6 @@ fn with_as_map() { // test simple use of with_flat_map #[test] fn with_flat_map() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -421,16 +339,6 @@ fn with_flat_map() { // Regression test for the issue #1834. #[test] fn with_propagates_poll_ready() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use std::pin::Pin; - - use flag_cx::flag_cx; - use sassert_next::sassert_next; - let (tx, mut rx) = mpsc::channel::<i32>(0); let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10)); @@ -457,14 +365,6 @@ fn with_propagates_poll_ready() { // but doesn't claim to be flushed until the underlying sink is #[test] fn with_flush_propagate() { - use futures::future::{self, FutureExt}; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - - use manual_flush::ManualFlush; - use flag_cx::flag_cx; - use unwrap::unwrap; - let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); @@ -486,21 +386,13 @@ fn with_flush_propagate() { // test that `Clone` is implemented on `with` sinks #[test] fn with_implements_clone() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::{SinkExt, StreamExt}; - let (mut tx, rx) = mpsc::channel(5); { - let mut is_positive = tx - .clone() - .with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); + let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); - let mut is_long = tx - .clone() - .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); + let mut is_long = + tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); block_on(is_positive.clone().send(-1)).unwrap(); block_on(is_long.clone().send("123456")).unwrap(); @@ -512,18 +404,12 @@ fn with_implements_clone() { block_on(tx.close()).unwrap(); - assert_eq!( - block_on(rx.collect::<Vec<_>>()), - vec![false, true, false, true, false] - ); + assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]); } // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -539,15 +425,6 @@ fn buffer_noop() { // and writing out when the underlying sink is ready #[test] fn buffer() { - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - use allowance::manual_allow; - let (sink, allow) = manual_allow::<i32>(); let sink = sink.buffer(2); @@ -567,10 +444,6 @@ fn buffer() { #[test] fn fanout_smoke() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); @@ -582,16 +455,6 @@ fn fanout_smoke() { #[test] fn fanout_backpressure() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); @@ -624,12 +487,6 @@ fn fanout_backpressure() { #[test] fn sink_map_err() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); @@ -639,20 +496,11 @@ fn sink_map_err() { } let tx = mpsc::channel(0).0; - assert_eq!( - Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), - Err(()) - ); + assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(())); } #[test] fn sink_unfold() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::sink::{self, Sink, SinkExt}; - use futures::task::Poll; - block_on(poll_fn(|cx| { let (tx, mut rx) = mpsc::channel(1); let unfold = sink::unfold((), |(), i: i32| { @@ -685,14 +533,8 @@ fn sink_unfold() { #[test] fn err_into() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkErrInto, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - #[derive(Copy, Clone, Debug, PartialEq, Eq)] - pub struct ErrIntoTest; + struct ErrIntoTest; impl From<mpsc::SendError> for ErrIntoTest { fn from(_: mpsc::SendError) -> Self { @@ -709,8 +551,5 @@ fn err_into() { } let tx = mpsc::channel(0).0; - assert_eq!( - Pin::new(&mut tx.sink_err_into()).start_send(()), - Err(ErrIntoTest) - ); + assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest)); } |