Add Send bounds everywhere, rewrite oneshot

Client is now Send (but not Sync yet).

The new implementation of the oneshot channel has cloneable senders
and is Send for non-Sync values.
Non-consuming read-only access to the contained value via peek
is now behind an explicit T: Sync requirement.

Renames Receiver::recv_nonblocking to Receiver::recv_now to be more accurate.

Moves the modules in channel to their own files.
This commit is contained in:
TheDaemoness
2024-04-09 15:42:13 -07:00
parent bbb6de2dcc
commit b9a938b3ab
15 changed files with 454 additions and 312 deletions

View File

@@ -47,7 +47,7 @@ fn main() -> std::io::Result<()> {
client.run()?;
// If we're here, the handler finished and gave us a value back.
// Let's fetch it and see what it is!
let reg = reg_result.0.recv_nonblocking().unwrap()?;
let reg = reg_result.0.recv_now().unwrap()?;
// Connection registration is done!
// But how does the network we connected to choose to name itself?
// ISUPPORT is vital for understanding the capabilities of the target network,

View File

@@ -114,7 +114,7 @@ impl<C, A> Client<C, A> {
/// Returns the handler id.
pub fn add_with_sender<T, M: MakeHandler<T>>(
&mut self,
sender: Box<dyn channel::Sender<Value = M::Value>>,
sender: Box<dyn channel::Sender<Value = M::Value> + Send>,
make_handler: M,
value: T,
) -> Result<usize, M::Error> {

View File

@@ -22,7 +22,7 @@ pub fn msg_abort() -> ClientMsg<'static> {
}
/// The logic of a SASL mechanism.
pub trait SaslLogic {
pub trait SaslLogic: Send {
/// Handles data sent by the server.
fn reply<'a>(
&'a mut self,

View File

@@ -82,7 +82,8 @@ impl<'a, T: Sasl> crate::client::MakeHandler<&'a T> for &'a Authenticate {
fn make_channel<Spec: crate::client::channel::ChannelSpec>(
spec: &Spec,
) -> (Box<dyn crate::client::channel::Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn crate::client::channel::Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>)
{
spec.new_oneshot()
}
}

View File

@@ -7,7 +7,7 @@ use channel::*;
/// Generic message handlers, typically intended to handle one expected batch of messages
/// and parse them into a more-useful form.
pub trait Handler: 'static {
pub trait Handler: 'static + Send {
/// The type of values produced by this handler.
type Value: 'static;
/// Processes one message.
@@ -75,7 +75,7 @@ pub trait MakeHandler<T> {
/// This typically just calls one method of the provided [`ChannelSpec`].
fn make_channel<Spec: ChannelSpec>(
spec: &Spec,
) -> (Box<dyn Sender<Value = Self::Value>>, Self::Receiver<Spec>);
) -> (Box<dyn Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>);
}
/// Handlers that can be used directly without any further options.
@@ -91,7 +91,7 @@ pub trait SelfMadeHandler: Handler {
/// This typically just calls one method of the provided [`ChannelSpec`].
fn make_channel<Spec: ChannelSpec>(
spec: &Spec,
) -> (Box<dyn Sender<Value = Self::Value>>, Self::Receiver<Spec>);
) -> (Box<dyn Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>);
}
impl<T: SelfMadeHandler> MakeHandler<T> for () {
@@ -110,12 +110,12 @@ impl<T: SelfMadeHandler> MakeHandler<T> for () {
fn make_channel<Spec: ChannelSpec>(
spec: &Spec,
) -> (Box<dyn Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>) {
T::make_channel(spec)
}
}
type BoxHandler = Box<dyn FnMut(&ServerMsg<'_>, QueueEditGuard<'_>) -> HandlerStatus>;
type BoxHandler = Box<dyn FnMut(&ServerMsg<'_>, QueueEditGuard<'_>) -> HandlerStatus + Send>;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum HandlerStatus {
@@ -125,7 +125,7 @@ enum HandlerStatus {
fn box_handler<T: 'static>(
mut handler: impl Handler<Value = T>,
mut sender: Box<dyn Sender<Value = T>>,
mut sender: Box<dyn Sender<Value = T> + Send>,
) -> BoxHandler {
Box::new(move |msg, queue| {
let mut yielded = false;
@@ -160,7 +160,7 @@ impl Handlers {
pub fn add<T: 'static>(
&mut self,
handler: impl Handler<Value = T>,
sender: Box<dyn Sender<Value = T>>,
sender: Box<dyn Sender<Value = T> + Send>,
) -> usize {
self.wants_owning |= handler.wants_owning();
let id = self.finished.pop().unwrap_or(self.handlers.len());

View File

@@ -1,8 +1,14 @@
//! Abstractions for returning data from handlers.
//! Abstractions for returning data from handlers,
//! as well as implementations of channels and synchronization that's missing from std.
//!
//! No relation to IRC channels.
/// Send halves of non-blocking channels
pub mod oneshot;
pub mod parker;
#[cfg(test)]
mod tests;
/// Send halves of non-blocking channels.
pub trait Sender {
/// The type of values consumed by this channel.
type Value;
@@ -72,7 +78,7 @@ impl<T> Sender for ClosedSender<T> {
}
}
impl<T: 'static> Sender for std::sync::mpsc::Sender<T> {
impl<T> Sender for std::sync::mpsc::Sender<T> {
type Value = T;
fn send(&mut self, value: T) -> SendCont {
@@ -99,7 +105,7 @@ impl<T> Sender for Option<tokio::sync::oneshot::Sender<T>> {
}
#[cfg(feature = "tokio")]
impl<T: 'static> Sender for tokio::sync::mpsc::UnboundedSender<T> {
impl<T> Sender for tokio::sync::mpsc::UnboundedSender<T> {
type Value = T;
fn send(&mut self, value: T) -> SendCont {
@@ -112,7 +118,7 @@ impl<T: 'static> Sender for tokio::sync::mpsc::UnboundedSender<T> {
}
#[cfg(feature = "tokio")]
impl<T: 'static> Sender for tokio::sync::mpsc::WeakUnboundedSender<T> {
impl<T> Sender for tokio::sync::mpsc::WeakUnboundedSender<T> {
type Value = T;
fn send(&mut self, value: T) -> SendCont {
@@ -135,10 +141,12 @@ pub trait ChannelSpec {
type Queue<T>;
/// Creates a new oneshot channel, the sender half of which is boxed.
fn new_oneshot<T: 'static>(&self) -> (Box<dyn Sender<Value = T>>, Self::Oneshot<T>);
fn new_oneshot<T: 'static + Send>(
&self,
) -> (Box<dyn Sender<Value = T> + Send>, Self::Oneshot<T>);
/// Creates a new queue channel, the sender half of which is boxed.
fn new_queue<T: 'static>(&self) -> (Box<dyn Sender<Value = T>>, Self::Queue<T>);
fn new_queue<T: 'static + Send>(&self) -> (Box<dyn Sender<Value = T> + Send>, Self::Queue<T>);
}
/// [`ChannelSpec`] for thread-safe synchronous channels.
@@ -152,13 +160,15 @@ impl ChannelSpec for SyncChannels {
type Queue<T> = std::sync::mpsc::Receiver<T>;
fn new_oneshot<T: 'static>(&self) -> (Box<dyn Sender<Value = T>>, Self::Oneshot<T>) {
fn new_oneshot<T: 'static + Send>(
&self,
) -> (Box<dyn Sender<Value = T> + Send>, Self::Oneshot<T>) {
let (send, recv) = self::oneshot::channel();
let (unparker, parker) = self::parker::new(Some(send));
(Box::new(unparker), (recv, parker))
}
fn new_queue<T: 'static>(&self) -> (Box<dyn Sender<Value = T>>, Self::Queue<T>) {
fn new_queue<T: 'static + Send>(&self) -> (Box<dyn Sender<Value = T> + Send>, Self::Queue<T>) {
let (send, recv) = std::sync::mpsc::channel();
(Box::new(send), recv)
}
@@ -170,287 +180,15 @@ impl ChannelSpec for TokioChannels {
type Queue<T> = tokio::sync::mpsc::UnboundedReceiver<T>;
fn new_oneshot<T: 'static>(&self) -> (Box<dyn Sender<Value = T>>, Self::Oneshot<T>) {
fn new_oneshot<T: 'static + Send>(
&self,
) -> (Box<dyn Sender<Value = T> + Send>, Self::Oneshot<T>) {
let (send, recv) = tokio::sync::oneshot::channel();
(Box::new(Some(send)), recv)
}
fn new_queue<T: 'static>(&self) -> (Box<dyn Sender<Value = T>>, Self::Queue<T>) {
fn new_queue<T: 'static + Send>(&self) -> (Box<dyn Sender<Value = T> + Send>, Self::Queue<T>) {
let (send, recv) = tokio::sync::mpsc::unbounded_channel();
(Box::new(send), recv)
}
}
pub mod parker {
//! Utilities for temporarily parking threads, awaiting some activity on another thread.
//!
//! These can be used to turn non-blocking channels into blocking ones in synchronous code.
//! They are designed for mpsc usecases, allowing multiple threads to unpark one thread.
use std::mem::ManuallyDrop;
use std::sync::{
atomic::{AtomicPtr, Ordering},
Arc, Weak,
};
use std::thread::Thread;
/// Global location whose address we can use to indicate that a [`Parker`] should skip parking.
static mut SKIP_PARKING: std::mem::MaybeUninit<Thread> = std::mem::MaybeUninit::uninit();
/// A wrapped [`Sender`][super::Sender] that can unpark a thread blocked by a [`Parker`].
#[derive(Clone, Debug, Default)]
pub struct Unparker<S>(S, ManuallyDrop<Arc<AtomicPtr<Thread>>>);
/// A synchronization primitive for parking the thread indefinitely pending activity
/// on a thread with an [`Unparker`].
#[derive(Debug)]
pub struct Parker(Weak<AtomicPtr<Thread>>);
/// Creates a new [`Unparker`] from the provided sender,
/// also returning a [`Parker`] for that unparker.
pub fn new<S>(sender: S) -> (Unparker<S>, Parker) {
let arc = Arc::new(AtomicPtr::new(std::ptr::null_mut()));
let weak = Arc::downgrade(&arc);
(Unparker(sender, ManuallyDrop::new(arc)), Parker(weak))
}
impl<S> Drop for Unparker<S> {
fn drop(&mut self) {
let Some(arc) = Arc::into_inner(unsafe { ManuallyDrop::take(&mut self.1) }) else {
return;
};
let ptr = arc.into_inner();
if ptr != unsafe { SKIP_PARKING.as_mut_ptr() } {
if let Some(th) = unsafe { ptr.as_ref() } {
th.clone().unpark();
}
}
}
}
impl<S> Unparker<S> {
/// Unparks a thread that is blocked by a [`Parker`].
/// If no thread is parked for this unparker, skips the next parking operation.
///
/// This generally doesn't need to be called manually unless `S` is not a sender.
pub fn unpark(&self) {
let ptr = self.1.swap(unsafe { SKIP_PARKING.as_mut_ptr() }, Ordering::AcqRel);
if ptr != unsafe { SKIP_PARKING.as_mut_ptr() } {
if let Some(th) = unsafe { ptr.as_ref() } {
th.clone().unpark();
}
}
}
}
impl<S: super::Sender> super::Sender for Unparker<S> {
type Value = <S as super::Sender>::Value;
fn send(&mut self, value: Self::Value) -> super::SendCont {
let result = self.0.send(value);
if result != super::SendCont::Closed {
self.unpark();
}
result
}
fn may_send(&self) -> bool {
self.0.may_send()
}
}
impl Parker {
/// Block this thread until either all [`Unparker`]s are dropped or
/// until one of them unparks this thread.
pub fn park(&self) {
let Some(strong) = self.0.upgrade() else {
return;
};
let mut th = std::thread::current();
if strong
.compare_exchange(
std::ptr::null_mut(),
&mut th as *mut Thread,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
if Arc::into_inner(strong).is_none() {
std::thread::park();
} else {
// No unparkers left!
return;
}
}
let Some(strong) = self.0.upgrade() else {
return;
};
strong.store(std::ptr::null_mut(), Ordering::Release);
}
}
}
pub mod oneshot {
//! An implementation of a non-blocking oneshot channel.
//!
//! This is essentially just a [`OnceLock`][std::sync::OnceLock] in an [`Arc`].
//! It offers no means of blocking.
//! This makes it safe to use in `async` contexts, but means that users of these types
//! need to work out synchronization to prevent premature reads.
//!
//! Consider using a [`Parker`][super::parker::Parker] if synchronization is needed.
use super::parker::Parker;
use std::sync::{Arc, OnceLock, Weak};
/// The sender portion of a oneshot channel.
#[derive(Debug)]
pub struct Sender<T>(Weak<OnceLock<T>>);
/// The reciever portion of a oneshot channel.
#[derive(Debug)]
pub struct Receiver<T>(Arc<OnceLock<T>>);
/// Creates a new oneshot channel for sending single values.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let strong = Arc::new(OnceLock::new());
let weak = Arc::downgrade(&strong);
(Sender(weak), Receiver(strong))
}
impl<T> Sender<T> {
/// Returns `true` if sends on this channel are guaranteed to fail.
pub fn is_closed(&self) -> bool {
self.0.strong_count() == 0
}
/// Attempts to send a value over this channel.
pub fn send(self, value: T) -> Result<(), T> {
let Some(strong) = self.0.upgrade() else {
return Err(value);
};
strong.set(value)?;
if let Some(existing) = Arc::into_inner(strong).and_then(OnceLock::into_inner) {
Err(existing)
} else {
Ok(())
}
}
}
impl<T> super::Sender for Option<Sender<T>> {
type Value = T;
fn send(&mut self, value: Self::Value) -> super::SendCont {
let Some(sender) = self.take() else {
return super::SendCont::Closed;
};
if sender.send(value).is_ok() {
super::SendCont::SentClosed
} else {
super::SendCont::Closed
}
}
fn may_send(&self) -> bool {
self.as_ref().is_some_and(|snd| !snd.is_closed())
}
}
impl<T> Receiver<T> {
/// Returns a reference to the value that is ready to be received, if any.
pub fn peek(&self) -> Option<&T> {
self.0.get()
}
/// Receives a value over this channel, consuming the receiver.
///
/// This method never blocks.
pub fn recv_nonblocking(self) -> Option<T> {
Arc::into_inner(self.0).and_then(OnceLock::into_inner)
}
/// Receives a value over this channel, blocking until one is present.
pub fn recv(self, parker: &Parker) -> Option<T> {
if self.peek().is_none() {
parker.park();
}
self.recv_nonblocking()
}
}
}
#[cfg(test)]
mod tests {
/// Park, expecting the thread to be blocked for 100 to 2000 ms.
/// Panic if the time falls outside of this range.
fn timed_park() {
let then = std::time::Instant::now();
std::thread::park_timeout(std::time::Duration::from_secs(2));
let now = std::time::Instant::now();
let diff = now - then;
if diff < std::time::Duration::from_millis(100) {
panic!("probable non-block; parked for {}ms", diff.as_millis());
}
if diff >= std::time::Duration::from_secs(2) {
panic!("probable deadlock; parked for {}ms", diff.as_millis());
}
}
#[test]
fn parker_slow_unpark() {
let (unparker, parker) = super::parker::new(());
let current = std::thread::current();
std::thread::spawn(move || {
parker.park();
current.unpark();
});
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
unparker.unpark();
});
timed_park();
}
#[test]
fn parker_slow_park() {
let (unparker, parker) = super::parker::new(());
let current = std::thread::current();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
parker.park();
current.unpark();
});
std::thread::spawn(move || {
unparker.unpark();
});
timed_park();
}
#[test]
fn unparker_drop_slow_unpark() {
let (unparker1, parker) = super::parker::new(());
let unparker2 = unparker1.clone();
let current = std::thread::current();
std::thread::spawn(move || {
parker.park();
current.unpark();
});
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
std::mem::drop(unparker1);
std::mem::drop(unparker2);
});
timed_park();
}
#[test]
fn unparker_drop_slow_park() {
let (unparker1, parker) = super::parker::new(());
let unparker2 = unparker1.clone();
let current = std::thread::current();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
parker.park();
current.unpark();
});
std::thread::spawn(move || {
std::mem::drop(unparker1);
std::mem::drop(unparker2);
});
timed_park();
}
}

View File

@@ -0,0 +1,197 @@
#![allow(unknown_lints)]
//! An implementation of a non-blocking oneshot channel.
//!
//! This is essentially just a [`OnceLock`][std::sync::OnceLock] under
//! thread-safe shared ownership. It may briefly block to avoid data races,
//! but should be unable to deadlock.
//!
//! Consider using a [`Parker`][super::parker::Parker] if synchronization is needed.
// Unknown lints lint is disabled because dropping_references was added after
// 1.70 (our MSRV). We drop references to be absolutely sure they won't live
// at the same time as the memory they point to getting dropped.
use super::parker::Parker;
use std::{
cell::UnsafeCell,
mem::MaybeUninit,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Once,
},
};
// Summary: This is an mpsc oneshot channel. Only one value can be sent over the channel,
// but it may come from one of several senders.
// The receiver owns an in-transit value and is responsible for cleaning it up.
// The senders have no access to the sent value.
struct Inner<T> {
sender_count: AtomicUsize,
recver: AtomicBool,
lock: Once,
// Manually implement OnceLock so that we can deinit the value but keep the lock.
value: UnsafeCell<MaybeUninit<T>>,
}
/// The receiver portion of a oneshot channel.
#[derive(Debug)]
pub struct Receiver<T>(std::ptr::NonNull<Inner<T>>);
/// The sender portion of a oneshot channel.
#[derive(Debug)]
pub struct Sender<T>(std::ptr::NonNull<Inner<T>>);
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Send for Sender<T> {}
// You can use a `Receiver` to get a `&T`, but only with one method that requires `T: Sync`.
unsafe impl<T> Sync for Receiver<T> {}
// You cannot use a `Sender` to get a `&T`.
unsafe impl<T> Sync for Sender<T> {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
unsafe {
self.0.as_ref().sender_count.fetch_add(1, Ordering::Relaxed);
}
Self(self.0)
}
}
// TODO: All the Acquire orderings going forward are chosen defensively.
// Evaluate if they can be relaxed.
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let inner_ref = unsafe { self.0.as_ref() };
inner_ref.recver.store(false, Ordering::Release);
// Destroy the value, if present.
// Do NOT use Once::is_completed
let mut has_value = true;
inner_ref.lock.call_once(|| has_value = false);
if has_value {
unsafe { inner_ref.value.get().as_mut().unwrap_unchecked().assume_init_drop() };
}
if inner_ref.sender_count.load(Ordering::Acquire) == 0 {
#[allow(dropping_references)]
std::mem::drop(inner_ref);
std::mem::drop(unsafe { Box::from_raw(self.0.as_ptr()) });
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner_ref = unsafe { self.0.as_ref() };
if inner_ref.sender_count.fetch_sub(1, Ordering::Release) == 1 {
#[allow(clippy::collapsible_if)]
if !inner_ref.recver.load(Ordering::Acquire) {
#[allow(dropping_references)]
std::mem::drop(inner_ref);
std::mem::drop(unsafe { Box::from_raw(self.0.as_ptr()) });
}
}
}
}
/// Creates a new oneshot channel for sending single values.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Box::new(Inner {
sender_count: AtomicUsize::new(1),
recver: AtomicBool::new(true),
lock: Once::new(),
value: UnsafeCell::new(MaybeUninit::uninit()),
});
let inner = std::ptr::NonNull::new(Box::into_raw(inner)).unwrap();
(Sender(inner), Receiver(inner))
}
impl<T> Receiver<T> {
/// Returns `false` if a value has been sent over this channel.
pub fn is_empty(&self) -> bool {
let inner_ref = unsafe { self.0.as_ref() };
!inner_ref.lock.is_completed()
}
/// Returns a reference to the value that is ready to be received, if any.
pub fn peek(&self) -> Option<&T>
where
T: Sync,
{
let inner_ref = unsafe { self.0.as_ref() };
inner_ref
.lock
.is_completed()
.then(|| unsafe { inner_ref.value.get().as_ref().unwrap_unchecked().assume_init_ref() })
}
/// Receives a value over this channel, blocking only to allow a send-in-progress
/// to complete.
pub fn recv_now(self) -> Option<T> {
let ptr = self.0;
let inner_ref = unsafe { ptr.as_ref() };
// Absolutely do not want the destructor to run here.
// We have the references we need to the data anyway.
std::mem::forget(self);
inner_ref.recver.store(false, Ordering::Release);
let mut has_value = true;
inner_ref.lock.call_once(|| has_value = false);
let retval = has_value.then(|| unsafe {
inner_ref.value.get().as_ref().unwrap_unchecked().assume_init_read()
});
if inner_ref.sender_count.load(Ordering::Acquire) == 0 {
#[allow(dropping_references)]
std::mem::drop(inner_ref);
std::mem::drop(unsafe { Box::from_raw(ptr.as_ptr()) });
}
retval
}
/// Receives a value over this channel. Parks using the provided `parker` if
/// no value is immediately available to be received.
pub fn recv(self, parker: &Parker) -> Option<T> {
if self.is_empty() {
parker.park();
}
self.recv_now()
}
}
impl<T> Sender<T> {
/// Returns `true` if sends on this channel are guaranteed to fail.
pub fn is_closed(&self) -> bool {
let inner_ref = unsafe { self.0.as_ref() };
!inner_ref.recver.load(Ordering::Relaxed) || inner_ref.lock.is_completed()
}
/// Attempts to send a value over this channel.
pub fn send(self, value: T) -> Result<(), T> {
let mut value = Some(value);
let inner_ref = unsafe { self.0.as_ref() };
if inner_ref.recver.load(Ordering::Relaxed) {
inner_ref.lock.call_once(|| unsafe {
let value = value.take().unwrap_unchecked();
inner_ref.value.get().as_mut().unwrap_unchecked().write(value);
});
}
match value {
Some(value) => Err(value),
None => Ok(()),
}
}
}
impl<T> super::Sender for Option<Sender<T>> {
type Value = T;
fn send(&mut self, value: Self::Value) -> super::SendCont {
let Some(sender) = self.take() else {
return super::SendCont::Closed;
};
if sender.send(value).is_ok() {
super::SendCont::SentClosed
} else {
super::SendCont::Closed
}
}
fn may_send(&self) -> bool {
self.as_ref().is_some_and(|snd| !snd.is_closed())
}
}

View File

@@ -0,0 +1,106 @@
//! Utilities for temporarily parking threads, awaiting some activity on another thread.
//!
//! These can be used to turn non-blocking channels into blocking ones in synchronous code.
//! They are designed for mpsc usecases, allowing multiple threads to unpark one thread.
use std::mem::ManuallyDrop;
use std::sync::{
atomic::{AtomicPtr, Ordering},
Arc, Weak,
};
use std::thread::Thread;
/// Global location whose address we can use to indicate that a [`Parker`] should skip parking.
static mut SKIP_PARKING: std::mem::MaybeUninit<Thread> = std::mem::MaybeUninit::uninit();
/// A wrapped [`Sender`][super::Sender] that can unpark a thread blocked by a [`Parker`].
#[derive(Clone, Debug, Default)]
pub struct Unparker<S>(S, ManuallyDrop<Arc<AtomicPtr<Thread>>>);
/// A synchronization primitive for parking the thread indefinitely pending activity
/// on a thread with an [`Unparker`].
#[derive(Debug)]
pub struct Parker(Weak<AtomicPtr<Thread>>);
/// Creates a new [`Unparker`] from the provided sender,
/// also returning a [`Parker`] for that unparker.
pub fn new<S>(sender: S) -> (Unparker<S>, Parker) {
let arc = Arc::new(AtomicPtr::new(std::ptr::null_mut()));
let weak = Arc::downgrade(&arc);
(Unparker(sender, ManuallyDrop::new(arc)), Parker(weak))
}
impl<S> Drop for Unparker<S> {
fn drop(&mut self) {
let Some(arc) = Arc::into_inner(unsafe { ManuallyDrop::take(&mut self.1) }) else {
return;
};
let ptr = arc.into_inner();
if ptr != unsafe { SKIP_PARKING.as_mut_ptr() } {
if let Some(th) = unsafe { ptr.as_ref() } {
th.clone().unpark();
}
}
}
}
impl<S> Unparker<S> {
/// Unparks a thread that is blocked by a [`Parker`].
/// If no thread is parked for this unparker, skips the next parking operation.
///
/// This generally doesn't need to be called manually unless `S` is not a sender.
pub fn unpark(&self) {
let ptr = self.1.swap(unsafe { SKIP_PARKING.as_mut_ptr() }, Ordering::AcqRel);
if ptr != unsafe { SKIP_PARKING.as_mut_ptr() } {
if let Some(th) = unsafe { ptr.as_ref() } {
th.clone().unpark();
}
}
}
}
impl<S: super::Sender> super::Sender for Unparker<S> {
type Value = <S as super::Sender>::Value;
fn send(&mut self, value: Self::Value) -> super::SendCont {
let result = self.0.send(value);
if result != super::SendCont::Closed {
self.unpark();
}
result
}
fn may_send(&self) -> bool {
self.0.may_send()
}
}
impl Parker {
/// Block this thread until either all [`Unparker`]s are dropped or
/// until one of them unparks this thread.
pub fn park(&self) {
let Some(strong) = self.0.upgrade() else {
return;
};
let mut th = std::thread::current();
if strong
.compare_exchange(
std::ptr::null_mut(),
&mut th as *mut Thread,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
if Arc::into_inner(strong).is_none() {
std::thread::park();
} else {
// No unparkers left!
return;
}
}
let Some(strong) = self.0.upgrade() else {
return;
};
strong.store(std::ptr::null_mut(), Ordering::Release);
}
}

View File

@@ -0,0 +1,100 @@
/// Park, expecting the thread to be blocked for 100 to 2000 ms.
/// Panic if the time falls outside of this range.
fn timed_park() {
let then = std::time::Instant::now();
std::thread::park_timeout(std::time::Duration::from_secs(2));
let now = std::time::Instant::now();
let diff = now - then;
if diff < std::time::Duration::from_millis(100) {
panic!("probable non-block; parked for {}ms", diff.as_millis());
}
if diff >= std::time::Duration::from_secs(2) {
panic!("probable deadlock; parked for {}ms", diff.as_millis());
}
}
#[test]
fn parker_slow_unpark() {
let (unparker, parker) = super::parker::new(());
let current = std::thread::current();
std::thread::spawn(move || {
parker.park();
current.unpark();
});
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
unparker.unpark();
});
timed_park();
}
#[test]
fn parker_slow_park() {
let (unparker, parker) = super::parker::new(());
let current = std::thread::current();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
parker.park();
current.unpark();
});
std::thread::spawn(move || {
unparker.unpark();
});
timed_park();
}
#[test]
fn unparker_drop_slow_unpark() {
let (unparker1, parker) = super::parker::new(());
let unparker2 = unparker1.clone();
let current = std::thread::current();
std::thread::spawn(move || {
parker.park();
current.unpark();
});
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
std::mem::drop(unparker1);
std::mem::drop(unparker2);
});
timed_park();
}
#[test]
fn unparker_drop_slow_park() {
let (unparker1, parker) = super::parker::new(());
let unparker2 = unparker1.clone();
let current = std::thread::current();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
parker.park();
current.unpark();
});
std::thread::spawn(move || {
std::mem::drop(unparker1);
std::mem::drop(unparker2);
});
timed_park();
}
#[test]
fn oneshot_slow_send() {
let string = "foobar".to_owned();
let (send, recv) = super::oneshot::channel();
let (mut send, parker) = super::parker::new(Some(send));
std::thread::spawn(move || {
use super::Sender;
std::thread::sleep(std::time::Duration::from_millis(200));
send.send(string);
});
let string = recv.recv(&parker).expect("spurious failure in blocking recv");
assert_eq!(string, "foobar");
}
#[test]
fn oneshot_slow_recv() {
let string = "foobar".to_owned();
let (send, recv) = super::oneshot::channel();
let (mut send, parker) = super::parker::new(Some(send));
std::thread::spawn(move || {
use super::Sender;
send.send(string);
});
std::thread::sleep(std::time::Duration::from_millis(200));
let string = recv.recv(&parker).expect("spurious failure in blocking recv");
assert_eq!(string, "foobar");
}

View File

@@ -40,18 +40,18 @@ impl SelfMadeHandler for YieldAll {
fn make_channel<Spec: super::channel::ChannelSpec>(
spec: &Spec,
) -> (Box<dyn super::channel::Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn super::channel::Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>) {
spec.new_queue()
}
}
type Parser<T> = dyn FnMut(ServerMsg<'static>) -> Option<T>;
type Parser<T> = dyn FnMut(ServerMsg<'static>) -> Option<T> + Send;
/// [`Handler`] that yields every message that successfully parses into `T`.
#[derive(Default)]
pub struct YieldParsed<T>(FlatMap<(ServerMsgKindRaw<'static>, Box<Parser<T>>)>);
impl<T: 'static> YieldParsed<T> {
impl<T: 'static + Send> YieldParsed<T> {
/// Creates a new instance that parses no messages.
pub const fn new() -> Self {
YieldParsed(FlatMap::new())
@@ -70,7 +70,7 @@ impl<T: 'static> YieldParsed<T> {
pub fn just_map<U, N, F>(kind: N, mut f: F) -> Self
where
N: NameValued<ServerMsgKind, Value<'static> = U>,
F: FnMut(U) -> Option<T> + 'static,
F: FnMut(U) -> Option<T> + 'static + Send,
{
YieldParsed(FlatMap::singleton((
kind.as_raw().clone(),
@@ -89,7 +89,7 @@ impl<T: 'static> YieldParsed<T> {
pub fn with_map<U, N, F>(mut self, kind: N, mut f: F) -> Self
where
N: NameValued<ServerMsgKind, Value<'static> = U>,
F: FnMut(U) -> Option<T> + 'static,
F: FnMut(U) -> Option<T> + 'static + Send,
{
self.0.edit().insert((
kind.as_raw().clone(),
@@ -99,7 +99,7 @@ impl<T: 'static> YieldParsed<T> {
}
}
impl<T: 'static> Handler for YieldParsed<T> {
impl<T: 'static + Send> Handler for YieldParsed<T> {
type Value = T;
fn handle(
@@ -123,14 +123,14 @@ impl<T: 'static> Handler for YieldParsed<T> {
}
}
impl<T: 'static> SelfMadeHandler for YieldParsed<T> {
impl<T: 'static + Send> SelfMadeHandler for YieldParsed<T> {
type Receiver<Spec: super::channel::ChannelSpec> = Spec::Queue<T>;
fn queue_msgs(&self, _: super::QueueEditGuard<'_>) {}
fn make_channel<Spec: super::channel::ChannelSpec>(
spec: &Spec,
) -> (Box<dyn super::channel::Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn super::channel::Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>) {
spec.new_queue()
}
}

View File

@@ -65,7 +65,7 @@ impl SelfMadeHandler for Ping {
fn make_channel<Spec: ChannelSpec>(
spec: &Spec,
) -> (Box<dyn Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>) {
spec.new_oneshot()
}
}
@@ -114,7 +114,7 @@ impl SelfMadeHandler for AutoPong {
fn make_channel<Spec: ChannelSpec>(
_: &Spec,
) -> (Box<dyn Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>) {
(Box::<crate::client::channel::ClosedSender<_>>::default(), ())
}
}

View File

@@ -40,7 +40,7 @@ impl Error for EndOfNicks {}
/// Nick generators.
///
/// Nick generators always yield at least one nick, are peekable, and have explicit continuations.
pub trait NickGen: 'static {
pub trait NickGen: 'static + Send {
/// Generates a new nickname and an optional continuation.
///
/// The returned nick should not depend on the value of `prev_was_invalid`,
@@ -105,7 +105,7 @@ pub fn from_iter_or<It: FusedIterator<Item = Nick<'static>>, I: IntoIterator<Int
FromIter { nick, iter }
}
impl<I: Iterator<Item = Nick<'static>> + 'static> NickGen for FromIter<I> {
impl<I: Iterator<Item = Nick<'static>> + 'static + Send> NickGen for FromIter<I> {
fn next_nick(mut self: Box<Self>) -> (Nick<'static>, Option<Box<dyn NickGen>>) {
if let Some(mut next) = self.iter.next() {
std::mem::swap(&mut next, &mut self.nick);

View File

@@ -17,7 +17,7 @@ pub struct Queue {
sub: Duration,
timepoint: Instant,
// TODO: Bespoke trait for this. We want Clone back.
labeler: Option<Box<dyn FnMut() -> NoNul<'static>>>,
labeler: Option<Box<dyn FnMut() -> NoNul<'static> + Send>>,
}
impl std::fmt::Debug for Queue {
@@ -128,7 +128,7 @@ impl Queue {
/// Adds `label` tags to outgoing messages for `labeled-response`.
///
/// Returns `None` is no labeler is configured for the underlying queue.
pub fn use_labeler(&mut self, f: impl FnMut() -> NoNul<'static> + 'static) -> &mut Self {
pub fn use_labeler(&mut self, f: impl FnMut() -> NoNul<'static> + 'static + Send) -> &mut Self {
self.labeler = Some(Box::new(f));
self
}

View File

@@ -143,7 +143,7 @@ impl<'a, O, A: Sasl> MakeHandler<&'a O> for &'a Register<O, A> {
fn make_channel<Spec: super::channel::ChannelSpec>(
spec: &Spec,
) -> (Box<dyn super::channel::Sender<Value = Self::Value>>, Self::Receiver<Spec>) {
) -> (Box<dyn super::channel::Sender<Value = Self::Value> + Send>, Self::Receiver<Spec>) {
spec.new_oneshot()
}
}

View File

@@ -17,7 +17,7 @@ fn static_register(msg: &[u8]) -> Result<Registration, HandlerError> {
client.queue_mut().set_rate_limit(Duration::ZERO, 1);
let (_, reg) = client.add(&SyncChannels, &reg, &options).unwrap();
client.run().unwrap();
reg.0.recv_nonblocking().unwrap()
reg.0.recv_now().unwrap()
}
#[test]