回调和等待体——异步算法的组合

怎么才能避免一核有难八核围观


最近一直在看 C++20 协程有关的内容,自认为懂了不少了,于是过来水一水 blog。这次的主题是异步编程。

这里进行的异步调用是一个非常复杂的函数,它的作用就是计算一个整数的平方数,这个问题太困难了所以计算速度非常慢。(当然实际的问题要比这个复杂,一般涉及到异步操作都是网络或者文件系统调用之类的,这里为了简单就用了这么一个函数。)

int slow_sqr(const int value)
{
    std::this_thread::sleep_for(200ms); // 线程很累了,它想睡一觉再算
    return value * value;
}

1. std::future 它不香吗?

先来看看没有协程的时代在 C++ 里一般怎么实现异步。标准库为我们提供了一种方式,那就是 std::futurefuture 是一个异步操作结果,一般与对应的 std::promise 相联系。当计算完成以后,调用 promiseset_value 函数就可以设置这个返回值,而对应的 future::get 函数则可以阻塞当前线程直到对应的 promise 返回了值。

std::future<int> async_sqr_ftr(const int value)
{
    std::promise<int> pms; // 先构造一个承诺对象
    std::future<int> ftr = pms.get_future(); // 然后获取承诺对应的 future
    // 为了不阻塞调用方,开一个新的线程用来计算
    // 注意这里要把承诺移动进新线程而不是直接引用捕获
    // 不然启动线程以后当前函数就会返回,pms 会被销毁
    std::thread thr([p = std::move(pms), v = value]() mutable
    {
        p.set_value(slow_sqr(v)); // 通过承诺对象设置值
    });
    thr.detach(); // 让这个线程在后台运行
    return ftr;
}

void sync_func() // 使用例
{
    std::future<int> ftr = async_sqr_ftr(7); // 开始计算
    const int result = ftr.get(); // 阻塞当前线程直至对应的承诺设置返回值
    std::cout << "result = " << result; // result = 49
}

虽然这里是用了另一个线程来完成任务,但是我们还是在调用方用 get 阻塞线程获取结果,再使用这个结果,成了一个线程工作另一个线程干等着,可真是够“异步”的呢。在真正的异步应用中不应该像这样执行完每一个任务都要线程间同步,而是在已有的任务上接续新的工作,当前面的任务结束后将其返回值作为参数传给下一个任务。这一点我们可以简单地采用 futurethen 成员函数来实现。构造完整个调用图以后就可以仅使用一次 get 来完成所有任务。

// 接续型 (continuation style) 异步构造
future<void> async_func()
{
    future<int> compute = async_sqr_ftr(7); // 开始计算
    // 使用 future::then 进行任务的接续,获取到结果之后输出结果
    // 注意对已有的 future 使用 then 以后原有的 future 将无效化
    return compute.then([](const int result)
    {
        std::cout << "result = " << result;
    })
}

啊,不过这个 future 并不是标准的 std::future,而是并发 TS 中的 std::experimental::future。也就是说,有很大的可能性你现在使用的标准库里没有这种 future 的实现,如果是这种情况的话就只能使用第三方的 boost::future 了。

假设你确实可以用到这个 then,我们来看这个函数的行为究竟是什么样子的。万能的 cppreference 告诉我们,用 future::then 附加接续函数后,“当前与 *this 关联的共享状态就绪时,在未指定的执行线程上调用持续”。也就是说,前一个任务完成以后会采用某种同步机制,之后在有可能是新的线程上面调用接续。绝大多数时候我们不希望这种多余的线程切换,不过这些东西我们用 future::then 就根本没办法控制。

另外,我们传递的接续函数对象所占的空间并不固定,future 中要存储这个可调用对象就必须要进行类似 std::function 的类型擦除,可能就不可避免地要使用动态内存申请。然后有没有想过,如果在调用这个 then 之前 future 就已经计算完了怎么办?如果没计算完呢?这之间就可能又需要两个线程间的协调。

而且,既然 future 是和 promise 相关联的,这双方就一定会有可共同访问的可变状态 (mutable shared state)。我们都知道,多线程 + 共同访问的可变数据 = 需要同步,我们又在为自己可能用不到的功能买单。

不都说 C++ 是 0 额外开销的语言吗?为什么我只是想在异步操作后接一个函数就要付出又是线程间同步、又要额外申请内存、又有可能开新线程这么大的开销?

2. 做个懒人

刚才例子里面的 then 看起来怎么这么像一个回调函数啊……不对,这不明显就是一个回调函数吗?!

让我们回忆一下 future 为什么需要这么多线程间协调。我们不知道在附加接续函数的时候 future 到底有没有结束计算,我们不知道接续函数到底有多大,而这一切的根源都在于 future 急迫地执行 (eagerly execute) 其任务。如果我们能够先构建好整个运算图再开始惰性地执行 (lazily execute) 的话,所有这些额外开销就都是可避免的了。嗯,这是我不知道第多少次在这里谈到惰性求值了,果然还是懒人拯救世界啊。

我们并不是不知道调用完 async_sqr_ftr 之后我们需要做什么,与之相反,我们的任务很明确——当异步算法返回之后调用我们的接续函数。那么只要让那个异步算法接受一个回调函数作为其参数不就解决了这个不必要的线程间同步问题了吗?C++ 有一种绝妙构造正适合这种泛型算法,没错,就是函数模板。下面的代码对于不熟悉 C++ 泛型编程的读者可能稍微有点难理解,所以我对此作了充分的注释,希望能帮助理解这段代码,毕竟我们的旅程才刚开始。

// 首先是模板头部分,这里使用了 C++20 的新特性“概念 (concept)”
// 与传统的 <typename F> 不同点在于这里明确给出了对类型 F 的要求
// std::invocable<int> 是一个类型制约,代表的是“能使用 (int) 作为参数来调用”这样的概念
// 也就是说,这里的 F 限制在可使用 (int) 调用的类型之中
// 对 OOP 比较熟悉的可以将概念类比成 OOP 中的“接口”
template <std::invocable<int> F>
auto async_sqr_ftr(const int value, F&& continuation)
{
    // 这里先获取对 F&& 使用 (int) 调用的返回值类型并记为 Ret
    using Ret = std::invoke_result_t<F&&, int>;
    // 于是我们的 promise 和 future 的参数类型就都是 Ret 了
    std::promise<Ret> pms;
    std::future<Ret> ftr = pms.get_future();
    // 现在除了 promise 对象和函数调用的参数 value 之外
    // 我们还需要捕获接续函数对象
    std::thread thr([p = std::move(pms), v = value, c = std::forward<F>(continuation)]() mutable
    {
        const int res = slow_sqr(v);
        // 这里返回类型是 void 的情况需要分开处理
        // 毕竟没有 void 类型的变量
        if constexpr (std::is_void_v<Ret>)
        {
            c(res);
            p.set_value();
        }
        // 返回类型不是 void 的话就可以直接给承诺对象设置返回值了
        else
        {
            p.set_value(c(res));
        }
    });
    thr.detach();
    return ftr;
}

那么使用方就大概这样:

std::future<void> async_func()
{
    return async_sqr_ftr(7, [](const int result)
    {
        std::cout << "result = " << result;
    });
}

使用方跟之前用 future::then 的样子差不多,但是我们在这里面节省了:

  • 回调函数对象的动态内存申请
  • 可能的线程切换
  • 设置接续函数所需的线程间同步

这就是惰性执行的众多好处。在惰性执行的这条路上我们可以走得更远:将惰性异步执行贯彻到底我们就可以获得非常强大,易于组合异步算法且无额外开销的天才抽象——发送器和接收器 (sender & receiver) 模型,这种模型之于异步算法就如同迭代器之于范围上的泛型算法一样重要。不过这里我就不继续往下说了,因为这并不是这个 blog 的重点,有兴趣的读者可以参考文章最后的相关阅读材料,我也许以后会再对这个话题进行展开,也许不会。

3. 只有回调函数可以吗?

诸如 ASIO 的很多异步 IO 库提供另一种异步函数调用的方案:忘记什么 future 吧,只需要回调函数就够了。用户提供一个回调函数,异步算法在计算完成以后直接把结果传给接续函数。

template <std::invocable<int> F>
void async_sqr_cbk(const int value, F&& continuation)
{
    std::thread thr([v = value, c = std::forward<F>(continuation)]() mutable
    {
        c(slow_sqr(v));
    });
    thr.detach();
}

这种方法的优点是显而易见的,它非常简洁,并没有那么多与异步算法本身无关的同步处理逻辑在里面。不过,因为没有返回值的缘故,用户如果确实想要同步地执行这个函数就需要自己手动构建同步的机制。

void sync_func()
{
    std::promise<void> pms;
    std::future<void> ftr = pms.get_future();
    async_sqr_cbk(7, [p = std::move(pms)](const int result) mutable
    {
        std::cout << "result = " << result;
        p.set_value();
    });
    ftr.get();
}

如果想要串接异步算法当然也是非常简单的。比如说这里我们想利用这个异步算法计算一个整数的 8 次方,同步后输出这个值,那么就需要调用三次 async_sqr,代码大概就是这样的。

void power_8(const int value)
{
    std::promise<void> pms;
    std::future<void> ftr = pms.get_future();
    async_sqr_ftr(value, [p = std::move(pms)](const int squared) mutable
    {
        async_sqr_ftr(squared, [p = std::move(p)](const int fourth) mutable
        {
            async_sqr_ftr(fourth, [p = std::move(p)](const int result) mutable
            {
                std::cout << "result = " << result;
                p.set_value();
            });
        });
    });
    ftr.get();
}

Emmmm… 这洋葱一样的函数调用和俄罗斯套娃一样的 lambda 是不是搞错了什么……

Javascript 人有一个词组来形容这种现象——回调地狱 (callback hell)。为了防止这种事情发生,我们一般需要把多层的 lambda 拆成更小的函数,但这样也让这些回调函数间额外信息的传递(比如此例中的承诺)变得更困难。这还只是线性的依次调用,如果再有更复杂的调用结构(比如循环),那基于回调函数的异步程序的实际结构会变得很复杂,共用状态的维护也会更困难,有时为了对象生存期正确性可能还不得不使用 std::shared_ptr 引入引用计数机制,在堆上分配共用状态。有没有更好的方法呢?

4. 协程拯救世界

C++20 加入了新的协程 (coroutine) 构造,极大地简化了异步算法的使用和实现。使用过 C# 的人可能会比较熟悉 C# 的 async/await 异步编程范式,利用 C++20 的协程我们可以做到相同的效果,还能不丢失 C++ 作为原生语言的高效率。Gor Nishanov 在 CppCon 2018 上曾经展示过如何使用协程来降低不在 cache 中的内存读取可能造成的延迟,这就是 C++ 的协程效率的最佳展现。(其他语言做的到吗?.jpg)

与子例程 (sub-routine) 也就是普通的函数不同的是,协程中可以有多个挂起点 (suspension point),在协程执行到挂起点时协程就会……呃……挂起 (suspend)。被挂起的协程将控制转交给调用方,之后调用方可以再度恢复 (resume) 协程的执行,被恢复的协程会从上次挂起的位置继续执行,直到下一次挂起,或者是整个协程执行的结束。

多说无益,让我们来看一看如果有协程支持的话上面的计算 8 次方代码会变成什么样子:

task<void> power_8_async(const int value)
{
    const int squared = co_await async_sqr_coro(value);
    const int fourth = co_await async_sqr_coro(squared);
    const int result = co_await async_sqr_coro(fourth);
    std::cout << "result = " << result;
}

是不是比回调函数的版本好看很多?本来是层层套娃的代码又变回了正常的顺序结构。这段异步代码与同步版本的代码差别仅是在异步调用的地方多了几个新的 co_await 关键字,返回值从 void 变成了 task<void>,其他的地方还是原来的配方,还是熟悉的味道。下面我就来粗略解释一下 co_await 关键字的意思,以及这个协程版本与上面回调函数版本的关联。

可以直接使用 co_await 关键字来等待的对象叫做等待器 (awaiter),严格定义来讲,等待器类型需要满足如下所述的 awaiter 概念:

template <typename T> struct __is_coro_handle : std::false_type {};
template <typename P> struct __is_coro_handle<std::coroutine_handle<P>> : std::true_type {};

template <typename T>
concept __valid_await_suspend_type = std::same_as<T, void> || std::same_as<T, bool> || __is_coro_handle<T>::value;

template <typename T>
concept awaiter = requires(T&& awt, std::coroutine_handle<> hdl)
{
    { awt.await_ready() ? void() : void() };
    { awt.await_suspend(hdl) } -> __valid_await_suspend_type;
    { awt.await_resume() };
};

好像不太像人话,让我来试着用人话描述一下这个概念的意思。首先等待器类型需要有一个 await_ready 函数,它的返回值可以按语境转换至 bool;然后,还要有一个 await_suspend 函数,返回值类型需要是 voidbool,或是某种协程句柄 std::coroutine_handle<T>;最后它还要有一个 await_resume 成员函数,对这个函数并没有返回类型的限制。

这三个函数的行为控制了在对等待器进行 co_await 时候的行为。

  • 当对一个等待器进行 co_await 时,会先调用 await_ready 成员函数,询问等待这个等待器的时候需不需要将协程挂起:如果 await_ready 的返回值转换至 true,就代表这个等待器已经准备好了,不用等就可以继续执行了,也就是说不挂起协程直接接下去运行协程的后面部分;如果 await_ready 返回值转换至 false 则挂起协程。
  • 每个协程都跟一个协程句柄相关联,这个句柄具有底层的对协程本身进行操作的成员函数。协程句柄类型重载了 operator(),调用协程句柄会恢复协程的执行,也就是说可以把协程句柄看成协程剩下部分的回调函数。协程被挂起以后,await_suspend 函数会被调用,传递给它的参数就是被挂起的协程对应的协程句柄。
    • await_suspend 返回 void 时函数直接返回,不作任何其他事情;
    • 若返回 bool,则根据这个返回值决定是否要即刻恢复协程:若这个值是 false 则直接恢复协程,若 true 则继续保持挂起状态;
    • 若返回的是协程柄 std::coroutine_handle<T>,则直接恢复返回的这个协程柄对应的协程(可以不是当前的协程);
    • 若这个函数执行过程中抛出了异常,则直接恢复协程并且在协程中抛出这个异常。
  • 当协程恢复执行的时候,调用 await_resume 函数,这个函数的返回值就是这次 co_await 的结果。

如果看不懂的话建议多看几遍可以了解大概意思以后看后面的实际例子,说不定有例子理解起来会更简单。这里我们的应用只会用到返回 voidawait_suspend 函数。

那么下面就来以 async_sqr_cbk 作为基础实现前面可使用 co_awaitasync_sqr_coro。根据前面的协程相关知识,要实现这个功能我们要写一个等待器类型,它满足前面说的 awaiter 要求。因为这里我们想要 co_await 的时候获取返回值的 int,所以 await_resume 函数的返回值就应该是 int。下面的代码就是完整的实现了。

// 等待器类型
class sqr_awaitable final
{
private:
    int value_ = 0;
    int result_ = 0;

public:
    // 我们需要把参数存在一个地方
    // 之后在挂起的时候把参数传给回调函数式的异步接口
    // 存在成员变量里是最简单直接的方式
    explicit sqr_awaitable(const int value): value_(value) {}

    // 直接返回 false,因为我们总是希望协程直接挂起的
    bool await_ready() const noexcept { return false; }

    // 挂起协程之后就可以开始异步运算了
    void await_suspend(const std::coroutine_handle<> awaiting)
    {
        // 调用回调式异步函数
        async_sqr_cbk(value_, [this, awaiting](const int result)
        {
            result_ = result; // 运算结束以后直接把结果存在等待器里面
            awaiting(); // 然后恢复协程,使用成员函数 awaiting.resume() 也可以
        });
    }

    // 协程恢复执行的时候把结果返回给协程就可以了
    int await_resume() const noexcept { return result_; }
};

// 这个函数实际上只是创建了一个等待器并且直接返回,主要的功能实现是在等待器类里面的
auto async_sqr_coro(const int value) { return sqr_awaitable(value); }

至于 task<void> 这个协程类型本身怎么实现嘛……这篇文章已经很长了,这种复杂的实现就不放在这里说了,说不定我以后会再写一篇有关如何实现协程类型的文章……

进一步阅读/观看材料