diff --git a/Cargo.toml b/Cargo.toml index 71e6636b..213b14a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,11 @@ name = "gateway" path = "examples/gateway.rs" required-features = ["full"] +[[example]] +name = "graceful_shutdown" +path = "examples/graceful_shutdown.rs" +required-features = ["full"] + [[example]] name = "hello" path = "examples/hello.rs" diff --git a/examples/README.md b/examples/README.md index 2511c4be..5010b39c 100644 --- a/examples/README.md +++ b/examples/README.md @@ -42,6 +42,8 @@ futures-util = { version = "0.3", default-features = false, features = ["alloc"] * [`gateway`](gateway.rs) - A server gateway (reverse proxy) that proxies to the `hello` service above. +* [`graceful_shutdown`](graceful_shutdown.rs) - A server that has a timeout for incoming connections and does graceful connection shutdown. + * [`http_proxy`](http_proxy.rs) - A simple HTTP(S) proxy that handle and upgrade `CONNECT` requests and then proxy data between client and remote server. * [`multi_server`](multi_server.rs) - A server that listens to two different ports, a different `Service` per port. diff --git a/examples/graceful_shutdown.rs b/examples/graceful_shutdown.rs new file mode 100644 index 00000000..47fbb8da --- /dev/null +++ b/examples/graceful_shutdown.rs @@ -0,0 +1,95 @@ +#![deny(warnings)] + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::time::Duration; + +use bytes::Bytes; +use http_body_util::Full; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use tokio::net::TcpListener; +use tokio::pin; + +#[path = "../benches/support/mod.rs"] +mod support; +use support::TokioIo; + +// An async function that consumes a request, does nothing with it and returns a +// response. +async fn hello(_: Request) -> Result>, Infallible> { + // Sleep for 6 seconds to simulate long processing. + // This is longer than the initial 5 second connection timeout, + // but within the 2 second graceful shutdown timeout. + println!("in hello before sleep"); + tokio::time::sleep(Duration::from_secs(6)).await; + println!("in hello after sleep"); + Ok(Response::new(Full::new(Bytes::from("Hello World!")))) +} + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + + // This address is localhost + let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); + + // Use a 5 second timeout for incoming connections to the server. + // If a request is in progress when the 5 second timeout elapses, + // use a 2 second timeout for processing the final request and graceful shutdown. + let connection_timeouts = vec![Duration::from_secs(5), Duration::from_secs(2)]; + + // Bind to the port and listen for incoming TCP connections + let listener = TcpListener::bind(addr).await?; + println!("Listening on http://{}", addr); + loop { + // When an incoming TCP connection is received grab a TCP stream for + // client<->server communication. + let (tcp, remote_address) = listener.accept().await?; + + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(tcp); + + // Print the remote address connecting to our server. + println!("accepted connection from {:?}", remote_address); + + // Clone the connection_timeouts so they can be passed to the new task. + let connection_timeouts_clone = connection_timeouts.clone(); + + // Spin up a new task in Tokio so we can continue to listen for new TCP connection on the + // current task without waiting for the processing of the HTTP1 connection we just received + // to finish + tokio::task::spawn(async move { + // Pin the connection object so we can use tokio::select! below. + let conn = http1::Builder::new().serve_connection(io, service_fn(hello)); + pin!(conn); + + // Iterate the timeouts. Use tokio::select! to wait on the + // result of polling the connection itself, + // and also on tokio::time::sleep for the current timeout duration. + for (iter, sleep_duration) in connection_timeouts_clone.iter().enumerate() { + println!("iter = {} sleep_duration = {:?}", iter, sleep_duration); + tokio::select! { + res = conn.as_mut() => { + // Polling the connection returned a result. + // In this case print either the successful or error result for the connection + // and break out of the loop. + match res { + Ok(()) => println!("after polling conn, no error"), + Err(e) => println!("error serving connection: {:?}", e), + }; + break; + } + _ = tokio::time::sleep(*sleep_duration) => { + // tokio::time::sleep returned a result. + // Call graceful_shutdown on the connection and continue the loop. + println!("iter = {} got timeout_interval, calling conn.graceful_shutdown", iter); + conn.as_mut().graceful_shutdown(); + } + } + } + }); + } +}