Автор: Sergey Teplyakov
Давайте продолжим рассматривать приемы, которые будут полезными при работе с TPL.
В прошлый раз мы рассмотрели идиому, которая позволяет обрабатывать результаты в порядке окончания работы задач, а не в порядке их запуска. Но там был пропущен один интересный момент. Вот, например, у нас есть все тот же сервис погоды и желание получить результаты по всем городам как можно быстрее. Означает ли это, что можно взять все города мира и послать одновременно тысячи запросов? Сервис погоды может посчитать, что клиент сошел с ума и попробует затроттлить (throttle) запросы, превышающие определенный лимит (кстати, этот самый троттлинг это один большой pain in the ass для всех облачных сервисов, причем как для авторов сервисов, так и для клиентов этих самых сервисов).
Так вот, нам нужно как-то ограничить число таких запросов или каких-то других асинхронных операций.
Вообще, с ограничением числа задач есть одна небольшая беда. В случае CPU Intensive операций (числодробилки, операции, которые нагружают CPU/GPU) есть простая эвристика число работающих задач должно быть ограничено числом вычислительных устройств. Но, в случае с IO Intensive операциями таких ограничений нет. Более того, нет и встроенных инструментов для контроля за числом таких операций.
ПРИМЕЧАНИЕПул потоков, теоретически, может помочь в этом плане, когда в системе исполняется смесь из IO Bound и CPU Bound операций. У него есть куча эвристик, которые пытаются подобрать оптимальное число одновременно работающих для обеспечения максимальной пропускной способности (throughput) по переработке задач. Но эти эвристики мало чем помогут, когда у нас есть большое число длительных IO Bound операций и только мы знаем, когда наш backend начнет отпадать от перегрузки. Если интересно, то есть просто замечательная статья по устройству пула потоков в .NET: Throttling Concurrency in the CLR 4.0 ThreadPool.
Итак, нам нужен метод, например, ForEachAsync, который берет последовательность элементов и фабричный метод для запуска задач. Ну и он же сможет ограничивать число одновременных операций.public static IEnumerable<,Task<,TTask>,>, ForEachAsync<,TItem, TTask>,(this IEnumerable<,TItem>, source, Func<,TItem, Task<,TTask>,>, selector,int degreeOfParallelism){Contract.Requires(source != null),Contract.Requires(selector != null),// Материализируем последовательность var tasks = source.ToList(),int completedTask = -1,// Массив TaskCompletionSource будет хранить результаты всех операций var taskCompletions = new TaskCompletionSource<,TTask>,[tasks.Count],for (int n = 0, n <, taskCompletions.Length, n++)taskCompletions[n] = new TaskCompletionSource<,TTask>,(),// Partitioner сделает за нас всю черную работу по ограничению // числа одновременных обработчиков foreach (var partition in Partitioner.Create(tasks).GetPartitions(degreeOfParallelism)) {var p = partition,// Теряем контекст синхронизации и запускаем обработку // каждой партиции асинхронно Task.Run(async () =>, {while (p.MoveNext()) {var task = selector(p.Current),// Хитрым образом подавляем исключения await task.ContinueWith(_ =>, { }),int finishedTaskIndex = Interlocked.Increment(ref completedTask),taskCompletions[finishedTaskIndex].FromTask(task), } }), }return taskCompletions.Select(tcs =>, tcs.Task),}
Выглядит немного жутковато, но, не все так плохо!
Существует Over 9000 способов ограничить число одновременных операций. И первым в этом списке идет семафор. С ним все хорошо, и его можно использовать для этой задачи, но можно взять что-то более высокоуровневое, например, класс Partitioner из TPL.
Partitioner.Create(source) возвращает объект, который умеет делить входную последовательность для параллельной обработки. Алгоритм деления может быть разный и зависит он от типа последовательности/коллекции, и не является интересным в нашем случае. Главное, partitioner позволяет получить несколько итератров, которые работать могут параллельно, каждый со своим куском входной последовательности.
Параллельная обработка это хорошо, но нам нужно где-то хранить результаты. Для этого, в начале метода идет материализация последовательности в список и создается массив объектов TaskCompletionSource для каждой будущей задачи. Затем, мы запускаем параллельную обработку каждой партиции, и добавляем результаты в массив TaskCompletionSource-ов по мере их поступления.private Task<,Weather>, GetWeatherForAsync(string city){Console.WriteLine('[{1}]: Getting the weather for '{0}'', city,DateTime.Now.ToLongTimeString()),return WeatherService.GetWeatherAsync(city),}[Test]public async Task ForEachAsync(){var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York', 'Kiev' },var tasks = cities.ForEachAsync(async city =>, {return new { City = city, Weather = await GetWeatherForAsync(city) }, }, 2),foreach (var task in tasks) {var taskResult = await task,ProcessWeather(taskResult.City, taskResult.Weather), }}private void ProcessWeather(string city, Weather weather){Console.WriteLine('[{2}]: Processing weather for '{0}': '{1}'', city, weather,DateTime.Now.ToLongTimeString()),}
И вот результаты исполнения:
[1:22:09 PM]: Getting the weather for 'Moscow'[1:22:09 PM]: Getting the weather for 'Seattle'-- Включилось ограничение числа задач! Ждем окончания первой задачи![1:22:10 PM]: Processing weather for 'Moscow': 'Temp: 6C'-- Сразу после окончания одной задачи запускаем следующую[1:22:10 PM]: Getting the weather for 'New York'-- Самая новая задача завершилась первой[1:22:15 PM]: Processing weather for 'New York': 'Temp: 8C'-- Запускаем следующую[1:22:15 PM]: Getting the weather for 'Kiev'-- Только теперь завершилась вторая задача[1:22:16 PM]: Processing weather for 'Seattle': 'Temp: 7C'-- И теперь - последняя[1:22:20 PM]: Processing weather for 'Kiev': 'Temp: 4C'
И в графическом виде:
Получается, что эта штука не просто ограничивает число одновременных операций, но и позволяет обрабатывать результаты в порядке их завершения, а не в порядке их запуска! Красота!
Что возвращать? Task или IEnumerable<,Task>,?
Камрад Тауб описывал реализацию ForEachAsync, аж в двух частях (Implementing a simple ForEachAsync и Implementing a simple ForEachAsync , part 2). Но его реализация несколько иная. Главное отличие ее в том, что метод ForEachAsync возвращает Task, а не IEnumerable<,Task>,. И это может быть весьма важным отличием:public static Task ForEachAsync<,T>,(this IEnumerable<,T>, source, int dop, Func<,T, Task>, body){return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)select Task.Run(async delegate {using (partition)while (partition.MoveNext())await body(partition.Current), })),}
Код короче (что хорошо!), но ведет себя по-другому (и тут, ХЗ, насколько это хорошо). Во-первых, этот подход работает только для команд, но не работает для запросов (команда это мутатор, запрос это геттер, подробности в правильной книге). Во-вторых, наличие лишь одного результата сильно усложняет процесс обработки ошибок даже в случае ограничение таких операций, как сохранение данных.
Например, в данном случае обработка полностью остановится, когда исключение произойдет в каждой из существующих партиций! Так, когда произойдет первая ошибка и await body завершится с ошибкой, то цикл while оборвется, и будет вызван Dispose на объекте partition. Партиционирование реализовано с использованием идиомы work stealing, а это значит, что текущие элементы (логически) будут добавлены в очередь на обработку другой партицией. Если упадет обработка еще одной партици, то количество обработчиков еще уменьшится. И так до тех пор, пока все партиции не свалятся с ошибками. В лучшем случае вы получите лишь часть обработанных данных, а в худшем случае проблемы с эффективностью, из-за того, что у вас будет гораздо меньше активных обработчиков задач, чем вы думали.
Решить задачу можно путем накопления ошибок и генерации AggregateException:public static async Task ForEachAsyncWithExceptions<,T>,(this IEnumerable<,T>, source, int dop, Func<,T, Task>, body){ConcurrentQueue<,Exception>, exceptions = null,await Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)select Task.Run(async delegate {using (partition) {while (partition.MoveNext()) {try {await body(partition.Current), }catch (Exception e) {LazyInitializer .EnsureInitialized(ref exceptions).Enqueue(e)), } } } })),if (exceptions != null) {throw new AggregateException(exceptions), }}
Этот подход полностью рабочий, хотя и остается вопрос с ассоциацией задачи и возникшей ошибки.
Дополнительные ссылки
- Идиома Process Tasks by Completion
- Implementing a simple ForEachAsync by Stephen Toub
- Implementing a simple ForEachAsync , part 2 by Stephen Toub
- Реализация ForEachAsync на GitHub