5 min read

Foreach itself is very useful and efficient for most operations. Sometimes special situations arise where high latency in getting data to iterate over, or processing data inside the foreach depends on an operation with very high latency or long processing. This is the case for example with getting paged data from a database to iterate over. The goal is to start getting data from the database, but a chunk of data at a time, since getting one record at a time introduces its own overhead. As the data becomes available, we’d start processing it, while in the background we get more data and feed it into the processor. The processing part would itself be parallel as well, and start processing the next iterator.

ForEachAsync

My favorite way to do this is with an extension method Stephen Toub wrote many years ago, that accepts a data generator and breaks the data source into partitions allowing for specifying the degree of parallelism and accepts a lambda to execute for each item

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Extensions
{
    public static class Extensions
    {
        public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
        {
            return Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(dop)
                select Task.Run(async delegate
                {
                    using (partition)
                        while (partition.MoveNext())
                            await body(partition.Current);
                }));
        }
    }
}

The history of it and previous versions are available here: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync/

The cool part is when we combine the generator pattern as the IEnumerable source.

public static IEnumerable<Entry> GetDocumentsFromDatabase(IDocumentSession session)
{
    var skip = 0;
    do
    {
        var entries = session.Query<Entry>().Where(x => !x.Deleted).OrderByDescending(x => x.DateModified).Skip(skip).Take(1024).ToList();

        foreach (var entry in entries)
            yield return entry;

        skip += 1024;
        if (entries.Count < 1024)
            break;
    } while (true);
}

When we combine the two, we’re fetching 20 pages from the database in parallel, then iterating over the results from each. Pausing execution of the thread on each result and yielding that item to the async lambda we have going on in the other thread:

await GetDocumentsFromDatabase(session).ForEachAsync(dop: 20, body: async entry =>
{
    _logger.Info($"Processing entry '{entry.Id}'");
});

Modernizing Async Foreach

Thanks to houseofcat we can actually improve on the above a bit by using some newer language features

public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    async Task AwaitPartition(IEnumerator<T> partition)
    {
        using (partition)
        {
            while (partition.MoveNext())
            { await body(partition.Current); }
        }
    }

    return Task.WhenAll(
        Partitioner
            .Create(source)
            .GetPartitions(dop)
            .AsParallel()
            .Select(p => AwaitPartition(p)));
}

https://houseofcat.io/tutorials/csharp/async/parallelforeachasync

C# 8.0 and Async Streams

From using this extensively whenever I need to work on really large data sets, I can vouch for its performance, but I’ve been looking for ways to push it even further. Once C# 8.0 announced async foreach, my interest was peaked. And it turns out that we can do better. Yielding on each item causes a lot of context switches, so want to yield one page at a time, but then we have to deal with nested foreach statements, and it’s just not as cool as the one-liner above.

The first thing we have new in C# 8 is IAsyncEnumerable, so our query can now look like this:

static async IAsyncEnumerable<Order> GetDocumentsFromDatabase2(IAsyncDocumentSession session)
{
    var skip = 0;
    do
    {
        var entries = await session.Query<Order>().OrderByDescending(x => x.Id).Skip(skip).Take(100).ToListAsync();
        foreach (var entry in entries)
            yield return entry;
        skip += 100;
        if (entries.Count < 100)
            break;
    } while (true);
}

And using the new await foreach we would expect the following to get close in at least optimizing the query part

using (var session = documentStore.OpenAsyncSession())
{
    await foreach (var entry in GetDocumentsFromDatabase2(session))
    {
        Console.WriteLine($"Processing entry '{entry.Id}'");
    }
}

The above actually is pretty bad in execution time, but it’ll come in handy soon.

Optimizing Parallel Foreach Further

Our next iteration comes from Stackoverflow. We instead use the TaskScheduler class with ActionBlock, and so far this is quite a bit faster than all the previous solutions

public static Task AsyncParallelForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

Source: https://stackoverflow.com/questions/14673728/run-async-method-8-times-in-parallel

Using it looks like this now

using (var session = documentStore.OpenSession())
{
    session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue;

    SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());

    await GetDocumentsFromDatabase(session).AsyncParallelForEach(async entry => { 
            Console.WriteLine($"Processing entry '{entry.Id}'");
        }, 20, TaskScheduler.FromCurrentSynchronizationContext()
    );
}

Optimizing Parallel async Foreach with C# 8.0 async streams

Fortunately, we can take advantage of the C# 8.0 async streams feature, and optimize this even more:

public static async Task AsyncParallelForEach<T>(this IAsyncEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    await foreach (var item in source)
        block.Post(item);

    block.Complete();
    await block.Completion;
}

Simulating a really slow connection to the database, and slow processing using Thread.Sleep(100) for both querying after each page, and during each foreach iteration, we get the following performance numbers:

Time it took to process sequentially: 87601 ms.
Time it took to process using await foreach (C# 8.0): 96101 ms. 
Time it took to process using ForEachAsync: 5210 ms.
Time it took to process using ParallelForEachAsync: 5135 ms.
Time it took to process using AsyncParallelForEach with IEnumerable: 4345 ms.
Time it took to process using AsyncParallelForEach with IAsyncEnumerable: 4301 ms.

Then I decided to turn it into a real benchmark project and test on some different size datasets.

Using 913 Order records

BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363
AMD Ryzen Threadripper 2950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=3.1.100
  [Host]     : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT
  DefaultJob : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT


|                         Method |     Mean |    Error |   StdDev | Ratio | RatioSD |
|------------------------------- |---------:|---------:|---------:|------:|--------:|
|                         Linear | 22.599 s | 0.4394 s | 0.5866 s |  1.00 |    0.00 |
|                   ForEachAsync |  2.499 s | 0.0491 s | 0.0947 s |  0.11 |    0.00 |
|           ParallelForEachAsync |  2.485 s | 0.0604 s | 0.0565 s |  0.11 |    0.00 |
|           AsyncParallelForEach |  2.157 s | 0.0423 s | 0.0578 s |  0.10 |    0.00 |
| AsyncEnumerableParallelForEach |  2.130 s | 0.0425 s | 0.0522 s |  0.09 |    0.00 |

Using 9130 Order records

BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363
AMD Ryzen Threadripper 2950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=3.1.100
  [Host]     : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT
  Job-WEHRKA : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT

InvocationCount=1  UnrollFactor=1

|                         Method |     Mean |   Error |  StdDev | Ratio | RatioSD |
|------------------------------- |---------:|--------:|--------:|------:|--------:|
|                         Linear | 244.07 s | 5.703 s | 8.879 s |  1.00 |    0.00 |
|                   ForEachAsync |  30.84 s | 1.899 s | 5.324 s |  0.11 |    0.01 |
|                   AsyncForEach | 272.10 s | 5.435 s | 8.620 s |  1.12 |    0.05 |
|           ParallelForEachAsync |  26.53 s | 0.339 s | 0.317 s |  0.11 |    0.00 |
|           AsyncParallelForEach |  25.79 s | 0.490 s | 0.564 s |  0.10 |    0.00 |
| AsyncEnumerableParallelForEach |  24.67 s | 0.436 s | 0.408 s |  0.10 |    0.00 |

DotNetBenchmark project, results and source code is available on my github: https://github.com/ops-ai/experiments

Was this post helpful?