Amacıyla eşzamanlı erişim denetimi sırasını soyutlamak olan bir sınıf oluşturdum.Uzun süren görev iptal edildikten sonra nasıl düzgün temizlenir
Bu sınıf, tek bir iş parçacığı üzerinde, birden çok iş parçacığı tarafından yazılmış ve sonradan izleyen tek bir iş parçacığı tarafından okunacak şekilde tasarlanmıştır.
Bir engelleme döngüsü gerçekleştirecek ve bir öğe başarıyla dequeued durumunda bir olay tetikleyecek sınıf içinde oluşturulan tek bir uzun çalışan görevin var.
Sorum şu: Bu, uzun çalışan görevin iptal edilmesini ve ardından CancellationTokenSource
nesnesinin doğru kullanımını temizliyor/sıfırlıyor mu?
İdeal olarak, sıranıza eklenecek kullanılabilirliği korurken, etkin bir nesnenin durdurulup yeniden başlatılmasını istiyorum. Ben esas Peter Bromberg'in makaleye kullandım
: Aşağıda Producer/Consumer Queue and BlockingCollection in C# 4.0
Kodu:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
public delegate void DeliverNextQueuedItemHandler<T>(T item);
public sealed class SOQueueManagerT<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning { get; private set; }
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public CancellationTokenSource CancellationTokenSource
{
get
{
if (_canceller == null)
_canceller = new CancellationTokenSource();
return _canceller;
}
}
public SOQueueManagerT()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
IsRunning = false;
}
public void Start()
{
if (_listener == null)
{
IsRunning = true;
_listener = Task.Factory.StartNew(() =>
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
OnNextItem(item);
}
}
}
},
CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
CancellationTokenSource.Cancel();
CleanUp();
}
}
public void Add(T item)
{
_queue.Add(item);
}
private void CleanUp()
{
_listener.Wait(2000);
if (_listener.IsCompleted)
{
IsRunning = false;
_listener = null;
_canceller = null;
}
}
}
}
GÜNCELLEME İşte sonunda birlikte gitti budur. Mükemmel değil ama şu ana kadar işi yapıyor.
public sealed class TaskQueueManager<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning
{
get
{
if (_listener == null)
return false;
else if (_listener.Status == TaskStatus.Running ||
_listener.Status == TaskStatus.Created ||
_listener.Status == TaskStatus.WaitingForActivation ||
_listener.Status == TaskStatus.WaitingToRun ||
_listener.IsCanceled)
return true;
else
return false;
}
}
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public TaskQueueManager()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
}
public void Start()
{
if (_listener == null)
{
_canceller = new CancellationTokenSource();
_listener = Task.Factory.StartNew(() =>
{
while (!_canceller.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
try
{
OnNextItem(item);
}
catch (Exception e)
{
//log or call an event
}
}
}
}
},
_canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
_canceller.Cancel();
if (_listener.IsCanceled && !_listener.IsCompleted)
_listener.Wait();
_listener = null;
_canceller = null;
}
}
public void Add(T item)
{
if (item != null)
{
_queue.Add(item);
}
else
{
throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
}
}
}
John: Evet, aynı zamanda Stop() öğesinin birden çok kez çağrılmasının büyük olasılıkla sorunlara neden olacağını fark ettim. Stop() yöntemini değiştirdim, böylece dönmeden önce görevin tamamlanmasını bekleyeceğim. Evet, bu bir engelleme çağrısı yapar, bu aşamada tamam. Bekleme (zaman aşımı) çağrısı üzerine iletilecek bir zaman aşımı sağlamak için Dur yönteminde bir geçersiz kılma sağlayabilirim. Yaptığınız IsRunning noktası geçerli olsa da geçerlidir. – MattC
@MattC Kod örneğine baktınız mı? –
Evet, ilginç ve yaptığınızı görüyorum. Benim ele aldığım özel konu, iç tüketici görevinin başlatılması, durdurulması ve yeniden başlatılmasının doğru bir şekilde ele alınmasıdır. Tüketici durdurulurken eklenecek kuyruğu için mutlu. CleanUp yöntemini tamamen kaldırdım ve IsRunning özelliğini özellikle görev durumuna göre kaldırdım. Bunu açık bırakacağım, sadece birisi TPL'yi kullanmamla ilgili bir cevap ekleyen birini kapatın. Yoksa seninkini alırım. – MattC