Add futures-io

This commit is contained in:
Taylor Cramer
2018-02-19 13:41:28 -08:00
parent 3e5aa4b035
commit c3d3e08239
28 changed files with 3724 additions and 6 deletions

View File

@@ -4,6 +4,7 @@ members = [
"futures-core",
"futures-channel",
"futures-executor",
"futures-io",
"futures-util",
"futures-sink",
]

View File

@@ -1,4 +1,5 @@
Copyright (c) 2016 Alex Crichton
Copyright (c) 2017 The Tokio Authors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated

19
futures-io/Cargo.toml Normal file
View File

@@ -0,0 +1,19 @@
[package]
name = "futures-io"
version = "0.2.0"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/alexcrichton/futures-rs"
homepage = "https://github.com/alexcrichton/futures-rs"
documentation = "https://docs.rs/futures-io"
description = """
The `AsyncRead` and `AsyncWrite` traits for the futures-rs library.
"""
[features]
std = ["futures-core/std", "iovec"]
default = ["std"]
[dependencies]
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
iovec = { version = "0.1", optional = true }

305
futures-io/src/lib.rs Normal file
View File

@@ -0,0 +1,305 @@
//! Asynchronous IO
//!
//! This crate contains the `AsyncRead` and `AsyncWrite` traits which allow
//! data to be read and written asynchronously.
#![no_std]
#![deny(missing_docs, missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/futures-io/0.2")]
macro_rules! if_std {
($($i:item)*) => ($(
#[cfg(feature = "std")]
$i
)*)
}
if_std! {
extern crate futures_core;
extern crate iovec;
extern crate std;
use futures_core::{Async, Poll, task};
use std::boxed::Box;
use std::io as StdIo;
use std::ptr;
use std::vec::Vec;
// Re-export IoVec for convenience
pub use iovec::IoVec;
// Re-export io::Error so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
pub use StdIo::Error as Error;
/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods.
#[derive(Debug)]
pub struct Initializer(bool);
impl Initializer {
/// Returns a new `Initializer` which will zero out buffers.
#[inline]
pub fn zeroing() -> Initializer {
Initializer(true)
}
/// Returns a new `Initializer` which will not zero out buffers.
///
/// # Safety
///
/// This method may only be called by `AsyncRead`ers which guarantee
/// that they will not read from the buffers passed to `AsyncRead`
/// methods, and that the return value of the method accurately reflects
/// the number of bytes that have been written to the head of the buffer.
#[inline]
pub unsafe fn nop() -> Initializer {
Initializer(false)
}
/// Indicates if a buffer should be initialized.
#[inline]
pub fn should_initialize(&self) -> bool {
self.0
}
/// Initializes a buffer if necessary.
#[inline]
pub fn initialize(&self, buf: &mut [u8]) {
if self.should_initialize() {
unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
}
}
}
/// Objects which can be read asynchronously.
pub trait AsyncRead {
/// Determines if this `AsyncRead`er can work with buffers of
/// uninitialized memory.
///
/// The default implementation returns an initializer which will zero
/// buffers.
///
/// # Safety
///
/// This method is `unsafe` because and `AsyncRead`er could otherwise
/// return a non-zeroing `Initializer` from another `AsyncRead` type
/// without an `unsafe` block.
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::zeroing()
}
/// Attempt to read from the `AsyncRead` into `buf`.
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// If reading would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes readable or is closed.
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, Error>;
/// Attempt to read from the `AsyncRead` into `vec` using vectored
/// IO operations. This allows data to be read into multiple buffers
/// using a single operation.
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `vec`. Objects which support vectored IO should override
/// this method.
///
/// If reading would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes readable or is closed.
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
if let Some(first_iovec) = vec.get_mut(0) {
self.poll_read(&mut *first_iovec, cx)
} else {
// `vec` is empty.
return Ok(Async::Ready(0));
}
}
}
/// Objects which can be written to asynchronously.
pub trait AsyncWrite {
/// Attempt to write bytes from `buf` into the object.
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// If writing would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// the object becomes writable or is closed.
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, Error>;
/// Attempt to write bytes from `vec` into the object using vectored
/// IO operations. This allows data from multiple buffers to be written
/// using a single operation.
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// By default, this method delegates to using `poll_write` on the first
/// buffer in `vec`. Objects which support vectored IO should override
/// this method.
///
/// If writing would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes writable or is closed.
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
if let Some(first_iovec) = vec.get(0) {
self.poll_write(&*first_iovec, cx)
} else {
// `vec` is empty.
return Ok(Async::Ready(0));
}
}
/// Attempt to flush the object, ensuring that all intermediately
/// buffered contents reach their destination.
///
/// On success, returns `Ok(Async::Ready(()))`.
///
/// If flushing is incomplete, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object can make progress towards flushing.
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
/// Attempt to close the object.
///
/// On success, returns `Ok(Async::Ready(()))`.
///
/// If closing is incomplete, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object can make progress towards closing.
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
}
macro_rules! deref_async_read {
() => {
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_read(buf, cx)
}
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_vectored_read(vec, cx)
}
}
}
impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
deref_async_read!();
}
impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
deref_async_read!();
}
/// `unsafe` because the `StdIo::Read` type must not access the buffer
/// before reading data into it.
macro_rules! unsafe_delegate_async_read_to_stdio {
() => {
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context)
-> Poll<usize, Error>
{
Ok(Async::Ready(StdIo::Read::read(self, buf)?))
}
}
}
impl<'a> AsyncRead for &'a [u8] {
unsafe_delegate_async_read_to_stdio!();
}
impl AsyncRead for StdIo::Repeat {
unsafe_delegate_async_read_to_stdio!();
}
impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
}
macro_rules! deref_async_write {
() => {
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_write(buf, cx)
}
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_vectored_write(vec, cx)
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
(**self).poll_flush(cx)
}
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
(**self).poll_close(cx)
}
}
}
impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
deref_async_write!();
}
impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
deref_async_write!();
}
macro_rules! delegate_async_write_to_stdio {
() => {
fn poll_write(&mut self, buf: &[u8], _: &mut task::Context)
-> Poll<usize, Error>
{
Ok(Async::Ready(StdIo::Write::write(self, buf)?))
}
fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
Ok(Async::Ready(StdIo::Write::flush(self)?))
}
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
self.poll_flush(cx)
}
}
}
impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
delegate_async_write_to_stdio!();
}
impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
delegate_async_write_to_stdio!();
}
impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
delegate_async_write_to_stdio!();
}
impl AsyncWrite for StdIo::Sink {
delegate_async_write_to_stdio!();
}
}

View File

@@ -11,12 +11,15 @@ Common utilities and extension traits for the futures-rs library.
"""
[features]
std = ["futures-core/std", "futures-sink/std"]
std = ["bytes", "log", "futures-core/std", "futures-io/std", "futures-sink/std"]
default = ["std"]
[dependencies]
bytes = { version = "0.4", optional = true }
log = { version = "0.4", optional = true }
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
[dev-dependencies]

View File

@@ -0,0 +1,96 @@
use futures_core::{Async, Poll, task};
use futures_io::{AsyncRead, AsyncWrite};
use std::{fmt, io};
use std::string::String;
use std::vec::Vec;
/// A simple wrapper type which allows types which implement only
/// implement `std::io::Read` or `std::io::Write`
/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`.
///
/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`,
/// it is expected that they will notify the current task on readiness.
/// Synchronous `std` types should not issue errors of this kind and
/// are safe to use in this context. However, using these types with
/// `AllowStdIo` will cause the event loop to block, so they should be used
/// with care.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct AllowStdIo<T>(T);
impl<T> AllowStdIo<T> {
/// Creates a new `AllowStdIo` from an existing IO object.
pub fn new(io: T) -> Self {
AllowStdIo(io)
}
/// Returns a reference to the contained IO object.
pub fn get_ref(&self) -> &T {
&self.0
}
/// Returns a mutable reference to the contained IO object.
pub fn get_mut(&mut self) -> &mut T {
&mut self.0
}
/// Consumes self and returns the contained IO object.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> io::Write for AllowStdIo<T> where T: io::Write {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.0.write_all(buf)
}
fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> {
self.0.write_fmt(fmt)
}
}
impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write {
fn poll_write(&mut self, buf: &[u8], _: &mut task::Context)
-> Poll<usize, io::Error>
{
Ok(Async::Ready(io::Write::write(&mut self.0, buf)?))
}
fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), io::Error> {
Ok(Async::Ready(io::Write::flush(self)?))
}
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.poll_flush(cx)
}
}
impl<T> io::Read for AllowStdIo<T> where T: io::Read {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
// TODO: implement the `initializer` fn when it stabilizes.
// See rust-lang/rust #42788
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.0.read_to_end(buf)
}
fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
self.0.read_to_string(buf)
}
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.0.read_exact(buf)
}
}
impl<T> AsyncRead for AllowStdIo<T> where T: io::Read {
fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context)
-> Poll<usize, io::Error>
{
Ok(Async::Ready(io::Read::read(&mut self.0, buf)?))
}
}

View File

@@ -0,0 +1,373 @@
//! Utilities for encoding and decoding frames.
//!
//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
//! Framed streams are also known as [transports].
//!
//! [`AsyncRead`]: #
//! [`AsyncWrite`]: #
//! [`Sink`]: #
//! [`Stream`]: #
//! [transports]: #
pub use io::codecs::{BytesCodec, LinesCodec};
pub use io::framed::{Framed, FramedParts};
pub use io::framed_read::{FramedRead, Decoder};
pub use io::framed_write::{FramedWrite, Encoder};
pub mod length_delimited {
//! Frame a stream of bytes based on a length prefix
//!
//! Many protocols delimit their frames by prefacing frame data with a
//! frame head that specifies the length of the frame. The
//! `length_delimited` module provides utilities for handling the length
//! based framing. This allows the consumer to work with entire frames
//! without having to worry about buffering or other framing logic.
//!
//! # Getting started
//!
//! If implementing a protocol from scratch, using length delimited framing
//! is an easy way to get started. [`Framed::new()`] will adapt a
//! full-duplex byte stream with a length delimited framer using default
//! configuration values.
//!
//! ```
//! extern crate futures;
//! use futures::io::{AsyncRead, AsyncWrite};
//! use futures::io::codec::length_delimited;
//!
//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
//! -> length_delimited::Framed<T>
//! {
//! length_delimited::Framed::new(io)
//! }
//!
//! # fn main() {}
//! ```
//!
//! The returned transport implements `Sink + Stream` for `BytesMut`. It
//! encodes the frame with a big-endian `u32` header denoting the frame
//! payload length:
//!
//! ```text
//! +----------+--------------------------------+
//! | len: u32 | frame payload |
//! +----------+--------------------------------+
//! ```
//!
//! Specifically, given the following:
//!
//! ```
//! # extern crate bytes;
//! # extern crate futures;
//!
//! use bytes::BytesMut;
//! use futures::{Sink, SinkExt, Future};
//! use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
//! use futures::io::codec::length_delimited;
//! use futures::executor::block_on;
//!
//! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
//! let mut transport = length_delimited::Framed::new(io);
//! let frame = BytesMut::from("hello world");
//!
//! block_on(transport.send(frame)).unwrap();
//! }
//! # fn main() {}
//! ```
//!
//! The encoded frame will look like this:
//!
//! ```text
//! +---- len: u32 ----+---- data ----+
//! | \x00\x00\x00\x0b | hello world |
//! +------------------+--------------+
//! ```
//!
//! # Decoding
//!
//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
//! such that each yielded [`BytesMut`] value contains the contents of an
//! entire frame. There are many configuration parameters enabling
//! [`FrameRead`] to handle a wide range of protocols. Here are some
//! examples that will cover the various options at a high level.
//!
//! ## Example 1
//!
//! The following will parse a `u16` length field at offset 0, including the
//! frame head in the yielded `BytesMut`.
//!
//! ```
//! # extern crate futures;
//! # use futures::io::AsyncRead;
//! # use futures::io::codec::length_delimited;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! length_delimited::Builder::new()
//! .length_field_offset(0) // default value
//! .length_field_length(2)
//! .length_adjustment(0) // default value
//! .num_skip(0) // Do not strip frame header
//! .new_read(io);
//! # }
//! # fn main() {}
//! ```
//!
//! The following frame will be decoded as such:
//!
//! ```text
//! INPUT DECODED
//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world |
//! +----------+---------------+ +----------+---------------+
//! ```
//!
//! The value of the length field is 11 (`\x0B`) which represents the length
//! of the payload, `hello world`. By default, [`FramedRead`] assumes that
//! the length field represents the number of bytes that **follows** the
//! length field. Thus, the entire frame has a length of 13: 2 bytes for the
//! frame head + 11 bytes for the payload.
//!
//! ## Example 2
//!
//! The following will parse a `u16` length field at offset 0, omitting the
//! frame head in the yielded `BytesMut`.
//!
//! ```
//! # extern crate futures;
//! # use futures::io::AsyncRead;
//! # use futures::io::codec::length_delimited;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! length_delimited::Builder::new()
//! .length_field_offset(0) // default value
//! .length_field_length(2)
//! .length_adjustment(0) // default value
//! // `num_skip` is not needed, the default is to skip
//! .new_read(io);
//! # }
//! # fn main() {}
//! ```
//!
//! The following frame will be decoded as such:
//!
//! ```text
//! INPUT DECODED
//! +-- len ---+--- Payload ---+ +--- Payload ---+
//! | \x00\x0B | Hello world | --> | Hello world |
//! +----------+---------------+ +---------------+
//! ```
//!
//! This is similar to the first example, the only difference is that the
//! frame head is **not** included in the yielded `BytesMut` value.
//!
//! ## Example 3
//!
//! The following will parse a `u16` length field at offset 0, including the
//! frame head in the yielded `BytesMut`. In this case, the length field
//! **includes** the frame head length.
//!
//! ```
//! # extern crate futures;
//! # use futures::io::AsyncRead;
//! # use futures::io::codec::length_delimited;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! length_delimited::Builder::new()
//! .length_field_offset(0) // default value
//! .length_field_length(2)
//! .length_adjustment(-2) // size of head
//! .num_skip(0)
//! .new_read(io);
//! # }
//! # fn main() {}
//! ```
//!
//! The following frame will be decoded as such:
//!
//! ```text
//! INPUT DECODED
//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
//! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world |
//! +----------+---------------+ +----------+---------------+
//! ```
//!
//! In most cases, the length field represents the length of the payload
//! only, as shown in the previous examples. However, in some protocols the
//! length field represents the length of the whole frame, including the
//! head. In such cases, we specify a negative `length_adjustment` to adjust
//! the value provided in the frame head to represent the payload length.
//!
//! ## Example 4
//!
//! The following will parse a 3 byte length field at offset 0 in a 5 byte
//! frame head, including the frame head in the yielded `BytesMut`.
//!
//! ```
//! # extern crate futures;
//! # use futures::io::AsyncRead;
//! # use futures::io::codec::length_delimited;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! length_delimited::Builder::new()
//! .length_field_offset(0) // default value
//! .length_field_length(3)
//! .length_adjustment(2) // remaining head
//! .num_skip(0)
//! .new_read(io);
//! # }
//! # fn main() {}
//! ```
//!
//! The following frame will be decoded as such:
//!
//! ```text
//! INPUT
//! +---- len -----+- head -+--- Payload ---+
//! | \x00\x00\x0B | \xCAFE | Hello world |
//! +--------------+--------+---------------+
//!
//! DECODED
//! +---- len -----+- head -+--- Payload ---+
//! | \x00\x00\x0B | \xCAFE | Hello world |
//! +--------------+--------+---------------+
//! ```
//!
//! A more advanced example that shows a case where there is extra frame
//! head data between the length field and the payload. In such cases, it is
//! usually desirable to include the frame head as part of the yielded
//! `BytesMut`. This lets consumers of the length delimited framer to
//! process the frame head as needed.
//!
//! The positive `length_adjustment` value lets `FramedRead` factor in the
//! additional head into the frame length calculation.
//!
//! ## Example 5
//!
//! The following will parse a `u16` length field at offset 1 of a 4 byte
//! frame head. The first byte and the length field will be omitted from the
//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
//! included.
//!
//! ```
//! # extern crate futures;
//! # use futures::io::AsyncRead;
//! # use futures::io::codec::length_delimited;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! length_delimited::Builder::new()
//! .length_field_offset(1) // length of hdr1
//! .length_field_length(2)
//! .length_adjustment(1) // length of hdr2
//! .num_skip(3) // length of hdr1 + LEN
//! .new_read(io);
//! # }
//! # fn main() {}
//! ```
//!
//! The following frame will be decoded as such:
//!
//! ```text
//! INPUT
//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
//! | \xCA | \x00\x0B | \xFE | Hello world |
//! +--------+----------+--------+---------------+
//!
//! DECODED
//! +- hdr2 -+--- Payload ---+
//! | \xFE | Hello world |
//! +--------+---------------+
//! ```
//!
//! The length field is situated in the middle of the frame head. In this
//! case, the first byte in the frame head could be a version or some other
//! identifier that is not needed for processing. On the other hand, the
//! second half of the head is needed.
//!
//! `length_field_offset` indicates how many bytes to skip before starting
//! to read the length field. `length_adjustment` is the number of bytes to
//! skip starting at the end of the length field. In this case, it is the
//! second half of the head.
//!
//! ## Example 6
//!
//! The following will parse a `u16` length field at offset 1 of a 4 byte
//! frame head. The first byte and the length field will be omitted from the
//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
//! included. In this case, the length field **includes** the frame head
//! length.
//!
//! ```
//! # extern crate futures;
//! # use futures::io::AsyncRead;
//! # use futures::io::codec::length_delimited;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! length_delimited::Builder::new()
//! .length_field_offset(1) // length of hdr1
//! .length_field_length(2)
//! .length_adjustment(-3) // length of hdr1 + LEN, negative
//! .num_skip(3)
//! .new_read(io);
//! # }
//! # fn main() {}
//! ```
//!
//! The following frame will be decoded as such:
//!
//! ```text
//! INPUT
//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
//! | \xCA | \x00\x0F | \xFE | Hello world |
//! +--------+----------+--------+---------------+
//!
//! DECODED
//! +- hdr2 -+--- Payload ---+
//! | \xFE | Hello world |
//! +--------+---------------+
//! ```
//!
//! Similar to the example above, the difference is that the length field
//! represents the length of the entire frame instead of just the payload.
//! The length of `hdr1` and `len` must be counted in `length_adjustment`.
//! Note that the length of `hdr2` does **not** need to be explicitly set
//! anywhere because it already is factored into the total frame length that
//! is read from the byte stream.
//!
//! # Encoding
//!
//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
//! such that each submitted [`BytesMut`] is prefaced by a length field.
//! There are fewer configuration options than [`FramedRead`]. Given
//! protocols that have more complex frame heads, an encoder should probably
//! be written by hand using [`Encoder`].
//!
//! Here is a simple example, given a `FramedWrite` with the following
//! configuration:
//!
//! ```
//! # extern crate bytes;
//! # extern crate futures;
//! # use futures::io::AsyncWrite;
//! # use futures::io::codec::length_delimited;
//! # use bytes::BytesMut;
//! # fn write_frame<T: AsyncWrite>(io: T) {
//! # let _: length_delimited::FramedWrite<T, BytesMut> =
//! length_delimited::Builder::new()
//! .length_field_length(2)
//! .new_write(io);
//! # }
//! # fn main() {}
//! ```
//!
//! A payload of `hello world` will be encoded as:
//!
//! ```text
//! +- len: u16 -+---- data ----+
//! | \x00\x0b | hello world |
//! +------------+--------------+
//! ```
//!
//! [`FramedRead`]: struct.FramedRead.html
//! [`FramedWrite`]: struct.FramedWrite.html
//! [`AsyncRead`]: ../../trait.AsyncRead.html
//! [`AsyncWrite`]: ../../trait.AsyncWrite.html
//! [`Encoder`]: ../trait.Encoder.html
//! [`BytesMut`]: https://docs.rs/bytes/~0.4/bytes/struct.BytesMut.html
pub use io::length_delimited::*;
}

View File

@@ -0,0 +1,124 @@
use bytes::{Bytes, BufMut, BytesMut};
use io::codec::{Encoder, Decoder};
use std::{io, str};
use std::string::{String, ToString};
/// A simple `Codec` implementation that just ships bytes around.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct BytesCodec(());
impl BytesCodec {
/// Creates a new `BytesCodec` for shipping around raw bytes.
pub fn new() -> BytesCodec { BytesCodec(()) }
}
impl Decoder for BytesCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
if buf.len() > 0 {
let len = buf.len();
Ok(Some(buf.split_to(len)))
} else {
Ok(None)
}
}
}
impl Encoder for BytesCodec {
type Item = Bytes;
type Error = io::Error;
fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}
/// A simple `Codec` implementation that splits up data into lines.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct LinesCodec {
// Stored index of the next index to examine for a `\n` character.
// This is used to optimize searching.
// For example, if `decode` was called with `abc`, it would hold `3`,
// because that is the next index to examine.
// The next time `decode` is called with `abcde\n`, the method will
// only look at `de\n` before returning.
next_index: usize,
}
impl LinesCodec {
/// Returns a `LinesCodec` for splitting up data into lines.
pub fn new() -> LinesCodec {
LinesCodec { next_index: 0 }
}
}
fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
str::from_utf8(buf).map_err(|_|
io::Error::new(
io::ErrorKind::InvalidData,
"Unable to decode input as UTF8"))
}
fn without_carriage_return(s: &[u8]) -> &[u8] {
if let Some(&b'\r') = s.last() {
&s[..s.len() - 1]
} else {
s
}
}
impl Decoder for LinesCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
if let Some(newline_offset) =
buf[self.next_index..].iter().position(|b| *b == b'\n')
{
let newline_index = newline_offset + self.next_index;
let line = buf.split_to(newline_index + 1);
let line = &line[..line.len()-1];
let line = without_carriage_return(line);
let line = utf8(line)?;
self.next_index = 0;
Ok(Some(line.to_string()))
} else {
self.next_index = buf.len();
Ok(None)
}
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
Ok(match self.decode(buf)? {
Some(frame) => Some(frame),
None => {
// No terminating newline - return remaining data, if any
if buf.is_empty() || buf == &b"\r"[..] {
None
} else {
let line = buf.take();
let line = without_carriage_return(&line);
let line = utf8(line)?;
self.next_index = 0;
Some(line.to_string())
}
}
})
}
}
impl Encoder for LinesCodec {
type Item = String;
type Error = io::Error;
fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(line.len() + 1);
buf.put(line);
buf.put_u8(b'\n');
Ok(())
}
}

View File

@@ -0,0 +1,97 @@
use std::io;
use std::boxed::Box;
use {Future, Poll, task};
use futures_io::{AsyncRead, AsyncWrite};
/// A future which will copy all data from a reader into a writer.
///
/// Created by the [`copy`] function, this future will resolve to the number of
/// bytes copied or an error if one happens.
///
/// [`copy`]: fn.copy.html
#[derive(Debug)]
pub struct Copy<R, W> {
reader: Option<R>,
read_done: bool,
writer: Option<W>,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
/// Creates a future which represents copying all the bytes from one object to
/// another.
///
/// The returned future will copy all the bytes read from `reader` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned and the `reader` and `writer` are
/// consumed. On error the error is returned and the I/O objects are consumed as
/// well.
pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
where R: AsyncRead,
W: AsyncWrite,
{
Copy {
reader: Some(reader),
read_done: false,
writer: Some(writer),
amt: 0,
pos: 0,
cap: 0,
buf: Box::new([0; 2048]),
}
}
impl<R, W> Future for Copy<R, W>
where R: AsyncRead,
W: AsyncWrite,
{
type Item = (u64, R, W);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(u64, R, W), io::Error> {
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
let reader = self.reader.as_mut().unwrap();
let n = try_ready!(reader.poll_read(&mut self.buf, cx));
if n == 0 {
self.read_done = true;
} else {
self.pos = 0;
self.cap = n;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let writer = self.writer.as_mut().unwrap();
let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap], cx));
if i == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"write zero byte into writer"));
} else {
self.pos += i;
self.amt += i as u64;
}
}
// If we've written al the data and we've seen EOF, flush out the
// data and finish the transfer.
// done with the entire transfer.
if self.pos == self.cap && self.read_done {
try_ready!(self.writer.as_mut().unwrap().poll_flush(cx));
let reader = self.reader.take().unwrap();
let writer = self.writer.take().unwrap();
return Ok((self.amt, reader, writer).into())
}
}
}
}

View File

@@ -0,0 +1,44 @@
use std::io;
use {Poll, Future, Async, task};
use futures_io::AsyncWrite;
/// A future used to fully flush an I/O object.
///
/// Resolves to the underlying I/O object once the flush operation is complete.
///
/// Created by the [`flush`] function.
///
/// [`flush`]: fn.flush.html
#[derive(Debug)]
pub struct Flush<A> {
a: Option<A>,
}
/// Creates a future which will entirely flush an I/O object and then yield the
/// object itself.
///
/// This function will consume the object provided if an error happens, and
/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
/// a retry if `WouldBlock` is seen along the way.
pub fn flush<A>(a: A) -> Flush<A>
where A: AsyncWrite,
{
Flush {
a: Some(a),
}
}
impl<A> Future for Flush<A>
where A: AsyncWrite,
{
type Item = A;
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<A, io::Error> {
try_ready!(self.a.as_mut().unwrap().poll_flush(cx));
Ok(Async::Ready(self.a.take().unwrap()))
}
}

View File

@@ -0,0 +1,257 @@
use std::io::{self, Read, Write};
use std::fmt;
use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec};
use io::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2, Decoder};
use io::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2, Encoder};
use {Stream, Sink, Poll, task};
use bytes::{BytesMut};
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
pub struct Framed<T, U> {
inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
}
pub struct Fuse<T, U>(pub T, pub U);
pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
where T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
{
Framed {
inner: framed_read2(framed_write2(Fuse(inner, codec))),
}
}
impl<T, U> Framed<T, U> {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// This objects takes a stream and a readbuffer and a writebuffer. These field
/// can be obtained from an existing `Framed` with the `into_parts` method.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U>
{
Framed {
inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf),
}
}
/// Returns a reference to the underlying I/O stream wrapped by
/// `Frame`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
&self.inner.get_ref().get_ref().0
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Frame`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner.get_mut().get_mut().0
}
/// Consumes the `Frame`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
self.inner.into_inner().into_inner().0
}
/// Consumes the `Frame`, returning its underlying I/O stream and the buffer
/// with unprocessed data.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T> {
let (inner, readbuf) = self.inner.into_parts();
let (inner, writebuf) = inner.into_parts();
FramedParts { inner: inner.0, readbuf, writebuf }
}
/// Consumes the `Frame`, returning its underlying I/O stream and the buffer
/// with unprocessed data, and also the current codec state.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
///
/// Note that this function will be removed once the codec has been
/// integrated into `FramedParts` in a new version (see
/// [#53](https://github.com/tokio-rs/tokio-io/pull/53)).
pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) {
let (inner, readbuf) = self.inner.into_parts();
let (inner, writebuf) = inner.into_parts();
(FramedParts { inner: inner.0, readbuf, writebuf }, inner.1)
}
}
impl<T, U> Stream for Framed<T, U>
where T: AsyncRead,
U: Decoder,
{
type Item = U::Item;
type Error = U::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll(cx)
}
}
impl<T, U> Sink for Framed<T, U>
where T: AsyncWrite,
U: Encoder,
U::Error: From<io::Error>,
{
type SinkItem = U::Item;
type SinkError = U::Error;
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
self.inner.get_mut().poll_ready(cx)
}
fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
self.inner.get_mut().start_send(item)
}
fn start_close(&mut self) -> Result<(), Self::SinkError> {
self.inner.get_mut().start_close()
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
self.inner.get_mut().poll_flush(cx)
}
}
impl<T, U> fmt::Debug for Framed<T, U>
where T: fmt::Debug,
U: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Framed")
.field("io", &self.inner.get_ref().get_ref().0)
.field("codec", &self.inner.get_ref().get_ref().1)
.finish()
}
}
// ===== impl Fuse =====
impl<T: Read, U> Read for Fuse<T, U> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.0.read(dst)
}
}
impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
unsafe fn initializer(&self) -> Initializer {
self.0.initializer()
}
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.0.poll_read(buf, cx)
}
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.0.poll_vectored_read(vec, cx)
}
}
impl<T: Write, U> Write for Fuse<T, U> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.0.write(src)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.0.poll_write(buf, cx)
}
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.0.poll_vectored_write(vec, cx)
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.0.poll_flush(cx)
}
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.0.poll_close(cx)
}
}
impl<T, U: Decoder> Decoder for Fuse<T, U> {
type Item = U::Item;
type Error = U::Error;
fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.1.decode(buffer)
}
fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.1.decode_eof(buffer)
}
}
impl<T, U: Encoder> Encoder for Fuse<T, U> {
type Item = U::Item;
type Error = U::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.1.encode(item, dst)
}
}
/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new `Framed` with a different codec.
/// It contains all current buffers and the inner transport.
#[derive(Debug)]
pub struct FramedParts<T>
{
/// The inner transport used to read bytes to and write bytes to
pub inner: T,
/// The buffer with read but unprocessed data.
pub readbuf: BytesMut,
/// A buffer with unprocessed data which are not written yet.
pub writebuf: BytesMut
}

View File

@@ -0,0 +1,297 @@
use std::{fmt, io};
use futures_io::AsyncRead;
use io::AsyncReadExt;
use io::framed::Fuse;
use {Async, Poll, Stream, Sink, task};
use bytes::BytesMut;
/// Decoding of frames via buffers.
///
/// This trait is used when constructing an instance of `Framed` or
/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
/// already been buffered in `src` and decodes the data into a stream of
/// `Self::Item` frames.
///
/// Implementations are able to track state on `self`, which enables
/// implementing stateful streaming parsers. In many cases, though, this type
/// will simply be a unit struct (e.g. `struct HttpDecoder`).
pub trait Decoder {
/// The type of decoded frames.
type Item;
/// The type of unrecoverable frame decoding errors.
///
/// If an individual message is ill-formed but can be ignored without
/// interfering with the processing of future messages, it may be more
/// useful to report the failure as an `Item`.
///
/// `From<io::Error>` is required in the interest of making `Error` suitable
/// for returning directly from a `FramedRead`, and to enable the default
/// implementation of `decode_eof` to yield an `io::Error` when the decoder
/// fails to consume all available data.
///
/// Note that implementors of this trait can simply indicate `type Error =
/// io::Error` to use I/O errors as this type.
type Error: From<io::Error>;
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `FramedRead` whenever bytes are ready to be
/// parsed. The provided buffer of bytes is what's been read so far, and
/// this instance of `Decode` can determine whether an entire frame is in
/// the buffer and is ready to be returned.
///
/// If an entire frame is available, then this instance will remove those
/// bytes from the buffer provided and return them as a decoded
/// frame. Note that removing bytes from the provided buffer doesn't always
/// necessarily copy the bytes, so this should be an efficient operation in
/// most circumstances.
///
/// If the bytes look valid, but a frame isn't fully available yet, then
/// `Ok(None)` is returned. This indicates to the `Framed` instance that
/// it needs to read some more bytes before calling this method again.
///
/// Note that the bytes provided may be empty. If a previous call to
/// `decode` consumed all the bytes in the buffer then `decode` will be
/// called again until it returns `None`, indicating that more bytes need to
/// be read.
///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
/// A default method available to be called when there are no more bytes
/// available to be read from the underlying I/O.
///
/// This method defaults to calling `decode` and returns an error if
/// `Ok(None)` is returned while there is unconsumed data in `buf`.
/// Typically this doesn't need to be implemented unless the framing
/// protocol differs near the end of the stream.
///
/// Note that the `buf` argument may be empty. If a previous call to
/// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
/// called again until it returns `None`, indicating that there are no more
/// frames to yield. This behavior enables returning finalization frames
/// that may not be based on inbound data.
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match try!(self.decode(buf)) {
Some(frame) => Ok(Some(frame)),
None => {
if buf.is_empty() {
Ok(None)
} else {
Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream").into())
}
}
}
}
}
/// A `Stream` of messages decoded from an `AsyncRead`.
pub struct FramedRead<T, D> {
inner: FramedRead2<Fuse<T, D>>,
}
pub struct FramedRead2<T> {
inner: T,
eof: bool,
is_readable: bool,
buffer: BytesMut,
}
const INITIAL_CAPACITY: usize = 8 * 1024;
// ===== impl FramedRead =====
impl<T, D> FramedRead<T, D>
where T: AsyncRead,
D: Decoder,
{
/// Creates a new `FramedRead` with the given `decoder`.
pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
FramedRead {
inner: framed_read2(Fuse(inner, decoder)),
}
}
}
impl<T, D> FramedRead<T, D> {
/// Returns a reference to the underlying I/O stream wrapped by
/// `FramedRead`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
&self.inner.inner.0
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `FramedRead`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner.inner.0
}
/// Consumes the `FramedRead`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
self.inner.inner.0
}
/// Returns a reference to the underlying decoder.
pub fn decoder(&self) -> &D {
&self.inner.inner.1
}
/// Returns a mutable reference to the underlying decoder.
pub fn decoder_mut(&mut self) -> &mut D {
&mut self.inner.inner.1
}
}
impl<T, D> Stream for FramedRead<T, D>
where T: AsyncRead,
D: Decoder,
{
type Item = D::Item;
type Error = D::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll(cx)
}
}
impl<T, D> Sink for FramedRead<T, D>
where T: Sink,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
self.inner.inner.0.poll_ready(cx)
}
fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
self.inner.inner.0.start_send(item)
}
fn start_close(&mut self) -> Result<(), Self::SinkError> {
self.inner.inner.0.start_close()
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
self.inner.inner.0.poll_flush(cx)
}
}
impl<T, D> fmt::Debug for FramedRead<T, D>
where T: fmt::Debug,
D: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FramedRead")
.field("inner", &self.inner.inner.0)
.field("decoder", &self.inner.inner.1)
.field("eof", &self.inner.eof)
.field("is_readable", &self.inner.is_readable)
.field("buffer", &self.inner.buffer)
.finish()
}
}
// ===== impl FramedRead2 =====
pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
FramedRead2 {
inner,
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
}
}
pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
if buf.capacity() < INITIAL_CAPACITY {
let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
buf.reserve(bytes_to_reserve);
}
FramedRead2 {
inner,
eof: false,
is_readable: buf.len() > 0,
buffer: buf,
}
}
impl<T> FramedRead2<T> {
pub fn get_ref(&self) -> &T {
&self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T> Stream for FramedRead2<T>
where T: AsyncRead + Decoder,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// Repeatedly call `decode` or `decode_eof` as long as it is
// "readable". Readable is defined as not having returned `None`. If
// the upstream has returned EOF, and the decoder is no longer
// readable, it can be assumed that the decoder will never become
// readable again, at which point the stream is terminated.
if self.is_readable {
if self.eof {
let frame = self.inner.decode_eof(&mut self.buffer)?;
return Ok(Async::Ready(frame));
}
trace!("attempting to decode a frame");
if let Some(frame) = self.inner.decode(&mut self.buffer)? {
trace!("frame decoded from buffer");
return Ok(Async::Ready(Some(frame)));
}
self.is_readable = false;
}
assert!(!self.eof);
// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
self.buffer.reserve(1);
if 0 == try_ready!(self.inner.read_buf(&mut self.buffer, cx)) {
self.eof = true;
}
self.is_readable = true;
}
}
}

View File

@@ -0,0 +1,267 @@
use std::io::{self, Read};
use std::fmt;
use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec};
use io::codec::Decoder;
use io::framed::Fuse;
use {Async, Poll, Stream, Sink, task};
use bytes::BytesMut;
/// Trait of helper objects to write out messages as bytes, for use with
/// `FramedWrite`.
pub trait Encoder {
/// The type of items consumed by the `Encoder`
type Item;
/// The type of encoding errors.
///
/// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
/// in the interest letting it return `Error`s directly.
type Error: From<io::Error>;
/// Encodes a frame into the buffer provided.
///
/// This method will encode `item` into the byte buffer provided by `dst`.
/// The `dst` provided is an internal buffer of the `Framed` instance and
/// will be written out when possible.
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut)
-> Result<(), Self::Error>;
}
/// A `Sink` of frames encoded to an `AsyncWrite`.
pub struct FramedWrite<T, E> {
inner: FramedWrite2<Fuse<T, E>>,
}
pub struct FramedWrite2<T> {
inner: T,
buffer: BytesMut,
do_close: bool,
}
const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
impl<T, E> FramedWrite<T, E>
where T: AsyncWrite,
E: Encoder,
{
/// Creates a new `FramedWrite` with the given `encoder`.
pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
FramedWrite {
inner: framed_write2(Fuse(inner, encoder)),
}
}
}
impl<T, E> FramedWrite<T, E> {
/// Returns a reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
&self.inner.inner.0
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner.inner.0
}
/// Consumes the `FramedWrite`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
self.inner.inner.0
}
/// Returns a reference to the underlying decoder.
pub fn encoder(&self) -> &E {
&self.inner.inner.1
}
/// Returns a mutable reference to the underlying decoder.
pub fn encoder_mut(&mut self) -> &mut E {
&mut self.inner.inner.1
}
}
impl<T, E> Sink for FramedWrite<T, E>
where T: AsyncWrite,
E: Encoder,
{
type SinkItem = E::Item;
type SinkError = E::Error;
delegate_sink!(inner);
}
impl<T, D> Stream for FramedWrite<T, D>
where T: Stream,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.inner.0.poll(cx)
}
}
impl<T, U> fmt::Debug for FramedWrite<T, U>
where T: fmt::Debug,
U: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FramedWrite")
.field("inner", &self.inner.get_ref().0)
.field("encoder", &self.inner.get_ref().1)
.field("buffer", &self.inner.buffer)
.finish()
}
}
// ===== impl FramedWrite2 =====
pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
FramedWrite2 {
inner,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
do_close: false,
}
}
pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
if buf.capacity() < INITIAL_CAPACITY {
let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
buf.reserve(bytes_to_reserve);
}
FramedWrite2 {
inner,
buffer: buf,
do_close: false,
}
}
impl<T> FramedWrite2<T> {
pub fn get_ref(&self) -> &T {
&self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T> Sink for FramedWrite2<T>
where T: AsyncWrite + Encoder,
{
type SinkItem = T::Item;
type SinkError = T::Error;
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
// If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
// *still* over 8KiB, then apply backpressure (reject the send).
if self.buffer.len() < BACKPRESSURE_BOUNDARY {
return Ok(Async::Ready(()));
}
self.poll_flush(cx)?;
if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
Ok(Async::Pending)
} else {
Ok(Async::Ready(()))
}
}
fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
self.inner.encode(item, &mut self.buffer)
}
fn start_close(&mut self) -> Result<(), Self::SinkError> {
self.do_close = true;
Ok(())
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
trace!("flushing framed transport");
while !self.buffer.is_empty() {
trace!("writing; remaining={}", self.buffer.len());
let n = try_ready!(self.inner.poll_write(&self.buffer, cx));
if n == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to
write frame to transport").into());
}
// TODO: Add a way to `bytes` to do this w/o returning the drained
// data.
let _ = self.buffer.split_to(n);
}
try_ready!(self.inner.poll_flush(cx));
if self.do_close {
self.inner.poll_close(cx).map_err(Into::into)
} else {
Ok(Async::Ready(()))
}
}
}
impl<T: Decoder> Decoder for FramedWrite2<T> {
type Item = T::Item;
type Error = T::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
self.inner.decode(src)
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
self.inner.decode_eof(src)
}
}
impl<T: Read> Read for FramedWrite2<T> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.inner.read(dst)
}
}
impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.inner.poll_read(buf, cx)
}
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.inner.poll_vectored_read(vec, cx)
}
}

23
futures-util/src/io/io.rs Normal file
View File

@@ -0,0 +1,23 @@
//! I/O conveniences when working with primitives in `tokio-core`
//!
//! Contains various combinators to work with I/O objects and type definitions
//! as well.
//!
//! A description of the high-level I/O combinators can be [found online] in
//! addition to a description of the [low level details].
//!
//! [found online]: https://tokio.rs/docs/getting-started/core/
//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
pub use io::allow_std::AllowStdIo;
pub use io::copy::{copy, Copy};
pub use io::flush::{flush, Flush};
//pub use io::lines::{lines, Lines};
pub use io::read::{read, Read};
pub use io::read_exact::{read_exact, ReadExact};
pub use io::read_to_end::{read_to_end, ReadToEnd};
//pub use io::read_until::{read_until, ReadUntil};
pub use io::shutdown::{shutdown, Shutdown};
pub use io::split::{ReadHalf, WriteHalf};
pub use io::window::Window;
pub use io::write_all::{write_all, WriteAll};

View File

@@ -0,0 +1,902 @@
use bytes::{Buf, BufMut, BytesMut, IntoBuf, BigEndian, LittleEndian};
use bytes::buf::Chain;
use {Async, Stream, Sink, Poll, task};
use io::{AsyncWriteExt, codec};
use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec};
use std::{cmp, fmt};
use std::error::Error as StdError;
use std::io::{self, Cursor};
/// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values.
///
/// `Builder` enables constructing configured length delimited framers. Note
/// that not all configuration settings apply to both encoding and decoding. See
/// the documentation for specific methods for more detail.
#[derive(Debug, Clone, Copy)]
pub struct Builder {
// Maximum frame length
max_frame_len: usize,
// Number of bytes representing the field length
length_field_len: usize,
// Number of bytes in the header before the length field
length_field_offset: usize,
// Adjust the length specified in the header field by this amount
length_adjustment: isize,
// Total number of bytes to skip before reading the payload, if not set,
// `length_field_len + length_field_offset`
num_skip: Option<usize>,
// Length field byte order (little or big endian)
length_field_is_big_endian: bool,
}
/// Adapts a byte stream into a unified `Stream` and `Sink` that works over
/// entire frame values.
///
/// See [module level] documentation for more detail.
///
/// [module level]: index.html
pub struct Framed<T, B: IntoBuf = BytesMut> {
inner: FramedRead<FramedWrite<T, B>>,
}
/// Adapts a byte stream to a `Stream` yielding entire frame values.
///
/// See [module level] documentation for more detail.
///
/// [module level]: index.html
#[derive(Debug)]
pub struct FramedRead<T> {
inner: codec::FramedRead<T, Decoder>,
}
/// An error when the number of bytes read is more than max frame length.
pub struct FrameTooBig {
_priv: (),
}
#[derive(Debug)]
struct Decoder {
// Configuration values
builder: Builder,
// Read state
state: DecodeState,
}
#[derive(Debug, Clone, Copy)]
enum DecodeState {
Head,
Data(usize),
}
/// Adapts a byte stream to a `Sink` accepting entire frame values.
///
/// See [module level] documentation for more detail.
///
/// [module level]: index.html
pub struct FramedWrite<T, B: IntoBuf = BytesMut> {
// I/O type
inner: T,
// Configuration values
builder: Builder,
// Current frame being written
frame: Option<Chain<Cursor<BytesMut>, B::Buf>>,
do_close: bool,
}
// ===== impl Framed =====
impl<T: AsyncRead + AsyncWrite, B: IntoBuf> Framed<T, B> {
/// Creates a new `Framed` with default configuration values.
pub fn new(inner: T) -> Framed<T, B> {
Builder::new().new_framed(inner)
}
}
impl<T, B: IntoBuf> Framed<T, B> {
/// Returns a reference to the underlying I/O stream wrapped by `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
self.inner.get_ref().get_ref()
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut().get_mut()
}
/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn into_inner(self) -> T {
self.inner.into_inner().into_inner()
}
}
impl<T: AsyncRead, B: IntoBuf> Stream for Framed<T, B> {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<BytesMut>, io::Error> {
self.inner.poll(cx)
}
}
impl<T: AsyncWrite, B: IntoBuf> Sink for Framed<T, B> {
type SinkItem = B;
type SinkError = io::Error;
delegate_sink!(inner);
}
impl<T, B: IntoBuf> fmt::Debug for Framed<T, B>
where T: fmt::Debug,
B::Buf: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Framed")
.field("inner", &self.inner)
.finish()
}
}
// ===== impl FramedRead =====
impl<T: AsyncRead> FramedRead<T> {
/// Creates a new `FramedRead` with default configuration values.
pub fn new(inner: T) -> FramedRead<T> {
Builder::new().new_read(inner)
}
}
impl<T> FramedRead<T> {
/// Returns the current max frame setting
///
/// This is the largest size this codec will accept from the wire. Larger
/// frames will be rejected.
pub fn max_frame_length(&self) -> usize {
self.inner.decoder().builder.max_frame_len
}
/// Updates the max frame setting.
///
/// The change takes effect the next time a frame is decoded. In other
/// words, if a frame is currently in process of being decoded with a frame
/// size greater than `val` but less than the max frame length in effect
/// before calling this function, then the frame will be allowed.
pub fn set_max_frame_length(&mut self, val: usize) {
self.inner.decoder_mut().builder.max_frame_length(val);
}
/// Returns a reference to the underlying I/O stream wrapped by `FramedRead`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
self.inner.get_ref()
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `FramedRead`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
/// Consumes the `FramedRead`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn into_inner(self) -> T {
self.inner.into_inner()
}
}
impl<T: AsyncRead> Stream for FramedRead<T> {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<BytesMut>, io::Error> {
self.inner.poll(cx)
}
}
impl<T: Sink> Sink for FramedRead<T> {
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
delegate_sink!(inner);
}
impl<T: io::Write> io::Write for FramedRead<T> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.inner.get_mut().write(src)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.get_mut().flush()
}
}
impl<T: AsyncWrite> AsyncWrite for FramedRead<T> {
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.inner.get_mut().poll_write(buf, cx)
}
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.inner.get_mut().poll_vectored_write(vec, cx)
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.inner.get_mut().poll_flush(cx)
}
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.inner.get_mut().poll_close(cx)
}
}
// ===== impl Decoder ======
impl Decoder {
fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
let head_len = self.builder.num_head_bytes();
let field_len = self.builder.length_field_len;
if src.len() < head_len {
// Not enough data
return Ok(None);
}
let n = {
let mut src = Cursor::new(&mut *src);
// Skip the required bytes
src.advance(self.builder.length_field_offset);
// match endianess
let n = if self.builder.length_field_is_big_endian {
src.get_uint::<BigEndian>(field_len)
} else {
src.get_uint::<LittleEndian>(field_len)
};
if n > self.builder.max_frame_len as u64 {
return Err(io::Error::new(io::ErrorKind::InvalidData, FrameTooBig {
_priv: (),
}));
}
// The check above ensures there is no overflow
let n = n as usize;
// Adjust `n` with bounds checking
let n = if self.builder.length_adjustment < 0 {
n.checked_sub(-self.builder.length_adjustment as usize)
} else {
n.checked_add(self.builder.length_adjustment as usize)
};
// Error handling
match n {
Some(n) => n,
None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")),
}
};
let num_skip = self.builder.get_num_skip();
if num_skip > 0 {
let _ = src.split_to(num_skip);
}
// Ensure that the buffer has enough space to read the incoming
// payload
src.reserve(n);
return Ok(Some(n));
}
fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
// At this point, the buffer has already had the required capacity
// reserved. All there is to do is read.
if src.len() < n {
return Ok(None);
}
Ok(Some(src.split_to(n)))
}
}
impl codec::Decoder for Decoder {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
let n = match self.state {
DecodeState::Head => {
match self.decode_head(src)? {
Some(n) => {
self.state = DecodeState::Data(n);
n
}
None => return Ok(None),
}
}
DecodeState::Data(n) => n,
};
match self.decode_data(n, src)? {
Some(data) => {
// Update the decode state
self.state = DecodeState::Head;
// Make sure the buffer has enough space to read the next head
src.reserve(self.builder.num_head_bytes());
Ok(Some(data))
}
None => Ok(None),
}
}
}
// ===== impl FramedWrite =====
impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> {
/// Creates a new `FramedWrite` with default configuration values.
pub fn new(inner: T) -> FramedWrite<T, B> {
Builder::new().new_write(inner)
}
}
impl<T, B: IntoBuf> FramedWrite<T, B> {
/// Returns the current max frame setting
///
/// This is the largest size this codec will write to the wire. Larger
/// frames will be rejected.
pub fn max_frame_length(&self) -> usize {
self.builder.max_frame_len
}
/// Updates the max frame setting.
///
/// The change takes effect the next time a frame is encoded. In other
/// words, if a frame is currently in process of being encoded with a frame
/// size greater than `val` but less than the max frame length in effect
/// before calling this function, then the frame will be allowed.
pub fn set_max_frame_length(&mut self, val: usize) {
self.builder.max_frame_length(val);
}
/// Returns a reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `FramedWrite`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consumes the `FramedWrite`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> {
// If there is a buffered frame, try to write it to `T`
fn do_write(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
if self.frame.is_none() {
return Ok(Async::Ready(()));
}
loop {
let frame = self.frame.as_mut().unwrap();
try_ready!(self.inner.write_buf(frame, cx));
if !frame.has_remaining() {
break;
}
}
self.frame = None;
Ok(Async::Ready(()))
}
fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> {
let mut head = BytesMut::with_capacity(8);
let n = buf.remaining();
if n > self.builder.max_frame_len {
return Err(io::Error::new(io::ErrorKind::InvalidInput, FrameTooBig {
_priv: (),
}));
}
// Adjust `n` with bounds checking
let n = if self.builder.length_adjustment < 0 {
n.checked_add(-self.builder.length_adjustment as usize)
} else {
n.checked_sub(self.builder.length_adjustment as usize)
};
// Error handling
let n = match n {
Some(n) => n,
None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")),
};
if self.builder.length_field_is_big_endian {
head.put_uint::<BigEndian>(n as u64, self.builder.length_field_len);
} else {
head.put_uint::<LittleEndian>(n as u64, self.builder.length_field_len);
}
debug_assert!(self.frame.is_none());
self.frame = Some(head.into_buf().chain(buf));
Ok(())
}
}
impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> {
type SinkItem = B;
type SinkError = io::Error;
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
self.do_write(cx)
}
fn start_send(&mut self, item: B) -> Result<(), Self::SinkError> {
self.set_frame(item.into_buf())
}
fn start_close(&mut self) -> Result<(), Self::SinkError> {
self.do_close = true;
Ok(())
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
// Write any buffered frame to T
try_ready!(self.do_write(cx));
// Try flushing the underlying IO
self.inner.poll_close(cx)
}
}
impl<T: Stream, B: IntoBuf> Stream for FramedWrite<T, B> {
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll(cx)
}
}
impl<T: io::Read, B: IntoBuf> io::Read for FramedWrite<T, B> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.get_mut().read(dst)
}
}
impl<T: AsyncRead, U: IntoBuf> AsyncRead for FramedWrite<T, U> {
unsafe fn initializer(&self) -> Initializer {
self.get_ref().initializer()
}
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.get_mut().poll_read(buf, cx)
}
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
self.get_mut().poll_vectored_read(vec, cx)
}
}
impl<T, B: IntoBuf> fmt::Debug for FramedWrite<T, B>
where T: fmt::Debug,
B::Buf: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FramedWrite")
.field("inner", &self.inner)
.field("builder", &self.builder)
.field("frame", &self.frame)
.finish()
}
}
// ===== impl Builder =====
impl Builder {
/// Creates a new length delimited framer builder with default configuration
/// values.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .length_field_offset(0)
/// .length_field_length(2)
/// .length_adjustment(0)
/// .num_skip(0)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn new() -> Builder {
Builder {
// Default max frame length of 8MB
max_frame_len: 8 * 1_024 * 1_024,
// Default byte length of 4
length_field_len: 4,
// Default to the header field being at the start of the header.
length_field_offset: 0,
length_adjustment: 0,
// Total number of bytes to skip before reading the payload, if not set,
// `length_field_len + length_field_offset`
num_skip: None,
// Default to reading the length field in network (big) endian.
length_field_is_big_endian: true,
}
}
/// Read the length field as a big endian integer
///
/// This is the default setting.
///
/// This configuration option applies to both encoding and decoding.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .big_endian()
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn big_endian(&mut self) -> &mut Self {
self.length_field_is_big_endian = true;
self
}
/// Read the length field as a little endian integer
///
/// The default setting is big endian.
///
/// This configuration option applies to both encoding and decoding.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .little_endian()
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn little_endian(&mut self) -> &mut Self {
self.length_field_is_big_endian = false;
self
}
/// Sets the max frame length
///
/// This configuration option applies to both encoding and decoding. The
/// default value is 8MB.
///
/// When decoding, the length field read from the byte stream is checked
/// against this setting **before** any adjustments are applied. When
/// encoding, the length of the submitted payload is checked against this
/// setting.
///
/// When frames exceed the max length, an `io::Error` with the custom value
/// of the `FrameTooBig` type will be returned.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .max_frame_length(8 * 1024)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
self.max_frame_len = val;
self
}
/// Sets the number of bytes used to represent the length field
///
/// The default value is `4`. The max value is `8`.
///
/// This configuration option applies to both encoding and decoding.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .length_field_length(4)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn length_field_length(&mut self, val: usize) -> &mut Self {
assert!(val > 0 && val <= 8, "invalid length field length");
self.length_field_len = val;
self
}
/// Sets the number of bytes in the header before the length field
///
/// This configuration option only applies to decoding.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .length_field_offset(1)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
self.length_field_offset = val;
self
}
/// Delta between the payload length specified in the header and the real
/// payload length
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .length_adjustment(-2)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
self.length_adjustment = val;
self
}
/// Sets the number of bytes to skip before reading the payload
///
/// Default value is `length_field_len + length_field_offset`
///
/// This configuration option only applies to decoding
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .num_skip(4)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn num_skip(&mut self, val: usize) -> &mut Self {
self.num_skip = Some(val);
self
}
/// Create a configured length delimited `FramedRead`
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// # use futures::io::AsyncRead;
/// use futures::io::codec::length_delimited::Builder;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// Builder::new()
/// .length_field_offset(0)
/// .length_field_length(2)
/// .length_adjustment(0)
/// .num_skip(0)
/// .new_read(io);
/// # }
/// # fn main() {}
/// ```
pub fn new_read<T>(&self, upstream: T) -> FramedRead<T>
where T: AsyncRead,
{
FramedRead {
inner: codec::FramedRead::new(upstream, Decoder {
builder: *self,
state: DecodeState::Head,
}),
}
}
/// Create a configured length delimited `FramedWrite`
///
/// # Examples
///
/// ```
/// # extern crate bytes;
/// # extern crate futures;
/// # use futures::io::AsyncWrite;
/// # use futures::io::codec::length_delimited;
/// # use bytes::BytesMut;
/// # fn write_frame<T: AsyncWrite>(io: T) {
/// # let _: length_delimited::FramedWrite<T, BytesMut> =
/// length_delimited::Builder::new()
/// .length_field_length(2)
/// .new_write(io);
/// # }
/// # fn main() {}
/// ```
pub fn new_write<T, B>(&self, inner: T) -> FramedWrite<T, B>
where T: AsyncWrite,
B: IntoBuf,
{
FramedWrite {
inner: inner,
builder: *self,
frame: None,
do_close: false,
}
}
/// Create a configured length delimited `Framed`
///
/// # Examples
///
/// ```
/// # extern crate bytes;
/// # extern crate futures;
/// # use futures::io::{AsyncRead, AsyncWrite};
/// # use futures::io::codec::length_delimited;
/// # use bytes::BytesMut;
/// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
/// # let _: length_delimited::Framed<T, BytesMut> =
/// length_delimited::Builder::new()
/// .length_field_length(2)
/// .new_framed(io);
/// # }
/// # fn main() {}
/// ```
pub fn new_framed<T, B>(&self, inner: T) -> Framed<T, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf
{
let inner = self.new_read(self.new_write(inner));
Framed { inner: inner }
}
fn num_head_bytes(&self) -> usize {
let num = self.length_field_offset + self.length_field_len;
cmp::max(num, self.num_skip.unwrap_or(0))
}
fn get_num_skip(&self) -> usize {
self.num_skip.unwrap_or(self.length_field_offset + self.length_field_len)
}
}
// ===== impl FrameTooBig =====
impl fmt::Debug for FrameTooBig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FrameTooBig")
.finish()
}
}
impl fmt::Display for FrameTooBig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.description())
}
}
impl StdError for FrameTooBig {
fn description(&self) -> &str {
"frame size too big"
}
}

View File

@@ -0,0 +1,61 @@
use std::io::{self, BufRead};
use std::mem;
use std::string::String;
use {Poll, Stream};
use io::AsyncRead;
/// Combinator created by the top-level `lines` method which is a stream over
/// the lines of text on an I/O object.
#[derive(Debug)]
pub struct Lines<A> {
io: A,
line: String,
}
/// Creates a new stream from the I/O object given representing the lines of
/// input that are found on `A`.
///
/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of
/// lines that the object contains. The returned stream will reach its end once
/// `a` reaches EOF.
pub fn lines<A>(a: A) -> Lines<A>
where A: AsyncRead + BufRead,
{
Lines {
io: a,
line: String::new(),
}
}
impl<A> Lines<A> {
/// Returns the underlying I/O object.
///
/// Note that this may lose data already read into internal buffers. It's
/// recommended to only call this once the stream has reached its end.
pub fn into_inner(self) -> A {
self.io
}
}
impl<A> Stream for Lines<A>
where A: AsyncRead + BufRead,
{
type Item = String;
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<String>, io::Error> {
let n = try_ready!(self.io.read_line(&mut self.line));
if n == 0 && self.line.len() == 0 {
return Ok(None.into())
}
if self.line.ends_with("\n") {
self.line.pop();
if self.line.ends_with("\r") {
self.line.pop();
}
}
Ok(Some(mem::replace(&mut self.line, String::new())).into())
}
}

159
futures-util/src/io/mod.rs Normal file
View File

@@ -0,0 +1,159 @@
//! Core I/O traits and combinators.
//!
//! A description of the high-level I/O combinators can be [found online] in
//! addition to a description of the [low level details].
//!
//! [found online]: https://tokio.rs/docs/getting-started/core/
//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
use std::io as std_io;
use futures_core::{Async, Poll, task};
pub use futures_io::{AsyncRead, AsyncWrite, IoVec};
use bytes::{Buf, BufMut};
pub mod io;
pub mod codec;
mod allow_std;
mod codecs;
mod copy;
mod flush;
mod framed;
mod framed_read;
mod framed_write;
mod length_delimited;
// Requires "BufRead"
// mod lines;
mod read;
mod read_exact;
mod read_to_end;
// TODO: resolve. Temporary disabled because it requires "BufRead",
// which does not have an async equivalent.
// mod read_until;
mod shutdown;
mod split;
mod window;
mod write_all;
use self::codec::{Decoder, Encoder, Framed};
use self::split::{ReadHalf, WriteHalf};
/// An extension trait which adds utility methods to `AsyncRead` types.
pub trait AsyncReadExt: AsyncRead {
/// Pull some bytes from this source into the specified `Buf`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
fn read_buf<B: BufMut>(&mut self, buf: &mut B, cx: &mut task::Context)
-> Poll<usize, std_io::Error>
where Self: Sized,
{
if !buf.has_remaining_mut() {
return Ok(Async::Ready(0));
}
unsafe {
let n = {
// The `IoVec` type can't have a 0-length size, so we create a bunch
// of dummy versions on the stack with 1 length which we'll quickly
// overwrite.
let b1: &mut [u8] = &mut [0];
let b2: &mut [u8] = &mut [0];
let b3: &mut [u8] = &mut [0];
let b4: &mut [u8] = &mut [0];
let b5: &mut [u8] = &mut [0];
let b6: &mut [u8] = &mut [0];
let b7: &mut [u8] = &mut [0];
let b8: &mut [u8] = &mut [0];
let b9: &mut [u8] = &mut [0];
let b10: &mut [u8] = &mut [0];
let b11: &mut [u8] = &mut [0];
let b12: &mut [u8] = &mut [0];
let b13: &mut [u8] = &mut [0];
let b14: &mut [u8] = &mut [0];
let b15: &mut [u8] = &mut [0];
let b16: &mut [u8] = &mut [0];
let mut bufs: [&mut IoVec; 16] = [
b1.into(), b2.into(), b3.into(), b4.into(),
b5.into(), b6.into(), b7.into(), b8.into(),
b9.into(), b10.into(), b11.into(), b12.into(),
b13.into(), b14.into(), b15.into(), b16.into(),
];
let n = buf.bytes_vec_mut(&mut bufs);
try_ready!(self.poll_vectored_read(&mut bufs[..n], cx))
};
buf.advance_mut(n);
Ok(Async::Ready(n))
}
}
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T>
where Self: AsyncWrite + Sized,
{
framed::framed(self, codec)
}
/// Helper method for splitting this read/write object into two halves.
///
/// The two halves returned implement the `Read` and `Write` traits,
/// respectively.
fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
where Self: AsyncWrite + Sized,
{
split::split(self)
}
}
impl<T: AsyncRead + ?Sized> AsyncReadExt for T {}
/// An extension trait which adds utility methods to `AsyncWrite` types.
pub trait AsyncWriteExt: AsyncWrite {
/// Write a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
fn write_buf<B: Buf>(&mut self, buf: &mut B, cx: &mut task::Context)
-> Poll<usize, std_io::Error>
where Self: Sized,
{
if !buf.has_remaining() {
return Ok(Async::Ready(0));
}
let n = {
// The `IoVec` type can't have a zero-length size, so create a dummy
// version from a 1-length slice which we'll overwrite with the
// `bytes_vec` method.
static DUMMY: &[u8] = &[0];
let iovec = <&IoVec>::from(DUMMY);
let mut bufs = [iovec; 64];
let n = buf.bytes_vec(&mut bufs);
try_ready!(self.poll_vectored_write(&bufs[..n], cx))
};
buf.advance(n);
Ok(Async::Ready(n))
}
}
impl<T: AsyncWrite + ?Sized> AsyncWriteExt for T {}

View File

@@ -0,0 +1,57 @@
use std::io;
use std::mem;
use {Future, Poll, task};
use io::AsyncRead;
#[derive(Debug)]
enum State<R, T> {
Pending {
rd: R,
buf: T,
},
Empty,
}
/// Tries to read some bytes directly into the given `buf` in asynchronous
/// manner, returning a future type.
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
pub fn read<R, T>(rd: R, buf: T) -> Read<R, T>
where R: AsyncRead,
T: AsMut<[u8]>
{
Read { state: State::Pending { rd: rd, buf: buf } }
}
/// A future which can be used to easily read available number of bytes to fill
/// a buffer.
///
/// Created by the [`read`] function.
#[derive(Debug)]
pub struct Read<R, T> {
state: State<R, T>,
}
impl<R, T> Future for Read<R, T>
where R: AsyncRead,
T: AsMut<[u8]>
{
type Item = (R, T, usize);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(R, T, usize), io::Error> {
let nread = match self.state {
State::Pending { ref mut rd, ref mut buf } =>
try_ready!(rd.poll_read(&mut buf.as_mut()[..], cx)),
State::Empty => panic!("poll a Read after it's done"),
};
match mem::replace(&mut self.state, State::Empty) {
State::Pending { rd, buf } => Ok((rd, buf, nread).into()),
State::Empty => panic!("invalid internal state"),
}
}
}

View File

@@ -0,0 +1,83 @@
use std::io;
use std::mem;
use {Poll, Future, task};
use io::AsyncRead;
/// A future which can be used to easily read exactly enough bytes to fill
/// a buffer.
///
/// Created by the [`read_exact`] function.
///
/// [`read_exact`]: fn.read_exact.html
#[derive(Debug)]
pub struct ReadExact<A, T> {
state: State<A, T>,
}
#[derive(Debug)]
enum State<A, T> {
Reading {
a: A,
buf: T,
pos: usize,
},
Empty,
}
/// Creates a future which will read exactly enough bytes to fill `buf`,
/// returning an error if EOF is hit sooner.
///
/// The returned future will resolve to both the I/O stream as well as the
/// buffer once the read operation is completed.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded. In the case of success the object will be destroyed and
/// the buffer will be returned, with all data read from the stream appended to
/// the buffer.
pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
where A: AsyncRead,
T: AsMut<[u8]>,
{
ReadExact {
state: State::Reading {
a: a,
buf: buf,
pos: 0,
},
}
}
fn eof() -> io::Error {
io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
}
impl<A, T> Future for ReadExact<A, T>
where A: AsyncRead,
T: AsMut<[u8]>,
{
type Item = (A, T);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> {
match self.state {
State::Reading { ref mut a, ref mut buf, ref mut pos } => {
let buf = buf.as_mut();
while *pos < buf.len() {
let n = try_ready!(a.poll_read(&mut buf[*pos..], cx));
*pos += n;
if n == 0 {
return Err(eof())
}
}
}
State::Empty => panic!("poll a ReadExact after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf, .. } => Ok((a, buf).into()),
State::Empty => panic!(),
}
}
}

View File

@@ -0,0 +1,120 @@
use std::io::{self, ErrorKind};
use std::mem;
use std::vec::Vec;
use {Async, Poll, Future, task};
use io::AsyncRead;
/// A future which can be used to easily read the entire contents of a stream
/// into a vector.
///
/// Created by the [`read_to_end`] function.
///
/// [`read_to_end`]: fn.read_to_end.html
#[derive(Debug)]
pub struct ReadToEnd<A> {
state: State<A>,
}
#[derive(Debug)]
enum State<A> {
Reading {
a: A,
buf: Vec<u8>,
},
Empty,
}
/// Creates a future which will read all the bytes associated with the I/O
/// object `A` into the buffer provided.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded. In the case of success the object will be destroyed and
/// the buffer will be returned, with all data read from the stream appended to
/// the buffer.
pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
where A: AsyncRead,
{
ReadToEnd {
state: State::Reading {
a: a,
buf: buf,
}
}
}
struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
unsafe { self.buf.set_len(self.len); }
}
}
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
fn read_to_end_internal<R: AsyncRead + ?Sized>(r: &mut R, buf: &mut Vec<u8>, cx: &mut task::Context)
-> Poll<usize, io::Error>
{
let start_len = buf.len();
let mut g = Guard { len: buf.len(), buf: buf };
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
r.initializer().initialize(&mut g.buf[g.len..]);
}
}
match r.poll_read(&mut g.buf[g.len..], cx) {
Ok(Async::Ready(0)) => {
ret = Ok(Async::Ready(g.len - start_len));
break;
}
Ok(Async::Ready(n)) => g.len += n,
Ok(Async::Pending) => return Ok(Async::Pending),
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => {
ret = Err(e);
break;
}
}
}
ret
}
impl<A> Future for ReadToEnd<A>
where A: AsyncRead,
{
type Item = (A, Vec<u8>);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, Vec<u8>), io::Error> {
match self.state {
State::Reading { ref mut a, ref mut buf } => {
// If we get `Ok`, then we know the stream hit EOF and we're done. If we
// hit "would block" then all the read data so far is in our buffer, and
// otherwise we propagate errors
try_ready!(read_to_end_internal(a, buf, cx));
},
State::Empty => panic!("poll ReadToEnd after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf } => Ok((a, buf).into()),
State::Empty => unreachable!(),
}
}
}

View File

@@ -0,0 +1,75 @@
use std::io::{self, BufRead};
use std::mem;
use std::vec::Vec;
use {Poll, Future};
use io::AsyncRead;
/// A future which can be used to easily read the contents of a stream into a
/// vector until the delimiter is reached.
///
/// Created by the [`read_until`] function.
///
/// [`read_until`]: fn.read_until.html
#[derive(Debug)]
pub struct ReadUntil<A> {
state: State<A>,
}
#[derive(Debug)]
enum State<A> {
Reading {
a: A,
byte: u8,
buf: Vec<u8>,
},
Empty,
}
/// Creates a future which will read all the bytes associated with the I/O
/// object `A` into the buffer provided until the delimiter `byte` is reached.
/// This method is the async equivalent to [`BufRead::read_until`].
///
/// In case of an error the buffer and the object will be discarded, with
/// the error yielded. In the case of success the object will be destroyed and
/// the buffer will be returned, with all bytes up to, and including, the delimiter
/// (if found).
///
/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until
pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A>
where A: AsyncRead + BufRead,
{
ReadUntil {
state: State::Reading {
a: a,
byte: byte,
buf: buf,
}
}
}
impl<A> Future for ReadUntil<A>
where A: AsyncRead + BufRead
{
type Item = (A, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
match self.state {
State::Reading { ref mut a, byte, ref mut buf } => {
// If we get `Ok(n)`, then we know the stream hit EOF or the delimiter.
// and just return it, as we are finished.
// If we hit "would block" then all the read data so far
// is in our buffer, and otherwise we propagate errors.
try_ready!(a.read_until(byte, buf));
},
State::Empty => panic!("poll ReadUntil after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, byte: _, buf } => Ok((a, buf).into()),
State::Empty => unreachable!(),
}
}
}

View File

@@ -0,0 +1,44 @@
use std::io;
use {Poll, Future, Async, task};
use AsyncWrite;
/// A future used to fully shutdown an I/O object.
///
/// Resolves to the underlying I/O object once the shutdown operation is
/// complete.
///
/// Created by the [`shutdown`] function.
///
/// [`shutdown`]: fn.shutdown.html
#[derive(Debug)]
pub struct Shutdown<A> {
a: Option<A>,
}
/// Creates a future which will entirely shutdown an I/O object and then yield
/// the object itself.
///
/// This function will consume the object provided if an error happens, and
/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`,
/// scheduling a retry if `WouldBlock` is seen along the way.
pub fn shutdown<A>(a: A) -> Shutdown<A>
where A: AsyncWrite,
{
Shutdown {
a: Some(a),
}
}
impl<A> Future for Shutdown<A>
where A: AsyncWrite,
{
type Item = A;
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<A, io::Error> {
try_ready!(self.a.as_mut().unwrap().poll_close(cx));
Ok(Async::Ready(self.a.take().unwrap()))
}
}

View File

@@ -0,0 +1,68 @@
use std::io;
use {Async, Poll, task};
use lock::BiLock;
use futures_io::{AsyncRead, AsyncWrite, Error, IoVec};
/// The readable half of an object returned from `AsyncRead::split`.
#[derive(Debug)]
pub struct ReadHalf<T> {
handle: BiLock<T>,
}
/// The writable half of an object returned from `AsyncRead::split`.
#[derive(Debug)]
pub struct WriteHalf<T> {
handle: BiLock<T>,
}
fn lock_and_then<T, U, E, F>(lock: &BiLock<T>, cx: &mut task::Context, f: F) -> Result<Async<U>, E>
where F: FnOnce(&mut T, &mut task::Context) -> Result<Async<U>, E>
{
match lock.poll_lock(cx) {
Async::Ready(ref mut l) => f(l, cx),
Async::Pending => Ok(Async::Pending),
}
}
pub fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
let (a, b) = BiLock::new(t);
(ReadHalf { handle: a }, WriteHalf { handle: b })
}
impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_read(buf, cx))
}
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, io::Error>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_read(vec, cx))
}
}
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, Error>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_write(buf, cx))
}
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_write(vec, cx))
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx))
}
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx))
}
}

View File

@@ -0,0 +1,117 @@
use std::ops;
/// A owned window around an underlying buffer.
///
/// Normally slices work great for considering sub-portions of a buffer, but
/// unfortunately a slice is a *borrowed* type in Rust which has an associated
/// lifetime. When working with future and async I/O these lifetimes are not
/// always appropriate, and are sometimes difficult to store in tasks. This
/// type strives to fill this gap by providing an "owned slice" around an
/// underlying buffer of bytes.
///
/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
/// that this type carries.
///
/// This type can be particularly useful when working with the `write_all`
/// combinator in this crate. Data can be sliced via `Window`, consumed by
/// `write_all`, and then earned back once the write operation finishes through
/// the `into_inner` method on this type.
#[derive(Debug)]
pub struct Window<T> {
inner: T,
range: ops::Range<usize>,
}
impl<T: AsRef<[u8]>> Window<T> {
/// Creates a new window around the buffer `t` defaulting to the entire
/// slice.
///
/// Further methods can be called on the returned `Window<T>` to alter the
/// window into the data provided.
pub fn new(t: T) -> Window<T> {
Window {
range: 0..t.as_ref().len(),
inner: t,
}
}
/// Gets a shared reference to the underlying buffer inside of this
/// `Window`.
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Gets a mutable reference to the underlying buffer inside of this
/// `Window`.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consumes this `Window`, returning the underlying buffer.
pub fn into_inner(self) -> T {
self.inner
}
/// Returns the starting index of this window into the underlying buffer
/// `T`.
pub fn start(&self) -> usize {
self.range.start
}
/// Returns the end index of this window into the underlying buffer
/// `T`.
pub fn end(&self) -> usize {
self.range.end
}
/// Changes the starting index of this window to the index specified.
///
/// Returns the windows back to chain multiple calls to this method.
///
/// # Panics
///
/// This method will panic if `start` is out of bounds for the underlying
/// slice or if it comes after the `end` configured in this window.
pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
assert!(start <= self.inner.as_ref().len());
assert!(start <= self.range.end);
self.range.start = start;
self
}
/// Changes the end index of this window to the index specified.
///
/// Returns the windows back to chain multiple calls to this method.
///
/// # Panics
///
/// This method will panic if `end` is out of bounds for the underlying
/// slice or if it comes before the `start` configured in this window.
pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
assert!(end <= self.inner.as_ref().len());
assert!(self.range.start <= end);
self.range.end = end;
self
}
// TODO: how about a generic set() method along the lines of:
//
// buffer.set(..3)
// .set(0..2)
// .set(4..)
//
// etc.
}
impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
fn as_ref(&self) -> &[u8] {
&self.inner.as_ref()[self.range.start..self.range.end]
}
}
impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.inner.as_mut()[self.range.start..self.range.end]
}
}

View File

@@ -0,0 +1,86 @@
use std::io;
use std::mem;
use {Poll, Future, task};
use AsyncWrite;
/// A future used to write the entire contents of some data to a stream.
///
/// This is created by the [`write_all`] top-level method.
///
/// [`write_all`]: fn.write_all.html
#[derive(Debug)]
pub struct WriteAll<A, T> {
state: State<A, T>,
}
#[derive(Debug)]
enum State<A, T> {
Writing {
a: A,
buf: T,
pos: usize,
},
Empty,
}
/// Creates a future that will write the entire contents of the buffer `buf` to
/// the stream `a` provided.
///
/// The returned future will not return until all the data has been written, and
/// the future will resolve to the stream as well as the buffer (for reuse if
/// needed).
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
///
/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
/// be broadly applicable to accepting data which can be converted to a slice.
/// The `Window` struct is also available in this crate to provide a different
/// window into a slice if necessary.
pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
where A: AsyncWrite,
T: AsRef<[u8]>,
{
WriteAll {
state: State::Writing {
a: a,
buf: buf,
pos: 0,
},
}
}
fn zero_write() -> io::Error {
io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
}
impl<A, T> Future for WriteAll<A, T>
where A: AsyncWrite,
T: AsRef<[u8]>,
{
type Item = (A, T);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> {
match self.state {
State::Writing { ref mut a, ref buf, ref mut pos } => {
let buf = buf.as_ref();
while *pos < buf.len() {
let n = try_ready!(a.poll_write(&buf[*pos..], cx));
*pos += n;
if n == 0 {
return Err(zero_write())
}
}
}
State::Empty => panic!("poll a WriteAll after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Writing { a, buf, .. } => Ok((a, buf).into()),
State::Empty => panic!(),
}
}
}

View File

@@ -6,8 +6,14 @@
#[macro_use]
extern crate futures_core;
extern crate futures_io;
extern crate futures_sink;
#[cfg(feature = "std")]
use futures_core::{Async, Future, Poll, Stream, task};
#[cfg(feature = "std")]
use futures_sink::Sink;
macro_rules! if_std {
($($i:item)*) => ($(
#[cfg(feature = "std")]
@@ -15,6 +21,16 @@ macro_rules! if_std {
)*)
}
if_std! {
extern crate bytes;
#[macro_use]
extern crate log;
}
#[cfg(feature = "std")]
#[macro_use]
extern crate std;
macro_rules! delegate_sink {
($field:ident) => {
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
@@ -35,16 +51,17 @@ macro_rules! delegate_sink {
}
}
#[macro_use]
#[cfg(feature = "std")]
extern crate std;
#[cfg(feature = "std")]
pub mod lock;
pub mod future;
pub use future::FutureExt;
#[cfg(feature = "std")]
pub mod io;
#[cfg(feature = "std")]
pub use io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
pub mod stream;
pub use stream::StreamExt;
@@ -54,4 +71,6 @@ pub use sink::SinkExt;
pub mod prelude {
//! Prelude with common traits from the `futures-util` crate.
pub use {FutureExt, StreamExt, SinkExt};
#[cfg(feature = "std")]
pub use {AsyncReadExt, AsyncWriteExt};
}

View File

@@ -22,9 +22,10 @@ appveyor = { repository = "alexcrichton/futures-rs" }
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = false }
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false }
futures-util = { path = "../futures-util", version = "0.2.0", default-features = false }
[features]
std = ["futures-core/std", "futures-executor/std", "futures-sink/std", "futures-util/std"]
std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-util/std"]
default = ["std"]

View File

@@ -166,6 +166,7 @@
extern crate futures_core;
extern crate futures_channel;
extern crate futures_executor;
extern crate futures_io;
extern crate futures_sink;
extern crate futures_util;
@@ -223,6 +224,16 @@ pub mod future {
pub use futures_util::future::*;
}
#[cfg(feature = "std")]
pub mod io {
//! IO
//!
//! This module contains the `AsyncRead` and `AsyncWrite` traits, as well
//! as a number of combinators and extensions for using them.
pub use futures_io::*;
pub use futures_util::io::*;
}
pub mod prelude {
//! A "prelude" for crates using the `futures` crate.
//!
@@ -253,6 +264,14 @@ pub mod prelude {
StreamExt,
SinkExt,
};
#[cfg(feature = "std")]
pub use futures_util::{
AsyncRead,
AsyncWrite,
AsyncReadExt,
AsyncWriteExt,
};
}
pub mod sink {