diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index faac5e78a..b54e37733 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -1,4 +1,5 @@ use crate::Stream; +use futures_core::stream::FusedStream; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::time::{Instant, Interval}; @@ -57,6 +58,12 @@ impl Stream for IntervalStream { } } +impl FusedStream for IntervalStream { + fn is_terminated(&self) -> bool { + false + } +} + impl AsRef for IntervalStream { fn as_ref(&self) -> &Interval { &self.inner diff --git a/tokio-stream/tests/stream_fuse.rs b/tokio-stream/tests/stream_fuse.rs index 9b6cf054c..8bb56d3de 100644 --- a/tokio-stream/tests/stream_fuse.rs +++ b/tokio-stream/tests/stream_fuse.rs @@ -48,3 +48,15 @@ async fn basic_usage() { assert_eq!(stream.next().await, None); assert_eq!(stream.size_hint(), (0, Some(0))); } + +#[tokio::test] +#[cfg(feature = "time")] +async fn interval_stream_is_never_terminated() { + use futures_core::stream::FusedStream; + use tokio_stream::wrappers::IntervalStream; + + let interval = tokio::time::interval(std::time::Duration::from_millis(1)); + let stream = IntervalStream::new(interval); + + assert!(!stream.is_terminated()); +}