feat(service): create own Service trait (#2920)

This removes the dependency on `tower-service`, and simplifies the `Service` trait to be used by hyper's server connections.

Closes #2853 

BREAKING CHANGE: Change any manual `impl tower::Service` to implement `hyper::service::Service` instead. The `poll_ready` method has been removed.
This commit is contained in:
Tom Karwowski
2022-09-09 00:25:20 +02:00
committed by GitHub
parent fae97ced3a
commit fee7d361c2
10 changed files with 54 additions and 170 deletions

View File

@@ -35,7 +35,6 @@ h2 = { version = "0.3.9", optional = true }
itoa = "1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project-lite = "0.2.4"
tower-service = "0.3"
tokio = { version = "1", features = ["sync"] }
want = "0.3"
@@ -65,7 +64,6 @@ tokio = { version = "1", features = [
] }
tokio-test = "0.4"
tokio-util = { version = "0.7", features = ["codec"] }
tower = { version = "0.4", features = ["make", "util"] }
url = "2.2"
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]

View File

@@ -8,7 +8,6 @@ use tokio::net::TcpListener;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
type Counter = i32;
@@ -42,10 +41,6 @@ impl Service<Request<Recv>> for Svc {
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Recv>) -> Self::Future {
fn mk_response(s: String) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap())

View File

@@ -18,7 +18,6 @@
//! use http_body_util::Empty;
//! use hyper::client::conn;
//! use tokio::net::TcpStream;
//! use tower::ServiceExt;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -41,9 +40,6 @@
//! let response = request_sender.send_request(request).await?;
//! assert!(response.status() == StatusCode::OK);
//!
//! // To send via the same connection again, it may not work as it may not be ready,
//! // so we have to wait until the request_sender becomes ready.
//! request_sender.ready().await?;
//! let request = Request::builder()
//! .header("Host", "example.com")
//! .method("GET")
@@ -69,7 +65,6 @@ use futures_util::future;
use httparse::ParserConfig;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;
use tracing::{debug, trace};
use super::dispatch;
@@ -266,23 +261,6 @@ where
}
}
impl<B> Service<Request<B>> for SendRequest<B>
where
B: Body + 'static,
{
type Response = Response<Recv>;
type Error = crate::Error;
type Future = ResponseFuture;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}
fn call(&mut self, req: Request<B>) -> Self::Future {
self.send_request(req)
}
}
impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendRequest").finish()

View File

@@ -233,7 +233,7 @@ where
}
fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// can dispatch receive, or does it still care about, an incoming message?
// can dispatch receive, or does it still care about other incoming message?
match ready!(self.dispatch.poll_ready(cx)) {
Ok(()) => (),
Err(()) => {
@@ -242,6 +242,7 @@ where
return Poll::Ready(Ok(()));
}
}
// dispatch is ready for a message, try to read one
match ready!(self.conn.poll_read_head(cx)) {
Some(Ok((mut head, body_len, wants))) => {
@@ -511,14 +512,11 @@ cfg_server! {
Ok(())
}
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
if self.in_flight.is_some() {
Poll::Pending
} else {
self.service.poll_ready(cx).map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
})
Poll::Ready(Ok(()))
}
}

View File

@@ -257,38 +257,6 @@ where
loop {
self.poll_ping(cx);
// Check that the service is ready to accept a new request.
//
// - If not, just drive the connection some.
// - If ready, try to accept a new request from the connection.
match service.poll_ready(cx) {
Poll::Ready(Ok(())) => (),
Poll::Pending => {
// use `poll_closed` instead of `poll_accept`,
// in order to avoid accepting a request.
ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
trace!("incoming connection complete");
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(err)) => {
let err = crate::Error::new_user_service(err);
debug!("service closed: {}", err);
let reason = err.h2_reason();
if reason == Reason::NO_ERROR {
// NO_ERROR is only used for graceful shutdowns...
trace!("interpreting NO_ERROR user error as graceful_shutdown");
self.conn.graceful_shutdown();
} else {
trace!("abruptly shutting down with {:?}", reason);
self.conn.abrupt_shutdown(reason);
}
self.closing = Some(err);
break;
}
}
// When the service is ready, accepts an incoming request.
match ready!(self.conn.poll_accept(cx)) {
Some(Ok((req, mut respond))) => {
trace!("incoming request");

View File

@@ -1,7 +1,8 @@
use std::error::Error as StdError;
use crate::body::Body;
use crate::common::{task, Future, Poll};
use crate::common::Future;
use crate::service::service::Service;
use crate::{Request, Response};
/// An asynchronous function from `Request` to `Response`.
@@ -19,16 +20,13 @@ pub trait HttpService<ReqBody>: sealed::Sealed<ReqBody> {
/// The `Future` returned by this `Service`.
type Future: Future<Output = Result<Response<Self::ResBody>, Self::Error>>;
#[doc(hidden)]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
#[doc(hidden)]
fn call(&mut self, req: Request<ReqBody>) -> Self::Future;
}
impl<T, B1, B2> HttpService<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
T: Service<Request<B1>, Response = Response<B2>>,
B2: Body,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
@@ -37,18 +35,14 @@ where
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
tower_service::Service::poll_ready(self, cx)
}
fn call(&mut self, req: Request<B1>) -> Self::Future {
tower_service::Service::call(self, req)
Service::call(self, req)
}
}
impl<T, B1, B2> sealed::Sealed<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
T: Service<Request<B1>, Response = Response<B2>>,
B2: Body,
{
}

View File

@@ -21,12 +21,16 @@
//! if you need to implement `Service` for a type manually, you can follow the example
//! in `service_struct_impl.rs`.
pub use tower_service::Service;
mod http;
mod service;
mod util;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
pub(super) use self::http::HttpService;
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub use self::service::Service;
pub use self::util::service_fn;

32
src/service/service.rs Normal file
View File

@@ -0,0 +1,32 @@
use std::future::Future;
/// An asynchronous function from a `Request` to a `Response`.
///
/// The `Service` trait is a simplified interface making it easy to write
/// network applications in a modular and reusable way, decoupled from the
/// underlying protocol.
///
/// # Functional
///
/// A `Service` is a function of a `Request`. It immediately returns a
/// `Future` representing the eventual completion of processing the
/// request. The actual request processing may happen at any time in the
/// future, on any thread or executor. The processing may depend on calling
/// other services. At some point in the future, the processing will complete,
/// and the `Future` will resolve to a response or error.
///
/// At a high level, the `Service::call` function represents an RPC request. The
/// `Service` value can be a server or a client.
pub trait Service<Request> {
/// Responses given by the service.
type Response;
/// Errors produced by the service.
type Error;
/// The future response value.
type Future: Future<Output = Result<Self::Response, Self::Error>>;
/// Process the request and return the response asynchronously.
fn call(&mut self, req: Request) -> Self::Future;
}

View File

@@ -3,7 +3,8 @@ use std::fmt;
use std::marker::PhantomData;
use crate::body::Body;
use crate::common::{task, Future, Poll};
use crate::common::Future;
use crate::service::service::Service;
use crate::{Request, Response};
/// Create a `Service` from a function.
@@ -43,8 +44,7 @@ pub struct ServiceFn<F, R> {
_req: PhantomData<fn(R)>,
}
impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>
for ServiceFn<F, ReqBody>
impl<F, ReqBody, Ret, ResBody, E> Service<Request<ReqBody>> for ServiceFn<F, ReqBody>
where
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: Body,
@@ -56,10 +56,6 @@ where
type Error = E;
type Future = Ret;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
(self.f)(req)
}

View File

@@ -29,7 +29,7 @@ use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpSt
use hyper::body::Body;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::service::{service_fn, Service};
use hyper::{Method, Recv, Request, Response, StatusCode, Uri, Version};
mod support;
@@ -2310,77 +2310,6 @@ fn http2_body_user_error_sends_reset_reason() {
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
}
struct Http2ReadyErrorSvc;
impl tower_service::Service<Request<Recv>> for Http2ReadyErrorSvc {
type Response = Response<Recv>;
type Error = h2::Error;
type Future = Box<
dyn futures_core::Future<Output = Result<Self::Response, Self::Error>>
+ Send
+ Sync
+ Unpin,
>;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err::<(), _>(h2::Error::from(
h2::Reason::INADEQUATE_SECURITY,
)))
}
fn call(&mut self, _: hyper::Request<Recv>) -> Self::Future {
unreachable!("poll_ready error should have shutdown conn");
}
}
#[tokio::test]
#[ignore] // sometimes ECONNRESET wins the race
async fn http2_service_poll_ready_error_sends_goaway() {
use std::error::Error;
let _ = pretty_env_logger::try_init();
let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr_str = format!("http://{}", listener.local_addr().unwrap());
tokio::task::spawn(async move {
loop {
tokio::select! {
res = listener.accept() => {
let (stream, _) = res.unwrap();
tokio::task::spawn(async move {
let mut http = Http::new();
http.http2_only(true);
let service = Http2ReadyErrorSvc;
http.serve_connection(stream, service).await.unwrap();
});
}
}
}
});
let uri = addr_str.parse().expect("server addr should parse");
let err = dbg!(TestClient::new()
.http2_only()
.get(uri)
.await
.expect_err("client.get should fail"));
// client request should have gotten the specific GOAWAY error...
let h2_err = err
.source()
.expect("source")
.downcast_ref::<h2::Error>()
.expect("downcast");
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
}
#[test]
fn skips_content_length_for_304_responses() {
let server = serve();
@@ -2789,15 +2718,11 @@ enum Msg {
End,
}
impl tower_service::Service<Request<Recv>> for TestService {
impl Service<Request<Recv>> for TestService {
type Response = Response<ReplyBody>;
type Error = BoxError;
type Future = BoxFuture;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, mut req: Request<Recv>) -> Self::Future {
let tx = self.tx.clone();
let replies = self.reply.clone();
@@ -2856,22 +2781,18 @@ const HELLO: &str = "hello";
struct HelloWorld;
impl tower_service::Service<Request<Recv>> for HelloWorld {
impl Service<Request<Recv>> for HelloWorld {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, _req: Request<Recv>) -> Self::Future {
let response = Response::new(Full::new(HELLO.into()));
future::ok(response)
}
}
fn unreachable_service() -> impl tower_service::Service<
fn unreachable_service() -> impl Service<
http::Request<hyper::Recv>,
Response = http::Response<ReplyBody>,
Error = BoxError,