summaryrefslogtreecommitdiff
path: root/src/stream_ext/timeout_repeating.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream_ext/timeout_repeating.rs')
-rw-r--r--src/stream_ext/timeout_repeating.rs56
1 files changed, 56 insertions, 0 deletions
diff --git a/src/stream_ext/timeout_repeating.rs b/src/stream_ext/timeout_repeating.rs
new file mode 100644
index 0000000..253d2fd
--- /dev/null
+++ b/src/stream_ext/timeout_repeating.rs
@@ -0,0 +1,56 @@
+use crate::stream_ext::Fuse;
+use crate::{Elapsed, Stream};
+use tokio::time::Interval;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct TimeoutRepeating<S> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ interval: Interval,
+ }
+}
+
+impl<S: Stream> TimeoutRepeating<S> {
+ pub(super) fn new(stream: S, interval: Interval) -> Self {
+ TimeoutRepeating {
+ stream: Fuse::new(stream),
+ interval,
+ }
+ }
+}
+
+impl<S: Stream> Stream for TimeoutRepeating<S> {
+ type Item = Result<S::Item, Elapsed>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.project();
+
+ match me.stream.poll_next(cx) {
+ Poll::Ready(v) => {
+ if v.is_some() {
+ me.interval.reset();
+ }
+ return Poll::Ready(v.map(Ok));
+ }
+ Poll::Pending => {}
+ };
+
+ ready!(me.interval.poll_tick(cx));
+ Poll::Ready(Some(Err(Elapsed::new())))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, _) = self.stream.size_hint();
+
+ // The timeout stream may insert an error an infinite number of times.
+ (lower, None)
+ }
+}