Автор: Sergey Teplyakov

При работе с тасками часто возникает такая задача: у нас есть набор входных данных и для обработки каждого элемента используется длительная операция.

Можно подойти к этой задаче в лоб. Крутим цикл, запускаем таски, обрабатываем результаты по одному:private Task<,Weather>, GetWeatherForAsync(string city)
{
Console.WriteLine('[{1}]: Getting the weather for '{0}'', city,
DateTime.Now.ToLongTimeString()),
return WeatherService.GetWeatherAsync(city),
}
[
Test]
public async Task
ProcessOneByOneNaive()
{
var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },
var tasks =
from city in
cities
select new { City = city, WeatherTask = GetWeatherForAsync(city) },
foreach (var entry in tasks)
{
var wheather = await entry.WeatherTask,
ProcessWeather(entry
.City, wheather),
}
}
private void ProcessWeather(string city, Weather
weather)
{
Console.WriteLine('[{2}]: Processing weather for '{0}': '{1}'', city, weather,
DateTime.Now.ToLongTimeString()),
}

Здесь мы обращаемся к некоторому сервису погоды, для получения температуры в каждом городе, затем обрабатываем полученные результаты путем вывода города и температуры на экран.

Подход рабочий, но есть одна проблема: новая задача будет запущена лишь после завершения предыдущей. Тут можно принудительно дернуть все задачи и вызвать ToList() на LINQ-запросе, но и в этом случае задачи будут обрабатываться в порядке городов, а не в порядке доступности результатов (предположим, что для получения погоды для первого города уйдет втрое больше времени, чем для других, в этом случае мы будем ждать результат по первому городу, хотя результаты по двум другим уже доступны).

Решение заключается в использовании идиомы Process Tasks by Completion (можете называть это паттерном, если хотите), которая заключается в следующем: задачи должны обрабатываться не в порядке их запуска, а в порядке их завершения.

Вот как это будет выглядеть:[Test]
public async Task
ManualProcessByCompletion()
{
var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },
var tasks = (from city in cities
let result = new { City = city, WeatherTask = GetWeatherForAsync(city) }
select TaskEx.FromTask(result, r =>, r.WeatherTask)).ToList(),
while (tasks.Count != 0)
{
var completedTask = await Task.WhenAny(tasks),
tasks
.Remove(completedTask),
var result = completedTask.Result,
ProcessWeather(result
.City, result.WeatherTask.Result),
}
}
ПРИМЕЧАНИЕ
В одном из курсов Pluralsight данная идиома называется Process tasks one by one, но, ИМХО, это еще менее понятное название, чем у меня. Так что если есть мысли, какое название будет лучше передавать ее суть буду рад выслушать варианты!

Метод ManualProcessByCompletion, запускает получение погоды с помощью GetWeatherForAsync для всех городов, оборачивает объект анонимного типа в таску (позже объясню, почему это нужно). Затем, внутри цикла while мы зовем Task.WhenAny и получаем первую завершенную задачу. Получается, что задачи обрабатываются по мере завершения, а не по порядку их запуска. Но в данном случае, таска содержит ответ на вопрос (какая погода в городе?), но не содержит самого вопроса (имени города). Нам нужно как-то объединить результат и контекст исполнения. Для этого используется метод TaskEx.FromTask:public static Task<,T>, FromTask<,T, U>,(T result, Func<,T, Task<,U>,>, taskSelector)
{
Contract.Requires(taskSelector != null),
var tcs = new TaskCompletionSource<,T>,(),
var task = taskSelector(result),
task
.ContinueWith(t =>,
{
if (t.IsFaulted)
tcs
.SetException(t.Exception),
else if (t.IsCanceled)
tcs
.SetCanceled(),
else
tcs.
SetResult(result),
}),
return tcs.Task,
}

Метод TaskEx.FromResult, создает прокси-таску, которая завершится при завершении оригинальной задачи. А делегат taskSelector позволяет извлечь задачу из основного объекта, что позволяет удобно использовать этот подход совместно с анонимными типами.

Новый подход работает лучше оригинального, но выглядит более громоздким. Есть смысл сделать небольшую обертку, которая позволит использовать его повторно.public static IEnumerable<,Task<,TElement>,>, OrderByCompletion<,TElement, TTaskResult>,(
this IEnumerable<,TElement>, sequence, Func<,TElement, Task<,TTaskResult>,>, taskSelector)
{
Contract.Requires(sequence != null),
Contract.Requires(taskSelector != null),
var tasks = (from element in sequence
let pair = new {Element = element, Task = taskSelector(element)}
select FromTask(pair, p =>, p.Task)).ToList(),
while (tasks.Count != 0)
{
var tcs = new TaskCompletionSource<,TElement>,(),
// Getting the first finished task
Task.WhenAny(tasks).ContinueWith(tsk =>,
{
var finishedTask = tsk.Result,
tasks
.Remove(finishedTask),
tcs
.FromTask(finishedTask, arg =>, arg.Element),
}),
yield return tcs.Task,
}
}

Полный код класса доступен на гитхабе, но смысл его такой. Метод преобразует последовательность задач в другую последовательность задач, порядок которых определяется порядком завершения задач, а не порядком в исходной последовательности.

И вот как выглядит пример использования:[Test]
public async Task
ProcessByCompletion()
{
var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },
var tasks =
from city in
cities
select new {City = city, WeatherTask = GetWeatherForAsync(city)},
foreach (var task in tasks.OrderByCompletion(t =>, t.WeatherTask))
{
var taskResult = await task,
// taskResult is an object of anonymous type with City and WeatherTask
ProcessWeather(taskResult.City, taskResult.WeatherTask.Result),
}
}

Мы снова вернулись к исходному варианту с точки зрения синтаксиса, но оставили новое поведение. Вот результат исполнения, который показывает, что все задачи запускаются одновременно, а обработка происходит по мере поступления результатов:

[12:54:35 PM]: Getting the weather for 'Moscow'
[12:54:35 PM]: Getting the weather for 'Seattle'
[12:54:35 PM]: Getting the weather for 'New York'
[12:54:36 PM]: Processing weather for 'Seattle': 'Temp: 7C'
Got the weather for 'Moscow'
[12:54:39 PM]: Processing weather for 'Moscow': 'Temp: 6C'
Got the weather for 'New York'
[12:54:40 PM]: Processing weather for 'New York': 'Temp: 8C'

UPDATE:

Отказ от Query Comprehension Syntax-а позволит упростить последний пример еще немного:

[Test]
public async Task
ProcessByCompletion()
{
var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },
var tasks = cities.Select(async city =>,
{
return new {City = city, Weather = await GetWeatherForAsync(city)},
}),
foreach (var task in tasks.OrderByCompletion())
{
var taskResult = await task,
// taskResult is an object of anonymous type with City and WeatherTask
ProcessWeather(taskResult.City, taskResult.Weather),
}
}

И вариант, предложенный @hazzik на основе Rx-ов:

[Test]
public void
ProcessOneUsingRx()
{
var cities = new[] { 'Moscow', 'Seattle', 'New York' },
var objs = cities.Select(async city =>, new
{
City
= city,
Weather
= await GetWeatherForAsync(city)
})
.Select(task =>, task.ToObservable()).Merge().ToEnumerable(),
foreach (var obj in objs)
{
ProcessWeather(obj
.City, obj.Weather),
}
}

Он работает следующим образом: вначале мы берем последовательность задач, конвертим ее в последовательность IEnumerable<,IObservable<,T>,>,, которая, затем мерджится в одну последовательность IObservable<,T>,.

Особенность этого подхода в том, что в данном случае вызов MoveNext в цикле foreach является блокирующим.

З.Ы. Код залит в новый репо на гитхабе - https://github.com/SergeyTeplyakov/TplTipsAndTricks


Помогла статья? Оцените её!
0 из 5. Общее количество голосов - 0
 

You have no rights to post comments

Дмитрий Крикунов

Публикую статьи, обучающие курсы и новости по программированию: алгоритмам, языкам (С++, Java), параллельному программированию, паттернам и библиотекам (Qt, boost).