Автор: Sergey Teplyakov

Давайте продолжим рассматривать приемы, которые будут полезными при работе с TPL.

В прошлый раз мы рассмотрели идиому, которая позволяет обрабатывать результаты в порядке окончания работы задач, а не в порядке их запуска. Но там был пропущен один интересный момент. Вот, например, у нас есть все тот же сервис погоды и желание получить результаты по всем городам как можно быстрее. Означает ли это, что можно взять все города мира и послать одновременно тысячи запросов? Сервис погоды может посчитать, что клиент сошел с ума и попробует затроттлить (throttle) запросы, превышающие определенный лимит (кстати, этот самый троттлинг это один большой pain in the ass для всех облачных сервисов, причем как для авторов сервисов, так и для клиентов этих самых сервисов).

Так вот, нам нужно как-то ограничить число таких запросов или каких-то других асинхронных операций.

Вообще, с ограничением числа задач есть одна небольшая беда. В случае CPU Intensive операций (числодробилки, операции, которые нагружают CPU/GPU) есть простая эвристика число работающих задач должно быть ограничено числом вычислительных устройств. Но, в случае с IO Intensive операциями таких ограничений нет. Более того, нет и встроенных инструментов для контроля за числом таких операций.

ПРИМЕЧАНИЕ
Пул потоков, теоретически, может помочь в этом плане, когда в системе исполняется смесь из IO Bound и CPU Bound операций. У него есть куча эвристик, которые пытаются подобрать оптимальное число одновременно работающих для обеспечения максимальной пропускной способности (throughput) по переработке задач. Но эти эвристики мало чем помогут, когда у нас есть большое число длительных IO Bound операций и только мы знаем, когда наш backend начнет отпадать от перегрузки. Если интересно, то есть просто замечательная статья по устройству пула потоков в .NET: Throttling Concurrency in the CLR 4.0 ThreadPool.

Итак, нам нужен метод, например, ForEachAsync, который берет последовательность элементов и фабричный метод для запуска задач. Ну и он же сможет ограничивать число одновременных операций.public static IEnumerable<,Task<,TTask>,>, ForEachAsync<,TItem, TTask>,(
this IEnumerable<,TItem>, source, Func<,TItem, Task<,TTask>,>, selector,
int degreeOfParallelism)
{
Contract.Requires(source != null),
Contract.Requires(selector != null),
// Материализируем последовательность
var tasks = source.ToList
(),
int completedTask = -1,
// Массив TaskCompletionSource будет хранить результаты всех операций
var taskCompletions = new TaskCompletionSource<,TTask>,[tasks.Count
],
for (int n = 0, n <, taskCompletions.Length, n++)
taskCompletions[n] = new TaskCompletionSource<,TTask>,(),
// Partitioner сделает за нас всю черную работу по ограничению
// числа одновременных обработчиков
foreach (var partition in
Partitioner.Create(tasks).GetPartitions(degreeOfParallelism
))
{
var p = partition,
// Теряем контекст синхронизации и запускаем обработку
// каждой партиции асинхронно
Task.Run(async () =>,
{
while (p.MoveNext())
{
var task = selector(p.Current),
// Хитрым образом подавляем исключения
await task.ContinueWith(_ =>,
{ }),
int finishedTaskIndex = Interlocked.Increment(ref completedTask),
taskCompletions[finishedTaskIndex].FromTask(task),
}
}),
}
return taskCompletions.Select(tcs =>, tcs.Task),
}

Выглядит немного жутковато, но, не все так плохо!

Существует Over 9000 способов ограничить число одновременных операций. И первым в этом списке идет семафор. С ним все хорошо, и его можно использовать для этой задачи, но можно взять что-то более высокоуровневое, например, класс Partitioner из TPL.

Partitioner.Create(source) возвращает объект, который умеет делить входную последовательность для параллельной обработки. Алгоритм деления может быть разный и зависит он от типа последовательности/коллекции, и не является интересным в нашем случае. Главное, partitioner позволяет получить несколько итератров, которые работать могут параллельно, каждый со своим куском входной последовательности.

Параллельная обработка это хорошо, но нам нужно где-то хранить результаты. Для этого, в начале метода идет материализация последовательности в список и создается массив объектов TaskCompletionSource для каждой будущей задачи. Затем, мы запускаем параллельную обработку каждой партиции, и добавляем результаты в массив TaskCompletionSource-ов по мере их поступления.private Task<,Weather>, GetWeatherForAsync(string city)
{
Console.WriteLine('[{1}]: Getting the weather for '{0}'', city,
DateTime.Now.ToLongTimeString()),
return WeatherService.GetWeatherAsync(city),
}
[
Test]
public async Task ForEachAsync
()
{
var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York', 'Kiev' },
var tasks = cities.ForEachAsync(async city =>,
{
return new { City = city, Weather = await GetWeatherForAsync(city) },
},
2),
foreach (var task in tasks)
{
var taskResult = await task,
ProcessWeather(taskResult.City, taskResult.Weather),
}
}
private void ProcessWeather(string city, Weather weather
)
{
Console.WriteLine('[{2}]: Processing weather for '{0}': '{1}'', city, weather,
DateTime.Now.ToLongTimeString()),
}

И вот результаты исполнения:

[1:22:09 PM]: Getting the weather for 'Moscow'
[1:22:09 PM]: Getting the weather for 'Seattle'
-- Включилось ограничение числа задач! Ждем окончания первой задачи!
[1:22:10 PM]: Processing weather for 'Moscow': 'Temp: 6C'
-- Сразу после окончания одной задачи запускаем следующую
[1:22:10 PM]: Getting the weather for 'New York'
-- Самая новая задача завершилась первой
[1:22:15 PM]: Processing weather for 'New York': 'Temp: 8C'
-- Запускаем следующую
[1:22:15 PM]: Getting the weather for 'Kiev'
-- Только теперь завершилась вторая задача
[1:22:16 PM]: Processing weather for 'Seattle': 'Temp: 7C'
-- И теперь - последняя
[1:22:20 PM]: Processing weather for 'Kiev': 'Temp: 4C'

И в графическом виде:

clip_image002

Получается, что эта штука не просто ограничивает число одновременных операций, но и позволяет обрабатывать результаты в порядке их завершения, а не в порядке их запуска! Красота!

Что возвращать? Task или IEnumerable<,Task>,?

Камрад Тауб описывал реализацию ForEachAsync, аж в двух частях (Implementing a simple ForEachAsync и Implementing a simple ForEachAsync , part 2). Но его реализация несколько иная. Главное отличие ее в том, что метод ForEachAsync возвращает Task, а не IEnumerable<,Task>,. И это может быть весьма важным отличием:public static Task ForEachAsync<,T>,(this IEnumerable<,T>, source, int dop, Func<,T, Task>, body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current),
})),
}

Код короче (что хорошо!), но ведет себя по-другому (и тут, ХЗ, насколько это хорошо). Во-первых, этот подход работает только для команд, но не работает для запросов (команда это мутатор, запрос это геттер, подробности в правильной книге). Во-вторых, наличие лишь одного результата сильно усложняет процесс обработки ошибок даже в случае ограничение таких операций, как сохранение данных.

Например, в данном случае обработка полностью остановится, когда исключение произойдет в каждой из существующих партиций! Так, когда произойдет первая ошибка и await body завершится с ошибкой, то цикл while оборвется, и будет вызван Dispose на объекте partition. Партиционирование реализовано с использованием идиомы work stealing, а это значит, что текущие элементы (логически) будут добавлены в очередь на обработку другой партицией. Если упадет обработка еще одной партици, то количество обработчиков еще уменьшится. И так до тех пор, пока все партиции не свалятся с ошибками. В лучшем случае вы получите лишь часть обработанных данных, а в худшем случае проблемы с эффективностью, из-за того, что у вас будет гораздо меньше активных обработчиков задач, чем вы думали.

Решить задачу можно путем накопления ошибок и генерации AggregateException:public static async Task ForEachAsyncWithExceptions<,T>,(
this IEnumerable<,T>, source, int dop, Func<,T, Task>, body)
{
ConcurrentQueue<,Exception>, exceptions = null,
await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
{
try
{
await body(partition.Current),
}
catch (Exception e)
{
LazyInitializer
.EnsureInitialized(ref exceptions).Enqueue(e)),
}
}
}
})),
if (exceptions != null)
{
throw new AggregateException(exceptions),
}
}

Этот подход полностью рабочий, хотя и остается вопрос с ассоциацией задачи и возникшей ошибки.

Дополнительные ссылки

  • Идиома Process Tasks by Completion
  • Implementing a simple ForEachAsync by Stephen Toub
  • Implementing a simple ForEachAsync , part 2 by Stephen Toub
  • Реализация ForEachAsync на GitHub

Помогла статья? Оцените её!
0 из 5. Общее количество голосов - 0
 

You have no rights to post comments

Дмитрий Крикунов

Публикую статьи, обучающие курсы и новости по программированию: алгоритмам, языкам (С++, Java), параллельному программированию, паттернам и библиотекам (Qt, boost).