2014-09-07 14 views
5

C++ 11 bir kilit serbest birden yapımcı, çoklu tüketici kuyruğu uygulamaya çalışıyorum. Bunu bir öğrenme alıştırması olarak yapıyorum, bu yüzden mevcut bir açık kaynak uygulamasını kullanabileceğimin farkındayım, ancak kodumun neden çalışmadığını öğrenmek istiyorum. Veriler bir çalma cihazında saklanır, görünüşe göre bir "sınırlı MPMC kuyruğu" dur.Kilit-Free Çoklu Üretici/Tüketici Kuyruk C++ 11

Ben bölücünün okuduklarıma oldukça yakından modelledik oldum. Fark ettiğim şey, tek bir tüketici ve tek/çok sayıda üretici ile kesinlikle iyi çalışabilmesi, onu kırmak gibi görünen sadece birden fazla tüketici.

İşte kuyruk var:

template <typename T> 
class Queue : public IQueue<T> 
{ 
public: 
    explicit Queue(int capacity); 
    ~Queue(); 

    bool try_push(T value); 
    bool try_pop(T& value); 
private: 
    typedef struct 
    { 
     bool readable; 
     T value; 
    } Item; 

    std::atomic<int> m_head; 
    std::atomic<int> m_tail; 
    int m_capacity; 
    Item* m_items; 
}; 

template <typename T> 
Queue<T>::Queue(int capacity) : 
m_head(0), 
m_tail(0), 
m_capacity(capacity), 
m_items(new Item[capacity]) 
{ 
    for(int i = 0; i < capacity; ++i) 
    { 
     m_items[i].readable = false; 
    } 
} 

template <typename T> 
Queue<T>::~Queue() 
{ 
    delete[] m_items; 
} 

template <typename T> 
bool Queue<T>::try_push(T value) 
{ 
    while(true) 
    { 
     // See that there's room 
     int tail = m_tail.load(std::memory_order_acquire); 
     int new_tail = (tail + 1); 
     int head = m_head.load(std::memory_order_acquire); 

     if((new_tail - head) >= m_capacity) 
     { 
      return false; 
     } 

     if(m_tail.compare_exchange_weak(tail, new_tail, std::memory_order_acq_rel)) 
     { 
      // In try_pop, m_head is incremented before the reading of the value has completed, 
      // so though we've acquired this slot, a consumer thread may be in the middle of reading 
      tail %= m_capacity; 

      std::atomic_thread_fence(std::memory_order_acquire); 
      while(m_items[tail].readable) 
      { 
      } 

      m_items[tail].value = value; 
      std::atomic_thread_fence(std::memory_order_release); 
      m_items[tail].readable = true; 

      return true; 
     } 
    } 
} 

template <typename T> 
bool Queue<T>::try_pop(T& value) 
{ 
    while(true) 
    { 
     int head = m_head.load(std::memory_order_acquire); 
     int tail = m_tail.load(std::memory_order_acquire); 

     if(head == tail) 
     { 
      return false; 
     } 

     int new_head = (head + 1); 

     if(m_head.compare_exchange_weak(head, new_head, std::memory_order_acq_rel)) 
     { 
      head %= m_capacity; 

      std::atomic_thread_fence(std::memory_order_acquire); 
      while(!m_items[head].readable) 
      { 
      } 

      value = m_items[head].value; 
      std::atomic_thread_fence(std::memory_order_release); 
      m_items[head].readable = false; 

      return true; 
     } 
    } 
} 

Ve burada kullanıyorum testi var: doğru yönde herhangi Nudging büyük takdir

void Test(std::string name, Queue<int>& queue) 
{ 
    const int NUM_PRODUCERS = 64; 
    const int NUM_CONSUMERS = 2; 
    const int NUM_ITERATIONS = 512; 
    bool table[NUM_PRODUCERS*NUM_ITERATIONS]; 
    memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool)); 

    std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS); 

    std::chrono::system_clock::time_point start, end; 
    start = std::chrono::system_clock::now(); 

    std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS); 
    std::atomic<int> push_count (0); 

    for(int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id) 
    { 
     threads[thread_id] = std::thread([&queue,thread_id,&push_count]() 
           { 
            int base = thread_id * NUM_ITERATIONS; 

            for(int i = 0; i < NUM_ITERATIONS; ++i) 
            { 
             while(!queue.try_push(base + i)){}; 
             push_count.fetch_add(1); 
            } 
           }); 
    } 

    for(int thread_id = 0; thread_id < (NUM_CONSUMERS); ++thread_id) 
    { 
     threads[thread_id+NUM_PRODUCERS] = std::thread([&]() 
             { 
              int v; 

              while(pop_count.load() > 0) 
              { 
               if(queue.try_pop(v)) 
               { 
                if(table[v]) 
                { 
                 std::cout << v << " already set" << std::endl; 
                } 
                table[v] = true; 
                pop_count.fetch_sub(1); 
               } 
              } 
             }); 

    } 

    for(int i = 0; i < (NUM_PRODUCERS + NUM_CONSUMERS); ++i) 
    { 
     threads[i].join(); 
    } 

    end = std::chrono::system_clock::now(); 
    std::chrono::duration<double> duration = end - start; 

    std::cout << name << " " << duration.count() << std::endl; 

    std::atomic_thread_fence(std::memory_order_acq_rel); 

    bool result = true; 
    for(int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i) 
    { 
     if(!table[i]) 
     { 
      std::cout << "failed at " << i << std::endl; 
      result = false; 
     } 
    } 
    std::cout << name << " " << (result? "success" : "fail") << std::endl; 
} 

. Her şey için bir muteks kullanmaktan ziyade, hafıza çitler için oldukça yeniyim, bu yüzden muhtemelen sadece bir şeyi yanlış anlamaya çalışıyorum.

Alkış J

+4

Sen sınırlandırılmış bir * * MPMC kuyruk yapıyolar açıklamanızda eklemek gerekir. Bu oldukça önemli bir özellik. –

+0

Ben iplik çit acquire/sürümdeki asimetri sevmiyorum – Joe

+0

) = sayesinde daha önce bu terimi hiç duymamıştım. Bunun doğru olduğuna emin misin? – LumpN

cevap

-3
#pragma once 

#include <atomic> 
#include <vector> 

/// <summary>Lock Free MPMC Queue - to solve Producer(1..n)/Consumer(1..n) problem.</summary> 
template <typename T> class QueueMpmcLockFree 
{ 
private: 
    // Valid data range. 
    std::atomic<int> m_atomicIndexBack = -1; // Last element. 
    std::atomic<int> m_atomicIndexBackPushing = -1; // Last element being pushed. 
    std::atomic<int> m_atomicIndexFront = 0; // Current element to be popped. May not be ready yet. 
    // Queue in a cyclic buffer. 
    const int m_sizeLimit; 
    std::vector<T> m_queue; 

public: 
    /// <summary>Ctor. Initialize the queue with specified size.</summary> 
    explicit QueueMpmcLockFree(const int sizeLimit = 10) 
     : m_sizeLimit(sizeLimit) 
     , m_queue(m_sizeLimit) // Fill the queue with default value. 
    { 
    } 

    /// <summary>Returns the number of messages.</summary> 
    int Peek() 
    { 
     return m_atomicIndexBack - m_atomicIndexFront + 1; 
    } 

    /// <summary>Push an item to the back of the queue. Move. 
    /// Attempts to put a message into the queue. Return true if the message is sent successfully. 
    /// If the message could not be sent immediately, typically because 
    /// the queue is full, false is returned. 
    /// </summary> 
    bool TryPush(T && itemToAddByMoving) 
    { 
     if (Peek() == m_sizeLimit) return false; 

     const int indexNext = ++m_atomicIndexBackPushing; // Get next position to add item. 
     m_queue[indexNext % m_sizeLimit] = std::move(itemToAddByMoving); // Moving item takes time. 
     ++m_atomicIndexBack; // Item pushed and ready. 
     return true; 
    } 

    /// <summary>Return true if a message is popped.</summary> 
    bool TryPop(T & poppedMsg) 
    { 
     if (Peek() <= 0) return false; 

     const int indexPop = m_atomicIndexFront++; // Get current position and set next one. 
     poppedMsg = m_queue[indexPop % m_sizeLimit]; // Copy message. 
     return true; 
    } 
}; 
+3

Bu nasıl kilitlenmeyen bir kuyruk hakkında soruya cevap veriyor? –

+1

Gerçekten, bu kilitlenmez. – Joe

+0

Hadi, 10 iş parçacığı için TryPop() ve Peek() adında 10 iş parçacığı döndürdüğünü varsayalım. Kuyrukta bir 1 giriş olsa bile, 10 iş parçacığının tamamı ön 10 kez hareket ederdi. Ayrıca, TryPush olarak adlandırılan 10 yazı parçacığı, yalnızca 1 giriş için yer olduğunda devam eder. Bunlar ciddi hatalar ve mpmc değil. – edwinc

3

Ben Moody Camel 'ın uygulanmasına bir görünüm vermek istiyorum.

Tamamen C++ 11 ile yazılmış C++ için hızlı genel amaçlı kilit içermeyen kuyruk olduğunu. Belgeleme, birkaç performans testi ile birlikte oldukça iyi görünüyor. tüm diğer ilginç şeylerin yanı sıra

basitleştirilmiş BSD lisansı altında, bu tek başlığında yer alan tüm ve kullanılabilir (onlar zaten okumak değersin). Sadece projenize bırakın ve keyfini çıkarın!

+0

İyi bul, teşekkürler :) – Joe