chore: replace num_cpus with available_parallelism (#2946)

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2025-05-23 22:06:24 +08:00
committed by GitHub
parent 97ea37e704
commit 07359ef832
7 changed files with 17 additions and 22 deletions

View File

@@ -13,13 +13,12 @@ Executors for asynchronous tasks based on the futures-rs library.
[features] [features]
default = ["std"] default = ["std"]
std = ["futures-core/std", "futures-task/std", "futures-util/std"] std = ["futures-core/std", "futures-task/std", "futures-util/std"]
thread-pool = ["std", "num_cpus"] thread-pool = ["std"]
[dependencies] [dependencies]
futures-core = { path = "../futures-core", version = "=1.0.0-alpha.0", default-features = false } futures-core = { path = "../futures-core", version = "=1.0.0-alpha.0", default-features = false }
futures-task = { path = "../futures-task", version = "=0.4.0-alpha.0", default-features = false } futures-task = { path = "../futures-task", version = "=0.4.0-alpha.0", default-features = false }
futures-util = { path = "../futures-util", version = "=0.4.0-alpha.0", default-features = false } futures-util = { path = "../futures-util", version = "=0.4.0-alpha.0", default-features = false }
num_cpus = { version = "1.8.0", optional = true }
[dev-dependencies] [dev-dependencies]
futures = { path = "../futures", features = ["thread-pool"] } futures = { path = "../futures", features = ["thread-pool"] }

View File

@@ -6,7 +6,6 @@ use futures_task::{waker_ref, ArcWake};
use futures_task::{FutureObj, Spawn, SpawnError}; use futures_task::{FutureObj, Spawn, SpawnError};
use futures_util::future::FutureExt; use futures_util::future::FutureExt;
use std::boxed::Box; use std::boxed::Box;
use std::cmp;
use std::fmt; use std::fmt;
use std::format; use std::format;
use std::io; use std::io;
@@ -190,13 +189,8 @@ impl ThreadPoolBuilder {
/// ///
/// See the other methods on this type for details on the defaults. /// See the other methods on this type for details on the defaults.
pub fn new() -> Self { pub fn new() -> Self {
Self { let pool_size = thread::available_parallelism().map_or(1, |p| p.get());
pool_size: cmp::max(1, num_cpus::get()), Self { pool_size, stack_size: 0, name_prefix: None, after_start: None, before_stop: None }
stack_size: 0,
name_prefix: None,
after_start: None,
before_stop: None,
}
} }
/// Set size of a future ThreadPool /// Set size of a future ThreadPool
@@ -283,7 +277,7 @@ impl ThreadPoolBuilder {
let before_stop = self.before_stop.clone(); let before_stop = self.before_stop.clone();
let mut thread_builder = thread::Builder::new(); let mut thread_builder = thread::Builder::new();
if let Some(ref name_prefix) = self.name_prefix { if let Some(ref name_prefix) = self.name_prefix {
thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); thread_builder = thread_builder.name(format!("{name_prefix}{counter}"));
} }
if self.stack_size > 0 { if self.stack_size > 0 {
thread_builder = thread_builder.stack_size(self.stack_size); thread_builder = thread_builder.stack_size(self.stack_size);

View File

@@ -16,8 +16,8 @@ std = ["alloc", "futures-core/std", "futures-task/std", "slab/std"]
alloc = ["futures-core/alloc", "futures-task/alloc", "slab"] alloc = ["futures-core/alloc", "futures-task/alloc", "slab"]
async-await = [] async-await = []
async-await-macro = ["async-await", "futures-macro"] async-await-macro = ["async-await", "futures-macro"]
compat = ["std", "futures_01"] compat = ["std", "futures_01", "libc"]
io-compat = ["io", "compat", "tokio-io"] io-compat = ["io", "compat", "tokio-io", "libc"]
sink = ["futures-sink"] sink = ["futures-sink"]
io = ["std", "futures-io", "memchr"] io = ["std", "futures-io", "memchr"]
channel = ["std", "futures-channel"] channel = ["std", "futures-channel"]
@@ -44,6 +44,9 @@ tokio-io = { version = "0.1.9", optional = true }
pin-project-lite = "0.2.6" pin-project-lite = "0.2.6"
spin = { version = "0.10.0", optional = true } spin = { version = "0.10.0", optional = true }
# INDIRECT DEPENDENCYS BUT ONLY FOR SPECIFIC MINIMAL VERSIONS
libc = { version = "0.2.26", optional = true }
[dev-dependencies] [dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] } futures = { path = "../futures", features = ["async-await", "thread-pool"] }
futures-test = { path = "../futures-test" } futures-test = { path = "../futures-test" }

View File

@@ -553,8 +553,8 @@ mod tests {
fn test_mutex_guard_debug_not_recurse() { fn test_mutex_guard_debug_not_recurse() {
let mutex = Mutex::new(42); let mutex = Mutex::new(42);
let guard = mutex.try_lock().unwrap(); let guard = mutex.try_lock().unwrap();
let _ = format!("{:?}", guard); let _ = format!("{guard:?}");
let guard = MutexGuard::map(guard, |n| n); let guard = MutexGuard::map(guard, |n| n);
let _ = format!("{:?}", guard); let _ = format!("{guard:?}");
} }
} }

View File

@@ -69,7 +69,7 @@ impl<Fut> ReadyToRunQueue<Fut> {
return Dequeue::Data(tail); return Dequeue::Data(tail);
} }
if self.head.load(Acquire) as *const _ != tail { if !core::ptr::eq(self.head.load(Acquire), tail) {
return Dequeue::Inconsistent; return Dequeue::Inconsistent;
} }

View File

@@ -1,6 +1,5 @@
use crate::stream::{FuturesUnordered, StreamExt}; use crate::stream::{FuturesUnordered, StreamExt};
use core::fmt; use core::fmt;
use core::mem;
use core::num::NonZeroUsize; use core::num::NonZeroUsize;
use core::pin::Pin; use core::pin::Pin;
use futures_core::future::{FusedFuture, Future}; use futures_core::future::{FusedFuture, Future};
@@ -113,7 +112,7 @@ where
// Empty the stream and futures so that we know // Empty the stream and futures so that we know
// the future has completed. // the future has completed.
this.stream.set(None); this.stream.set(None);
drop(mem::replace(this.futures, FuturesUnordered::new())); drop(core::mem::take(this.futures));
return Poll::Ready(Err(e)); return Poll::Ready(Err(e));
} }
} }

View File

@@ -68,11 +68,11 @@ fn oneshot_drop_rx() {
#[test] #[test]
fn oneshot_debug() { fn oneshot_debug() {
let (tx, rx) = oneshot::channel::<i32>(); let (tx, rx) = oneshot::channel::<i32>();
assert_eq!(format!("{:?}", tx), "Sender { complete: false }"); assert_eq!(format!("{tx:?}"), "Sender { complete: false }");
assert_eq!(format!("{:?}", rx), "Receiver { complete: false }"); assert_eq!(format!("{rx:?}"), "Receiver { complete: false }");
drop(rx); drop(rx);
assert_eq!(format!("{:?}", tx), "Sender { complete: true }"); assert_eq!(format!("{tx:?}"), "Sender { complete: true }");
let (tx, rx) = oneshot::channel::<i32>(); let (tx, rx) = oneshot::channel::<i32>();
drop(tx); drop(tx);
assert_eq!(format!("{:?}", rx), "Receiver { complete: true }"); assert_eq!(format!("{rx:?}"), "Receiver { complete: true }");
} }