Foreach itself is very useful and efficient for most operations. Sometimes special situations arise where high latency in getting data to iterate over, or processing data inside the foreach depends on an operation with very high latency or long processing. This is the case for example with getting paged data from a database to iterate over. The goal is to start getting data from the database, but a chunk of data at a time, since getting one record at a time introduces its own overhead. As the data becomes available, we’d start processing it, while in the background we get more data and feed it into the processor. The processing part would itself be parallel as well, and start processing the next iterator.
My favorite way to do this is with an extension method Stephen Toub wrote many years ago, that accepts a data generator and breaks the data source into partitions allowing for specifying the degree of parallelism and accepts a lambda to execute for each item
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Extensions { public static class Extensions { public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current); })); } } }
The history of it and previous versions are available here: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync/
The cool part is when we combine the generator pattern as the IEnumerable
source.
public static IEnumerable<Entry> GetDocumentsFromDatabase(IDocumentSession session) { var skip = 0; do { var entries = session.Query<Entry>().Where(x => !x.Deleted).OrderByDescending(x => x.DateModified).Skip(skip).Take(1024).ToList(); foreach (var entry in entries) yield return entry; skip += 1024; if (entries.Count < 1024) break; } while (true); }
When we combine the two, we’re fetching 20 pages from the database in parallel, then iterating over the results from each. Pausing execution of the thread on each result and yielding that item to the async lambda we have going on in the other thread:
await GetDocumentsFromDatabase(session).ForEachAsync(dop: 20, body: async entry => { _logger.Info($"Processing entry '{entry.Id}'"); });
Thanks to houseofcat we can actually improve on the above a bit by using some newer language features
public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) { async Task AwaitPartition(IEnumerator<T> partition) { using (partition) { while (partition.MoveNext()) { await body(partition.Current); } } } return Task.WhenAll( Partitioner .Create(source) .GetPartitions(dop) .AsParallel() .Select(p => AwaitPartition(p))); }
https://houseofcat.io/tutorials/csharp/async/parallelforeachasync
From using this extensively whenever I need to work on really large data sets, I can vouch for its performance, but I’ve been looking for ways to push it even further. Once C# 8.0 announced async foreach, my interest was peaked. And it turns out that we can do better. Yielding on each item causes a lot of context switches, so want to yield one page at a time, but then we have to deal with nested foreach statements, and it’s just not as cool as the one-liner above.
The first thing we have new in C# 8 is IAsyncEnumerable, so our query can now look like this:
static async IAsyncEnumerable<Order> GetDocumentsFromDatabase2(IAsyncDocumentSession session) { var skip = 0; do { var entries = await session.Query<Order>().OrderByDescending(x => x.Id).Skip(skip).Take(100).ToListAsync(); foreach (var entry in entries) yield return entry; skip += 100; if (entries.Count < 100) break; } while (true); }
And using the new await foreach we would expect the following to get close in at least optimizing the query part
using (var session = documentStore.OpenAsyncSession()) { await foreach (var entry in GetDocumentsFromDatabase2(session)) { Console.WriteLine($"Processing entry '{entry.Id}'"); } }
The above actually is pretty bad in execution time, but it’ll come in handy soon.
Our next iteration comes from Stackoverflow. We instead use the TaskScheduler class with ActionBlock, and so far this is quite a bit faster than all the previous solutions
public static Task AsyncParallelForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }; if (scheduler != null) options.TaskScheduler = scheduler; var block = new ActionBlock<T>(body, options); foreach (var item in source) block.Post(item); block.Complete(); return block.Completion; }
Source: https://stackoverflow.com/questions/14673728/run-async-method-8-times-in-parallel
using (var session = documentStore.OpenSession()) { session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue; SynchronizationContext.SetSynchronizationContext(new SynchronizationContext()); await GetDocumentsFromDatabase(session).AsyncParallelForEach(async entry => { Console.WriteLine($"Processing entry '{entry.Id}'"); }, 20, TaskScheduler.FromCurrentSynchronizationContext() ); }
Fortunately, we can take advantage of the C# 8.0 async streams feature, and optimize this even more:
public static async Task AsyncParallelForEach<T>(this IAsyncEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }; if (scheduler != null) options.TaskScheduler = scheduler; var block = new ActionBlock<T>(body, options); await foreach (var item in source) block.Post(item); block.Complete(); await block.Completion; }
Simulating a really slow connection to the database, and slow processing using Thread.Sleep(100) for both querying after each page, and during each foreach iteration, we get the following performance numbers:
Time it took to process sequentially: 87601 ms. Time it took to process using await foreach (C# 8.0): 96101 ms. Time it took to process using ForEachAsync: 5210 ms. Time it took to process using ParallelForEachAsync: 5135 ms. Time it took to process using AsyncParallelForEach with IEnumerable: 4345 ms. Time it took to process using AsyncParallelForEach with IAsyncEnumerable: 4301 ms.
Then I decided to turn it into a real benchmark project and test on some different size datasets.
BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363 AMD Ryzen Threadripper 2950X, 1 CPU, 32 logical and 16 physical cores .NET Core SDK=3.1.100 [Host] : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT DefaultJob : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT | Method | Mean | Error | StdDev | Ratio | RatioSD | |------------------------------- |---------:|---------:|---------:|------:|--------:| | Linear | 22.599 s | 0.4394 s | 0.5866 s | 1.00 | 0.00 | | ForEachAsync | 2.499 s | 0.0491 s | 0.0947 s | 0.11 | 0.00 | | ParallelForEachAsync | 2.485 s | 0.0604 s | 0.0565 s | 0.11 | 0.00 | | AsyncParallelForEach | 2.157 s | 0.0423 s | 0.0578 s | 0.10 | 0.00 | | AsyncEnumerableParallelForEach | 2.130 s | 0.0425 s | 0.0522 s | 0.09 | 0.00 |
BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363 AMD Ryzen Threadripper 2950X, 1 CPU, 32 logical and 16 physical cores .NET Core SDK=3.1.100 [Host] : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT Job-WEHRKA : .NET Core 3.1.0 (CoreCLR 4.700.19.56402, CoreFX 4.700.19.56404), X64 RyuJIT InvocationCount=1 UnrollFactor=1 | Method | Mean | Error | StdDev | Ratio | RatioSD | |------------------------------- |---------:|--------:|--------:|------:|--------:| | Linear | 244.07 s | 5.703 s | 8.879 s | 1.00 | 0.00 | | ForEachAsync | 30.84 s | 1.899 s | 5.324 s | 0.11 | 0.01 | | AsyncForEach | 272.10 s | 5.435 s | 8.620 s | 1.12 | 0.05 | | ParallelForEachAsync | 26.53 s | 0.339 s | 0.317 s | 0.11 | 0.00 | | AsyncParallelForEach | 25.79 s | 0.490 s | 0.564 s | 0.10 | 0.00 | | AsyncEnumerableParallelForEach | 24.67 s | 0.436 s | 0.408 s | 0.10 | 0.00 |
DotNetBenchmark project, results and source code is available on my github: https://github.com/ops-ai/experiments
Quick Links
Legal Stuff