2013-07-29 17 views
5

Bir corpus'ta NGram'ların frekanslarını sayan bir program yazarım. Bunu nasılConduit: Çoklu Akış Tüketicileri

tokens --- trigrams --- countFreq 

: ben sadece bir dere kaynağına bir akım tüketiciyi bağlayabilirsiniz Şu anda

ngram :: Monad m => Int -> Conduit t m [t] 
trigrams = ngram 3 
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int) 

: Zaten jeton bir dere tüketir ve bir tek düzenin NGrams üreten bir işleve sahip Birden fazla akış tüketicisini aynı akış kaynağına mı bağlarım? Böyle bir şey istiyorum:

  .--- unigrams --- countFreq 
      |--- bigrams --- countFreq 
tokens ----|--- trigrams --- countFreq 
      '--- ...  --- countFreq 

artı paralel

DÜZENLEME her tüketici çalıştırmak olacaktır: Petr sayesinde bu çözümü

spawnMultiple orders = do 
    chan <- atomically newBroadcastTMChan 

    results <- forM orders $ \_ -> newEmptyMVar 
    threads <- forM (zip results orders) $ 
         forkIO . uncurry (sink chan) 

    forkIO . runResourceT $ sourceFile "test.txt" 
         $$ javascriptTokenizer 
         =$ sinkTMChan chan 

    forM results readMVar 

    where 
     sink chan result n = do 
      chan' <- atomically $ dupTMChan chan 
      freqs <- runResourceT $ sourceTMChan chan' 
           $$ ngram n 
           =$ frequencies 
      putMVar result freqs 
ile geldi
+0

“Jetonlar” bir değer verdiğinde, tüm 'gram'larınız bunu alır mı? –

cevap

5

Tüm lavabolarınızın tüm değerleri almasını istediğinizi varsayıyorum.

Ben öneririm:

  1. kullanın newBroadcastTMChan yeni kanalı Control.Concurrent.STM.TMChan (STM-chans) oluşturmak için.
  2. Ana üreticiniz için Data.Conduit.TMChan (stm-conduit) adresinden sinkTBMChan kullanarak bir lavabo oluşturmak için bu kanalı kullanın.
  3. Her istemci, okumak için kendi kopyasını oluşturmak üzere dupTMChan kullanın. Bu kopyayı sourceTBMChan kullanarak okuyacak yeni bir konu başlatın.
  4. Konuları kendi cihazınızdan toplayın.
  5. İstemcilerin, verileri üretildikleri kadar hızlı okuyabildiğinden emin olun, aksi halde yığın taşması alabilirsiniz.

(ı bize nasıl çalıştığını bildirin bunu denemedim.)


Güncelleme: Eğer sonuçları toplamak nasıl bir yolu olduğunu her tüketici parçacığı için MVar oluşturmak için . Her biri bitiminden sonra onun sonucu putMVar olur. Ve ana iş parçacığınız bu s üzerinde takeMVar olur, böylece her iş parçacığının bitmesini bekler. Örneğin, vars, MVar s listesinin bir listesi ise, ana iş parçacığı, tüm sonuçları toplamak için mapM takeMVar vars numarasını verir.

+0

Cevabınız için teşekkürler, threadIO'yu iş parçacığı ile doğurursam sonuçları nasıl toplarım? – SvenK

+0

@SvenK Yanıtı, sonuçları nasıl toplayacağına dair bir fikirle güncelledim. –

+0

TMChan neden bir yayın sürümüne sahip ve TBMChan çalışmıyor, 'newBroadcastTBMChan'ı nerede bulabilirim? – CMCDragonkai