2010-12-29 13 views
14

ConcurrentQueue ile diş nasıl çalışılır. DataTable döndüren bir işlem var. Her bir DataTable, sırayla, önceki DataTable ile birleştirilir. Bir sorun var, son BulkCopy'ye (OutOfMemory) kadar tutmak için çok fazla kayıt var. Ben kuyrukta ile çalışmanın en iyi yolu ne olacağını anlamaya çalışıyorum

Yani, ben hemen her gelen DataTable işlemek gerektiğini belirledi. ConcurrentQueue<T> düşünmek ... ama WriteQueuedData() yöntem bir tablo sıradan çıkarma ve veritabanına yazmak için nasıl tanımış görmüyorum. Örneğin

:

public class TableTransporter 
{ 
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>(); 

    public TableTransporter() 
    { 
     tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available 
    } 

    public void ExtractData() 
    { 
     DataTable table; 

     // perform data extraction 
     tableQueue.Enqueue(table); 
    } 

    private void WriteQueuedData(object sender, EventArgs e) 
    { 
     BulkCopy(e.Table); 
    } 
} 

ilk sorum, bir kenara ben ExtractData() ararsanız Aslında abone için bir olay yok olmasından uyumsuz buna ihtiyacım hepsi bu nedir? İkincisi, ConcurrentQueue<T> işlevlerinin yolunda eksik olan ve sıraya alınmış nesnelerle eşzamansız olarak çalışmak için bir tür tetikleyici gerektiren bir şey var mı?

Güncelleme Ben sadece bir OnItemQueued olay işleyicisi vardır ConcurrentQueue<T> gelen bir sınıf elde var. Sonra:

new public void Enqueue (DataTable Table) 
{ 
    base.Enqueue(Table); 
    OnTableQueued(new TableQueuedEventArgs(Table)); 
} 

public void OnTableQueued(TableQueuedEventArgs table) 
{ 
    EventHandler<TableQueuedEventArgs> handler = TableQueued; 

    if (handler != null) 
    { 
     handler(this, table); 
    } 
} 

Bu uygulama ile ilgili herhangi bir endişeniz var mı? Sorunun Benim anlayış

cevap

18

, bir kaç şey eksik.

eşzamanlı kuyruk birden konuları okuma ve açıkça veri yapısını kilitlemek gerek kalmadan kuyruğa yazma kabul etmek üzere tasarlanmış bir veri yapısıdır. (Tüm bu caz sahnelerin ardında halledilir, ya da koleksiyon bir kilit gerektirmeyecek şekilde uygulanır.)

Bunu göz önünde bulundurarak, denediğiniz modele benziyor. Kullanmak "Üretin/Tüketici" dir. Öncelikle, iş üreten bazı görevleriniz var (ve sıraya öğeler ekleyerek). İkincisi ise ikinci bir göreviniz var.

Gerçekten iki parçayı da istiyorsunuz: bir tane öğe ve ikinci bir çıkarma öğesi. Eşzamanlı bir koleksiyon kullandığınız için, öğelerin eklenmesi ve öğelerin kaldırılmasında birden fazla iş parçacığı ekleyerek birden çok iş parçacığınız olabilir. Ama tabii ki eşzamanlı sıradaki daha fazla çekişme, darboğaz haline gelen daha hızlıdır.

+0

2 ileti dizisinin olduğunu düşündüm. Ana iş parçacığı temel olarak olayın tetiklenmesini beklerdi. İkinci iş parçacığı, ExtractData() 'için eşzamansız çağrı olarak başlar. Eşzamanlı olmayan geri çağırma işleminde, sadece çıkarma işlemine devam edeceğim. – IAbstract

+0

Aslında, sanırım geriye bende var; ana iş parçacığı, veri yığınlarını sıraya koymalı; Daha sonra, sıralı öğe olayı tetiği aracılığıyla asenkron yazım yöntemine başlayın. – IAbstract

3

Bu benim ile geldi ne için komple bir çözümdür:

public class TableTransporter 
{ 
    private static int _indexer; 

    private CustomQueue tableQueue = new CustomQueue(); 
    private Func<DataTable, String> RunPostProcess; 
    private string filename; 

    public TableTransporter() 
    { 
     RunPostProcess = new Func<DataTable, String>(SerializeTable); 
     tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued); 
    } 

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) 
    { 
     // do something with table 
     // I can't figure out is how to pass custom object in 3rd parameter 
     RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); 
    } 

    public void ExtractData() 
    { 
     // perform data extraction 
     tableQueue.Enqueue(MakeTable()); 
     Console.WriteLine("Table count [{0}]", tableQueue.Count); 
    } 

    private DataTable MakeTable() 
    { return new DataTable(String.Format("Table{0}", _indexer++)); } 

    private string SerializeTable(DataTable Table) 
    { 
     string file = Table.TableName + ".xml"; 

     DataSet dataSet = new DataSet(Table.TableName); 

     dataSet.Tables.Add(Table); 

     Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); 
     string xmlstream = String.Empty; 

     using (MemoryStream memstream = new MemoryStream()) 
     { 
      XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); 
      XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); 

      xmlSerializer.Serialize(xmlWriter, dataSet); 
      xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); 

      using (var fileStream = new FileStream(file, FileMode.Create)) 
       fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); 
     } 
     filename = file; 

     return file; 
    } 

    private void PostComplete(IAsyncResult iasResult) 
    { 
     string file = (string)iasResult.AsyncState; 
     Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); 

     RunPostProcess.EndInvoke(iasResult); 
    } 

    public static String UTF8ByteArrayToString(Byte[] ArrBytes) 
    { return new UTF8Encoding().GetString(ArrBytes); } 

    public static Byte[] StringToUTF8ByteArray(String XmlString) 
    { return new UTF8Encoding().GetBytes(XmlString); } 
} 

public sealed class CustomQueue : ConcurrentQueue<DataTable> 
{ 
    public event EventHandler<TableQueuedEventArgs> TableQueued; 

    public CustomQueue() 
    { } 
    public CustomQueue(IEnumerable<DataTable> TableCollection) 
     : base(TableCollection) 
    { } 

    new public void Enqueue (DataTable Table) 
    { 
     base.Enqueue(Table); 
     OnTableQueued(new TableQueuedEventArgs(Table)); 
    } 

    public void OnTableQueued(TableQueuedEventArgs table) 
    { 
     EventHandler<TableQueuedEventArgs> handler = TableQueued; 

     if (handler != null) 
     { 
      handler(this, table); 
     } 
    } 
} 

public class TableQueuedEventArgs : EventArgs 
{ 
    #region Fields 
    #endregion 

    #region Init 
    public TableQueuedEventArgs(DataTable Table) 
    {this.Table = Table;} 
    #endregion 

    #region Functions 
    #endregion 

    #region Properties 
    public DataTable Table 
    {get;set;} 
    #endregion 
} 

kavram ispatı olarak, oldukça iyi iş gibi görünüyor. En çok 4 işçi iş parçacığı gördüm.

+0

TODO: daha yeni eşzamansız yöntemle güncelleyin. – IAbstract

+0

Bunun üzerinden bakıldığında, iyi bir uygulama, ancak, hızlı bir test yaptıktan sonra, bir öğe ne zaman dequeued olur? –

+0

@RichardPriddy: 5 yıldan uzun bir süre önce olduğu için (* ve uzun zamandır 3. şirketime geçtim *), bunun sadece tam bir örnek olmadığını varsayabilirim. Sonunda * konseptin * ispatı olduğuna dikkat ediniz. ;) Gereksinimlere bağlı olarak, 'seçkin' olayı ortaya çıkarabileceğinizi ve başka bir şeyin dequeueing işlemine izin verebileceğini söylediniz. Aksi takdirde, post process işlevinin 'AsyncCallback' kısmında bir yere dequeue yapmak mantıklı olabilir. Bu geç tarihte daha spesifik olanı kesin olarak belirlemek gerçekten zor olacaktır. – IAbstract

8

ConcurrentQueue'nun yalnızca birkaç durumda yararlı olduğunu düşünüyorum. Ana avantajı, kilitlenmemesidir. Bununla birlikte, genellikle üretici iplik (ler), tüketici ipliğini bir şekilde işlemek için elverişli veri olduğunu bildirmek zorundadır. İş parçacığı arasındaki bu sinyalizasyon kilitlere ihtiyaç duyar ve ConcurrentQueue kullanmanın yararını ortadan kaldırır. İş parçacığı senkronize etmenin en hızlı yolu, yalnızca bir kilit içinde çalışan Monitor.Pulse() yöntemini kullanmaktır. Diğer tüm senkronizasyon araçları daha yavaştır. Tabii ki, tüketiciler, kuyrukta çalışacak, ancak işlemci kaynaklarının büyük bir israfı olan kuyrukta bir şey olup olmadığını sürekli kontrol edebilirler. Tüketici kontrol arasında beklerse biraz daha iyi olur.

Sıraya yazarken bir iş parçacığı oluşturmak çok kötü bir fikirdir. Mabe 1 mikro saniyesini kaydetmek için ConcurrentQueue kullanılarak, 1000 kat daha uzun sürebilir, eventhandler çalıştırılarak tamamen boşa harcanacaktır.

Tüm işlem bir olay işleyicisinde veya bir zaman uyumsuz çağrısında gerçekleştirilirse, sorun hala bir sıraya ihtiyaç duyuyor mu? Verileri doğrudan işleyiciye iletin ve bir sıra kullanmayın.

ConcurrentQueue uygulamasının eşzamanlılık sağlamak için oldukça karmaşık olduğunu lütfen unutmayın. Çoğu durumda, normal bir Kuyruk <> öğesini daha iyi kullanın ve kuyruğa her erişimi kilitleyin. Kuyruk girişinin yalnızca mikrosaniye gerektirdiğinden, 2 iş parçacığının aynı mikrosaniye içinde kuyruğa erişmesi çok olası değildir ve kilitleme nedeniyle neredeyse hiç gecikme olmayacaktır. Kilitleme ile normal bir Queue <> kullanma ConcurrentQueue'den daha hızlı kod yürütme ile sonuçlanır.

+0

Aşağı oylamayı almanın utancı. Bence bu geçerli, pragmatik bir fikir. – user3085342

İlgili konular