mirror of
https://github.com/hyperium/hyper.git
synced 2026-01-25 02:16:14 +00:00
refactor(lib): drop futures-util except in ffi (#3890)
Make hyper usable for h1/h2 and client/server without this heavyweight dependency. It's about 17k lines of code and takes up to 1.7 seconds to compile on my machine, but hyper is only using a tiny fraction of it. Larger applications probably still pull in futures-util by other means, but it's no longer as unavoidable as in the early days of the ecosystem. To remove futures-util without raising MSRV, I took these steps: * When futures-util just re-exports something from its dependencies, use it directly from the source. * Inline trivial helpers like `poll_unpin` that "only" communicate intent a little better but don't save any significant amount of code. * Refactor the h2 client code to avoid `StreamFuture` for the "Client has been dropped" detection -- just poll the mpsc channel directly. * Implement a couple of small helpers from scratch when they're straightforward and fit on one screen each. The majority of this is polyfills for standard library APIs that would require a higher MSRV. * Use `AtomicWaker` from the `atomic-waker` crate, a separately published copy of the futures-util type of the same name. While the two crates are owned by different organizations (smol-rs vs. rust-lang), it's mostly the same people maintaining both copies. The uses of future-util in hyper's tests/benches/examples and in the `ffi` module seem much harder to remove entirely, so I did not touch those modules at all.
This commit is contained in:
11
Cargo.toml
11
Cargo.toml
@@ -27,14 +27,17 @@ tokio = { version = "1", features = ["sync"] }
|
||||
|
||||
# Optional
|
||||
|
||||
atomic-waker = { version = "1.1.2", optional = true }
|
||||
futures-channel = { version = "0.3", optional = true }
|
||||
futures-util = { version = "0.3", default-features = false, optional = true }
|
||||
futures-core = { version = "0.3.31", optional = true }
|
||||
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
|
||||
h2 = { version = "0.4.2", optional = true }
|
||||
http-body-util = { version = "0.1", optional = true }
|
||||
httparse = { version = "1.9", optional = true }
|
||||
httpdate = { version = "1.0", optional = true }
|
||||
itoa = { version = "1", optional = true }
|
||||
pin-project-lite = { version = "0.2.4", optional = true }
|
||||
pin-utils = { version = "0.1", optional = true } # TODO: replace with std::pin::pin! once MSRV >= 1.68
|
||||
smallvec = { version = "1.12", features = ["const_generics", "const_new"], optional = true }
|
||||
tracing = { version = "0.1", default-features = false, features = ["std"], optional = true }
|
||||
want = { version = "0.3", optional = true }
|
||||
@@ -77,15 +80,15 @@ full = [
|
||||
]
|
||||
|
||||
# HTTP versions
|
||||
http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"]
|
||||
http2 = ["dep:futures-channel", "dep:futures-util", "dep:h2"]
|
||||
http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
|
||||
http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"]
|
||||
|
||||
# Client/Server
|
||||
client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"]
|
||||
server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"]
|
||||
|
||||
# C-API support (currently unstable (no semver))
|
||||
ffi = ["dep:http-body-util", "futures-util?/alloc"]
|
||||
ffi = ["dep:http-body-util", "futures-util"]
|
||||
capi = []
|
||||
|
||||
# Utilize tracing (currently unstable)
|
||||
|
||||
@@ -11,9 +11,9 @@ use futures_channel::{mpsc, oneshot};
|
||||
any(feature = "http1", feature = "http2"),
|
||||
any(feature = "client", feature = "server")
|
||||
))]
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
|
||||
use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
|
||||
use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
|
||||
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
|
||||
use http::HeaderMap;
|
||||
use http_body::{Body, Frame, SizeHint};
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::task::{Context, Poll};
|
||||
|
||||
use crate::rt::{Read, Write};
|
||||
use bytes::Bytes;
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use http::{Request, Response};
|
||||
use httparse::ParserConfig;
|
||||
|
||||
@@ -92,7 +92,7 @@ where
|
||||
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
|
||||
pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
|
||||
let mut conn = Some(self);
|
||||
futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
|
||||
crate::common::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
|
||||
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
|
||||
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
|
||||
})
|
||||
@@ -148,7 +148,7 @@ impl<B> SendRequest<B> {
|
||||
///
|
||||
/// If the associated connection is closed, this returns an Error.
|
||||
pub async fn ready(&mut self) -> crate::Result<()> {
|
||||
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
|
||||
crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
|
||||
}
|
||||
|
||||
/// Checks if the connection is currently ready to send a request.
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::rt::{Read, Write};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use http::{Request, Response};
|
||||
|
||||
use super::super::dispatch::{self, TrySendError};
|
||||
@@ -99,7 +99,7 @@ impl<B> SendRequest<B> {
|
||||
///
|
||||
/// If the associated connection is closed, this returns an Error.
|
||||
pub async fn ready(&mut self) -> crate::Result<()> {
|
||||
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
|
||||
crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
|
||||
}
|
||||
|
||||
/// Checks if the connection is currently ready to send a request.
|
||||
|
||||
@@ -199,8 +199,7 @@ impl<T, U> Receiver<T, U> {
|
||||
|
||||
#[cfg(feature = "http1")]
|
||||
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
|
||||
use futures_util::FutureExt;
|
||||
match self.inner.recv().now_or_never() {
|
||||
match crate::common::task::now_or_never(self.inner.recv()) {
|
||||
Some(Some(mut env)) => env.0.take(),
|
||||
_ => None,
|
||||
}
|
||||
|
||||
46
src/common/either.rs
Normal file
46
src/common/either.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use pin_project_lite::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pin_project! {
|
||||
/// One of two possible futures that have the same output type.
|
||||
#[project = EitherProj]
|
||||
pub(crate) enum Either<F1, F2> {
|
||||
Left {
|
||||
#[pin]
|
||||
fut: F1
|
||||
},
|
||||
Right {
|
||||
#[pin]
|
||||
fut: F2,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl<F1, F2> Either<F1, F2> {
|
||||
pub(crate) fn left(fut: F1) -> Self {
|
||||
Either::Left { fut }
|
||||
}
|
||||
|
||||
pub(crate) fn right(fut: F2) -> Self {
|
||||
Either::Right { fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F1, F2> Future for Either<F1, F2>
|
||||
where
|
||||
F1: Future,
|
||||
F2: Future<Output = F1::Output>,
|
||||
{
|
||||
type Output = F1::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.project() {
|
||||
EitherProj::Left { fut } => fut.poll(cx),
|
||||
EitherProj::Right { fut } => fut.poll(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
30
src/common/future.rs
Normal file
30
src/common/future.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
// TODO: replace with `std::future::poll_fn` once MSRV >= 1.64
|
||||
pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
PollFn { f }
|
||||
}
|
||||
|
||||
pub(crate) struct PollFn<F> {
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<F> Unpin for PollFn<F> {}
|
||||
|
||||
impl<T, F> Future for PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
(self.as_mut().f)(cx)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,13 @@
|
||||
pub(crate) mod buf;
|
||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
||||
pub(crate) mod date;
|
||||
#[cfg(all(feature = "client", feature = "http2"))]
|
||||
pub(crate) mod either;
|
||||
#[cfg(any(
|
||||
all(feature = "client", any(feature = "http1", feature = "http2")),
|
||||
all(feature = "server", feature = "http1"),
|
||||
))]
|
||||
pub(crate) mod future;
|
||||
pub(crate) mod io;
|
||||
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
|
||||
pub(crate) mod task;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use std::task::{Context, Poll};
|
||||
#[cfg(feature = "client")]
|
||||
use std::task::{RawWaker, RawWakerVTable, Waker};
|
||||
|
||||
/// A function to help "yield" a future, such that it is re-scheduled immediately.
|
||||
///
|
||||
@@ -7,3 +9,37 @@ pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<std::convert::Infallible>
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
// TODO: replace with `std::task::Waker::noop()` once MSRV >= 1.85
|
||||
#[cfg(feature = "client")]
|
||||
fn noop_waker() -> Waker {
|
||||
const NOOP_RAW_WAKER: RawWaker = RawWaker::new(std::ptr::null(), &NOOP_VTABLE);
|
||||
const NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
// `clone` returns the same noop waker again
|
||||
|_: *const ()| NOOP_RAW_WAKER,
|
||||
// `wake`, `wake_by_ref`, and `drop` do nothing
|
||||
|_: *const ()| {},
|
||||
|_: *const ()| {},
|
||||
|_: *const ()| {},
|
||||
);
|
||||
|
||||
// SAFETY: all functions in the vtable are safe to call, and Waker's safety does not require
|
||||
// them to actually do anything.
|
||||
unsafe { Waker::from_raw(NOOP_RAW_WAKER) }
|
||||
}
|
||||
|
||||
/// Poll the future once and return `Some` if it is ready, else `None`.
|
||||
///
|
||||
/// If the future wasn't ready, it future likely can't be driven to completion any more: the polling
|
||||
/// uses a no-op waker, so knowledge of what the pending future was waiting for is lost.
|
||||
#[cfg(feature = "client")]
|
||||
pub(crate) fn now_or_never<F: std::future::Future>(fut: F) -> Option<F::Output> {
|
||||
let waker = noop_waker();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
// TODO: replace with std::pin::pin! and drop pin-utils once MSRV >= 1.68
|
||||
pin_utils::pin_mut!(fut);
|
||||
match fut.poll(&mut cx) {
|
||||
Poll::Ready(res) => Some(res),
|
||||
Poll::Pending => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ impl Time {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "http1")]
|
||||
#[cfg(all(feature = "server", feature = "http1"))]
|
||||
pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
|
||||
match *self {
|
||||
Time::Empty => {
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! - The consumer is only notified if the value is different.
|
||||
//! - The value `0` is reserved for closed.
|
||||
|
||||
use futures_util::task::AtomicWaker;
|
||||
use atomic_waker::AtomicWaker;
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use crate::rt::{Read, Write};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use http::header::{HeaderValue, CONNECTION, TE};
|
||||
use http::{HeaderMap, Method, Version};
|
||||
use http_body::Frame;
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::io;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use http::{HeaderMap, HeaderName, HeaderValue};
|
||||
use http_body::Frame;
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::{
|
||||
|
||||
use crate::rt::{Read, Write};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use http::Request;
|
||||
|
||||
use super::{Http1Transaction, Wants};
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::task::{Context, Poll};
|
||||
|
||||
use crate::rt::{Read, ReadBuf, Write};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
|
||||
use super::{Http1Transaction, ParseContext, ParsedMessage};
|
||||
use crate::common::buf::BufList;
|
||||
|
||||
@@ -11,9 +11,7 @@ use crate::rt::{Read, Write};
|
||||
use bytes::Bytes;
|
||||
use futures_channel::mpsc::{Receiver, Sender};
|
||||
use futures_channel::{mpsc, oneshot};
|
||||
use futures_util::future::{Either, FusedFuture, FutureExt as _};
|
||||
use futures_util::ready;
|
||||
use futures_util::stream::{StreamExt as _, StreamFuture};
|
||||
use futures_core::{ready, FusedFuture, FusedStream, Stream};
|
||||
use h2::client::{Builder, Connection, SendRequest};
|
||||
use h2::SendStream;
|
||||
use http::{Method, StatusCode};
|
||||
@@ -23,6 +21,7 @@ use super::ping::{Ponger, Recorder};
|
||||
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
|
||||
use crate::body::{Body, Incoming as IncomingBody};
|
||||
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
|
||||
use crate::common::either::Either;
|
||||
use crate::common::io::Compat;
|
||||
use crate::common::time::Time;
|
||||
use crate::ext::Protocol;
|
||||
@@ -164,11 +163,9 @@ where
|
||||
// 'Client' has been dropped. This is to get around a bug
|
||||
// in h2 where dropping all SendRequests won't notify a
|
||||
// parked Connection.
|
||||
let (conn_drop_ref, rx) = mpsc::channel(1);
|
||||
let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
|
||||
let (cancel_tx, conn_eof) = oneshot::channel();
|
||||
|
||||
let conn_drop_rx = rx.into_future();
|
||||
|
||||
let ping_config = new_ping_config(config);
|
||||
|
||||
let (conn, ping) = if ping_config.is_enabled() {
|
||||
@@ -176,9 +173,9 @@ where
|
||||
let (recorder, ponger) = ping::channel(pp, ping_config, timer);
|
||||
|
||||
let conn: Conn<_, B> = Conn::new(ponger, conn);
|
||||
(Either::Left(conn), recorder)
|
||||
(Either::left(conn), recorder)
|
||||
} else {
|
||||
(Either::Right(conn), ping::disabled())
|
||||
(Either::right(conn), ping::disabled())
|
||||
};
|
||||
let conn: ConnMapErr<T, B> = ConnMapErr {
|
||||
conn,
|
||||
@@ -305,7 +302,7 @@ pin_project! {
|
||||
T: Unpin,
|
||||
{
|
||||
#[pin]
|
||||
drop_rx: StreamFuture<Receiver<Infallible>>,
|
||||
drop_rx: Receiver<Infallible>,
|
||||
#[pin]
|
||||
cancel_tx: Option<oneshot::Sender<Infallible>>,
|
||||
#[pin]
|
||||
@@ -320,7 +317,7 @@ where
|
||||
{
|
||||
fn new(
|
||||
conn: ConnMapErr<T, B>,
|
||||
drop_rx: StreamFuture<Receiver<Infallible>>,
|
||||
drop_rx: Receiver<Infallible>,
|
||||
cancel_tx: oneshot::Sender<Infallible>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -341,12 +338,12 @@ where
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
|
||||
if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
|
||||
if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
|
||||
// ok or err, the `conn` has finished.
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
|
||||
if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
|
||||
// mpsc has been dropped, hopefully polling
|
||||
// the connection some more should start shutdown
|
||||
// and then close.
|
||||
@@ -468,7 +465,7 @@ where
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
|
||||
match this.pipe.poll_unpin(cx) {
|
||||
match Pin::new(&mut this.pipe).poll(cx) {
|
||||
Poll::Ready(result) => {
|
||||
if let Err(_e) = result {
|
||||
debug!("client request body error: {}", _e);
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use h2::{Reason, RecvStream, SendStream};
|
||||
use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
|
||||
use http::HeaderMap;
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use h2::server::{Connection, Handshake, SendResponse};
|
||||
use h2::{Reason, RecvStream};
|
||||
use http::{Method, Request};
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::time::Duration;
|
||||
use crate::rt::{Read, Write};
|
||||
use crate::upgrade::Upgraded;
|
||||
use bytes::Bytes;
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
|
||||
use crate::body::{Body, Incoming as IncomingBody};
|
||||
use crate::proto;
|
||||
@@ -179,7 +179,7 @@ where
|
||||
/// This errors if the underlying connection protocol is not HTTP/1.
|
||||
pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> {
|
||||
let mut zelf = Some(self);
|
||||
futures_util::future::poll_fn(move |cx| {
|
||||
crate::common::future::poll_fn(move |cx| {
|
||||
ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
|
||||
Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
|
||||
})
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::rt::{Read, Write};
|
||||
use futures_util::ready;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::body::{Body, Incoming as IncomingBody};
|
||||
|
||||
Reference in New Issue
Block a user