Simple is Beautiful!


随着 tokio 0.1 的发布,rust 语言的异步 io 库逐渐成熟, 打算用几篇博客来描述一下 tokio 框架相关的基础 crate(当然,也是为了学习一下):

本篇主要讲解 futures

在许多编程语言都有 futures(类似于 promisedelaydeferred) 的概念, 并在此基础上引入轻量级进程或协程的概念。

为什么需要 futures?

在异步并发编程中同时需要维护成千上万个链接,这些链接大多处于阻塞状态, 当其中某些链接就绪后(由 mio 提供的接口询问),服务程序需要分别对其进行不同的处理, 处理这些链接的方法大多用两种方式:

虽然 callbacks 方式便于理解,但是当回调的嵌套层次比较深时,程序代码维护和阅读很不方便, 为此引入了 futures 方式来表达程序的执行流程,进而使代码像同步代码一样。

什么是 futures?

本质上 future 就是一个值,但是这个值有可能还没有就绪,而就绪是需要某些事件的发生, 其适用于任何事件,我们可以用 future 来表示以下事件:

不仅如此,我们可以对 future 进行任意的组合,比如:

通过上面的特性我们可以很容易用 futures 来表达下面的执行流:

id_rpc(&my_server).and_then(|id| {
}).map(|row| {
}).and_then(|encoded| {
    write_string(my_socket, encoded)


futures-rs future 种类

从上面看 future 只是代表一个单值,只代表一个事件, 但是在网络编程中,读取一个 TCP 链接我们通常会获取一个字节流,与一系列读事件相关, 因此我们还需要另一种 future 抽象用来关联多个事件:

因此 futures-rs 分别提供了三种 trait:

trait 定义

trait Future 的定义与标准库中的 Iterator 很相似:

trait Future {
    // The type of value that the future yields on successful completion.
    type Item;

    // The type of value that the future yields on failure.
    type Error;

    // The only required method, which attempts to complete the future.
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;


定义一个 Future 只需定义 ItemError 类型和 poll 方法就可以了, 其他与 Future 有关的方法都是基于这三个的,已经预先定义了,不需要用户自定义了。

StreamSink 也与 Future 类似:

trait Stream {
    // The type of item yielded each time the stream's event occurs
    type Item;

    // The error type; errors terminate the stream.
    type Error;

    // Try to produce a value.
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;


trait Sink {
    // The type of value that the sink accepts.
    type SinkItem;

    // The type of value produced by the sink when an error occurs.
    type SinkError;

    // The analog to `poll`, used for sending and then flushing items.
    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>;

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError>;

2018-04-11 15:08:33