mirror of
https://github.com/hyperium/hyper.git
synced 2026-01-25 02:16:14 +00:00
feat(body): update Body trait to use Frames (#3020)
The `Body` trait was adjusted to be forwards compatible with adding new frame types. That resulted in changing from `poll_data` and `poll_trailers` to a single `poll_frame` function. More can be learned from the proposal in https://github.com/hyperium/hyper/issues/2840. Closes #3010 BREAKING CHANGE: The polling functions of the `Body` trait have been redesigned. The free functions `hyper::body::to_bytes` and `aggregate` have been removed. Similar functionality is on `http_body_util::BodyExt::collect`.
This commit is contained in:
@@ -3,8 +3,8 @@
|
||||
use std::env;
|
||||
|
||||
use bytes::Bytes;
|
||||
use http_body_util::Empty;
|
||||
use hyper::{body::Body as _, Request};
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
use hyper::Request;
|
||||
use tokio::io::{self, AsyncWriteExt as _};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
@@ -62,9 +62,11 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
|
||||
|
||||
// Stream the body, writing each chunk to stdout as we get it
|
||||
// (instead of buffering and printing at the end).
|
||||
while let Some(next) = res.data().await {
|
||||
let chunk = next?;
|
||||
io::stdout().write_all(&chunk).await?;
|
||||
while let Some(next) = res.frame().await {
|
||||
let frame = next?;
|
||||
if let Some(chunk) = frame.data_ref() {
|
||||
io::stdout().write_all(&chunk).await?;
|
||||
}
|
||||
}
|
||||
|
||||
println!("\n\nDone!");
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
#![warn(rust_2018_idioms)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use http_body_util::Empty;
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
use hyper::{body::Buf, Request};
|
||||
use serde::Deserialize;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -48,7 +48,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
|
||||
let res = sender.send_request(req).await?;
|
||||
|
||||
// asynchronously aggregate the chunks of the body
|
||||
let body = hyper::body::aggregate(res).await?;
|
||||
let body = res.collect().await?.aggregate();
|
||||
|
||||
// try to parse as json with serde_json
|
||||
let users = serde_json::from_reader(body.reader())?;
|
||||
|
||||
@@ -50,7 +50,7 @@ async fn echo(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error
|
||||
return Ok(resp);
|
||||
}
|
||||
|
||||
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let whole_body = req.collect().await?.to_bytes();
|
||||
|
||||
let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
|
||||
Ok(Response::new(full(reversed_body)))
|
||||
|
||||
@@ -25,7 +25,7 @@ async fn param_example(
|
||||
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
|
||||
(&Method::POST, "/post") => {
|
||||
// Concatenate the body...
|
||||
let b = hyper::body::to_bytes(req).await?;
|
||||
let b = req.collect().await?.to_bytes();
|
||||
// Parse the request body. form_urlencoded::parse
|
||||
// always succeeds, but in general parsing may
|
||||
// fail (for example, an invalid post of json), so
|
||||
|
||||
@@ -6,8 +6,7 @@ use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use hyper::body::{Body as HttpBody, Bytes};
|
||||
use hyper::header::{HeaderMap, HeaderValue};
|
||||
use hyper::body::{Body as HttpBody, Bytes, Frame};
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Error, Response};
|
||||
use std::marker::PhantomData;
|
||||
@@ -33,18 +32,11 @@ impl HttpBody for Body {
|
||||
type Data = Bytes;
|
||||
type Error = Error;
|
||||
|
||||
fn poll_data(
|
||||
fn poll_frame(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
||||
Poll::Ready(self.get_mut().data.take().map(Ok))
|
||||
}
|
||||
|
||||
fn poll_trailers(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
|
||||
Poll::Ready(Ok(None))
|
||||
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||
Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d))))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn client_request_response() -> Result<Response<BoxBody>> {
|
||||
|
||||
async fn api_post_response(req: Request<Recv>) -> Result<Response<BoxBody>> {
|
||||
// Aggregate the body...
|
||||
let whole_body = hyper::body::aggregate(req).await?;
|
||||
let whole_body = req.collect().await?.aggregate();
|
||||
// Decode as JSON...
|
||||
let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?;
|
||||
// Change the JSON...
|
||||
|
||||
Reference in New Issue
Block a user