Initial testing utilities crate

This commit is contained in:
Wim Looman
2018-08-03 19:18:06 +02:00
parent 09617ac0b1
commit 8a5f1a294a
36 changed files with 835 additions and 282 deletions

View File

@@ -52,6 +52,7 @@ matrix:
- cargo build --manifest-path futures-io/Cargo.toml --all-features
- cargo build --manifest-path futures-sink/Cargo.toml --all-features
- cargo build --manifest-path futures-util/Cargo.toml --all-features
- cargo build --manifest-path futures-test/Cargo.toml --all-features
- name: cargo build --all-features (with minimal versions)
rust: nightly
@@ -81,6 +82,7 @@ matrix:
- RUSTDOCFLAGS=-Dwarnings cargo doc --all
--exclude futures-preview
--exclude futures-executor-preview
--exclude futures-test-preview
- cargo doc
script:

View File

@@ -7,4 +7,5 @@ members = [
"futures-io",
"futures-sink",
"futures-util",
"futures-test",
]

26
futures-test/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
cargo-features = ["edition"]
[package]
name = "futures-test-preview"
edition = "2018"
version = "0.3.0-alpha.3"
authors = []
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang-nursery/futures-rs"
homepage = "https://rust-lang-nursery.github.io/futures-rs"
documentation = "https://rust-lang-nursery.github.io/futures-doc/0.3.0-alpha.3/futures_test"
description = """
Common utilities for testing components built off futures-rs.
"""
[lib]
name = "futures_test"
[dependencies]
futures-core-preview = { version = "0.3.0-alpha.2", path = "../futures-core", default-features = false }
futures-util-preview = { version = "0.3.0-alpha.2", path = "../futures-util", default-features = false }
futures-executor-preview = { version = "0.3.0-alpha.2", path = "../futures-executor", default-features = false }
pin-utils = { version = "0.1.0-alpha.1", default-features = false }
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.2", path = "../futures", default-features = false, features = ["std"] }

1
futures-test/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
futures-test/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

125
futures-test/src/assert.rs Normal file
View File

@@ -0,0 +1,125 @@
use futures_core::stream::Stream;
use std::marker::Unpin;
#[doc(hidden)]
pub fn assert_is_unpin_stream<S: Stream + Unpin>(_: &mut S) {}
/// Assert that the next poll to the provided stream will return
/// [`Poll::Pending`][futures_core::task::Poll::Pending].
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::stream;
/// use futures_test::future::FutureTestExt;
/// use futures_test::{
/// assert_stream_pending, assert_stream_next, assert_stream_done,
/// };
/// use pin_utils::pin_mut;
///
/// let mut stream = stream::once((async { 5 }).delay());
/// pin_mut!(stream);
///
/// assert_stream_pending!(stream);
/// assert_stream_next!(stream, 5);
/// assert_stream_done!(stream);
/// ```
#[macro_export]
macro_rules! assert_stream_pending {
($stream:expr) => {{
let mut stream = &mut $stream;
$crate::assert::assert_is_unpin_stream(stream);
let stream = $crate::std_reexport::mem::PinMut::new(stream);
let cx = &mut $crate::task::no_spawn_context();
let poll = $crate::futures_core_reexport::stream::Stream::poll_next(
stream, cx,
);
if poll.is_ready() {
panic!("assertion failed: stream is not pending");
}
}};
}
/// Assert that the next poll to the provided stream will return
/// [`Poll::Ready`][futures_core::task::Poll::Ready] with the provided item.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::stream;
/// use futures_test::future::FutureTestExt;
/// use futures_test::{
/// assert_stream_pending, assert_stream_next, assert_stream_done,
/// };
/// use pin_utils::pin_mut;
///
/// let mut stream = stream::once((async { 5 }).delay());
/// pin_mut!(stream);
///
/// assert_stream_pending!(stream);
/// assert_stream_next!(stream, 5);
/// assert_stream_done!(stream);
/// ```
#[macro_export]
macro_rules! assert_stream_next {
($stream:expr, $item:expr) => {{
let mut stream = &mut $stream;
$crate::assert::assert_is_unpin_stream(stream);
let stream = $crate::std_reexport::mem::PinMut::new(stream);
let cx = &mut $crate::task::no_spawn_context();
match $crate::futures_core_reexport::stream::Stream::poll_next(stream, cx) {
$crate::futures_core_reexport::task::Poll::Ready(Some(x)) => {
assert_eq!(x, $item);
}
$crate::futures_core_reexport::task::Poll::Ready(None) => {
panic!("assertion failed: expected stream to provide item but stream is at its end");
}
$crate::futures_core_reexport::task::Poll::Pending => {
panic!("assertion failed: expected stream to provide item but stream wasn't ready");
}
}
}}
}
/// Assert that the next poll to the provided stream will return an empty
/// [`Poll::Ready`][futures_core::task::Poll::Ready] signalling the
/// completion of the stream.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::stream;
/// use futures_test::future::FutureTestExt;
/// use futures_test::{
/// assert_stream_pending, assert_stream_next, assert_stream_done,
/// };
/// use pin_utils::pin_mut;
///
/// let mut stream = stream::once((async { 5 }).delay());
/// pin_mut!(stream);
///
/// assert_stream_pending!(stream);
/// assert_stream_next!(stream, 5);
/// assert_stream_done!(stream);
/// ```
#[macro_export]
macro_rules! assert_stream_done {
($stream:expr) => {{
let mut stream = &mut $stream;
$crate::assert::assert_is_unpin_stream(stream);
let stream = $crate::std_reexport::mem::PinMut::new(stream);
let cx = &mut $crate::task::no_spawn_context();
match $crate::futures_core_reexport::stream::Stream::poll_next(stream, cx) {
$crate::futures_core_reexport::task::Poll::Ready(Some(_)) => {
panic!("assertion failed: expected stream to be done but had more elements");
}
$crate::futures_core_reexport::task::Poll::Ready(None) => {}
$crate::futures_core_reexport::task::Poll::Pending => {
panic!("assertion failed: expected stream to be done but was pending");
}
}
}}
}

View File

@@ -0,0 +1,44 @@
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use std::mem::PinMut;
/// Combinator that guarantees one [`Poll::Pending`] before polling its inner
/// future.
///
/// This is created by the [`FutureTestExt::delay`][super::FutureTestExt::delay]
/// method.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Delayed<Fut: Future> {
future: Fut,
polled_before: bool,
}
impl<Fut: Future> Delayed<Fut> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(polled_before: bool);
pub(super) fn new(future: Fut) -> Self {
Self {
future,
polled_before: false,
}
}
}
impl<Fut: Future> Future for Delayed<Fut> {
type Output = Fut::Output;
fn poll(
mut self: PinMut<Self>,
cx: &mut task::Context,
) -> Poll<Self::Output> {
if *self.polled_before() {
self.future().poll(cx)
} else {
*self.polled_before() = true;
cx.waker().wake();
Poll::Pending
}
}
}

View File

@@ -0,0 +1,65 @@
//! Additional combinators for testing futures.
mod delay;
use self::delay::Delayed;
use futures_core::future::Future;
use futures_executor;
use std::thread;
/// Additional combinators for testing futures.
pub trait FutureTestExt: Future {
/// Introduces one [`Poll::Pending`][futures_core::task::Poll::Pending]
/// before polling the given future
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::task::Poll;
/// use futures::future::FutureExt;
/// use futures_test::task;
/// use futures_test::future::FutureTestExt;
/// use pin_utils::pin_mut;
///
/// let future = (async { 5 }).delay();
/// pin_mut!(future);
///
/// let cx = &mut task::no_spawn_context();
///
/// assert_eq!(future.poll_unpin(cx), Poll::Pending);
/// assert_eq!(future.poll_unpin(cx), Poll::Ready(5));
/// ```
fn delay(self) -> Delayed<Self>
where
Self: Sized,
{
delay::Delayed::new(self)
}
/// Runs this future on a dedicated executor running in a background thread.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::channel::oneshot;
/// use futures::executor::block_on;
/// use futures_test::future::FutureTestExt;
///
/// let (tx, rx) = oneshot::channel::<i32>();
///
/// (async { tx.send(5).unwrap() }).run_in_background();
///
/// assert_eq!(block_on(rx), Ok(5));
/// ```
fn run_in_background(self)
where
Self: Sized + Send + 'static,
Self::Output: Send,
{
thread::spawn(|| futures_executor::block_on(self));
}
}
impl<Fut> FutureTestExt for Fut where Fut: Future {}

31
futures-test/src/lib.rs Normal file
View File

@@ -0,0 +1,31 @@
//! Utilities to make testing [`Future`s][futures_core::Future] easier
#![feature(
arbitrary_self_types,
async_await,
await_macro,
futures_api,
pin,
)]
#![warn(missing_docs, missing_debug_implementations)]
#![deny(bare_trait_objects)]
#![doc(
html_root_url = "https://rust-lang-nursery.github.io/futures-doc/0.3.0-alpha.3/futures_test"
)]
#[doc(hidden)]
pub use std as std_reexport;
#[doc(hidden)]
pub extern crate futures_core as futures_core_reexport;
#[macro_use]
extern crate pin_utils;
#[macro_use]
#[doc(hidden)]
pub mod assert;
pub mod task;
pub mod future;

View File

@@ -0,0 +1,65 @@
use crate::task::{spawn, wake};
use futures_core::task::Context;
/// Create a new [`task::Context`][futures_core::task::Context] where both
/// the [`waker`][futures_core::task::Context::waker] and
/// [`spawner`][futures_core::task::Context::spawner] will both panic if used.
///
/// # Examples
///
/// ```should_panic
/// #![feature(futures_api)]
/// use futures_test::task;
///
/// let cx = task::panic_context();
/// cx.waker().wake(); // Will panic
/// ```
pub fn panic_context() -> Context<'static> {
Context::new(wake::Panic::local_waker_ref(), spawn::Panic::spawn_mut())
}
/// Create a new [`task::Context`][futures_core::task::Context] where the
/// [`waker`][futures_core::task::Context::waker] will ignore any calls to
/// `wake` while the [`spawner`][futures_core::task::Context::spawner] will
/// panic if used.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::future::Future;
/// use futures::task::Poll;
/// use futures_test::task::no_spawn_context;
/// use pin_utils::pin_mut;
///
/// let mut future = async { 5 };
/// pin_mut!(future);
///
/// assert_eq!(future.poll(&mut no_spawn_context()), Poll::Ready(5));
/// ```
pub fn no_spawn_context() -> Context<'static> {
Context::new(wake::Noop::local_waker_ref(), spawn::Panic::spawn_mut())
}
/// Create a new [`task::Context`][futures_core::task::Context] where the
/// [`waker`][futures_core::task::Context::waker] and
/// [`spawner`][futures_core::task::Context::spawner] will both ignore any
/// uses.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api, pin)]
/// use futures::future::Future;
/// use futures::task::Poll;
/// use futures_test::task::noop_context;
/// use pin_utils::pin_mut;
///
/// let mut future = async { 5 };
/// pin_mut!(future);
///
/// assert_eq!(future.poll(&mut noop_context()), Poll::Ready(5));
/// ```
pub fn noop_context() -> Context<'static> {
Context::new(wake::Noop::local_waker_ref(), spawn::Noop::spawn_mut())
}

View File

@@ -0,0 +1,20 @@
//! Task related utilities.
//!
//! In the majority of use cases you can use the functions exported below to
//! create a [`Context`][futures_core::task::Context] appropriate to use in your
//! tests.
//!
//! For more complex test cases you can take a `Context` from one of these
//! functions and then use the
//! [`Context::with_waker`][futures_core::task::Context::with_waker] and
//! [`Context::with_spawner`][futures_core::task::Context::with_spawner]
//! methods to change the implementations used. See the examples on
//! the provided implementations in [`wake`] and
//! [`spawn`] for more details.
mod context;
pub mod spawn;
pub mod wake;
pub use self::context::{no_spawn_context, noop_context, panic_context};

View File

@@ -0,0 +1,11 @@
//! Implementations of [`Spawn`][futures_core::task::Spawn] with various
//! behaviour for test purposes.
mod noop;
pub use self::noop::Noop;
mod panic;
pub use self::panic::Panic;
mod record;
pub use self::record::Record;

View File

@@ -0,0 +1,55 @@
use futures_core::future::FutureObj;
use futures_core::task::{Spawn, SpawnObjError};
use std::cell::UnsafeCell;
/// An implementation of [`Spawn`][futures_core::task::Spawn] that
/// discards spawned futures when used.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api)]
/// use futures::task::SpawnExt;
/// use futures_test::task::{panic_context, spawn};
///
/// let mut cx = panic_context();
/// let mut spawn = spawn::Noop::new();
/// let cx = &mut cx.with_spawner(&mut spawn);
///
/// cx.spawner().spawn(async { });
/// ```
#[derive(Debug)]
pub struct Noop {
_reserved: (),
}
impl Noop {
/// Create a new instance
pub fn new() -> Self {
Self { _reserved: () }
}
/// Get a thread local reference to a singleton instance of [`Noop`] as a
/// [`Spawn`].
pub fn spawn_mut() -> &'static mut dyn Spawn {
thread_local! {
static INSTANCE: UnsafeCell<Noop> = UnsafeCell::new(Noop { _reserved: () });
}
INSTANCE.with(|i| unsafe { &mut *i.get() })
}
}
impl Spawn for Noop {
fn spawn_obj(
&mut self,
_future: FutureObj<'static, ()>,
) -> Result<(), SpawnObjError> {
Ok(())
}
}
impl Default for Noop {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,55 @@
use futures_core::future::FutureObj;
use futures_core::task::{Spawn, SpawnObjError};
use std::cell::UnsafeCell;
/// An implementation of [`Spawn`][futures_core::task::Spawn] that panics
/// when used.
///
/// # Examples
///
/// ```should_panic
/// #![feature(async_await, futures_api)]
/// use futures::task::SpawnExt;
/// use futures_test::task::{noop_context, spawn};
///
/// let mut cx = noop_context();
/// let mut spawn = spawn::Panic::new();
/// let cx = &mut cx.with_spawner(&mut spawn);
///
/// cx.spawner().spawn(async { }); // Will panic
/// ```
#[derive(Debug)]
pub struct Panic {
_reserved: (),
}
impl Panic {
/// Create a new instance
pub fn new() -> Self {
Self { _reserved: () }
}
/// Get a thread local reference to a singleton instance of [`Panic`] as a
/// [`Spawn`].
pub fn spawn_mut() -> &'static mut dyn Spawn {
thread_local! {
static INSTANCE: UnsafeCell<Panic> = UnsafeCell::new(Panic { _reserved: () });
}
INSTANCE.with(|i| unsafe { &mut *i.get() })
}
}
impl Spawn for Panic {
fn spawn_obj(
&mut self,
_future: FutureObj<'static, ()>,
) -> Result<(), SpawnObjError> {
panic!("should not spawn")
}
}
impl Default for Panic {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,57 @@
use futures_core::future::FutureObj;
use futures_core::task::{Spawn, SpawnObjError};
/// An implementation of [`Spawn`][futures_core::task::Spawn] that records
/// any [`Future`][futures_core::future::Future]s spawned on it.
///
/// # Examples
///
/// ```
/// #![feature(async_await, futures_api)]
/// use futures::task::SpawnExt;
/// use futures_test::task::{panic_context, spawn};
///
/// let mut recorder = spawn::Record::new();
///
/// {
/// let mut cx = panic_context();
/// let cx = &mut cx.with_spawner(&mut recorder);
/// cx.spawner().spawn(async { });
/// }
///
/// assert_eq!(recorder.spawned().len(), 1);
/// ```
#[derive(Debug)]
pub struct Record {
spawned: Vec<FutureObj<'static, ()>>,
}
impl Record {
/// Create a new instance
pub fn new() -> Self {
Self {
spawned: Vec::new(),
}
}
/// Inspect any futures that were spawned onto this [`Spawn`].
pub fn spawned(&self) -> &[FutureObj<'static, ()>] {
&self.spawned
}
}
impl Spawn for Record {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnObjError> {
self.spawned.push(future);
Ok(())
}
}
impl Default for Record {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,58 @@
use futures_core::task::{local_waker, LocalWaker, Wake};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
/// An implementation of [`Wake`][futures_core::task::Wake] that tracks how many
/// times it has been woken.
///
/// # Examples
///
/// ```
/// #![feature(futures_api)]
/// use futures_test::task::{panic_context, wake};
///
/// let (wake_counter, local_waker) = wake::Counter::new();
/// let mut cx = panic_context();
/// let cx = &mut cx.with_waker(&local_waker);
///
/// assert_eq!(wake_counter.count(), 0);
///
/// cx.waker().wake();
/// cx.waker().wake();
///
/// assert_eq!(wake_counter.count(), 2);
/// ```
#[derive(Debug)]
pub struct Counter {
count: AtomicUsize,
}
impl Counter {
/// Create a new instance with an associated [`LocalWaker`]
pub fn new() -> (Arc<Self>, LocalWaker) {
let arc = Arc::new(Self {
count: AtomicUsize::new(0),
});
let local_waker = unsafe { local_waker(arc.clone()) };
(arc, local_waker)
}
/// Get the number of times this [`Counter`] has been woken
pub fn count(&self) -> usize {
self.count.load(Ordering::SeqCst)
}
}
impl Default for Counter {
fn default() -> Self {
Self {
count: AtomicUsize::new(0),
}
}
}
impl Wake for Counter {
fn wake(arc_self: &Arc<Self>) {
arc_self.count.fetch_add(1, Ordering::SeqCst);
}
}

View File

@@ -0,0 +1,11 @@
//! Implementations of [`Wake`][futures_core::task::Wake] with various behaviour
//! for test purposes.
mod counter;
pub use self::counter::Counter;
mod noop;
pub use self::noop::Noop;
mod panic;
pub use self::panic::Panic;

View File

@@ -0,0 +1,76 @@
use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker};
use std::cell::UnsafeCell;
use std::ptr::NonNull;
use std::sync::Arc;
/// An implementation of [`Wake`][futures_core::task::Wake] that does nothing
/// when woken.
///
/// # Examples
///
/// ```
/// #![feature(futures_api)]
/// use futures_test::task::{panic_context, wake};
///
/// let mut cx = panic_context();
/// let cx = &mut cx.with_waker(wake::Noop::local_waker_ref());
///
/// cx.waker().wake();
/// ```
#[derive(Debug)]
pub struct Noop {
_reserved: (),
}
impl Noop {
/// Create a new instance
pub fn new() -> Self {
Self { _reserved: () }
}
fn unsafe_wake() -> NonNull<dyn UnsafeWake> {
static mut INSTANCE: Noop = Noop { _reserved: () };
unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) }
}
/// Create a new [`Waker`] referencing a singleton instance of [`Noop`].
pub fn waker() -> Waker {
unsafe { Waker::new(Self::unsafe_wake()) }
}
/// Create a new [`LocalWaker`] referencing a singleton instance of
/// [`Noop`].
pub fn local_waker() -> LocalWaker {
unsafe { LocalWaker::new(Self::unsafe_wake()) }
}
/// Get a thread local reference to a [`LocalWaker`] referencing a singleton
/// instance of [`Noop`].
pub fn local_waker_ref() -> &'static LocalWaker {
thread_local! {
static LOCAL_WAKER_INSTANCE: UnsafeCell<LocalWaker> =
UnsafeCell::new(Noop::local_waker());
}
LOCAL_WAKER_INSTANCE.with(|l| unsafe { &mut *l.get() })
}
}
impl Default for Noop {
fn default() -> Self {
Self::new()
}
}
impl Wake for Noop {
fn wake(_arc_self: &Arc<Self>) {}
}
unsafe impl UnsafeWake for Noop {
unsafe fn clone_raw(&self) -> Waker {
Noop::waker()
}
unsafe fn drop_raw(&self) {}
unsafe fn wake(&self) {}
}

View File

@@ -0,0 +1,80 @@
use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker};
use std::cell::UnsafeCell;
use std::ptr::NonNull;
use std::sync::Arc;
/// An implementation of [`Wake`][futures_core::task::Wake] that panics when
/// woken.
///
/// # Examples
///
/// ```should_panic
/// #![feature(futures_api)]
/// use futures_test::task::{noop_context, wake};
///
/// let mut cx = noop_context();
/// let cx = &mut cx.with_waker(wake::Panic::local_waker_ref());
///
/// cx.waker().wake(); // Will panic
/// ```
#[derive(Debug)]
pub struct Panic {
_reserved: (),
}
impl Panic {
/// Create a new instance
pub fn new() -> Self {
Self { _reserved: () }
}
fn unsafe_wake() -> NonNull<dyn UnsafeWake> {
static mut INSTANCE: Panic = Panic { _reserved: () };
unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) }
}
/// Create a new [`Waker`] referencing a singleton instance of [`Panic`].
pub fn waker() -> Waker {
unsafe { Waker::new(Self::unsafe_wake()) }
}
/// Create a new [`LocalWaker`] referencing a singleton instance of
/// [`Panic`].
pub fn local_waker() -> LocalWaker {
unsafe { LocalWaker::new(Self::unsafe_wake()) }
}
/// Get a thread local reference to a [`LocalWaker`] referencing a singleton
/// instance of [`Panic`].
pub fn local_waker_ref() -> &'static LocalWaker {
thread_local! {
static LOCAL_WAKER_INSTANCE: UnsafeCell<LocalWaker> =
UnsafeCell::new(Panic::local_waker());
}
LOCAL_WAKER_INSTANCE.with(|l| unsafe { &mut *l.get() })
}
}
impl Default for Panic {
fn default() -> Self {
Self::new()
}
}
impl Wake for Panic {
fn wake(_arc_self: &Arc<Self>) {
panic!("should not be woken")
}
}
unsafe impl UnsafeWake for Panic {
unsafe fn clone_raw(&self) -> Waker {
Panic::waker()
}
unsafe fn drop_raw(&self) {}
unsafe fn wake(&self) {
panic!("should not be woken")
}
}

View File

@@ -34,6 +34,7 @@ futures-util-preview = { path = "../futures-util", version = "0.3.0-alpha.3", de
[dev-dependencies]
pin-utils = "0.1.0-alpha.1"
futures-test-preview = { path = "../futures-test", version = "0.3.0-alpha.3", default-features = false }
[features]
nightly = ["futures-util-preview/nightly"]

View File

@@ -1,13 +1,10 @@
#![feature(pin, arbitrary_self_types, futures_api)]
use futures::FutureExt;
use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::{abortable, Aborted};
use futures::future::{abortable, Aborted, FutureExt};
use futures::task::Poll;
mod support;
use self::support::with_counter_waker_context;
use futures_test::task::{panic_context, wake};
#[test]
fn abortable_works() {
@@ -23,14 +20,15 @@ fn abortable_awakens() {
let (_tx, a_rx) = oneshot::channel::<()>();
let (mut abortable_rx, abort_handle) = abortable(a_rx);
with_counter_waker_context(|cx, counter| {
assert_eq!(0, counter.get());
assert_eq!(Poll::Pending, abortable_rx.poll_unpin(cx));
assert_eq!(0, counter.get());
abort_handle.abort();
assert_eq!(1, counter.get());
assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(cx));
})
let (wake_counter, local_waker) = wake::Counter::new();
let mut cx = panic_context();
let cx = &mut cx.with_waker(&local_waker);
assert_eq!(0, wake_counter.count());
assert_eq!(Poll::Pending, abortable_rx.poll_unpin(cx));
assert_eq!(0, wake_counter.count());
abort_handle.abort();
assert_eq!(1, wake_counter.count());
assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(cx));
}
#[test]

View File

@@ -1,11 +1,9 @@
#![feature(pin, arbitrary_self_types, futures_api)]
use futures::future::{self, FutureExt, TryFutureExt};
use futures_test::future::FutureTestExt;
use std::sync::mpsc;
mod support;
use self::support::RunInBackgroundExt;
#[test]
fn basic_future_combinators() {
let (tx1, rx) = mpsc::channel();

View File

@@ -3,13 +3,11 @@
use futures::channel::oneshot;
use futures::future::{self, Future, FutureExt, TryFutureExt};
use futures::task::{self, Poll};
use futures_test::future::FutureTestExt;
use pin_utils::unsafe_pinned;
use std::mem::PinMut;
use std::sync::mpsc;
mod support;
use self::support::RunInBackgroundExt;
#[test]
fn map_ok() {
// The closure given to `map_ok` should have been dropped by the time `map`

View File

@@ -1,14 +1,12 @@
#![feature(pin, arbitrary_self_types, futures_api)]
use futures::future::{self, FutureExt};
mod support;
use futures_test::task::panic_context;
#[test]
fn fuse() {
let mut future = future::ready::<i32>(2).fuse();
support::with_panic_waker_context(|cx| {
assert!(future.poll_unpin(cx).is_ready());
assert!(future.poll_unpin(cx).is_pending());
})
let cx = &mut panic_context();
assert!(future.poll_unpin(cx).is_ready());
assert!(future.poll_unpin(cx).is_pending());
}

View File

@@ -4,8 +4,7 @@ use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, FutureExt, FutureObj};
use futures::stream::{StreamExt, futures_ordered, FuturesOrdered};
mod support;
use futures_test::task::no_spawn_context;
#[test]
fn works_1() {
@@ -16,9 +15,7 @@ fn works_1() {
let mut stream = futures_ordered(vec![a_rx, b_rx, c_rx]);
b_tx.send(99).unwrap();
support::with_noop_waker_context(|cx| {
assert!(stream.poll_next_unpin(cx).is_pending());
});
assert!(stream.poll_next_unpin(&mut no_spawn_context()).is_pending());
a_tx.send(33).unwrap();
c_tx.send(33).unwrap();
@@ -41,14 +38,13 @@ fn works_2() {
FutureObj::new(Box::new(b_rx.join(c_rx).map(|(a, b)| Ok(a? + b?)))),
]);
support::with_noop_waker_context(|cx| {
a_tx.send(33).unwrap();
b_tx.send(33).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
assert!(stream.poll_next_unpin(cx).is_pending());
c_tx.send(33).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
})
let cx = &mut no_spawn_context();
a_tx.send(33).unwrap();
b_tx.send(33).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
assert!(stream.poll_next_unpin(cx).is_pending());
c_tx.send(33).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
}
#[test]
@@ -74,7 +70,7 @@ fn queue_never_unblocked() {
Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))) as _,
]);
support::with_noop_waker_context(f)(|cx| {
with_no_spawn_context(|cx| {
for _ in 0..10 {
assert!(stream.poll_next(cx).unwrap().is_pending());
}

View File

@@ -5,10 +5,9 @@ use futures::executor::{block_on, block_on_stream};
use futures::future::{self, FutureExt, FutureObj};
use futures::stream::{StreamExt, futures_unordered, FuturesUnordered};
use futures::task::Poll;
use futures_test::task::no_spawn_context;
use std::boxed::Box;
mod support;
#[test]
fn works_1() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -40,12 +39,12 @@ fn works_2() {
a_tx.send(9).unwrap();
b_tx.send(10).unwrap();
support::with_noop_waker_context(|cx| {
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(9))));
c_tx.send(20).unwrap();
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(30))));
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(None));
})
let cx = &mut no_spawn_context();
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(9))));
c_tx.send(20).unwrap();
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(30))));
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(None));
}
#[test]

View File

@@ -2,12 +2,10 @@
use futures::channel::oneshot;
use futures::future::{FutureExt, TryFutureExt};
use futures_test::future::FutureTestExt;
use std::sync::mpsc;
use std::thread;
mod support;
use self::support::RunInBackgroundExt;
#[test]
fn oneshot_send1() {
let (tx1, rx1) = oneshot::channel::<i32>();

View File

@@ -1,38 +0,0 @@
use futures::stream::Stream;
use futures::task::Poll;
use std::fmt;
use std::mem::PinMut;
use super::{with_noop_waker_context, with_panic_waker_context};
pub fn assert_stream_pending<S: Stream>(stream: PinMut<S>) {
with_noop_waker_context(|cx| {
match stream.poll_next(cx) {
Poll::Ready(_) => panic!("stream is not pending"),
Poll::Pending => {},
}
})
}
pub fn assert_stream_next<S: Stream>(stream: PinMut<S>, item: S::Item)
where S::Item: Eq + fmt::Debug
{
with_panic_waker_context(|cx| {
match stream.poll_next(cx) {
Poll::Ready(Some(x)) => assert_eq!(x, item),
Poll::Ready(None) => panic!("stream is at its end"),
Poll::Pending => panic!("stream wasn't ready"),
}
})
}
pub fn assert_stream_done<S: Stream>(stream: PinMut<S>)
{
with_panic_waker_context(|cx| {
match stream.poll_next(cx) {
Poll::Ready(Some(_)) => panic!("stream had more elements"),
Poll::Ready(None) => {},
Poll::Pending => panic!("stream wasn't ready"),
}
})
}

View File

@@ -1,33 +0,0 @@
use super::panic_executor::PanicExecutor;
use futures::task::{self, Wake};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct CounterWaker(AtomicUsize);
impl CounterWaker {
pub fn get(&self) -> usize {
self.0.load(Ordering::SeqCst)
}
pub fn set(&self, x: usize) {
self.0.store(x, Ordering::SeqCst)
}
}
pub fn with_counter_waker_context<F, R>(f: F) -> R
where F: FnOnce(&mut task::Context, &Arc<CounterWaker>) -> R
{
impl Wake for CounterWaker {
fn wake(arc_self: &Arc<Self>) {
arc_self.0.fetch_add(1, Ordering::SeqCst);
}
}
let counter_arc = Arc::new(CounterWaker(AtomicUsize::new(0)));
let counter_waker = unsafe { task::local_waker_ref(&counter_arc) };
let exec = &mut PanicExecutor;
let cx = &mut task::Context::new(&counter_waker, exec);
f(cx, &counter_arc)
}

View File

@@ -1,35 +0,0 @@
use futures::future::Future;
use futures::task::{self, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::mem::PinMut;
pub struct Delayed<F> {
future: F,
polled_before: bool
}
impl<F> Delayed<F> {
unsafe_pinned!(future: F);
unsafe_unpinned!(polled_before: bool);
}
impl<F: Future> Future for Delayed<F> {
type Output = F::Output;
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<F::Output> {
if *self.polled_before() {
self.future().poll(cx)
} else {
*self.polled_before() = true;
cx.waker().wake();
Poll::Pending
}
}
}
/// Introduces one `Poll::Pending` before polling the given future
pub fn delayed<F>(future: F) -> Delayed<F>
where F: Future,
{
Delayed { future, polled_before: false }
}

View File

@@ -1,44 +0,0 @@
#![allow(dead_code)]
pub mod assert;
mod delayed;
pub use self::delayed::{delayed, Delayed};
mod run_in_background;
pub use self::run_in_background::RunInBackgroundExt;
mod counter_waker_context;
pub use self::counter_waker_context::with_counter_waker_context;
mod noop_waker_context;
pub use self::noop_waker_context::with_noop_waker_context;
mod panic_executor;
mod panic_waker_context;
pub use self::panic_waker_context::with_panic_waker_context;
// pub fn f_ok(a: i32) -> FutureResult<i32, u32> { Ok(a).into_future() }
// pub fn f_err(a: u32) -> FutureResult<i32, u32> { Err(a).into_future() }
// pub fn r_ok(a: i32) -> Result<i32, u32> { Ok(a) }
// pub fn r_err(a: u32) -> Result<i32, u32> { Err(a) }
// pub fn assert_done<T, F>(f: F, result: Result<T::Item, T::Error>)
// where T: Future,
// T::Item: Eq + fmt::Debug,
// T::Error: Eq + fmt::Debug,
// F: FnOnce() -> T,
// {
// assert_eq!(block_on(f()), result);
// }
// pub fn assert_empty<T: Future, F: FnMut() -> T>(mut f: F)
// where T::Error: Debug
// {
// panic_waker_cx(|cx| {
// assert!(f().poll(cx).unwrap().is_pending())
// })
// }

View File

@@ -1,19 +0,0 @@
use super::panic_executor::PanicExecutor;
use futures::task::{self, Wake};
use std::sync::Arc;
pub fn with_noop_waker_context<F, R>(f: F) -> R
where F: FnOnce(&mut task::Context) -> R
{
struct NoopWake;
impl Wake for NoopWake {
fn wake(_: &Arc<Self>) {}
}
let noop_waker = unsafe { task::local_waker(Arc::new(NoopWake)) };
let exec = &mut PanicExecutor;
let cx = &mut task::Context::new(&noop_waker, exec);
f(cx)
}

View File

@@ -1,10 +0,0 @@
use futures::future::FutureObj;
use futures::task::{Spawn, SpawnObjError};
pub struct PanicExecutor;
impl Spawn for PanicExecutor {
fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> {
panic!("should not spawn")
}
}

View File

@@ -1,21 +0,0 @@
use super::panic_executor::PanicExecutor;
use futures::task::{self, Wake};
use std::sync::Arc;
pub fn with_panic_waker_context<F, R>(f: F) -> R
where F: FnOnce(&mut task::Context) -> R
{
struct PanicWake;
impl Wake for PanicWake {
fn wake(_: &Arc<Self>) {
panic!("should not be woken");
}
}
let panic_waker = unsafe { task::local_waker(Arc::new(PanicWake)) };
let exec = &mut PanicExecutor;
let cx = &mut task::Context::new(&panic_waker, exec);
f(cx)
}

View File

@@ -1,16 +0,0 @@
use futures::executor::block_on;
use futures::future::Future;
use std::thread;
pub trait RunInBackgroundExt {
fn run_in_background(self);
}
impl<F> RunInBackgroundExt for F
where F: Future + Sized + Send + 'static,
F::Output: Send,
{
fn run_in_background(self) {
thread::spawn(|| block_on(self));
}
}

View File

@@ -2,37 +2,36 @@
use futures::future;
use futures::stream;
use pin_utils::pin_mut;
mod support;
use self::support::assert::*;
use futures_test::{
assert_stream_pending, assert_stream_next, assert_stream_done,
};
use futures_test::future::FutureTestExt;
#[test]
fn unfold1() {
let stream = stream::unfold(0, |state| {
let mut stream = stream::unfold(0, |state| {
if state <= 2 {
support::delayed(future::ready(Some((state * 2, state + 1))))
future::ready(Some((state * 2, state + 1))).delay()
} else {
support::delayed(future::ready(None))
future::ready(None).delay()
}
});
pin_mut!(stream);
// Creates the future with the closure
// Not ready (delayed future)
assert_stream_pending(stream.reborrow());
assert_stream_pending!(stream);
// Future is ready, yields the item
assert_stream_next(stream.reborrow(), 0);
assert_stream_next!(stream, 0);
// Repeat
assert_stream_pending(stream.reborrow());
assert_stream_next(stream.reborrow(), 2);
assert_stream_pending!(stream);
assert_stream_next!(stream, 2);
assert_stream_pending(stream.reborrow());
assert_stream_next(stream.reborrow(), 4);
assert_stream_pending!(stream);
assert_stream_next!(stream, 4);
// No more items
assert_stream_pending(stream.reborrow());
assert_stream_done(stream.reborrow());
assert_stream_pending!(stream);
assert_stream_done!(stream);
}