aboutsummaryrefslogtreecommitdiff
path: root/tests/time_delay_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/time_delay_queue.rs')
-rw-r--r--tests/time_delay_queue.rs63
1 files changed, 63 insertions, 0 deletions
diff --git a/tests/time_delay_queue.rs b/tests/time_delay_queue.rs
index 9ceae34..9b7b6cc 100644
--- a/tests/time_delay_queue.rs
+++ b/tests/time_delay_queue.rs
@@ -2,6 +2,7 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use futures::StreamExt;
use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;
@@ -257,6 +258,10 @@ async fn reset_twice() {
#[tokio::test]
async fn repeatedly_reset_entry_inserted_as_expired() {
time::pause();
+
+ // Instants before the start of the test seem to break in wasm.
+ time::sleep(ms(1000)).await;
+
let mut queue = task::spawn(DelayQueue::new());
let now = Instant::now();
@@ -556,6 +561,10 @@ async fn reset_later_after_slot_starts() {
#[tokio::test]
async fn reset_inserted_expired() {
time::pause();
+
+ // Instants before the start of the test seem to break in wasm.
+ time::sleep(ms(1000)).await;
+
let mut queue = task::spawn(DelayQueue::new());
let now = Instant::now();
@@ -778,6 +787,22 @@ async fn compact_change_deadline() {
assert!(entry.is_none());
}
+#[tokio::test(start_paused = true)]
+async fn item_expiry_greater_than_wheel() {
+ // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted.
+ let mut queue = DelayQueue::new();
+ for _ in 0..2 {
+ tokio::time::advance(Duration::from_millis(1 << 35)).await;
+ queue.insert(0, Duration::from_millis(0));
+ queue.next().await;
+ }
+ // This should not panic
+ let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+ queue.insert(1, Duration::from_millis(1));
+ }));
+ assert!(no_panic.is_ok());
+}
+
#[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
#[tokio::test(start_paused = true)]
async fn remove_after_compact() {
@@ -815,6 +840,44 @@ async fn remove_after_compact_poll() {
assert!(panic.is_err());
}
+#[tokio::test(start_paused = true)]
+async fn peek() {
+ let mut queue = task::spawn(DelayQueue::new());
+
+ let now = Instant::now();
+
+ let key = queue.insert_at("foo", now + ms(5));
+ let key2 = queue.insert_at("bar", now);
+ let key3 = queue.insert_at("baz", now + ms(10));
+
+ assert_eq!(queue.peek(), Some(key2));
+
+ sleep(ms(6)).await;
+
+ assert_eq!(queue.peek(), Some(key2));
+
+ let entry = assert_ready_some!(poll!(queue));
+ assert_eq!(entry.get_ref(), &"bar");
+
+ assert_eq!(queue.peek(), Some(key));
+
+ let entry = assert_ready_some!(poll!(queue));
+ assert_eq!(entry.get_ref(), &"foo");
+
+ assert_eq!(queue.peek(), Some(key3));
+
+ assert_pending!(poll!(queue));
+
+ sleep(ms(5)).await;
+
+ assert_eq!(queue.peek(), Some(key3));
+
+ let entry = assert_ready_some!(poll!(queue));
+ assert_eq!(entry.get_ref(), &"baz");
+
+ assert!(queue.peek().is_none());
+}
+
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}