Автор: Sergey Teplyakov
При работе с тасками часто возникает такая задача: у нас есть набор входных данных и для обработки каждого элемента используется длительная операция.
Можно подойти к этой задаче в лоб. Крутим цикл, запускаем таски, обрабатываем результаты по одному:private Task<,Weather>, GetWeatherForAsync(string city){Console.WriteLine('[{1}]: Getting the weather for '{0}'', city,DateTime.Now.ToLongTimeString()),return WeatherService.GetWeatherAsync(city),}[Test]public async Task ProcessOneByOneNaive(){var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },var tasks = from city in citiesselect new { City = city, WeatherTask = GetWeatherForAsync(city) },foreach (var entry in tasks) {var wheather = await entry.WeatherTask, ProcessWeather(entry.City, wheather), }}private void ProcessWeather(string city, Weather weather){Console.WriteLine('[{2}]: Processing weather for '{0}': '{1}'', city, weather,DateTime.Now.ToLongTimeString()),}
Здесь мы обращаемся к некоторому сервису погоды, для получения температуры в каждом городе, затем обрабатываем полученные результаты путем вывода города и температуры на экран.
Подход рабочий, но есть одна проблема: новая задача будет запущена лишь после завершения предыдущей. Тут можно принудительно дернуть все задачи и вызвать ToList() на LINQ-запросе, но и в этом случае задачи будут обрабатываться в порядке городов, а не в порядке доступности результатов (предположим, что для получения погоды для первого города уйдет втрое больше времени, чем для других, в этом случае мы будем ждать результат по первому городу, хотя результаты по двум другим уже доступны).
Решение заключается в использовании идиомы Process Tasks by Completion (можете называть это паттерном, если хотите), которая заключается в следующем: задачи должны обрабатываться не в порядке их запуска, а в порядке их завершения.
Вот как это будет выглядеть:[Test]public async Task ManualProcessByCompletion(){var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },var tasks = (from city in citieslet result = new { City = city, WeatherTask = GetWeatherForAsync(city) }select TaskEx.FromTask(result, r =>, r.WeatherTask)).ToList(),while (tasks.Count != 0) {var completedTask = await Task.WhenAny(tasks), tasks.Remove(completedTask),var result = completedTask.Result, ProcessWeather(result.City, result.WeatherTask.Result), }}ПРИМЕЧАНИЕВ одном из курсов Pluralsight данная идиома называется Process tasks one by one, но, ИМХО, это еще менее понятное название, чем у меня. Так что если есть мысли, какое название будет лучше передавать ее суть буду рад выслушать варианты!
Метод ManualProcessByCompletion, запускает получение погоды с помощью GetWeatherForAsync для всех городов, оборачивает объект анонимного типа в таску (позже объясню, почему это нужно). Затем, внутри цикла while мы зовем Task.WhenAny и получаем первую завершенную задачу. Получается, что задачи обрабатываются по мере завершения, а не по порядку их запуска. Но в данном случае, таска содержит ответ на вопрос (какая погода в городе?), но не содержит самого вопроса (имени города). Нам нужно как-то объединить результат и контекст исполнения. Для этого используется метод TaskEx.FromTask:public static Task<,T>, FromTask<,T, U>,(T result, Func<,T, Task<,U>,>, taskSelector){Contract.Requires(taskSelector != null),var tcs = new TaskCompletionSource<,T>,(),var task = taskSelector(result), task.ContinueWith(t =>, {if (t.IsFaulted) tcs.SetException(t.Exception),else if (t.IsCanceled) tcs.SetCanceled(),else tcs.SetResult(result), }),return tcs.Task,}
Метод TaskEx.FromResult, создает прокси-таску, которая завершится при завершении оригинальной задачи. А делегат taskSelector позволяет извлечь задачу из основного объекта, что позволяет удобно использовать этот подход совместно с анонимными типами.
Новый подход работает лучше оригинального, но выглядит более громоздким. Есть смысл сделать небольшую обертку, которая позволит использовать его повторно.public static IEnumerable<,Task<,TElement>,>, OrderByCompletion<,TElement, TTaskResult>,(this IEnumerable<,TElement>, sequence, Func<,TElement, Task<,TTaskResult>,>, taskSelector){Contract.Requires(sequence != null),Contract.Requires(taskSelector != null),var tasks = (from element in sequencelet pair = new {Element = element, Task = taskSelector(element)}select FromTask(pair, p =>, p.Task)).ToList(),while (tasks.Count != 0) {var tcs = new TaskCompletionSource<,TElement>,(),// Getting the first finished task Task.WhenAny(tasks).ContinueWith(tsk =>, {var finishedTask = tsk.Result, tasks.Remove(finishedTask), tcs.FromTask(finishedTask, arg =>, arg.Element), }),yield return tcs.Task, }}
Полный код класса доступен на гитхабе, но смысл его такой. Метод преобразует последовательность задач в другую последовательность задач, порядок которых определяется порядком завершения задач, а не порядком в исходной последовательности.
И вот как выглядит пример использования:[Test]public async Task ProcessByCompletion(){var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },var tasks = from city in citiesselect new {City = city, WeatherTask = GetWeatherForAsync(city)},foreach (var task in tasks.OrderByCompletion(t =>, t.WeatherTask)) {var taskResult = await task,// taskResult is an object of anonymous type with City and WeatherTask ProcessWeather(taskResult.City, taskResult.WeatherTask.Result), }}
Мы снова вернулись к исходному варианту с точки зрения синтаксиса, но оставили новое поведение. Вот результат исполнения, который показывает, что все задачи запускаются одновременно, а обработка происходит по мере поступления результатов:
[12:54:35 PM]: Getting the weather for 'Moscow'[12:54:35 PM]: Getting the weather for 'Seattle'[12:54:35 PM]: Getting the weather for 'New York'[12:54:36 PM]: Processing weather for 'Seattle': 'Temp: 7C'Got the weather for 'Moscow'[12:54:39 PM]: Processing weather for 'Moscow': 'Temp: 6C'Got the weather for 'New York'[12:54:40 PM]: Processing weather for 'New York': 'Temp: 8C'
UPDATE:
Отказ от Query Comprehension Syntax-а позволит упростить последний пример еще немного:
[Test]public async Task ProcessByCompletion(){var cities = new List<,string>, { 'Moscow', 'Seattle', 'New York' },var tasks = cities.Select(async city =>, {return new {City = city, Weather = await GetWeatherForAsync(city)}, }),foreach (var task in tasks.OrderByCompletion()) {var taskResult = await task,// taskResult is an object of anonymous type with City and WeatherTask ProcessWeather(taskResult.City, taskResult.Weather), }}И вариант, предложенный @hazzik на основе Rx-ов:
[Test]public void ProcessOneUsingRx(){var cities = new[] { 'Moscow', 'Seattle', 'New York' },var objs = cities.Select(async city =>, new { City = city, Weather = await GetWeatherForAsync(city) }).Select(task =>, task.ToObservable()).Merge().ToEnumerable(),foreach (var obj in objs) { ProcessWeather(obj.City, obj.Weather), }}Он работает следующим образом: вначале мы берем последовательность задач, конвертим ее в последовательность IEnumerable<,IObservable<,T>,>,, которая, затем мерджится в одну последовательность IObservable<,T>,.
Особенность этого подхода в том, что в данном случае вызов MoveNext в цикле foreach является блокирующим.
З.Ы. Код залит в новый репо на гитхабе - https://github.com/SergeyTeplyakov/TplTipsAndTricks