2014-06-23 19 views
6

Paralel işleme ile bazı testler yürütüyordum ve bir tamsayı matrisi verilen her programın komşulara göre değerini yeniden hesaplayan bir program yaptım. CyclicBarrier üzerinde bir numara yapmamak için fikirler

I değerleri overriden olmaz, böylece matris bir kopyasını gerekli ve kısmi sorunlar çözülmüştür kez sonuçları birleştirmek için bir CyclicBarrier kullanılır:

CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() { 
    public void run() { 
     ParallelProcess.mergeResult(); 
    } 
}); 
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init 

Her görev matrisin bir kısmının atanır : Eşit parçalarla satırlara ayırıyorum. Ancak bölümlerin kesin olmadığı, bu yüzden iş parçacığı havuzuna gönderilmeyecek son sıraya karşılık gelen küçük bir parça olabileceği ortaya çıkabilir.

Örnek: 16 satırları ve n_tasks = 4 sorunum yoksa, 4 tanesinin tümü havuza gönderilir. Ama bunun yerine 18 olsaydı, ilk 16 olanlar gönderilecek, ancak son iki değil.

Bu durumda bir gönderim zorluyorum. Eh, aslında ben göndermiyorum çünkü bu ExecutorService e = Executors.newFixedThreadPool(n_tasks) gibi oluşturduğum bir sabit iş parçacığı havuzu kullanıyorum. Havuzdaki tüm yuvalar işgal edildiğinden ve iş parçacıkları engel tarafından engellendiğinden (mybarrier.await()run yönteminde çağrılır) havuza gönderemedim, bu yüzden Thread.start() kullanıyorum.

Şimdi konuya gidelim. CyclicBarrier için dikkate almam gerektiğinden, bu parçanın kalma olasılığı, tarafların sayısı 1 artırılmalıdır.

Ancak bu durum gerçekleşmediyse, bariyeri tetiklemek için kısa bir parti olurdum. .

benim çözüm ?:

if (lower_limit != n_rows) { // the remaining chunk to be processed 
    Thread t = new Thread(new ParallelProcess(lower_limit, n_rows)); 
    t.start(); 
    t.join(); 
} 
else { 
    cyclic_barrier.await(); 
} 

zorla bariyer yükseltmek cyclic_barrier.await() hile kullanırken Hile gibi hissediyorum ne var.

Bu sorunu çözebilmem için başka bir yol var mı, bu yüzden ne yaptığımı yapmak zorunda değildim? Bu, CyclicBarriers ile ilgili sorunuzu yanıtlamıyor olsa da, bir Phaser'u kullanmanızı tavsiye edebilir miyim?

+0

: bariyer gelmesi Geçen işçi olarak ('tarafından döndürülen değeri ile kontrol barrier.await() '), ilerlemek için kalan öğelerin olup olmadığını kontrol edebilirsiniz. Evet ise, işçi, soldan fazla öğeyi işlemek için yeni görev gönderebilir.Belki de, 't.join() 'ile gösterdiğiniz gibi,' barrier.await() 'ın ötesinde' mergeResult() 'içinde bazı ek senkronizasyon kullanmaya ihtiyaç duyacaksınız. –

+0

@VictorSorokin “await()” işlevinin dönüş değerini kontrol etmenin, sol kısım olup olmadığını kontrol etmeme yardımcı olacağını anlamıyorum. – dabadaba

+0

Önceden bilinen "satırlar" ve "n_threads" değerleri değil (görevleri göndermeye başlamadan önce)? –

cevap

1

Bu, parti sayısını dahil etme yeteneğine sahiptir ve ayrıca bir aşama tetiklendiğinde mergeResult'u çalıştırmanıza izin verir.

Bu nedenle, bir uyumsuz hesaplama gerçekleştirmeden önce, yalnızca register. Sonra bu hesaplama içinde iş parçacığı phaser'a varır. Tüm iş parçacıkları geldiğinde, faz ilerler ve onAdvance geçersiz bir yöntemini çağırabilir.

gönderme:

ParallelProcess process = new ParallelProcess(lower_limit, n_rows)); 
phaser.register(); 
executor.submit(process); 

işlemci

public void run(){ 
    //do stuff 
    phaser.arrive(); 
} 

fazer Bunun yaklaşık

Phaser phaser = new Phaser(){ 
    protected boolean onAdvance(int phase, int registeredParties) { 
     ParallelProcess.mergeResult(); 
     return true; 
    } 
} 
İlgili konular