2011-06-10 18 views
6

Şu anda bir proje üzerinde çalışıyorum, paralel olarak öğeleri işlemek için bir meydan okuma var. Şimdiye kadar büyük bir anlaşma değil;) Şimdi sorun. Her ID için bir StoredProcedure olarak adlandırdığımız periyodik olarak (her 2 saniyede bir) kimlikler listesi vardır. 2 sn'nin, çalışma sırasında eklendiği ve çıkarıldığı için her bir öğe için ayrı ayrı kontrol edilmesi gerekir. Buna ek olarak, DB aynı anda 300 iş parçacığı ile su basmaması gerektiği için maksimum paralellik derecesini yapılandırmak istiyoruz. İşlenmekte olan bir öğe, bir önceki yürütme işlemini tamamlayana kadar işleme için yeniden planlanmamalıdır. Nedeni, DB'de gecikmeler olması durumunda, bir çok öğeyi sıraya almayı engellemek istememizdir. Şu anda, işlenecek ürünlerin periyodik olarak kontrol edilmesini gerektiren, ana iş parçacığı olan, kendi geliştirdiğimiz bir bileşen kullanıyoruz. Listeye sahip olduktan sonra, bunları özel IOCP tabanlı iş parçacığı havuzuna bırakıyor ve ardından işlenecek öğeleri beklemek için waithandles işlevini kullanıyor. Sonra bir sonraki iterasyon başlar. IOCP, çaldığı çalma nedeniyle sağladı.TPL mimari soru

Bu özel uygulamayı bir TPL/.NET 4 sürümü ile değiştirmek istiyorum ve bunu nasıl çözeceğinizi (ideal olarak basit ve güzel okunabilir/sürdürülebilir) öğrenmek istiyorum. Bu makaleyi biliyorum: http://msdn.microsoft.com/en-us/library/ee789351.aspx, ancak yalnızca kullanılan iş parçacığı miktarını sınırlıyor. Yaprakları çalmak, periyodik olarak öğeleri çalıştırmak ....

İdeal olarak, bir liste listesi için periyodik olarak yapılması gereken bazı görevler için kullanılabilecek genel bir bileşen haline gelecektir.

herhangi bir giriş karşılama, Martin

+0

Reaktif Programlama –

cevap

9

Senin bu doğrudan VUK Tasks ile aşağı ve kirli almak gerekir sanmıyorum Tia. Yeni başlayanlar için ConcurrentQueue (varsayılan) etrafında BlockingCollection (varsayılan) BlockingCollection üzerinde işlem görmesi gereken kimlikleri saklamak için ayarlanmış hiçbir BoundedCapacity ayarlayın.

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) 
BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 

Oradan sadece BlockingCollection::GetConsumingEnumerable döndü numaralandırma üzerinde Parallel::ForEach kullanmak. ForEach numaralı telefondan, ParallelOptions::MaxDegreeOfParallelism'unuzu kuracaksınız. ForEach'un gövdesinin içinde saklı yordamınızı gerçekleştireceksiniz.

Şimdi, saklı yordam yürütme işlemi tamamlandıktan sonra, yürütmeyi en az iki saniye olarak yeniden programlamak istemediğinizi söylüyorsunuz. Sorun değil, bir geri bildirimde bulunan System.Threading.Timer numaralı programı, geri bildirimde bulunan geri bildirimde kimliği BlockingCollection'a geri ekleyeceksiniz. Kapatma işlemi sırasında

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(), 
    new ParallelOptions 
    { 
     MaxDegreeOfParallelism = 4 // read this from config 
    }, 
    (id) => 
    { 
     // ... execute sproc ... 

     // Need to declare/assign this before the delegate so that we can dispose of it inside 
     Timer timer = null; 

     timer = new Timer(
      _ => 
      { 
       // Add the id back to the collection so it will be processed again 
       idsToProcess.Add(id); 

       // Cleanup the timer 
       timer.Dispose(); 
      }, 
      null, // no state, id wee need is "captured" in the anonymous delegate 
      2000, // probably should read this from config 
      Timeout.Infinite); 
    } 

Son olarak, enumerable durdurma engelleme ve tam ve Parallel ile işleniyor böylece BlockingCollection::CompleteAdding çağırır :: ForEach çıkılacak. Bu bir Windows servisi olsaydı, örneğin bunu OnStop'da yapardınız. Güncelleme

// When ready to shutdown you just signal you're done adding 
idsToProcess.CompleteAdding(); 

Sen herhangi bir noktada kimlikleri büyük miktarda işlem olabilir yorumunuzda geçerli bir endişe kaldırdı ve kimliği başına bir zamanlayıcı çok fazla havai olacağını korku .Buna kesinlikle katılıyorum. Yani eşzamanlı olarak ID'lerin büyük bir listesiyle uğraşmanız durumunda, tek bir kısa aralıklı zamanlayıcı tarafından izlenen "uyku" kimliklerini tutmak için başka bir kuyruğu kullanmak için bir kronometre başına zamanlayıcı kullanmaktan değiştirirdim. İlk, üzerine uykuda kimliklerini yerleştirmek için bir ConcurrentQueue gerekir:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>(); 

Şimdi, açıklama amacıyla burada iki parçasını Tuple kullanıyorum, ama daha kesinlikle yazılı oluşturmak isteyebilirsiniz Daha iyi okunabilirlik için (ya da en azından using bildirimi ile en az takma) yapı. Tuple, kimliği ve kuyruğa koyulduğunu gösteren bir DateTime'a sahiptir.

Sonra
Timer wakeSleepingIdsTimer = new Timer(
    _ => 
    { 
     DateTime utcNow = DateTime.UtcNow; 

     // Pull all items from the sleeping queue that have been there for at least 2 seconds 
     foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) 
     { 
      // Add this id back to the processing queue 
      idsToProcess.Enqueue(id); 
     } 
    }, 
    null, // no state 
    Timeout.Infinite, // no due time 
    100 // wake up every 100ms, probably should read this from config 
); 

basitçe yerine aşağıdakilerden her biri için bir zamanlayıcı ayarlama yapmak Parallel::ForEach değiştirecek:

Şimdi de kurulum için bu kuyruğa izleyecek zamanlayıcı isteyeceksiniz

(id) => 
{ 
     // ... execute sproc ... 

     sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
} 
+0

güzel fikir koştu yana olmuştur iki saniye olmadığını kontrol, ama bu bir kaynak sorunu biraz oluşturacağını düşünmüyor musunuz? Yani, örneğin Listedeki 500 eleman, biraz zaman aşımına uğrayan zamanlayıcıların büyük miktarından endişe duyuyorum ... –

+0

Bunun hakkında düşündüm, ama net sınırlar vermediniz, bu yüzden bu sizin bir araya gelip gelmediğine dair bir yanıt bekliyordum. ihtiyacı vardır. Bunu, başka bir kuyrukla ve kuyruğa giren girdiler için kuyruğu izleyen ve onları ana iş kuyruğuna geri döndüren tek bir zamanlayıcı ile kolayca düzeltebilirsiniz. Cevabımın ayrıntılarını ekleyeceğim. –

1

Bu, sorunuzu zaten belirttiğiniz yaklaşımla oldukça benzer, ancak TPL görevleriyle bunu yapıyor. Bir görev, bittiğinde zamanlaması gereken şeylerin bir listesini tekrar ekler.

düz bir liste üzerinde kilitleme kullanımı bu örnekte oldukça çirkin

, muhtemelen Aslında gerçekleştiren SchedulingLoop, başlatıldığında

// Fill the idsToSchedule 
for (int id = 0; id < 5; id++) 
{ 
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id)); 
} 

// LongRunning will tell TPL to create a new thread to run this on 
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning); 

planlamak için şeylerin listesini tutmak için daha iyi bir koleksiyon isteyeyim şey

// Tuple of the last time an id was processed and the id of the thing to schedule 
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>(); 
static int currentlyProcessing = 0; 
const int ProcessingLimit = 3; 

// An event loop that performs the scheduling 
public static void SchedulingLoop() 
{ 
    while (true) 
    { 
     lock (idsToSchedule) 
     { 
      DateTime currentTime = DateTime.Now; 
      for (int index = idsToSchedule.Count - 1; index >= 0; index--) 
      { 
       var scheduleItem = idsToSchedule[index]; 
       var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds; 

       // start it executing in a background task 
       if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit) 
       { 
        Interlocked.Increment(ref currentlyProcessing); 

        Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun); 

        // Schedule this task to be processed 
        Task.Factory.StartNew(() => 
         { 
          Console.WriteLine("Executing {0}", scheduleItem.Item2); 

          // simulate the time taken to call this procedure 
          Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500); 

          lock (idsToSchedule) 
          { 
           idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2)); 
          } 

          Console.WriteLine("Done Executing {0}", scheduleItem.Item2); 
          Interlocked.Decrement(ref currentlyProcessing); 
         }); 

        // remove this from the list of things to schedule 
        idsToSchedule.RemoveAt(index); 
       } 
      } 
     } 

     Thread.Sleep(100); 
    } 
}