mirror of
https://github.com/tokio-rs/tokio.git
synced 2026-01-25 07:47:52 +00:00
sync: return TryRecvError::Disconnected from Receiver::try_recv after Receiver::close (#7686)
(cherry picked from commit d060401f6c)
This commit is contained in:
@@ -439,6 +439,10 @@ impl<T, S: Semaphore> Rx<T, S> {
|
||||
return Ok(value);
|
||||
}
|
||||
TryPopResult::Closed => return Err(TryRecvError::Disconnected),
|
||||
// If close() was called, an empty queue should report Disconnected.
|
||||
TryPopResult::Empty if rx_fields.rx_closed => {
|
||||
return Err(TryRecvError::Disconnected)
|
||||
}
|
||||
TryPopResult::Empty => return Err(TryRecvError::Empty),
|
||||
TryPopResult::Busy => {} // fall through
|
||||
}
|
||||
|
||||
@@ -35,8 +35,14 @@ pub(crate) enum TryPopResult<T> {
|
||||
/// Successfully popped a value.
|
||||
Ok(T),
|
||||
/// The channel is empty.
|
||||
///
|
||||
/// Note that `list.rs` only tracks the close state set by senders. If the
|
||||
/// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still
|
||||
/// returned, and the close state needs to be handled by `chan.rs`.
|
||||
Empty,
|
||||
/// The channel is empty and closed.
|
||||
///
|
||||
/// Returned when the send half is closed (all senders dropped).
|
||||
Closed,
|
||||
/// The channel is not empty, but the first value is being written.
|
||||
Busy,
|
||||
|
||||
@@ -7,9 +7,7 @@ use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
use tokio::sync::broadcast;
|
||||
use tokio_test::task;
|
||||
use tokio_test::{
|
||||
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
|
||||
};
|
||||
use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_err, assert_ready_ok};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
||||
@@ -966,6 +966,15 @@ fn try_recv_unbounded() {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_recv_after_receiver_close() {
|
||||
let (_tx, mut rx) = mpsc::channel::<()>(5);
|
||||
|
||||
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
|
||||
rx.close();
|
||||
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_recv_close_while_empty_bounded() {
|
||||
let (tx, mut rx) = mpsc::channel::<()>(5);
|
||||
|
||||
Reference in New Issue
Block a user