Liam W
封面

C#.NET拾遗补漏 | 18-理解 C# 中的异步流

作者
王亮·发表于 2 年前

在阅读本文前,建议先阅读本系列的上一篇文章『理解 yield 关键字』。我们知道,使用 C# 的 yield 关键字可以实现一个迭代器(Iterator),使用 async/await 关键字可以实现一个异步方法。异步流(Asynchronous Stream)就是这两种功能的结合体,它实现了以异步的方式生成和消费一组数据系列的迭代器。

异步流的支持主要建立在 C# 8 引入的两个接口上:

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator (...);
}
public interface IAsyncEnumerator<out T>: IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}

所以理解了上一篇我们讲的 yield 关键字,就很容易理解异步流,它只是在原来的基础上支持通过 yield return 返回异步得到的一系列结果值而已。从序列中获取每个元素的行为(MoveNextAsync)是一个异步操作,元素是以零散的方式到达,这就形成了所谓的“异步流”(比如视频流中的数据)。

这里 IAsyncEnumerator 接口的 ValueTask<T>Task<T> 类型轻量化的封装,它是结构类型(值类型)。使用方式与 Task<T> 相似,但它在同步完成任务或返回立即可用的结果时(这在列举序列时会经常发生),可以避免不必要的内存开销,比 Task<T> 更高效。

在上一篇文章中的 Fibonacci 方法中,Thread.Sleep(1000) 用来模拟一个耗时操作,它是“同步”的:

IEnumerable<int> Fibonacci(int count)
{
    int prev = 1;
    int curr = 1;
    for (int i = 0; i < count; i++)
    {
        yield return prev;
        Thread.Sleep(1000);
        int temp = prev + curr;
        prev = curr;
        curr = temp;
    }
}

为了提高程序执行效率,我们很有可能需要把 Thread.Sleep(1000) 改成“异步”的。如果使它生成异步的数据流,该怎么做呢?这就需要同时用到迭代器和异步方法了,也就是说方法中要同时使用 yield returnasync/await 关键字。要支持这一特性,就要使用 IAsyncEnumerable<T> 作为方法的返回类型。于是,前文的 Fibonacci 方法可以这样改造:

async IAsyncEnumerable<int> FibonacciAsync(int count)
{
    int prev = 1;
    int curr = 1;
    Random r = new();
    for (int i = 0; i < count; i++)
    {
        yield return prev;
        await Task.Delay(1000);
        int temp = prev + curr;
        prev = curr;
        curr = temp;
    }
}

不同的是,这个方法允许调用者以异步的方式消费它生成的数字系列,换句话说就是使用 await foreach 来遍历消费这个方法的返回结果(IAsyncEnumerable<int>),如下所示:

await foreach (var n in FibonacciAsync(10))
    Console.Write("{0} ", n);

但,如果要在 LINQ 查询语句中消费异步流,需要先引入 System.Linq.Async NuGet 包,除此之外,使用方式没有差别:

IAsyncEnumerable<int> query =
    from i in FibonacciAsync(10)
    where i % 2 == 0
    select i * 10;

await foreach (var number in query)
    Console.WriteLine(number);

另外,在 ASP.NET Core 的 Action 方法中也支持返回异步流,如下面示例:

[HttpGet]
public async IAsyncEnumerable<string> Get()
{
    using var dbContext = new BookContext();
    await foreach (var title in dbContext.Books
        .Select(b => b.Title)
        .AsAsyncEnumerable())
    yield return title;
}

综上,可以看到,异步流解决了零散数据系列的异步生成和消费问题。要知道,在 C# 还没有异步流时,一组数据系列(IEnumerable<T>)只能以整体异步的方式(Task<IEnumerable<T>>)返回给调用者。

参考:

《C# 9.0 in a nutshell》
Generate and consume async streams