2013-02-11 16 views
9

.NET yineleyici'yi paralel Görevler/Beklet ile kullanmak isterim ?. Bunun gibi bir şey: .NET, bunu yerel olarak ele alamamaktadır. @svick tarafından şimdiye kadar en iyi cevap - AsParallel() kullanın..NET 4.5'te paralel görevler nasıl oluşturulur

BONUS: Birden çok yayıncı ve tek bir aboneyi uygulayan basit bir uyumsuz/bekletme kodu var mı? Abone verilecek ve barlar işleyecek. (yalnızca çekirdek kitaplıkları)

cevap

11

Bu PLINQ için bir iş gibi görünüyor 'Kendi rulo' konusunda size ilham olacak:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s)); 

Bu kısa sürede tamamlanır tamamlanmaz her sonucu dönen, iş parçacığı sınırlı sayıda kullanılarak paralel olarak temsilci çalıştırır. ExecuteOrDownloadSomething() yöntem IO bağlı ise

(örn aslında bir şey indirebilir) ve async kullanarak sonra, konuları atık istemiyorum - mantıklı olabilir await, ama daha karmaşık olacaktır.

async'dan tam olarak yararlanmak istiyorsanız, IEnumerable döndürmemelisiniz, çünkü bu eşzamanlıdır (yani, herhangi bir öğe yoksa bloklar). Ne gerek asenkron koleksiyon çeşit, ve kullanabileceğiniz ISourceBlock (özellikle TransformBlock) bunun için TPL veri akışı den: kaynağıdır

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    var block = new TransformBlock<TSrc, TDest>(
     async s => await ExecuteOrDownloadSomethingAsync(s), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    foreach (var item in source) 
     block.Post(item); 

    block.Complete(); 

    return block; 
} 

ise “yavaş” (yani sonuçlarını işlemeye başlamak istiyorum Foo(), source yinelenen tamamlandı), foreach ve Complete() çağrıyı ayrı bir Task'a taşımak isteyebilirsiniz. Daha da iyi bir çözüm, source'u ISourceBlock<TSrc>'a da yapmak olabilir.

+0

tarafından desteklendiğini düşünürüm, ama bu sorunun nasıl çözüleceğine dair bir örnek verebilir misiniz? Teşekkürler! – Yurik

+0

@Yurik Neden bunu istiyorsun açıklar mısınız? – svick

+0

Çoğunlukla, "async 101" değil, gerçek bir dünya senaryosu olan yeni bir problemin söz dizimini beklememde yardımcı olacağını düşündüğümden. – Yurik

0

MS robotics ekibi tarafından yapılan eşzamansız kitaplıkta, eşzamanlı olmayan kod sağlamak için yineleyicinin kullanılmasına izin veren eşzamanlılık ilkelleri vardı.

Kütüphane (CCR) ücretsizdir (Ücretsiz olmak için kullanmadı). Güzel bir tanıtım makale burada bulunabilir: Concurrent affairs

Belki Net görev kütüphanede yanında bu kütüphaneyi kullanabilir, ya da

+0

açıklayabilir misiniz tam olarak nasıl CCR burada kullanırsınız? – svick

+0

Alıntı yaptığım makale, bunu olabildiğimden daha iyi açıklayabilir.Bir şeye bakarsanız ve şu şeklin kontrolünü yaparsanız: 'Şekil 6 SerialAsyncDemo', OP'nin sorduğu gibi bir kod örneğine sahiptir: Bir .Net yineleyici kullanan Async işlemleri. Bu yineleyici sözdiziminin, zamanının zekice olmasına rağmen, şimdi async/await sözdizimi – Toad

1

Yani gerçekten yapmak istediğinizi ne zaman tamamladıklarına bağlı olarak bir dizi görev sipariş etmektir. Bu korkunç karmaşık değil:

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks) 
{ 
    var input = tasks.ToList(); 

    var output = input.Select(task => new TaskCompletionSource<T>()); 
    var collection = new BlockingCollection<TaskCompletionSource<T>>(); 
    foreach (var tcs in output) 
     collection.Add(tcs); 

    foreach (var task in input) 
    { 
     task.ContinueWith(t => 
     { 
      var tcs = collection.Take(); 
      switch (task.Status) 
      { 
       case TaskStatus.Canceled: 
        tcs.TrySetCanceled(); 
        break; 
       case TaskStatus.Faulted: 
        tcs.TrySetException(task.Exception.InnerExceptions); 
        break; 
       case TaskStatus.RanToCompletion: 
        tcs.TrySetResult(task.Result); 
        break; 
      } 
     } 
     , CancellationToken.None 
     , TaskContinuationOptions.ExecuteSynchronously 
     , TaskScheduler.Default); 
    } 

    return output.Select(tcs => tcs.Task); 
} 

Yani burada biz o zaman görevlerin her geçmesi ve bir BlockingCollection sonraki tamamlama kaynağını alır ve bu hataların nedeni ayarlayan bir devamı ayarlamak, her giriş görev için bir TaskCompletionSource oluşturun. Tamamlanan ilk görev, geri gönderilen ilk tcs'i yakalar, ikinci görev tamamlandığında geri gönderilen ikinci tcs olur ve böyle devam eder.

Şimdi kod oldukça basit olur:

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item)) 
    .Order(); 
foreach(var task in tasks) 
{ 
    var result = task.Result;//or you could `await` each result 
    //.... 
} 
+0

Teşekkürler, ama ihtiyacım olan şey, bir yöntemden elde edilen verim olarak işlenmiş nesneler akışı elde etmektir. Ne sunduğunuz temelde bir Parallel.ForEach() rewrite. – Yurik

+0

@Yurik Eğer yapılması gereken tüm eşyaları beklemeniz gerekmiyorsa, 'WhenAll' /' WaitAll' kaldırabilirsiniz, fakat bunun dışında 'Seç'in ihtiyacınız olanı yapmadığını göremiyorum kendi içinde ve içinde. Bir dizi öğeniz var ve bunu, her öğe için bir görev dizisine dönüştürmek istiyorsunuz. 'Seç (item => LongRunningOperation (madde))', bir dizi Görev döndürdüğü için gereksinimlerinizi nasıl karşılamaz? – Servy

+0

Bu durumda, öğelerin sıralaması orijinal ile aynı olacaktır, bu da verimsiz olabilir. Eşyaların verilişinden sıyrılmam. – Yurik