From a8f6103e4aa860263e6943852e41fe0ba31cd1b3 Mon Sep 17 00:00:00 2001 From: David Senk Date: Sun, 10 Aug 2025 15:03:24 -0400 Subject: [PATCH] removed out of scope, into dsml --- src/broadcast_channel.rs | 397 --------------------------------------- src/lib.rs | 5 - src/return_channel.rs | 82 -------- 3 files changed, 484 deletions(-) delete mode 100644 src/broadcast_channel.rs delete mode 100644 src/return_channel.rs diff --git a/src/broadcast_channel.rs b/src/broadcast_channel.rs deleted file mode 100644 index 05918bd..0000000 --- a/src/broadcast_channel.rs +++ /dev/null @@ -1,397 +0,0 @@ -use std::collections::VecDeque; -use std::sync::{Arc, Condvar, Mutex, MutexGuard, TryLockError, TryLockResult}; - -struct Inner { - queue: Arc>>>, - condvar: Condvar, -} - -#[derive(Debug, PartialEq)] -enum TrySendResult { - OK, - Closed, - WouldBlock, -} - -#[derive(Debug, PartialEq)] -enum TryRecvResult { - OK(Arc), - Closed, - WouldBlock, -} - -impl Inner { - fn new() -> Inner { - let queue = Arc::new(Mutex::new(VecDeque::new())); - - Inner { - queue, - condvar: Condvar::new(), - } - } - - fn try_send_one(&self, item: Arc) -> TrySendResult { - match self.queue.try_lock() { - Ok(mut lock) => { - lock.push_back(item.clone()); - self.condvar.notify_all(); - TrySendResult::OK - } - Err(err) => { - match err { - TryLockError::Poisoned(err) => { - // dropping poisoned receiver, receiving end panic / closed - TrySendResult::Closed - } - TryLockError::WouldBlock => { - // feels like a silly arc clone, but compiler can't figure out - // that this is actually ok to not clone - TrySendResult::WouldBlock - } - } - } - } - } - - /// If `false` is returned, receiving side has been closed - /// Blocks until able to send, or receiving side closes - fn send_one(&self, item: Arc) -> bool { - let lock = self.queue.lock(); - - match lock { - Ok(mut lock) => { - lock.push_back(item); - self.condvar.notify_all(); - true - } - Err(err) => false, - } - } - - /// If `false` is returned, receiving side has been closed - /// Blocks until able to send, or receiving side closes - fn send_many(&self, items: Vec>) -> bool { - let lock = self.queue.lock(); - - match lock { - Ok(mut lock) => { - for item in items { - lock.push_back(item) - } - self.condvar.notify_all(); - true - } - Err(err) => false, - } - } - - fn try_send_many(&self, items: Vec>) -> TrySendResult { - let lock = self.queue.try_lock(); - - match lock { - Ok(mut lock) => { - for item in items { - lock.push_back(item); - } - - self.condvar.notify_all(); - TrySendResult::OK - } - Err(err) => match err { - TryLockError::Poisoned(err) => TrySendResult::Closed, - TryLockError::WouldBlock => TrySendResult::WouldBlock, - }, - } - } - - pub fn try_recv_one(&self) -> TryRecvResult { - let lock = self.queue.try_lock(); - - match lock { - Ok(mut lock) => { - let val = lock.pop_front(); - - match val { - None => TryRecvResult::WouldBlock, - Some(val) => TryRecvResult::OK(val), - } - } - Err(err) => match err { - TryLockError::Poisoned(err) => TryRecvResult::Closed, - TryLockError::WouldBlock => TryRecvResult::WouldBlock, - }, - } - } - - /// If `None` is returned, sending side has been closed - /// Blocks until receipt - pub fn recv_one(&self) -> Option> { - let lock = self.queue.lock(); - - match lock { - Ok(mut lock) => { - while lock.is_empty() { - //lock = self.condvar.wait(lock).unwrap(); - lock = match self.condvar.wait(lock) { - Ok(lock) => lock, - Err(err) => { - return None; - } - } - } - assert!(!lock.is_empty()); - lock.pop_front() - } - Err(err) => None, - } - } -} - -pub struct BroadcastReceiever { - inner: Arc>, -} - -impl BroadcastReceiever { - fn new(inner: Arc>) -> Self { - BroadcastReceiever { inner } - } - - /// Blocks until there is an item to receive, - /// if `None` is returned, sending side has closed - pub fn recv_one(&self) -> Option> { - self.inner.recv_one() - } - - pub fn try_recv_one(&self) -> TryRecvResult { - self.inner.try_recv_one() - } -} - -pub struct BroadcastChannel { - receivers: Vec>>, -} - -impl BroadcastChannel { - #[allow(clippy::new_ret_no_self)] - pub fn new() -> Self { - BroadcastChannel { - receivers: Vec::new(), - } - } - - pub fn new_receiver(&mut self) -> BroadcastReceiever { - let inner = Arc::new(Inner::new()); - let receiver = BroadcastReceiever::new(inner.clone()); - - self.receivers.push(inner); - - receiver - } - - /// Return value is number of broadcast receivers `item` was sent to - pub fn send(&mut self, item: Arc) -> usize { - let mut sent_count = 0; - - let mut receivers_pending = VecDeque::with_capacity(self.receivers.len()); - - while let Some(next) = self.receivers.pop() { - receivers_pending.push_back(next); - } - - while let Some(next) = receivers_pending.pop_front() { - match next.try_send_one(item.clone()) { - TrySendResult::OK => { - // receiver good, still receiving messages, return to receivers vec - self.receivers.push(next); - sent_count += 1; - } - TrySendResult::Closed => { - // if broadcast receiver is closed, drop it - continue; - } - TrySendResult::WouldBlock => { - // receiver not ready for next message, return to pending queue - receivers_pending.push_back(next); - } - } - } - - sent_count - } - - /// Return value is number of receivers `item` was sent to - pub fn send_many(&mut self, item: Vec>) -> usize { - let mut sent_count = 0; - - let mut receivers_pending = VecDeque::with_capacity(self.receivers.len()); - - while let Some(next) = self.receivers.pop() { - receivers_pending.push_back(next); - } - - while let Some(next) = receivers_pending.pop_front() { - match next.try_send_many(item.clone()) { - TrySendResult::OK => { - // receiver good, still receiving messages, return to receivers vec - self.receivers.push(next); - sent_count += 1; - } - TrySendResult::Closed => { - // if broadcast receiver is closed, drop it - continue; - } - TrySendResult::WouldBlock => { - // receiver not ready for next message, return to pending queue - receivers_pending.push_back(next); - } - } - } - - sent_count - } -} - -#[cfg(test)] -mod test { - use super::*; - use std::time::Duration; - - #[test] - fn test_single_thread() { - let mut tx = BroadcastChannel::::new(); - - let rx1 = tx.new_receiver(); - let rx2 = tx.new_receiver(); - - let rxc = tx.send(Arc::new(42)); - - assert_eq!(rxc, 2); - - assert_eq!(*rx1.recv_one().unwrap(), 42); - assert_eq!(*rx2.recv_one().unwrap(), 42); - - assert_eq!(rx1.try_recv_one(), TryRecvResult::WouldBlock); - assert_eq!(rx2.try_recv_one(), TryRecvResult::WouldBlock); - - let rxc = tx.send(Arc::new(69)); - - assert_eq!(rxc, 2); - - assert_eq!(*rx1.recv_one().unwrap(), 69); - assert_eq!(*rx2.recv_one().unwrap(), 69); - } - - #[test] - fn test_multi_thread() { - let mut tx = BroadcastChannel::::new(); - - let mut rx = Vec::new(); - - for _ in 0..12 { - rx.push(tx.new_receiver()); - } - - std::thread::spawn(move || { - let txc = tx.send(Arc::new(42)); - assert_eq!(txc, 12); - let txc = tx.send(Arc::new(69)); - assert_eq!(txc, 12); - std::thread::sleep(Duration::from_millis(500)); - let txc = tx.send(Arc::new(80085)); - assert_eq!(txc, 12); - }); - - let mut rvs = Vec::new(); - - for rv in rx { - rvs.push(std::thread::spawn(move || { - assert_eq!(*rv.recv_one().unwrap(), 42); - assert_eq!(*rv.recv_one().unwrap(), 69); - assert_eq!(rv.try_recv_one(), TryRecvResult::WouldBlock); - assert_eq!(*rv.recv_one().unwrap(), 80085); - })); - } - - for thread in rvs { - assert!(thread.join().is_ok()); - } - } - - #[test] - fn test_million_messages_threaded() { - const THREAD_COUNT: usize = 16; - const MESSAGE_COUNT: usize = 1_000_000; - let mut tx = BroadcastChannel::::new(); - - let mut rx = Vec::new(); - - for _ in 0..THREAD_COUNT { - rx.push(tx.new_receiver()); - } - - let mut rvs = Vec::new(); - - for rv in rx { - rvs.push(std::thread::spawn(move || { - for i in 0..MESSAGE_COUNT { - assert_eq!(*rv.recv_one().unwrap(), i) - } - })) - } - - assert!( - std::thread::spawn(move || { - for i in 0..MESSAGE_COUNT { - assert_eq!(tx.send(Arc::new(i)), THREAD_COUNT); - } - }) - .join() - .is_ok() - ); - - for thread in rvs { - assert!(thread.join().is_ok()) - } - } - - #[test] - fn test_million_messages_threaded_many() { - const THREAD_COUNT: usize = 16; - const MESSAGE_COUNT: usize = 1_000_000; - let mut tx = BroadcastChannel::::new(); - - let mut messages = Vec::with_capacity(MESSAGE_COUNT); - - for i in 0..MESSAGE_COUNT { - messages.push(Arc::new(i)) - } - - let mut rx = Vec::new(); - - for _ in 0..THREAD_COUNT { - rx.push(tx.new_receiver()); - } - - let mut rvs = Vec::new(); - - for rv in rx { - rvs.push(std::thread::spawn(move || { - for i in 0..MESSAGE_COUNT { - assert_eq!(*rv.recv_one().unwrap(), i) - } - })) - } - - assert!( - std::thread::spawn(move || { - assert_eq!(tx.send_many(messages), THREAD_COUNT); - }) - .join() - .is_ok() - ); - - for thread in rvs { - assert!(thread.join().is_ok()) - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 15070ee..94e38e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,3 @@ mod args; pub use args::Args; mod hashes; - -mod return_channel; -pub use return_channel::ReturnChannel; - -mod broadcast_channel; diff --git a/src/return_channel.rs b/src/return_channel.rs deleted file mode 100644 index 7f2fad6..0000000 --- a/src/return_channel.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::sync::{Arc, Condvar, Mutex}; - -/// Operates similarly to the `oneshot` channel in tokio -pub struct ReturnChannel { - value: Mutex>, - condvar: Condvar, -} - -impl ReturnChannel { - #[allow(clippy::new_ret_no_self)] - pub fn new() -> (ReturnChannelSender, ReturnChannelReceiver) { - let inner = ReturnChannel { - value: Mutex::new(None), - condvar: Condvar::new(), - }; - - let arc = Arc::new(inner); - - let ret_tx = ReturnChannelSender(arc.clone()); - let ret_rx = ReturnChannelReceiver(arc); - - (ret_tx, ret_rx) - } -} - -pub struct ReturnChannelReceiver(Arc>); -pub struct ReturnChannelSender(Arc>); - -impl ReturnChannelReceiver { - pub fn receive(self) -> T { - let mut val = self.0.value.lock().unwrap(); - - while val.is_none() { - val = self.0.condvar.wait(val).unwrap() - } - - let replace = val.take(); - - replace.unwrap() - } -} - -impl ReturnChannelSender { - pub fn send(self, val: T) { - let mut lock = self.0.value.lock().unwrap(); - - *lock = Some(val); - - self.0.condvar.notify_all(); - } -} - -#[cfg(test)] -mod test { - pub use super::*; - use std::time::Duration; - - #[test] - fn test_channel_single_thread() { - let (tx, rx) = ReturnChannel::new(); - - tx.send(42); - - let rx = rx.receive(); - - assert_eq!(rx, 42); - } - - #[test] - fn test_channel_multi_thread() { - let (tx, rx) = ReturnChannel::new(); - - std::thread::spawn(|| { - std::thread::sleep(Duration::from_millis(500)); - tx.send(42); - }); - - let rx = rx.receive(); - - assert_eq!(rx, 42); - } -}