mirror of
https://github.com/rust-lang/futures-rs.git
synced 2026-01-25 03:26:14 +00:00
Format modules defined inside macros
`cargo fmt` cannot recognize modules defined inside macros.
This commit is contained in:
4
.github/workflows/ci.yml
vendored
4
.github/workflows/ci.yml
vendored
@@ -241,13 +241,13 @@ jobs:
|
||||
rustup component add clippy
|
||||
- run: cargo clippy --workspace --all-features --all-targets
|
||||
|
||||
rustfmt:
|
||||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Install Rust
|
||||
run: rustup update stable && rustup default stable
|
||||
- run: cargo fmt --all -- --check
|
||||
- run: tools/fmt.sh
|
||||
|
||||
docs:
|
||||
name: cargo doc
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
use_small_heuristics = "Max"
|
||||
edition = "2018"
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
|
||||
use core::cell::UnsafeCell;
|
||||
use core::ops::{Deref, DerefMut};
|
||||
use core::sync::atomic::Ordering::SeqCst;
|
||||
use core::sync::atomic::AtomicBool;
|
||||
use core::sync::atomic::Ordering::SeqCst;
|
||||
|
||||
/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
|
||||
///
|
||||
@@ -37,10 +37,7 @@ unsafe impl<T: Send> Sync for Lock<T> {}
|
||||
impl<T> Lock<T> {
|
||||
/// Creates a new lock around the given value.
|
||||
pub(crate) fn new(t: T) -> Self {
|
||||
Self {
|
||||
locked: AtomicBool::new(false),
|
||||
data: UnsafeCell::new(t),
|
||||
}
|
||||
Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) }
|
||||
}
|
||||
|
||||
/// Attempts to acquire this lock, returning whether the lock was acquired or
|
||||
|
||||
@@ -79,13 +79,13 @@
|
||||
// by the queue structure.
|
||||
|
||||
use futures_core::stream::{FusedStream, Stream};
|
||||
use futures_core::task::{Context, Poll, Waker};
|
||||
use futures_core::task::__internal::AtomicWaker;
|
||||
use futures_core::task::{Context, Poll, Waker};
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering::SeqCst;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
use crate::mpsc::queue::Queue;
|
||||
@@ -209,9 +209,7 @@ impl SendError {
|
||||
|
||||
impl<T> fmt::Debug for TrySendError<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("TrySendError")
|
||||
.field("kind", &self.err.kind)
|
||||
.finish()
|
||||
f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,8 +249,7 @@ impl<T> TrySendError<T> {
|
||||
|
||||
impl fmt::Debug for TryRecvError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_tuple("TryRecvError")
|
||||
.finish()
|
||||
f.debug_tuple("TryRecvError").finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -335,10 +332,7 @@ struct SenderTask {
|
||||
|
||||
impl SenderTask {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
task: None,
|
||||
is_parked: false,
|
||||
}
|
||||
Self { task: None, is_parked: false }
|
||||
}
|
||||
|
||||
fn notify(&mut self) {
|
||||
@@ -381,9 +375,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
|
||||
maybe_parked: false,
|
||||
};
|
||||
|
||||
let rx = Receiver {
|
||||
inner: Some(inner),
|
||||
};
|
||||
let rx = Receiver { inner: Some(inner) };
|
||||
|
||||
(Sender(Some(tx)), rx)
|
||||
}
|
||||
@@ -399,7 +391,6 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
|
||||
/// the channel. Using an `unbounded` channel has the ability of causing the
|
||||
/// process to run out of memory. In this case, the process will be aborted.
|
||||
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
|
||||
|
||||
let inner = Arc::new(UnboundedInner {
|
||||
state: AtomicUsize::new(INIT_STATE),
|
||||
message_queue: Queue::new(),
|
||||
@@ -407,13 +398,9 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
|
||||
recv_task: AtomicWaker::new(),
|
||||
});
|
||||
|
||||
let tx = UnboundedSenderInner {
|
||||
inner: inner.clone(),
|
||||
};
|
||||
let tx = UnboundedSenderInner { inner: inner.clone() };
|
||||
|
||||
let rx = UnboundedReceiver {
|
||||
inner: Some(inner),
|
||||
};
|
||||
let rx = UnboundedReceiver { inner: Some(inner) };
|
||||
|
||||
(UnboundedSender(Some(tx)), rx)
|
||||
}
|
||||
@@ -430,13 +417,10 @@ impl<T> UnboundedSenderInner<T> {
|
||||
if state.is_open {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Ready(Err(SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
}))
|
||||
Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Push message to the queue and signal to the receiver
|
||||
fn queue_push_and_signal(&self, msg: T) {
|
||||
// Push the message onto the message queue
|
||||
@@ -462,16 +446,17 @@ impl<T> UnboundedSenderInner<T> {
|
||||
// This probably is never hit? Odds are the process will run out of
|
||||
// memory first. It may be worth to return something else in this
|
||||
// case?
|
||||
assert!(state.num_messages < MAX_CAPACITY, "buffer space \
|
||||
exhausted; sending this messages would overflow the state");
|
||||
assert!(
|
||||
state.num_messages < MAX_CAPACITY,
|
||||
"buffer space \
|
||||
exhausted; sending this messages would overflow the state"
|
||||
);
|
||||
|
||||
state.num_messages += 1;
|
||||
|
||||
let next = encode_state(&state);
|
||||
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
|
||||
Ok(_) => {
|
||||
return Some(state.num_messages)
|
||||
}
|
||||
Ok(_) => return Some(state.num_messages),
|
||||
Err(actual) => curr = actual,
|
||||
}
|
||||
}
|
||||
@@ -516,12 +501,7 @@ impl<T> BoundedSenderInner<T> {
|
||||
fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
|
||||
// If the sender is currently blocked, reject the message
|
||||
if !self.poll_unparked(None).is_ready() {
|
||||
return Err(TrySendError {
|
||||
err: SendError {
|
||||
kind: SendErrorKind::Full,
|
||||
},
|
||||
val: msg,
|
||||
});
|
||||
return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
|
||||
}
|
||||
|
||||
// The channel has capacity to accept the message, so send it
|
||||
@@ -531,9 +511,7 @@ impl<T> BoundedSenderInner<T> {
|
||||
// Do the send without failing.
|
||||
// Can be called only by bounded sender.
|
||||
#[allow(clippy::debug_assert_with_mut_call)]
|
||||
fn do_send_b(&mut self, msg: T)
|
||||
-> Result<(), TrySendError<T>>
|
||||
{
|
||||
fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
|
||||
// Anyone callig do_send *should* make sure there is room first,
|
||||
// but assert here for tests as a sanity check.
|
||||
debug_assert!(self.poll_unparked(None).is_ready());
|
||||
@@ -551,12 +529,12 @@ impl<T> BoundedSenderInner<T> {
|
||||
// the configured buffer size
|
||||
num_messages > self.inner.buffer
|
||||
}
|
||||
None => return Err(TrySendError {
|
||||
err: SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
},
|
||||
None => {
|
||||
return Err(TrySendError {
|
||||
err: SendError { kind: SendErrorKind::Disconnected },
|
||||
val: msg,
|
||||
}),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
// If the channel has reached capacity, then the sender task needs to
|
||||
@@ -600,16 +578,17 @@ impl<T> BoundedSenderInner<T> {
|
||||
// This probably is never hit? Odds are the process will run out of
|
||||
// memory first. It may be worth to return something else in this
|
||||
// case?
|
||||
assert!(state.num_messages < MAX_CAPACITY, "buffer space \
|
||||
exhausted; sending this messages would overflow the state");
|
||||
assert!(
|
||||
state.num_messages < MAX_CAPACITY,
|
||||
"buffer space \
|
||||
exhausted; sending this messages would overflow the state"
|
||||
);
|
||||
|
||||
state.num_messages += 1;
|
||||
|
||||
let next = encode_state(&state);
|
||||
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
|
||||
Ok(_) => {
|
||||
return Some(state.num_messages)
|
||||
}
|
||||
Ok(_) => return Some(state.num_messages),
|
||||
Err(actual) => curr = actual,
|
||||
}
|
||||
}
|
||||
@@ -644,15 +623,10 @@ impl<T> BoundedSenderInner<T> {
|
||||
/// capacity, in which case the current task is queued to be notified once
|
||||
/// capacity is available;
|
||||
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), SendError>> {
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
|
||||
let state = decode_state(self.inner.state.load(SeqCst));
|
||||
if !state.is_open {
|
||||
return Poll::Ready(Err(SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
}));
|
||||
return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
|
||||
}
|
||||
|
||||
self.poll_unparked(Some(cx)).map(Ok)
|
||||
@@ -699,7 +673,7 @@ impl<T> BoundedSenderInner<T> {
|
||||
|
||||
if !task.is_parked {
|
||||
self.maybe_parked = false;
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// At this point, an unpark request is pending, so there will be an
|
||||
@@ -724,12 +698,7 @@ impl<T> Sender<T> {
|
||||
if let Some(inner) = &mut self.0 {
|
||||
inner.try_send(msg)
|
||||
} else {
|
||||
Err(TrySendError {
|
||||
err: SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
},
|
||||
val: msg,
|
||||
})
|
||||
Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -739,8 +708,7 @@ impl<T> Sender<T> {
|
||||
/// [`poll_ready`](Sender::poll_ready) has reported that the channel is
|
||||
/// ready to receive a message.
|
||||
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
|
||||
self.try_send(msg)
|
||||
.map_err(|e| e.err)
|
||||
self.try_send(msg).map_err(|e| e.err)
|
||||
}
|
||||
|
||||
/// Polls the channel to determine if there is guaranteed capacity to send
|
||||
@@ -755,13 +723,8 @@ impl<T> Sender<T> {
|
||||
/// capacity, in which case the current task is queued to be notified once
|
||||
/// capacity is available;
|
||||
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
|
||||
pub fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), SendError>> {
|
||||
let inner = self.0.as_mut().ok_or(SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
})?;
|
||||
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
|
||||
let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
|
||||
inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
@@ -799,7 +762,10 @@ impl<T> Sender<T> {
|
||||
}
|
||||
|
||||
/// Hashes the receiver into the provided hasher
|
||||
pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
|
||||
pub fn hash_receiver<H>(&self, hasher: &mut H)
|
||||
where
|
||||
H: std::hash::Hasher,
|
||||
{
|
||||
use std::hash::Hash;
|
||||
|
||||
let ptr = self.0.as_ref().map(|inner| inner.ptr());
|
||||
@@ -809,13 +775,8 @@ impl<T> Sender<T> {
|
||||
|
||||
impl<T> UnboundedSender<T> {
|
||||
/// Check if the channel is ready to receive a message.
|
||||
pub fn poll_ready(
|
||||
&self,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), SendError>> {
|
||||
let inner = self.0.as_ref().ok_or(SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
})?;
|
||||
pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
|
||||
let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
|
||||
inner.poll_ready_nb()
|
||||
}
|
||||
|
||||
@@ -845,12 +806,7 @@ impl<T> UnboundedSender<T> {
|
||||
}
|
||||
}
|
||||
|
||||
Err(TrySendError {
|
||||
err: SendError {
|
||||
kind: SendErrorKind::Disconnected,
|
||||
},
|
||||
val: msg,
|
||||
})
|
||||
Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
|
||||
}
|
||||
|
||||
/// Send a message on the channel.
|
||||
@@ -858,8 +814,7 @@ impl<T> UnboundedSender<T> {
|
||||
/// This method should only be called after `poll_ready` has been used to
|
||||
/// verify that the channel is ready to receive a message.
|
||||
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
|
||||
self.do_send_nb(msg)
|
||||
.map_err(|e| e.err)
|
||||
self.do_send_nb(msg).map_err(|e| e.err)
|
||||
}
|
||||
|
||||
/// Sends a message along this channel.
|
||||
@@ -888,7 +843,10 @@ impl<T> UnboundedSender<T> {
|
||||
}
|
||||
|
||||
/// Hashes the receiver into the provided hasher
|
||||
pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
|
||||
pub fn hash_receiver<H>(&self, hasher: &mut H)
|
||||
where
|
||||
H: std::hash::Hasher,
|
||||
{
|
||||
use std::hash::Hash;
|
||||
|
||||
let ptr = self.0.as_ref().map(|inner| inner.ptr());
|
||||
@@ -928,9 +886,7 @@ impl<T> Clone for UnboundedSenderInner<T> {
|
||||
Ok(_) => {
|
||||
// The ABA problem doesn't matter here. We only care that the
|
||||
// number of senders never exceeds the maximum.
|
||||
return Self {
|
||||
inner: self.inner.clone(),
|
||||
};
|
||||
return Self { inner: self.inner.clone() };
|
||||
}
|
||||
Err(actual) => curr = actual,
|
||||
}
|
||||
@@ -1027,9 +983,7 @@ impl<T> Receiver<T> {
|
||||
/// * `Err(e)` when there are no messages available, but channel is not yet closed
|
||||
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
|
||||
match self.next_message() {
|
||||
Poll::Ready(msg) => {
|
||||
Ok(msg)
|
||||
},
|
||||
Poll::Ready(msg) => Ok(msg),
|
||||
Poll::Pending => Err(TryRecvError { _priv: () }),
|
||||
}
|
||||
}
|
||||
@@ -1103,10 +1057,7 @@ impl<T> FusedStream for Receiver<T> {
|
||||
impl<T> Stream for Receiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<T>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
||||
// Try to read a message off of the message queue.
|
||||
match self.next_message() {
|
||||
Poll::Ready(msg) => {
|
||||
@@ -1114,7 +1065,7 @@ impl<T> Stream for Receiver<T> {
|
||||
self.inner = None;
|
||||
}
|
||||
Poll::Ready(msg)
|
||||
},
|
||||
}
|
||||
Poll::Pending => {
|
||||
// There are no messages to read, in this case, park.
|
||||
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
|
||||
@@ -1180,9 +1131,7 @@ impl<T> UnboundedReceiver<T> {
|
||||
/// * `Err(e)` when there are no messages available, but channel is not yet closed
|
||||
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
|
||||
match self.next_message() {
|
||||
Poll::Ready(msg) => {
|
||||
Ok(msg)
|
||||
},
|
||||
Poll::Ready(msg) => Ok(msg),
|
||||
Poll::Pending => Err(TryRecvError { _priv: () }),
|
||||
}
|
||||
}
|
||||
@@ -1240,10 +1189,7 @@ impl<T> FusedStream for UnboundedReceiver<T> {
|
||||
impl<T> Stream for UnboundedReceiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<T>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
||||
// Try to read a message off of the message queue.
|
||||
match self.next_message() {
|
||||
Poll::Ready(msg) => {
|
||||
@@ -1251,7 +1197,7 @@ impl<T> Stream for UnboundedReceiver<T> {
|
||||
self.inner = None;
|
||||
}
|
||||
Poll::Ready(msg)
|
||||
},
|
||||
}
|
||||
Poll::Pending => {
|
||||
// There are no messages to read, in this case, park.
|
||||
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
|
||||
@@ -1349,10 +1295,7 @@ impl State {
|
||||
*/
|
||||
|
||||
fn decode_state(num: usize) -> State {
|
||||
State {
|
||||
is_open: num & OPEN_MASK == OPEN_MASK,
|
||||
num_messages: num & MAX_CAPACITY,
|
||||
}
|
||||
State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
|
||||
}
|
||||
|
||||
fn encode_state(state: &State) -> usize {
|
||||
|
||||
@@ -43,10 +43,10 @@
|
||||
|
||||
pub(super) use self::PopResult::*;
|
||||
|
||||
use std::thread;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicPtr, Ordering};
|
||||
use std::thread;
|
||||
|
||||
/// A result of the `pop` function.
|
||||
pub(super) enum PopResult<T> {
|
||||
@@ -76,15 +76,12 @@ pub(super) struct Queue<T> {
|
||||
tail: UnsafeCell<*mut Node<T>>,
|
||||
}
|
||||
|
||||
unsafe impl<T: Send> Send for Queue<T> { }
|
||||
unsafe impl<T: Send> Sync for Queue<T> { }
|
||||
unsafe impl<T: Send> Send for Queue<T> {}
|
||||
unsafe impl<T: Send> Sync for Queue<T> {}
|
||||
|
||||
impl<T> Node<T> {
|
||||
unsafe fn new(v: Option<T>) -> *mut Self {
|
||||
Box::into_raw(Box::new(Self {
|
||||
next: AtomicPtr::new(ptr::null_mut()),
|
||||
value: v,
|
||||
}))
|
||||
Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v }))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,10 +90,7 @@ impl<T> Queue<T> {
|
||||
/// one consumer.
|
||||
pub(super) fn new() -> Self {
|
||||
let stub = unsafe { Node::new(None) };
|
||||
Self {
|
||||
head: AtomicPtr::new(stub),
|
||||
tail: UnsafeCell::new(stub),
|
||||
}
|
||||
Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
|
||||
}
|
||||
|
||||
/// Pushes a new value onto this queue.
|
||||
@@ -133,7 +127,11 @@ impl<T> Queue<T> {
|
||||
return Data(ret);
|
||||
}
|
||||
|
||||
if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
|
||||
if self.head.load(Ordering::Acquire) == tail {
|
||||
Empty
|
||||
} else {
|
||||
Inconsistent
|
||||
}
|
||||
}
|
||||
|
||||
/// Pop an element similarly to `pop` function, but spin-wait on inconsistent
|
||||
|
||||
@@ -6,24 +6,15 @@ use std::pin::Pin;
|
||||
impl<T> Sink<T> for Sender<T> {
|
||||
type Error = SendError;
|
||||
|
||||
fn poll_ready(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
(*self).poll_ready(cx)
|
||||
}
|
||||
|
||||
fn start_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
msg: T,
|
||||
) -> Result<(), Self::Error> {
|
||||
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
|
||||
(*self).start_send(msg)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match (*self).poll_ready(cx) {
|
||||
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
|
||||
// If the receiver disconnected, we consider the sink to be flushed.
|
||||
@@ -33,10 +24,7 @@ impl<T> Sink<T> for Sender<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.disconnect();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
@@ -45,31 +33,19 @@ impl<T> Sink<T> for Sender<T> {
|
||||
impl<T> Sink<T> for UnboundedSender<T> {
|
||||
type Error = SendError;
|
||||
|
||||
fn poll_ready(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Self::poll_ready(&*self, cx)
|
||||
}
|
||||
|
||||
fn start_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
msg: T,
|
||||
) -> Result<(), Self::Error> {
|
||||
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
|
||||
Self::start_send(&mut *self, msg)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.disconnect();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
@@ -78,29 +54,19 @@ impl<T> Sink<T> for UnboundedSender<T> {
|
||||
impl<T> Sink<T> for &UnboundedSender<T> {
|
||||
type Error = SendError;
|
||||
|
||||
fn poll_ready(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
UnboundedSender::poll_ready(*self, cx)
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
|
||||
self.unbounded_send(msg)
|
||||
.map_err(TrySendError::into_send_error)
|
||||
self.unbounded_send(msg).map_err(TrySendError::into_send_error)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.close_channel();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use core::sync::atomic::AtomicBool;
|
||||
use core::sync::atomic::Ordering::SeqCst;
|
||||
use futures_core::future::{Future, FusedFuture};
|
||||
use futures_core::future::{FusedFuture, Future};
|
||||
use futures_core::task::{Context, Poll, Waker};
|
||||
|
||||
use crate::lock::Lock;
|
||||
@@ -103,12 +103,8 @@ struct Inner<T> {
|
||||
/// ```
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
let inner = Arc::new(Inner::new());
|
||||
let receiver = Receiver {
|
||||
inner: inner.clone(),
|
||||
};
|
||||
let sender = Sender {
|
||||
inner,
|
||||
};
|
||||
let receiver = Receiver { inner: inner.clone() };
|
||||
let sender = Sender { inner };
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
@@ -124,7 +120,7 @@ impl<T> Inner<T> {
|
||||
|
||||
fn send(&self, t: T) -> Result<(), T> {
|
||||
if self.complete.load(SeqCst) {
|
||||
return Err(t)
|
||||
return Err(t);
|
||||
}
|
||||
|
||||
// Note that this lock acquisition may fail if the receiver
|
||||
@@ -161,7 +157,7 @@ impl<T> Inner<T> {
|
||||
// destructor, but our destructor hasn't run yet so if it's set then the
|
||||
// oneshot is gone.
|
||||
if self.complete.load(SeqCst) {
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// If our other half is not gone then we need to park our current task
|
||||
@@ -270,7 +266,10 @@ impl<T> Inner<T> {
|
||||
} else {
|
||||
let task = cx.waker().clone();
|
||||
match self.rx_task.try_lock() {
|
||||
Some(mut slot) => { *slot = Some(task); false },
|
||||
Some(mut slot) => {
|
||||
*slot = Some(task);
|
||||
false
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
};
|
||||
@@ -456,10 +455,7 @@ impl<T> Receiver<T> {
|
||||
impl<T> Future for Receiver<T> {
|
||||
type Output = Result<T, Canceled>;
|
||||
|
||||
fn poll(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<T, Canceled>> {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
|
||||
self.inner.recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::arc_wake::ArcWake;
|
||||
use core::mem;
|
||||
use core::task::{Waker, RawWaker, RawWakerVTable};
|
||||
use alloc::sync::Arc;
|
||||
use core::mem;
|
||||
use core::task::{RawWaker, RawWakerVTable, Waker};
|
||||
|
||||
pub(super) fn waker_vtable<W: ArcWake>() -> &'static RawWakerVTable {
|
||||
&RawWakerVTable::new(
|
||||
@@ -22,9 +22,7 @@ where
|
||||
{
|
||||
let ptr = Arc::into_raw(wake) as *const ();
|
||||
|
||||
unsafe {
|
||||
Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>()))
|
||||
}
|
||||
unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) }
|
||||
}
|
||||
|
||||
// FIXME: panics on Arc::clone / refcount changes could wreak havoc on the
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use super::arc_wake::{ArcWake};
|
||||
use super::arc_wake::ArcWake;
|
||||
use super::waker::waker_vtable;
|
||||
use alloc::sync::Arc;
|
||||
use core::mem::ManuallyDrop;
|
||||
use core::marker::PhantomData;
|
||||
use core::mem::ManuallyDrop;
|
||||
use core::ops::Deref;
|
||||
use core::task::{Waker, RawWaker};
|
||||
use core::task::{RawWaker, Waker};
|
||||
|
||||
/// A [`Waker`] that is only valid for a given lifetime.
|
||||
///
|
||||
@@ -22,10 +22,7 @@ impl<'a> WakerRef<'a> {
|
||||
// copy the underlying (raw) waker without calling a clone,
|
||||
// as we won't call Waker::drop either.
|
||||
let waker = ManuallyDrop::new(unsafe { core::ptr::read(waker) });
|
||||
Self {
|
||||
waker,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
Self { waker, _marker: PhantomData }
|
||||
}
|
||||
|
||||
/// Create a new [`WakerRef`] from a [`Waker`] that must not be dropped.
|
||||
@@ -35,10 +32,7 @@ impl<'a> WakerRef<'a> {
|
||||
/// by the caller), and the [`Waker`] doesn't need to or must not be
|
||||
/// destroyed.
|
||||
pub fn new_unowned(waker: ManuallyDrop<Waker>) -> Self {
|
||||
Self {
|
||||
waker,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
Self { waker, _marker: PhantomData }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,14 +51,13 @@ impl Deref for WakerRef<'_> {
|
||||
#[inline]
|
||||
pub fn waker_ref<W>(wake: &Arc<W>) -> WakerRef<'_>
|
||||
where
|
||||
W: ArcWake
|
||||
W: ArcWake,
|
||||
{
|
||||
// simply copy the pointer instead of using Arc::into_raw,
|
||||
// as we don't actually keep a refcount by using ManuallyDrop.<
|
||||
let ptr = (&**wake as *const W) as *const ();
|
||||
|
||||
let waker = ManuallyDrop::new(unsafe {
|
||||
Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>()))
|
||||
});
|
||||
let waker =
|
||||
ManuallyDrop::new(unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) });
|
||||
WakerRef::new_unowned(waker)
|
||||
}
|
||||
|
||||
@@ -2,17 +2,17 @@
|
||||
|
||||
#[cfg(feature = "bilock")]
|
||||
mod bench {
|
||||
use futures::task::{Context, Waker};
|
||||
use futures::executor::LocalPool;
|
||||
use futures_util::lock::BiLock;
|
||||
use futures_util::lock::BiLockAcquire;
|
||||
use futures_util::lock::BiLockAcquired;
|
||||
use futures_util::task::ArcWake;
|
||||
use futures::executor::LocalPool;
|
||||
use futures::task::{Context, Waker};
|
||||
use futures_util::lock::BiLock;
|
||||
use futures_util::lock::BiLockAcquire;
|
||||
use futures_util::lock::BiLockAcquired;
|
||||
use futures_util::task::ArcWake;
|
||||
|
||||
use std::sync::Arc;
|
||||
use test::Bencher;
|
||||
use std::sync::Arc;
|
||||
use test::Bencher;
|
||||
|
||||
fn notify_noop() -> Waker {
|
||||
fn notify_noop() -> Waker {
|
||||
struct Noop;
|
||||
|
||||
impl ArcWake for Noop {
|
||||
@@ -20,19 +20,16 @@ fn notify_noop() -> Waker {
|
||||
}
|
||||
|
||||
ArcWake::into_waker(Arc::new(Noop))
|
||||
}
|
||||
|
||||
|
||||
/// Pseudo-stream which simply calls `lock.poll()` on `poll`
|
||||
struct LockStream {
|
||||
lock: BiLockAcquire<u32>,
|
||||
}
|
||||
|
||||
impl LockStream {
|
||||
fn new(lock: BiLock<u32>) -> Self {
|
||||
Self {
|
||||
lock: lock.lock()
|
||||
}
|
||||
|
||||
/// Pseudo-stream which simply calls `lock.poll()` on `poll`
|
||||
struct LockStream {
|
||||
lock: BiLockAcquire<u32>,
|
||||
}
|
||||
|
||||
impl LockStream {
|
||||
fn new(lock: BiLock<u32>) -> Self {
|
||||
Self { lock: lock.lock() }
|
||||
}
|
||||
|
||||
/// Release a lock after it was acquired in `poll`,
|
||||
@@ -40,20 +37,19 @@ impl LockStream {
|
||||
fn release_lock(&mut self, guard: BiLockAcquired<u32>) {
|
||||
self.lock = guard.unlock().lock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for LockStream {
|
||||
impl Stream for LockStream {
|
||||
type Item = BiLockAcquired<u32>;
|
||||
type Error = ();
|
||||
|
||||
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.lock.poll(cx).map(|a| a.map(Some))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[bench]
|
||||
fn contended(b: &mut Bencher) {
|
||||
#[bench]
|
||||
fn contended(b: &mut Bencher) {
|
||||
let pool = LocalPool::new();
|
||||
let mut exec = pool.executor();
|
||||
let waker = notify_noop();
|
||||
@@ -89,10 +85,10 @@ fn contended(b: &mut Bencher) {
|
||||
}
|
||||
(x, y)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn lock_unlock(b: &mut Bencher) {
|
||||
#[bench]
|
||||
fn lock_unlock(b: &mut Bencher) {
|
||||
let pool = LocalPool::new();
|
||||
let mut exec = pool.executor();
|
||||
let waker = notify_noop();
|
||||
@@ -122,5 +118,5 @@ fn lock_unlock(b: &mut Bencher) {
|
||||
}
|
||||
(x, y)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use super::assert_future;
|
||||
use crate::task::AtomicWaker;
|
||||
use futures_core::future::Future;
|
||||
use futures_core::task::{Context, Poll};
|
||||
use alloc::sync::Arc;
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use core::sync::atomic::{AtomicBool, Ordering};
|
||||
use alloc::sync::Arc;
|
||||
use futures_core::future::Future;
|
||||
use futures_core::task::{Context, Poll};
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
pin_project! {
|
||||
@@ -19,7 +19,10 @@ pin_project! {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut> Abortable<Fut> where Fut: Future {
|
||||
impl<Fut> Abortable<Fut>
|
||||
where
|
||||
Fut: Future,
|
||||
{
|
||||
/// Creates a new `Abortable` future using an existing `AbortRegistration`.
|
||||
/// `AbortRegistration`s can be acquired through `AbortHandle::new`.
|
||||
///
|
||||
@@ -40,10 +43,7 @@ impl<Fut> Abortable<Fut> where Fut: Future {
|
||||
/// # });
|
||||
/// ```
|
||||
pub fn new(future: Fut, reg: AbortRegistration) -> Self {
|
||||
assert_future::<Result<Fut::Output, Aborted>, _>(Self {
|
||||
future,
|
||||
inner: reg.inner,
|
||||
})
|
||||
assert_future::<Result<Fut::Output, Aborted>, _>(Self { future, inner: reg.inner })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,19 +80,10 @@ impl AbortHandle {
|
||||
/// # });
|
||||
/// ```
|
||||
pub fn new_pair() -> (Self, AbortRegistration) {
|
||||
let inner = Arc::new(AbortInner {
|
||||
waker: AtomicWaker::new(),
|
||||
cancel: AtomicBool::new(false),
|
||||
});
|
||||
let inner =
|
||||
Arc::new(AbortInner { waker: AtomicWaker::new(), cancel: AtomicBool::new(false) });
|
||||
|
||||
(
|
||||
Self {
|
||||
inner: inner.clone(),
|
||||
},
|
||||
AbortRegistration {
|
||||
inner,
|
||||
},
|
||||
)
|
||||
(Self { inner: inner.clone() }, AbortRegistration { inner })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,13 +103,11 @@ struct AbortInner {
|
||||
/// This function is only available when the `std` or `alloc` feature of this
|
||||
/// library is activated, and it is activated by default.
|
||||
pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
|
||||
where Fut: Future
|
||||
where
|
||||
Fut: Future,
|
||||
{
|
||||
let (handle, reg) = AbortHandle::new_pair();
|
||||
(
|
||||
Abortable::new(future, reg),
|
||||
handle,
|
||||
)
|
||||
(Abortable::new(future, reg), handle)
|
||||
}
|
||||
|
||||
/// Indicator that the `Abortable` future was aborted.
|
||||
@@ -134,18 +123,21 @@ impl fmt::Display for Aborted {
|
||||
#[cfg(feature = "std")]
|
||||
impl std::error::Error for Aborted {}
|
||||
|
||||
impl<Fut> Future for Abortable<Fut> where Fut: Future {
|
||||
impl<Fut> Future for Abortable<Fut>
|
||||
where
|
||||
Fut: Future,
|
||||
{
|
||||
type Output = Result<Fut::Output, Aborted>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// Check if the future has been aborted
|
||||
if self.inner.cancel.load(Ordering::Relaxed) {
|
||||
return Poll::Ready(Err(Aborted))
|
||||
return Poll::Ready(Err(Aborted));
|
||||
}
|
||||
|
||||
// attempt to complete the future
|
||||
if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) {
|
||||
return Poll::Ready(Ok(x))
|
||||
return Poll::Ready(Ok(x));
|
||||
}
|
||||
|
||||
// Register to receive a wakeup if the future is aborted in the... future
|
||||
@@ -156,7 +148,7 @@ impl<Fut> Future for Abortable<Fut> where Fut: Future {
|
||||
// Checking with `Relaxed` is sufficient because `register` introduces an
|
||||
// `AcqRel` barrier.
|
||||
if self.inner.cancel.load(Ordering::Relaxed) {
|
||||
return Poll::Ready(Err(Aborted))
|
||||
return Poll::Ready(Err(Aborted));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
//! Futures-powered synchronization primitives.
|
||||
|
||||
#[cfg(feature = "bilock")]
|
||||
use futures_core::future::Future;
|
||||
use futures_core::task::{Context, Poll, Waker};
|
||||
use alloc::boxed::Box;
|
||||
use alloc::sync::Arc;
|
||||
use core::cell::UnsafeCell;
|
||||
use core::fmt;
|
||||
use core::ops::{Deref, DerefMut};
|
||||
use core::pin::Pin;
|
||||
use core::sync::atomic::AtomicUsize;
|
||||
use core::sync::atomic::Ordering::SeqCst;
|
||||
use alloc::boxed::Box;
|
||||
use alloc::sync::Arc;
|
||||
#[cfg(feature = "bilock")]
|
||||
use futures_core::future::Future;
|
||||
use futures_core::task::{Context, Poll, Waker};
|
||||
|
||||
/// A type of futures-powered synchronization primitive which is a mutex between
|
||||
/// two possible owners.
|
||||
@@ -61,10 +61,7 @@ impl<T> BiLock<T> {
|
||||
/// Similarly, reuniting the lock and extracting the inner value is only
|
||||
/// possible when `T` is `Unpin`.
|
||||
pub fn new(t: T) -> (Self, Self) {
|
||||
let arc = Arc::new(Inner {
|
||||
state: AtomicUsize::new(0),
|
||||
value: Some(UnsafeCell::new(t)),
|
||||
});
|
||||
let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) });
|
||||
|
||||
(Self { arc: arc.clone() }, Self { arc })
|
||||
}
|
||||
@@ -103,11 +100,11 @@ impl<T> BiLock<T> {
|
||||
let mut prev = Box::from_raw(n as *mut Waker);
|
||||
*prev = cx.waker().clone();
|
||||
waker = Some(prev);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// type ascription for safety's sake!
|
||||
let me: Box<Waker> = waker.take().unwrap_or_else(||Box::new(cx.waker().clone()));
|
||||
let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
|
||||
let me = Box::into_raw(me) as usize;
|
||||
|
||||
match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) {
|
||||
@@ -145,9 +142,7 @@ impl<T> BiLock<T> {
|
||||
#[cfg(feature = "bilock")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
|
||||
pub fn lock(&self) -> BiLockAcquire<'_, T> {
|
||||
BiLockAcquire {
|
||||
bilock: self,
|
||||
}
|
||||
BiLockAcquire { bilock: self }
|
||||
}
|
||||
|
||||
/// Attempts to put the two "halves" of a `BiLock<T>` back together and
|
||||
@@ -181,7 +176,7 @@ impl<T> BiLock<T> {
|
||||
// up as its now their turn.
|
||||
n => unsafe {
|
||||
Box::from_raw(n as *mut Waker).wake();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -205,9 +200,7 @@ pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
|
||||
|
||||
impl<T> fmt::Debug for ReuniteError<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_tuple("ReuniteError")
|
||||
.field(&"...")
|
||||
.finish()
|
||||
f.debug_tuple("ReuniteError").field(&"...").finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use futures_core::future::{FusedFuture, Future};
|
||||
use futures_core::task::{Context, Poll, Waker};
|
||||
use slab::Slab;
|
||||
use std::{fmt, mem};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::{fmt, mem};
|
||||
|
||||
/// A futures-aware mutex.
|
||||
///
|
||||
@@ -53,7 +53,7 @@ enum Waiter {
|
||||
impl Waiter {
|
||||
fn register(&mut self, waker: &Waker) {
|
||||
match self {
|
||||
Self::Waiting(w) if waker.will_wake(w) => {},
|
||||
Self::Waiting(w) if waker.will_wake(w) => {}
|
||||
_ => *self = Self::Waiting(waker.clone()),
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,7 @@ impl Waiter {
|
||||
fn wake(&mut self) {
|
||||
match mem::replace(self, Self::Woken) {
|
||||
Self::Waiting(waker) => waker.wake(),
|
||||
Self::Woken => {},
|
||||
Self::Woken => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -113,10 +113,7 @@ impl<T: ?Sized> Mutex<T> {
|
||||
/// This method returns a future that will resolve once the lock has been
|
||||
/// successfully acquired.
|
||||
pub fn lock(&self) -> MutexLockFuture<'_, T> {
|
||||
MutexLockFuture {
|
||||
mutex: Some(self),
|
||||
wait_key: WAIT_KEY_NONE,
|
||||
}
|
||||
MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the underlying data.
|
||||
@@ -145,7 +142,7 @@ impl<T: ?Sized> Mutex<T> {
|
||||
if wait_key != WAIT_KEY_NONE {
|
||||
let mut waiters = self.waiters.lock().unwrap();
|
||||
match waiters.remove(wait_key) {
|
||||
Waiter::Waiting(_) => {},
|
||||
Waiter::Waiting(_) => {}
|
||||
Waiter::Woken => {
|
||||
// We were awoken, but then dropped before we could
|
||||
// wake up to acquire the lock. Wake up another
|
||||
@@ -191,13 +188,10 @@ impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> {
|
||||
f.debug_struct("MutexLockFuture")
|
||||
.field("was_acquired", &self.mutex.is_none())
|
||||
.field("mutex", &self.mutex)
|
||||
.field("wait_key", &(
|
||||
if self.wait_key == WAIT_KEY_NONE {
|
||||
None
|
||||
} else {
|
||||
Some(self.wait_key)
|
||||
}
|
||||
))
|
||||
.field(
|
||||
"wait_key",
|
||||
&(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -295,10 +289,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
|
||||
|
||||
impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("MutexGuard")
|
||||
.field("value", &&**self)
|
||||
.field("mutex", &self.mutex)
|
||||
.finish()
|
||||
f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,10 +52,7 @@ where
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let index = self.index;
|
||||
self.project().data.poll(cx).map(|output| OrderWrapper {
|
||||
data: output,
|
||||
index,
|
||||
})
|
||||
self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,10 +136,7 @@ impl<Fut: Future> FuturesOrdered<Fut> {
|
||||
/// must ensure that `FuturesOrdered::poll` is called in order to receive
|
||||
/// task notifications.
|
||||
pub fn push(&mut self, future: Fut) {
|
||||
let wrapped = OrderWrapper {
|
||||
data: future,
|
||||
index: self.next_incoming_index,
|
||||
};
|
||||
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
|
||||
self.next_incoming_index += 1;
|
||||
self.in_progress_queue.push(wrapped);
|
||||
}
|
||||
|
||||
@@ -3,11 +3,8 @@
|
||||
//! This module is only available when the `std` or `alloc` feature of this
|
||||
//! library is activated, and it is activated by default.
|
||||
|
||||
use futures_core::future::Future;
|
||||
use futures_core::stream::{FusedStream, Stream};
|
||||
use futures_core::task::{Context, Poll};
|
||||
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
|
||||
use crate::task::AtomicWaker;
|
||||
use alloc::sync::{Arc, Weak};
|
||||
use core::cell::UnsafeCell;
|
||||
use core::fmt::{self, Debug};
|
||||
use core::iter::FromIterator;
|
||||
@@ -16,8 +13,11 @@ use core::mem;
|
||||
use core::pin::Pin;
|
||||
use core::ptr;
|
||||
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
|
||||
use core::sync::atomic::{AtomicPtr, AtomicBool};
|
||||
use alloc::sync::{Arc, Weak};
|
||||
use core::sync::atomic::{AtomicBool, AtomicPtr};
|
||||
use futures_core::future::Future;
|
||||
use futures_core::stream::{FusedStream, Stream};
|
||||
use futures_core::task::{Context, Poll};
|
||||
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
|
||||
|
||||
mod abort;
|
||||
|
||||
@@ -28,8 +28,7 @@ mod task;
|
||||
use self::task::Task;
|
||||
|
||||
mod ready_to_run_queue;
|
||||
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
|
||||
|
||||
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
|
||||
|
||||
/// A set of futures which may complete in any order.
|
||||
///
|
||||
@@ -63,18 +62,14 @@ unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
|
||||
impl<Fut> Unpin for FuturesUnordered<Fut> {}
|
||||
|
||||
impl Spawn for FuturesUnordered<FutureObj<'_, ()>> {
|
||||
fn spawn_obj(&self, future_obj: FutureObj<'static, ()>)
|
||||
-> Result<(), SpawnError>
|
||||
{
|
||||
fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> {
|
||||
self.push(future_obj);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
|
||||
fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>)
|
||||
-> Result<(), SpawnError>
|
||||
{
|
||||
fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
|
||||
self.push(future_obj);
|
||||
Ok(())
|
||||
}
|
||||
@@ -191,7 +186,10 @@ impl<Fut> FuturesUnordered<Fut> {
|
||||
}
|
||||
|
||||
/// Returns an iterator that allows inspecting each future in the set.
|
||||
pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin {
|
||||
pub fn iter(&self) -> Iter<'_, Fut>
|
||||
where
|
||||
Fut: Unpin,
|
||||
{
|
||||
Iter(Pin::new(self).iter_pin_ref())
|
||||
}
|
||||
|
||||
@@ -199,16 +197,14 @@ impl<Fut> FuturesUnordered<Fut> {
|
||||
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
|
||||
let (task, len) = self.atomic_load_head_and_len_all();
|
||||
|
||||
IterPinRef {
|
||||
task,
|
||||
len,
|
||||
pending_next_all: self.pending_next_all(),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData }
|
||||
}
|
||||
|
||||
/// Returns an iterator that allows modifying each future in the set.
|
||||
pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin {
|
||||
pub fn iter_mut(&mut self) -> IterMut<'_, Fut>
|
||||
where
|
||||
Fut: Unpin,
|
||||
{
|
||||
IterMut(Pin::new(self).iter_pin_mut())
|
||||
}
|
||||
|
||||
@@ -217,19 +213,9 @@ impl<Fut> FuturesUnordered<Fut> {
|
||||
// `head_all` can be accessed directly and we don't need to spin on
|
||||
// `Task::next_all` since we have exclusive access to the set.
|
||||
let task = *self.head_all.get_mut();
|
||||
let len = if task.is_null() {
|
||||
0
|
||||
} else {
|
||||
unsafe {
|
||||
*(*task).len_all.get()
|
||||
}
|
||||
};
|
||||
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };
|
||||
|
||||
IterPinMut {
|
||||
task,
|
||||
len,
|
||||
_marker: PhantomData
|
||||
}
|
||||
IterPinMut { task, len, _marker: PhantomData }
|
||||
}
|
||||
|
||||
/// Returns the current head node and number of futures in the list of all
|
||||
@@ -395,9 +381,7 @@ impl<Fut> FuturesUnordered<Fut> {
|
||||
impl<Fut: Future> Stream for FuturesUnordered<Fut> {
|
||||
type Item = Fut::Output;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
|
||||
-> Poll<Option<Self::Item>>
|
||||
{
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// Variable to determine how many times it is allowed to poll underlying
|
||||
// futures without yielding.
|
||||
//
|
||||
@@ -469,14 +453,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
|
||||
|
||||
// Double check that the call to `release_task` really
|
||||
// happened. Calling it required the task to be unlinked.
|
||||
debug_assert_eq!(
|
||||
task.next_all.load(Relaxed),
|
||||
self.pending_next_all()
|
||||
);
|
||||
debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all());
|
||||
unsafe {
|
||||
debug_assert!((*task.prev_all.get()).is_null());
|
||||
}
|
||||
continue
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -516,10 +497,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut bomb = Bomb {
|
||||
task: Some(task),
|
||||
queue: &mut *self,
|
||||
};
|
||||
let mut bomb = Bomb { task: Some(task), queue: &mut *self };
|
||||
|
||||
// Poll the underlying future with the appropriate waker
|
||||
// implementation. This is where a large bit of the unsafety
|
||||
@@ -555,11 +533,9 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
}
|
||||
continue
|
||||
}
|
||||
Poll::Ready(output) => {
|
||||
return Poll::Ready(Some(output))
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(output) => return Poll::Ready(Some(output)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -611,7 +587,10 @@ impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
|
||||
I: IntoIterator<Item = Fut>,
|
||||
{
|
||||
let acc = Self::new();
|
||||
iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc })
|
||||
iter.into_iter().fold(acc, |acc, item| {
|
||||
acc.push(item);
|
||||
acc
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use crate::task::AtomicWaker;
|
||||
use alloc::sync::Arc;
|
||||
use core::cell::UnsafeCell;
|
||||
use core::ptr;
|
||||
use core::sync::atomic::AtomicPtr;
|
||||
use core::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel};
|
||||
use alloc::sync::Arc;
|
||||
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
|
||||
|
||||
use super::abort::abort;
|
||||
use super::task::Task;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use core::cell::UnsafeCell;
|
||||
use core::sync::atomic::{AtomicPtr, AtomicBool};
|
||||
use core::sync::atomic::Ordering::{self, SeqCst};
|
||||
use alloc::sync::{Arc, Weak};
|
||||
use core::cell::UnsafeCell;
|
||||
use core::sync::atomic::Ordering::{self, SeqCst};
|
||||
use core::sync::atomic::{AtomicBool, AtomicPtr};
|
||||
|
||||
use crate::task::{ArcWake, WakerRef, waker_ref};
|
||||
use super::ReadyToRunQueue;
|
||||
use super::abort::abort;
|
||||
use super::ReadyToRunQueue;
|
||||
use crate::task::{waker_ref, ArcWake, WakerRef};
|
||||
|
||||
pub(super) struct Task<Fut> {
|
||||
// The future
|
||||
|
||||
@@ -5,11 +5,11 @@ use core::iter::FromIterator;
|
||||
use core::pin::Pin;
|
||||
|
||||
use futures_core::ready;
|
||||
use futures_core::stream::{Stream, FusedStream};
|
||||
use futures_core::stream::{FusedStream, Stream};
|
||||
use futures_core::task::{Context, Poll};
|
||||
|
||||
use super::assert_stream;
|
||||
use crate::stream::{StreamExt, StreamFuture, FuturesUnordered};
|
||||
use crate::stream::{FuturesUnordered, StreamExt, StreamFuture};
|
||||
|
||||
/// An unbounded set of streams
|
||||
///
|
||||
@@ -75,10 +75,7 @@ impl<St: Stream + Unpin> Default for SelectAll<St> {
|
||||
impl<St: Stream + Unpin> Stream for SelectAll<St> {
|
||||
type Item = St::Item;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
match ready!(self.inner.poll_next_unpin(cx)) {
|
||||
Some((Some(item), remaining)) => {
|
||||
@@ -116,8 +113,9 @@ impl<St: Stream + Unpin> FusedStream for SelectAll<St> {
|
||||
/// This function is only available when the `std` or `alloc` feature of this
|
||||
/// library is activated, and it is activated by default.
|
||||
pub fn select_all<I>(streams: I) -> SelectAll<I::Item>
|
||||
where I: IntoIterator,
|
||||
I::Item: Stream + Unpin
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: Stream + Unpin,
|
||||
{
|
||||
let set = SelectAll::new();
|
||||
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use futures_core::future::Future;
|
||||
use futures_core::stream::{Stream, FusedStream};
|
||||
use futures_core::stream::{FusedStream, Stream};
|
||||
use futures_core::task::{Context, Poll};
|
||||
#[cfg(feature = "sink")]
|
||||
use futures_sink::Sink;
|
||||
use pin_project_lite::pin_project;
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
|
||||
pin_project! {
|
||||
/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
|
||||
@@ -64,10 +64,7 @@ where
|
||||
{
|
||||
type Item = <St::Item as Future>::Output;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
// First up, try to spawn off as many futures as possible by filling up
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use crate::stream::{Fuse, FuturesOrdered, StreamExt};
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use futures_core::future::Future;
|
||||
use futures_core::ready;
|
||||
use futures_core::stream::Stream;
|
||||
@@ -6,8 +8,6 @@ use futures_core::task::{Context, Poll};
|
||||
#[cfg(feature = "sink")]
|
||||
use futures_sink::Sink;
|
||||
use pin_project_lite::pin_project;
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
|
||||
pin_project! {
|
||||
/// Stream for the [`buffered`](super::StreamExt::buffered) method.
|
||||
@@ -45,11 +45,7 @@ where
|
||||
{
|
||||
pub(super) fn new(stream: St, n: usize) -> Self {
|
||||
assert!(n > 0);
|
||||
Self {
|
||||
stream: super::Fuse::new(stream),
|
||||
in_progress_queue: FuturesOrdered::new(),
|
||||
max: n,
|
||||
}
|
||||
Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n }
|
||||
}
|
||||
|
||||
delegate_access_inner!(stream, St, (.));
|
||||
@@ -62,10 +58,7 @@ where
|
||||
{
|
||||
type Item = <St::Item as Future>::Output;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
// First up, try to spawn off as many futures as possible by filling up
|
||||
@@ -80,7 +73,7 @@ where
|
||||
// Attempt to pull the next value from the in_progress_queue
|
||||
let res = this.in_progress_queue.poll_next_unpin(cx);
|
||||
if let Some(val) = ready!(res) {
|
||||
return Poll::Ready(Some(val))
|
||||
return Poll::Ready(Some(val));
|
||||
}
|
||||
|
||||
// If more values are still coming from the stream, we're not done yet
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::stream::{FuturesUnordered, StreamExt};
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use core::num::NonZeroUsize;
|
||||
use core::pin::Pin;
|
||||
use futures_core::future::{FusedFuture, Future};
|
||||
use futures_core::stream::Stream;
|
||||
use futures_core::task::{Context, Poll};
|
||||
@@ -35,7 +35,8 @@ where
|
||||
}
|
||||
|
||||
impl<St, Fut, F> ForEachConcurrent<St, Fut, F>
|
||||
where St: Stream,
|
||||
where
|
||||
St: Stream,
|
||||
F: FnMut(St::Item) -> Fut,
|
||||
Fut: Future<Output = ()>,
|
||||
{
|
||||
@@ -51,7 +52,8 @@ where St: Stream,
|
||||
}
|
||||
|
||||
impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
|
||||
where St: Stream,
|
||||
where
|
||||
St: Stream,
|
||||
F: FnMut(St::Item) -> Fut,
|
||||
Fut: Future<Output = ()>,
|
||||
{
|
||||
@@ -61,7 +63,8 @@ impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
|
||||
}
|
||||
|
||||
impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
|
||||
where St: Stream,
|
||||
where
|
||||
St: Stream,
|
||||
F: FnMut(St::Item) -> Fut,
|
||||
Fut: Future<Output = ()>,
|
||||
{
|
||||
@@ -80,7 +83,7 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
|
||||
Poll::Ready(Some(elem)) => {
|
||||
made_progress_this_iter = true;
|
||||
Some(elem)
|
||||
},
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
stream_completed = true;
|
||||
None
|
||||
@@ -102,9 +105,9 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
|
||||
Poll::Ready(Some(())) => made_progress_this_iter = true,
|
||||
Poll::Ready(None) => {
|
||||
if this.stream.is_none() {
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
}
|
||||
},
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use futures_core::ready;
|
||||
use futures_core::stream::Stream;
|
||||
use futures_core::task::{Context, Poll};
|
||||
use futures_sink::Sink;
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
|
||||
use crate::lock::BiLock;
|
||||
|
||||
@@ -20,7 +20,8 @@ impl<S: Unpin> SplitStream<S> {
|
||||
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
|
||||
/// a matching pair originating from the same call to `StreamExt::split`.
|
||||
pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
|
||||
where S: Sink<Item>,
|
||||
where
|
||||
S: Sink<Item>,
|
||||
{
|
||||
other.reunite(self)
|
||||
}
|
||||
@@ -36,10 +37,7 @@ impl<S: Stream> Stream for SplitStream<S> {
|
||||
|
||||
#[allow(bad_style)]
|
||||
fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
|
||||
SplitSink {
|
||||
lock,
|
||||
slot: None,
|
||||
}
|
||||
SplitSink { lock, slot: None }
|
||||
}
|
||||
|
||||
/// A `Sink` part of the split pair
|
||||
@@ -58,14 +56,16 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
|
||||
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
|
||||
/// a matching pair originating from the same call to `StreamExt::split`.
|
||||
pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
|
||||
self.lock.reunite(other.0).map_err(|err| {
|
||||
ReuniteError(SplitSink(err.0), SplitStream(err.1))
|
||||
})
|
||||
self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Sink<Item>, Item> SplitSink<S, Item> {
|
||||
fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
fn poll_flush_slot(
|
||||
mut inner: Pin<&mut S>,
|
||||
slot: &mut Option<Item>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), S::Error>> {
|
||||
if slot.is_some() {
|
||||
ready!(inner.as_mut().poll_ready(cx))?;
|
||||
Poll::Ready(inner.start_send(slot.take().unwrap()))
|
||||
@@ -74,7 +74,10 @@ impl<S: Sink<Item>, Item> SplitSink<S, Item> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
fn poll_lock_and_flush_slot(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), S::Error>> {
|
||||
let this = &mut *self;
|
||||
let mut inner = ready!(this.lock.poll_lock(cx));
|
||||
Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
|
||||
@@ -127,9 +130,7 @@ pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
|
||||
|
||||
impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_tuple("ReuniteError")
|
||||
.field(&"...")
|
||||
.finish()
|
||||
f.debug_tuple("ReuniteError").field(&"...").finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::stream::{FuturesUnordered, StreamExt};
|
||||
use core::fmt;
|
||||
use core::mem;
|
||||
use core::pin::Pin;
|
||||
use core::num::NonZeroUsize;
|
||||
use core::pin::Pin;
|
||||
use futures_core::future::{FusedFuture, Future};
|
||||
use futures_core::stream::Stream;
|
||||
use futures_core::task::{Context, Poll};
|
||||
@@ -37,7 +37,8 @@ where
|
||||
}
|
||||
|
||||
impl<St, Fut, F, E> FusedFuture for TryForEachConcurrent<St, Fut, F>
|
||||
where St: Stream,
|
||||
where
|
||||
St: Stream,
|
||||
F: FnMut(St::Item) -> Fut,
|
||||
Fut: Future<Output = Result<(), E>>,
|
||||
{
|
||||
@@ -47,7 +48,8 @@ impl<St, Fut, F, E> FusedFuture for TryForEachConcurrent<St, Fut, F>
|
||||
}
|
||||
|
||||
impl<St, Fut, F, E> TryForEachConcurrent<St, Fut, F>
|
||||
where St: Stream,
|
||||
where
|
||||
St: Stream,
|
||||
F: FnMut(St::Item) -> Fut,
|
||||
Fut: Future<Output = Result<(), E>>,
|
||||
{
|
||||
@@ -63,7 +65,8 @@ where St: Stream,
|
||||
}
|
||||
|
||||
impl<St, Fut, F, E> Future for TryForEachConcurrent<St, Fut, F>
|
||||
where St: Stream,
|
||||
where
|
||||
St: Stream,
|
||||
F: FnMut(St::Item) -> Fut,
|
||||
Fut: Future<Output = Result<(), E>>,
|
||||
{
|
||||
@@ -85,7 +88,7 @@ impl<St, Fut, F, E> Future for TryForEachConcurrent<St, Fut, F>
|
||||
Poll::Ready(Some(elem)) => {
|
||||
made_progress_this_iter = true;
|
||||
Some(elem)
|
||||
},
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
this.stream.set(None);
|
||||
None
|
||||
@@ -102,9 +105,9 @@ impl<St, Fut, F, E> Future for TryForEachConcurrent<St, Fut, F>
|
||||
Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
|
||||
Poll::Ready(None) => {
|
||||
if this.stream.is_none() {
|
||||
return Poll::Ready(Ok(()))
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
},
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
// Empty the stream and futures so that we know
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use crate::stream::{Fuse, FuturesUnordered, StreamExt, IntoStream};
|
||||
use crate::future::{IntoFuture, TryFutureExt};
|
||||
use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt};
|
||||
use core::pin::Pin;
|
||||
use futures_core::future::TryFuture;
|
||||
use futures_core::stream::{Stream, TryStream};
|
||||
use futures_core::task::{Context, Poll};
|
||||
#[cfg(feature = "sink")]
|
||||
use futures_sink::Sink;
|
||||
use pin_project_lite::pin_project;
|
||||
use core::pin::Pin;
|
||||
|
||||
pin_project! {
|
||||
/// Stream for the
|
||||
@@ -24,7 +24,8 @@ pin_project! {
|
||||
}
|
||||
|
||||
impl<St> TryBufferUnordered<St>
|
||||
where St: TryStream,
|
||||
where
|
||||
St: TryStream,
|
||||
St::Ok: TryFuture,
|
||||
{
|
||||
pub(super) fn new(stream: St, n: usize) -> Self {
|
||||
@@ -39,15 +40,13 @@ impl<St> TryBufferUnordered<St>
|
||||
}
|
||||
|
||||
impl<St> Stream for TryBufferUnordered<St>
|
||||
where St: TryStream,
|
||||
where
|
||||
St: TryStream,
|
||||
St::Ok: TryFuture<Error = St::Error>,
|
||||
{
|
||||
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
// First up, try to spawn off as many futures as possible by filling up
|
||||
@@ -77,7 +76,8 @@ impl<St> Stream for TryBufferUnordered<St>
|
||||
// Forwarding impl of Sink from the underlying stream
|
||||
#[cfg(feature = "sink")]
|
||||
impl<S, Item, E> Sink<Item> for TryBufferUnordered<S>
|
||||
where S: TryStream + Sink<Item, Error = E>,
|
||||
where
|
||||
S: TryStream + Sink<Item, Error = E>,
|
||||
S::Ok: TryFuture<Error = E>,
|
||||
{
|
||||
type Error = E;
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream};
|
||||
use crate::future::{IntoFuture, TryFutureExt};
|
||||
use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt};
|
||||
use core::pin::Pin;
|
||||
use futures_core::future::TryFuture;
|
||||
use futures_core::stream::{Stream, TryStream};
|
||||
use futures_core::task::{Context, Poll};
|
||||
#[cfg(feature = "sink")]
|
||||
use futures_sink::Sink;
|
||||
use pin_project_lite::pin_project;
|
||||
use core::pin::Pin;
|
||||
|
||||
pin_project! {
|
||||
/// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method.
|
||||
@@ -47,10 +47,7 @@ where
|
||||
{
|
||||
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
// First up, try to spawn off as many futures as possible by filling up
|
||||
|
||||
@@ -1,27 +1,27 @@
|
||||
use futures::future;
|
||||
use futures::executor::block_on;
|
||||
use futures::channel::oneshot::{self, Canceled};
|
||||
use futures::executor::block_on;
|
||||
use futures::future;
|
||||
use std::sync::mpsc::{channel, TryRecvError};
|
||||
|
||||
mod support;
|
||||
use support::*;
|
||||
// mod support;
|
||||
// use support::*;
|
||||
|
||||
fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
|
||||
match r {
|
||||
Ok(Either::Left((t, _))) |
|
||||
Ok(Either::Right((t, _))) => Ok(t),
|
||||
Err(Either::Left((e, _))) |
|
||||
Err(Either::Right((e, _))) => Err(e),
|
||||
Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t),
|
||||
Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn result_smoke() {
|
||||
fn is_future_v<A, B, C>(_: C)
|
||||
where A: Send + 'static,
|
||||
where
|
||||
A: Send + 'static,
|
||||
B: Send + 'static,
|
||||
C: Future<Item=A, Error=B>
|
||||
{}
|
||||
C: Future<Item = A, Error = B>,
|
||||
{
|
||||
}
|
||||
|
||||
is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
|
||||
is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
|
||||
@@ -64,7 +64,9 @@ fn result_smoke() {
|
||||
|
||||
#[test]
|
||||
fn test_empty() {
|
||||
fn empty() -> Empty<i32, u32> { future::empty() }
|
||||
fn empty() -> Empty<i32, u32> {
|
||||
future::empty()
|
||||
}
|
||||
|
||||
assert_empty(|| empty());
|
||||
assert_empty(|| empty().select(empty()));
|
||||
@@ -105,16 +107,22 @@ fn flatten() {
|
||||
|
||||
#[test]
|
||||
fn smoke_oneshot() {
|
||||
assert_done(|| {
|
||||
assert_done(
|
||||
|| {
|
||||
let (c, p) = oneshot::channel();
|
||||
c.send(1).unwrap();
|
||||
p
|
||||
}, Ok(1));
|
||||
assert_done(|| {
|
||||
},
|
||||
Ok(1),
|
||||
);
|
||||
assert_done(
|
||||
|| {
|
||||
let (c, p) = oneshot::channel::<i32>();
|
||||
drop(c);
|
||||
p
|
||||
}, Err(Canceled));
|
||||
},
|
||||
Err(Canceled),
|
||||
);
|
||||
let mut completes = Vec::new();
|
||||
assert_empty(|| {
|
||||
let (a, b) = oneshot::channel::<i32>();
|
||||
@@ -129,9 +137,7 @@ fn smoke_oneshot() {
|
||||
let (c, p) = oneshot::channel::<i32>();
|
||||
drop(c);
|
||||
let (tx, rx) = channel();
|
||||
p.then(move |_| {
|
||||
tx.send(())
|
||||
}).forget();
|
||||
p.then(move |_| tx.send(())).forget();
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
|
||||
@@ -139,8 +145,14 @@ fn smoke_oneshot() {
|
||||
fn select_cancels() {
|
||||
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |b| { btx.send(b).unwrap(); b });
|
||||
let d = d.map(move |d| { dtx.send(d).unwrap(); d });
|
||||
let b = b.map(move |b| {
|
||||
btx.send(b).unwrap();
|
||||
b
|
||||
});
|
||||
let d = d.map(move |d| {
|
||||
dtx.send(d).unwrap();
|
||||
d
|
||||
});
|
||||
|
||||
let mut f = b.select(d).then(unselect);
|
||||
// assert!(f.poll(&mut Task::new()).is_pending());
|
||||
@@ -156,8 +168,14 @@ fn select_cancels() {
|
||||
|
||||
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, _brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |b| { btx.send(b).unwrap(); b });
|
||||
let d = d.map(move |d| { dtx.send(d).unwrap(); d });
|
||||
let b = b.map(move |b| {
|
||||
btx.send(b).unwrap();
|
||||
b
|
||||
});
|
||||
let d = d.map(move |d| {
|
||||
dtx.send(d).unwrap();
|
||||
d
|
||||
});
|
||||
|
||||
let mut f = b.select(d).then(unselect);
|
||||
assert!(f.poll(lw).ok().unwrap().is_pending());
|
||||
@@ -173,8 +191,14 @@ fn select_cancels() {
|
||||
fn join_cancels() {
|
||||
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, _brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |b| { btx.send(b).unwrap(); b });
|
||||
let d = d.map(move |d| { dtx.send(d).unwrap(); d });
|
||||
let b = b.map(move |b| {
|
||||
btx.send(b).unwrap();
|
||||
b
|
||||
});
|
||||
let d = d.map(move |d| {
|
||||
dtx.send(d).unwrap();
|
||||
d
|
||||
});
|
||||
|
||||
let mut f = b.join(d);
|
||||
drop(a);
|
||||
@@ -185,8 +209,14 @@ fn join_cancels() {
|
||||
|
||||
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, _brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |b| { btx.send(b).unwrap(); b });
|
||||
let d = d.map(move |d| { dtx.send(d).unwrap(); d });
|
||||
let b = b.map(move |b| {
|
||||
btx.send(b).unwrap();
|
||||
b
|
||||
});
|
||||
let d = d.map(move |d| {
|
||||
dtx.send(d).unwrap();
|
||||
d
|
||||
});
|
||||
|
||||
let (tx, rx) = channel();
|
||||
let f = b.join(d);
|
||||
@@ -194,7 +224,8 @@ fn join_cancels() {
|
||||
tx.send(()).unwrap();
|
||||
let res: Result<(), ()> = Ok(());
|
||||
res
|
||||
}).forget();
|
||||
})
|
||||
.forget();
|
||||
assert!(rx.try_recv().is_err());
|
||||
drop(a);
|
||||
rx.recv().unwrap();
|
||||
@@ -243,7 +274,6 @@ fn join_incomplete() {
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn select2() {
|
||||
assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
|
||||
@@ -251,14 +281,15 @@ fn select2() {
|
||||
assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
|
||||
assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
|
||||
|
||||
assert_done(|| {
|
||||
f_ok(1).select(f_ok(2))
|
||||
.map_err(|_| 0)
|
||||
.and_then(|either_tup| {
|
||||
assert_done(
|
||||
|| {
|
||||
f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| {
|
||||
let (a, b) = either_tup.into_inner();
|
||||
b.map(move |b| a + b)
|
||||
})
|
||||
}, Ok(3));
|
||||
},
|
||||
Ok(3),
|
||||
);
|
||||
|
||||
// Finish one half of a select and then fail the second, ensuring that we
|
||||
// get the notification of the second one.
|
||||
@@ -297,8 +328,14 @@ fn select2() {
|
||||
{
|
||||
let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |v| { btx.send(v).unwrap(); v });
|
||||
let d = d.map(move |v| { dtx.send(v).unwrap(); v });
|
||||
let b = b.map(move |v| {
|
||||
btx.send(v).unwrap();
|
||||
v
|
||||
});
|
||||
let d = d.map(move |v| {
|
||||
dtx.send(v).unwrap();
|
||||
v
|
||||
});
|
||||
let f = b.select(d);
|
||||
drop(f);
|
||||
assert!(drx.recv().is_err());
|
||||
@@ -309,8 +346,14 @@ fn select2() {
|
||||
{
|
||||
let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |v| { btx.send(v).unwrap(); v });
|
||||
let d = d.map(move |v| { dtx.send(v).unwrap(); v });
|
||||
let b = b.map(move |v| {
|
||||
btx.send(v).unwrap();
|
||||
v
|
||||
});
|
||||
let d = d.map(move |v| {
|
||||
dtx.send(v).unwrap();
|
||||
v
|
||||
});
|
||||
let mut f = b.select(d);
|
||||
let _res = noop_waker_lw(|lw| f.poll(lw));
|
||||
drop(f);
|
||||
@@ -322,8 +365,14 @@ fn select2() {
|
||||
{
|
||||
let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
|
||||
let ((btx, brx), (dtx, drx)) = (channel(), channel());
|
||||
let b = b.map(move |v| { btx.send(v).unwrap(); v });
|
||||
let d = d.map(move |v| { dtx.send(v).unwrap(); v });
|
||||
let b = b.map(move |v| {
|
||||
btx.send(v).unwrap();
|
||||
v
|
||||
});
|
||||
let d = d.map(move |v| {
|
||||
dtx.send(v).unwrap();
|
||||
v
|
||||
});
|
||||
let (tx, rx) = channel();
|
||||
b.select(d).map(move |_| tx.send(()).unwrap()).forget();
|
||||
drop(a);
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use futures::task;
|
||||
use futures::stream;
|
||||
use futures::future;
|
||||
use futures::stream;
|
||||
use futures::task;
|
||||
use futures_util::lock::BiLock;
|
||||
use std::thread;
|
||||
|
||||
mod support;
|
||||
use support::*;
|
||||
// mod support;
|
||||
// use support::*;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
@@ -51,10 +51,7 @@ fn concurrent() {
|
||||
const N: usize = 10000;
|
||||
let (a, b) = BiLock::new(0);
|
||||
|
||||
let a = Increment {
|
||||
a: Some(a),
|
||||
remaining: N,
|
||||
};
|
||||
let a = Increment { a: Some(a), remaining: N };
|
||||
let b = stream::iter_ok(0..N).fold(b, |b, _n| {
|
||||
b.lock().map(|mut b| {
|
||||
*b += 1;
|
||||
@@ -89,7 +86,7 @@ fn concurrent() {
|
||||
fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
|
||||
loop {
|
||||
if self.remaining == 0 {
|
||||
return Ok(self.a.take().unwrap().into())
|
||||
return Ok(self.a.take().unwrap().into());
|
||||
}
|
||||
|
||||
let a = self.a.as_ref().unwrap();
|
||||
|
||||
@@ -1,26 +1,26 @@
|
||||
use futures::channel::mpsc;
|
||||
use futures::channel::oneshot;
|
||||
use futures::executor::{block_on, block_on_stream};
|
||||
use futures::future::{err, ok};
|
||||
use futures::stream::{empty, iter_ok, poll_fn, Peekable};
|
||||
use futures::channel::oneshot;
|
||||
use futures::channel::mpsc;
|
||||
|
||||
mod support;
|
||||
use support::*;
|
||||
// mod support;
|
||||
// use support::*;
|
||||
|
||||
pub struct Iter<I> {
|
||||
iter: I,
|
||||
}
|
||||
|
||||
pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
|
||||
where J: IntoIterator<Item=Result<T, E>>,
|
||||
where
|
||||
J: IntoIterator<Item = Result<T, E>>,
|
||||
{
|
||||
Iter {
|
||||
iter: i.into_iter(),
|
||||
}
|
||||
Iter { iter: i.into_iter() }
|
||||
}
|
||||
|
||||
impl<I, T, E> Stream for Iter<I>
|
||||
where I: Iterator<Item=Result<T, E>>,
|
||||
where
|
||||
I: Iterator<Item = Result<T, E>>,
|
||||
{
|
||||
type Item = T;
|
||||
type Error = E;
|
||||
@@ -34,21 +34,15 @@ impl<I, T, E> Stream for Iter<I>
|
||||
}
|
||||
}
|
||||
|
||||
fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
|
||||
fn list() -> Box<Stream<Item = i32, Error = u32> + Send> {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
tx.send(Ok(1))
|
||||
.and_then(|tx| tx.send(Ok(2)))
|
||||
.and_then(|tx| tx.send(Ok(3)))
|
||||
.forget();
|
||||
tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget();
|
||||
Box::new(rx.then(|r| r.unwrap()))
|
||||
}
|
||||
|
||||
fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
|
||||
fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
tx.send(Ok(1))
|
||||
.and_then(|tx| tx.send(Ok(2)))
|
||||
.and_then(|tx| tx.send(Err(3)))
|
||||
.forget();
|
||||
tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget();
|
||||
Box::new(rx.then(|r| r.unwrap()))
|
||||
}
|
||||
|
||||
@@ -89,40 +83,31 @@ fn filter() {
|
||||
|
||||
#[test]
|
||||
fn filter_map() {
|
||||
assert_done(|| list().filter_map(|x| {
|
||||
ok(if x % 2 == 0 {
|
||||
Some(x + 10)
|
||||
} else {
|
||||
None
|
||||
})
|
||||
}).collect(), Ok(vec![12]));
|
||||
assert_done(
|
||||
|| list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(),
|
||||
Ok(vec![12]),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn and_then() {
|
||||
assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
|
||||
assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(),
|
||||
Err(1));
|
||||
assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn then() {
|
||||
assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn or_else() {
|
||||
assert_done(|| err_list().or_else(|a| {
|
||||
ok::<i32, u32>(a as i32)
|
||||
}).collect(), Ok(vec![1, 2, 3]));
|
||||
assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flatten() {
|
||||
assert_done(|| list().map(|_| list()).flatten().collect(),
|
||||
Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
|
||||
|
||||
assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -132,9 +117,7 @@ fn skip() {
|
||||
|
||||
#[test]
|
||||
fn skip_passes_errors_through() {
|
||||
let mut s = block_on_stream(
|
||||
iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)
|
||||
);
|
||||
let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1));
|
||||
assert_eq!(s.next(), Some(Err(1)));
|
||||
assert_eq!(s.next(), Some(Err(2)));
|
||||
assert_eq!(s.next(), Some(Ok(4)));
|
||||
@@ -144,8 +127,7 @@ fn skip_passes_errors_through() {
|
||||
|
||||
#[test]
|
||||
fn skip_while() {
|
||||
assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
|
||||
Ok(vec![2, 3]));
|
||||
assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3]));
|
||||
}
|
||||
#[test]
|
||||
fn take() {
|
||||
@@ -154,8 +136,7 @@ fn take() {
|
||||
|
||||
#[test]
|
||||
fn take_while() {
|
||||
assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
|
||||
Ok(vec![1, 2]));
|
||||
assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -193,7 +174,7 @@ fn buffered() {
|
||||
let (a, b) = oneshot::channel::<u32>();
|
||||
let (c, d) = oneshot::channel::<u32>();
|
||||
|
||||
tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
|
||||
tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
|
||||
.and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
|
||||
.forget();
|
||||
|
||||
@@ -211,7 +192,7 @@ fn buffered() {
|
||||
let (a, b) = oneshot::channel::<u32>();
|
||||
let (c, d) = oneshot::channel::<u32>();
|
||||
|
||||
tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
|
||||
tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
|
||||
.and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
|
||||
.forget();
|
||||
|
||||
@@ -267,21 +248,17 @@ fn unordered() {
|
||||
|
||||
#[test]
|
||||
fn zip() {
|
||||
assert_done(|| list().zip(list()).collect(),
|
||||
Ok(vec![(1, 1), (2, 2), (3, 3)]));
|
||||
assert_done(|| list().zip(list().take(2)).collect(),
|
||||
Ok(vec![(1, 1), (2, 2)]));
|
||||
assert_done(|| list().take(2).zip(list()).collect(),
|
||||
Ok(vec![(1, 1), (2, 2)]));
|
||||
assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)]));
|
||||
assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)]));
|
||||
assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)]));
|
||||
assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
|
||||
assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
|
||||
Ok(vec![(1, 2), (2, 3), (3, 4)]));
|
||||
assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peek() {
|
||||
struct Peek {
|
||||
inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
|
||||
inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>,
|
||||
}
|
||||
|
||||
impl Future for Peek {
|
||||
@@ -299,15 +276,12 @@ fn peek() {
|
||||
}
|
||||
}
|
||||
|
||||
block_on(Peek {
|
||||
inner: list().peekable(),
|
||||
}).unwrap()
|
||||
block_on(Peek { inner: list().peekable() }).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait() {
|
||||
assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(),
|
||||
Ok(vec![1, 2, 3]));
|
||||
assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -337,8 +311,10 @@ fn forward() {
|
||||
let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
|
||||
assert_eq!(v, vec![0, 1, 2, 3]);
|
||||
|
||||
assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
|
||||
Ok(vec![0, 1, 2, 3, 4, 5]));
|
||||
assert_done(
|
||||
move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
|
||||
Ok(vec![0, 1, 2, 3, 4, 5]),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
26
tools/fmt.sh
Executable file
26
tools/fmt.sh
Executable file
@@ -0,0 +1,26 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Format all rust code.
|
||||
#
|
||||
# Usage:
|
||||
# ./tools/fmt.sh
|
||||
#
|
||||
# This script is needed because `cargo fmt` cannot recognize modules defined inside macros.
|
||||
# Refs: https://github.com/rust-lang/rustfmt/issues/4078
|
||||
|
||||
set -euo pipefail
|
||||
IFS=$'\n\t'
|
||||
|
||||
cd "$(cd "$(dirname "${0}")" && pwd)"/..
|
||||
|
||||
# shellcheck disable=SC2046
|
||||
if [[ -z "${CI:-}" ]]; then
|
||||
(
|
||||
# `cargo fmt` cannot recognize modules defined inside macros so run rustfmt directly.
|
||||
rustfmt $(git ls-files "*.rs")
|
||||
)
|
||||
else
|
||||
(
|
||||
rustfmt --check $(git ls-files "*.rs")
|
||||
)
|
||||
fi
|
||||
Reference in New Issue
Block a user