Using Channel Like IAsyncEnumerable

How System.Threading.Channels can be used like IAsyncEnumerable in older .NET versions

Ironically, as I am creating a Visual Studio extension that migrates .NET 4.x code to .NET Core 3.1, I find myself mainly coding with .NET 4.7.2 and C# 7.3. While it is technically possible to use a subset of C# 8 features outside of .NET Core 3.x, the VS SDK is a difficult beast and introducing it to new things often makes it unhappy. I’ve lost an afternoon tracking down a reference to System.Collections.Immutable that was upsetting it.

One of the big things from C# 8 that I’m missing is IAsyncEnumerable<T> and await foreach. These would be incredibly useful for getting a stream of values from a background thread and updating the UI. But I can’t use them, so instead I turned to System.Threading.Channels, which scratched a similar itch before C# 8 came along.

If you’re not familiar, System.Threading.Channels is a new-ish feature of .NET designed for inter-thread messaging. It’s a really nice abstraction compared to things like BlockingCollection.

Steve Gordon wrote a great Introduction to Channels article that you should read if you want to know more about them.

The fun thing I’ve found is that you can treat a ChannelReader<T> almost exactly like an IAsyncEnumerable<T>, and write useful extension methods over it. For example, while writing unit tests for a method that returns a ChannelReader<Thing>, I wanted to collate all the values it returned into a List<Thing>. The code was so obviously generic that I refactored it into this extension method:

 1internal static class ChannelReaderExtensions
 2{
 3    public static async Task<List<T>> ToListAsync<T>(this ChannelReader<T> channelReader, CancellationToken cancellationToken = default)
 4    {
 5        var list = new List<T>();
 6        while (await channelReader.WaitToReadAsync(cancellationToken))
 7        {
 8            while (channelReader.TryRead(out var item))
 9            {
10                list.Add(item);
11            }
12        }
13
14        return list;
15    }
16}

You could pretty much write any of the IEnumerable<T> extension methods over ChannelReader<T>, although you’d be creating new Channels and piling things on thread pools a lot, so, you know… caveat developer. But anyway, here’s a Where extension method as an example.

 1public static ChannelReader<T> Where<T>(this ChannelReader<T> channelReader,
 2    Func<T, bool> predicate, CancellationToken token)
 3{
 4    var output = Channel.CreateUnbounded<T>(new UnboundedChannelOptions {SingleWriter = true});
 5    var writer = output.Writer;
 6    Task.Run(async () =>
 7    {
 8        while (await channelReader.WaitToReadAsync(token))
 9        {
10            while (channelReader.TryRead(out var item))
11            {
12                if (predicate(item))
13                {
14                    if (!writer.TryWrite(item))
15                    {
16                        await writer.WriteAsync(item, token);
17                    }
18                }
19            }
20        }
21
22        writer.TryComplete();
23    });
24    return output.Reader;
25}

#Dotnet