2012-04-03 16 views
5

tamamlandığında, bir dizi ileti dizisinden sonuçları işlemek için bir CompletionService kullanmak istiyorum. Kullanıma hazır olduklarında gelecekteki nesneleri almak için bir döngüde hizmetim var, ancak tüm iş parçacıklarının ne zaman tamamlandığını (ve dolayısıyla döngüden çıkıp çıkamayacağını) belirlemenin en iyi yolunu bilmiyorum:Bir Tamamlama Hizmeti'nin sonuçları teslim etmeyi bitirdiğinde nasıl bilinir?

import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.ThreadPoolExecutor; 

public class Bar { 

    final static int MAX_THREADS = 4; 
    final static int TOTAL_THREADS = 20; 

    public static void main(String[] args) throws Exception{ 

     final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS); 
     final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool); 

     for (int i=0; i<TOTAL_THREADS; i++){ 
      service.submit(new MyCallable(i)); 
     } 

     int finished = 0; 
     Future<Integer> future = null; 
     do{ 
      future = service.take(); 
      int result = future.get(); 
      System.out.println(" took: " + result); 
      finished++;    

     }while(finished < TOTAL_THREADS); 

     System.out.println("Shutting down"); 
     threadPool.shutdown(); 
    } 


    public static class MyCallable implements Callable<Integer>{ 

     final int id; 

     public MyCallable(int id){ 
      this.id = id; 
      System.out.println("Submitting: " + id); 
     } 

     @Override 
     public Integer call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("finished: " + id); 
      return id; 
     } 
    } 
} 

ThreadPoolExecutor öğesinin durumunu denetlemeyi denedim, ancak getCompletedTaskCount ve getTaskCount yöntemlerinin yalnızca yaklaşık değerler olduğunu biliyorum ve bunlara güvenmemeliyim. Tamamlandıktan sonra tüm Vadeli İşlemleri tamamladığımı garanti etmenin daha iyi bir yolu var mı?


Düzenleme: Nobeh sağlanan bağlantı ve this link Hem önermek o zaman take çağırarak, gönderilen görevlerin sayısını sayma() birçok kez, gitmek yoludur. Tamamen CompletionService veya Executor'a iade edilmek için ne bırakıldığını sormanın bir yolu olmadığına şaşırdım.

cevap

3

Bu sorulara cevap vermek size cevabı verir mi?

  • Eşzamansız görevleriniz, CompletionService'a gönderilen diğer görevleri yapıyor mu?
  • service, uygulamanızda oluşturulan görevleri yerine getirmesi gereken nesne mi?

reference documentation dayanarak, CompletionService tüketici/Üretici yaklaşım üzerine hareket eder ve bir iç Executor yararlanır. Yani, görevleri tek bir yerde üretip başka bir yerde tüketirseniz, daha fazla sonuç çıkması durumunda CompletionService.take() belirtilecektir.

this question'un da size yardımcı olduğuna inanıyorum.

+0

Teşekkürler, nobeh. "Thread (int tasksHandled = 0; tasksHandled

+1

API'daki örnek, arka arkaya take() n kere yürütme ile aynı yaklaşımı kullanır. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html –

+0

CompletionService'den sonuçlar farklı bir iş parçacığında Exector'a gönderilen görevlerden alınmışsa, bu iş parçacığı güvenli midir? – raffian

3

Aradığınızı yapmak için ExecutorCompletionService'in nasıl genişletileceği konusunda iyi bir öneri için http://www.javaspecialists.eu/archive/Issue214.html adresine bakın. Size kolaylık sağlamak için ilgili kodu yapıştırdım. Yazar ayrıca servis uygulamasının Iterable'yi iyi bir fikir olacağını düşünüyorum.

FWIW, Bu gerçekten standart uygulamanın bir parçası olması gerektiğine katılıyorum, ama ne yazık ki değil.

import java.util.concurrent.*; 
import java.util.concurrent.atomic.*; 

public class CountingCompletionService<V> extends ExecutorCompletionService<V> { 
    private final AtomicLong submittedTasks = new AtomicLong(); 
    private final AtomicLong completedTasks = new AtomicLong(); 

    public CountingCompletionService(Executor executor) { 
    super(executor); 
    } 

    public CountingCompletionService(
     Executor executor, BlockingQueue<Future<V>> queue) { 
    super(executor, queue); 
    } 

    public Future<V> submit(Callable<V> task) { 
    Future<V> future = super.submit(task); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> submit(Runnable task, V result) { 
    Future<V> future = super.submit(task, result); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> take() throws InterruptedException { 
    Future<V> future = super.take(); 
    completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll() { 
    Future<V> future = super.poll(); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll(long timeout, TimeUnit unit) 
     throws InterruptedException { 
    Future<V> future = super.poll(timeout, unit); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public long getNumberOfCompletedTasks() { 
    return completedTasks.get(); 
    } 

    public long getNumberOfSubmittedTasks() { 
    return submittedTasks.get(); 
    } 

    public boolean hasUncompletedTasks() { 
    return completedTasks.get() < submittedTasks.get(); 
    } 
} 
İlgili konular