2011-03-09 18 views
5

Amacıyla eşzamanlı erişim denetimi sırasını soyutlamak olan bir sınıf oluşturdum.Uzun süren görev iptal edildikten sonra nasıl düzgün temizlenir

Bu sınıf, tek bir iş parçacığı üzerinde, birden çok iş parçacığı tarafından yazılmış ve sonradan izleyen tek bir iş parçacığı tarafından okunacak şekilde tasarlanmıştır.

Bir engelleme döngüsü gerçekleştirecek ve bir öğe başarıyla dequeued durumunda bir olay tetikleyecek sınıf içinde oluşturulan tek bir uzun çalışan görevin var.

Sorum şu: Bu, uzun çalışan görevin iptal edilmesini ve ardından CancellationTokenSource nesnesinin doğru kullanımını temizliyor/sıfırlıyor mu?

İdeal olarak, sıranıza eklenecek kullanılabilirliği korurken, etkin bir nesnenin durdurulup yeniden başlatılmasını istiyorum. Ben esas Peter Bromberg'in makaleye kullandım

: Aşağıda Producer/Consumer Queue and BlockingCollection in C# 4.0

Kodu:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Test 
{ 
    public delegate void DeliverNextQueuedItemHandler<T>(T item); 

public sealed class SOQueueManagerT<T> 
{ 

    ConcurrentQueue<T> _multiQueue; 
    BlockingCollection<T> _queue; 
    CancellationTokenSource _canceller; 
    Task _listener = null; 

    public event DeliverNextQueuedItemHandler<T> OnNextItem; 

    public bool IsRunning { get; private set; } 
    public int QueueSize 
    { 
     get 
     { 
      if (_queue != null) 
       return _queue.Count; 
      return -1; 
     } 
    } 

    public CancellationTokenSource CancellationTokenSource 
    { 
     get 
     { 
      if (_canceller == null) 
       _canceller = new CancellationTokenSource(); 

      return _canceller; 
     } 
    } 

    public SOQueueManagerT() 
    { 
     _multiQueue = new ConcurrentQueue<T>(); 
     _queue = new BlockingCollection<T>(_multiQueue); 

     IsRunning = false; 
    } 

    public void Start() 
    { 
     if (_listener == null) 
     { 


      IsRunning = true; 

      _listener = Task.Factory.StartNew(() => 
      { 

       while (!CancellationTokenSource.Token.IsCancellationRequested) 
       { 
        T item; 
        if (_queue.TryTake(out item, 100)) 
        { 
         if (OnNextItem != null) 
         { 

          OnNextItem(item); 
         } 
        } 

       } 
      }, 
      CancellationTokenSource.Token, 
      TaskCreationOptions.LongRunning, 
      TaskScheduler.Default); 
     } 
    } 

    public void Stop() 
    { 
     if (_listener != null) 
     { 
      CancellationTokenSource.Cancel(); 
      CleanUp(); 
     } 
    } 

    public void Add(T item) 
    { 
     _queue.Add(item); 
    } 

    private void CleanUp() 
    { 
     _listener.Wait(2000); 
     if (_listener.IsCompleted) 
     { 
      IsRunning = false; 
      _listener = null; 
      _canceller = null; 
     } 
    } 


} 
} 

GÜNCELLEME İşte sonunda birlikte gitti budur. Mükemmel değil ama şu ana kadar işi yapıyor.

public sealed class TaskQueueManager<T> 
{ 
    ConcurrentQueue<T> _multiQueue; 
    BlockingCollection<T> _queue; 
    CancellationTokenSource _canceller; 
    Task _listener = null; 

    public event DeliverNextQueuedItemHandler<T> OnNextItem; 

    public bool IsRunning 
    { 
     get 
     { 
      if (_listener == null) 
       return false; 
      else if (_listener.Status == TaskStatus.Running || 
       _listener.Status == TaskStatus.Created || 
       _listener.Status == TaskStatus.WaitingForActivation || 
       _listener.Status == TaskStatus.WaitingToRun || 
       _listener.IsCanceled) 
       return true; 
      else 
       return false; 
     } 
    } 
    public int QueueSize 
    { 
     get 
     { 
      if (_queue != null) 
       return _queue.Count; 
      return -1; 
     } 
    } 

    public TaskQueueManager() 
    { 
     _multiQueue = new ConcurrentQueue<T>(); 
     _queue = new BlockingCollection<T>(_multiQueue); 
    } 

    public void Start() 
    { 
     if (_listener == null) 
     { 
      _canceller = new CancellationTokenSource(); 

      _listener = Task.Factory.StartNew(() => 
      { 
       while (!_canceller.Token.IsCancellationRequested) 
       { 
        T item; 
        if (_queue.TryTake(out item, 100)) 
        { 
         if (OnNextItem != null) 
         { 
          try 
          { 
           OnNextItem(item); 
          } 
          catch (Exception e) 
          { 
           //log or call an event 
          } 
         } 
        } 
       } 
      }, 
      _canceller.Token, 
      TaskCreationOptions.LongRunning, 
      TaskScheduler.Default); 
     } 
    } 

    public void Stop() 
    { 
     if (_listener != null) 
     { 
      _canceller.Cancel(); 

      if (_listener.IsCanceled && !_listener.IsCompleted) 
       _listener.Wait(); 

      _listener = null; 
      _canceller = null; 
     } 
    } 

    public void Add(T item) 
    { 
     if (item != null) 
     { 
      _queue.Add(item); 
     } 
     else 
     { 
      throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null"); 
     } 
    } 
} 

cevap

1
Dikkatle programlama, keseceği tek şeydir. İşlemi iptal etseniz bile, modaya uygun bir sürede tamamlanmayan bir bekleyen işleminiz olabilir. Kilitlenmeyen bir engelleme işlemi olabilir. Bu durumda programınız aslında sonlandırılmayacaktır.

Örneğin, CleanUp yöntemini birkaç kez veya Ara'yı çağırmadan çağırırsam, ilk önce kilitlenme hissi alıyorum.

Temizleme sırasında 2 saniyelik bir zaman aşımı planlanandan daha fazla keyfi hisseder ve aslında şeylerin düzgün şekilde kapanmasını veya kilitlenmesini/askıda kalmasını (bilinmeyen bir durumda eş zamanlı şeyler bırakmak istemediğinizden) emin olmak için giderdim.

Ayrıca, IsRunning açıkça belirtilmiş, nesnenin durumundan çıkarılmadı.

İlham almak için Son zamanlarda yazdığım benzer bir sınıfa bakmanızı isterim, bu bir arka plan iş parçacığında çalışan bir yapımcı/tüketici kalıbıdır. Bu kaynak kodunu CodePlex'da bulabilirsiniz. Yine de, bu çok özel bir sorunu çözmek için tasarlandı.

Burada, yalnızca çalışan iş parçacığının tanıdığı ve dolayısıyla kapanmaya başladığı belirli bir türün oluşturulmasıyla iptal işlemi çözülür. Bu aynı zamanda beklemedeki çalışmayı asla iptal etmememi sağlar, sadece tüm iş birimleri dikkate alınır.

Bu durumu iyileştirmek için, geçerli çalışma için ayrı bir zamanlayıcıya sahip olabilir ve iptal edilirse tamamlanmamış işi iptal edebilir veya geri alabilirsiniz. Şimdi, işleminin gibi bir davranışı uygulamak bazı deneme yanılmalarına yol açacaktır çünkü her olası köşe durumuna bakmanız ve kendinize sormanız gerekiyor, program çökerse ne olur? İdeal olarak, bu kod yollarının tümü, işinize devam edebileceğiniz, kurtarılabilir veya bilinen bir duruma yol açar. Ama tahmin ettiğim gibi, bu, dikkatli bir programlama ve çok fazla test yapacak.

+0

John: Evet, aynı zamanda Stop() öğesinin birden çok kez çağrılmasının büyük olasılıkla sorunlara neden olacağını fark ettim. Stop() yöntemini değiştirdim, böylece dönmeden önce görevin tamamlanmasını bekleyeceğim. Evet, bu bir engelleme çağrısı yapar, bu aşamada tamam. Bekleme (zaman aşımı) çağrısı üzerine iletilecek bir zaman aşımı sağlamak için Dur yönteminde bir geçersiz kılma sağlayabilirim. Yaptığınız IsRunning noktası geçerli olsa da geçerlidir. – MattC

+0

@MattC Kod örneğine baktınız mı? –

+0

Evet, ilginç ve yaptığınızı görüyorum. Benim ele aldığım özel konu, iç tüketici görevinin başlatılması, durdurulması ve yeniden başlatılmasının doğru bir şekilde ele alınmasıdır. Tüketici durdurulurken eklenecek kuyruğu için mutlu. CleanUp yöntemini tamamen kaldırdım ve IsRunning özelliğini özellikle görev durumuna göre kaldırdım. Bunu açık bırakacağım, sadece birisi TPL'yi kullanmamla ilgili bir cevap ekleyen birini kapatın. Yoksa seninkini alırım. – MattC

İlgili konular