2011-11-02 11 views
7
araya giriyor

Her 10 saniyede bir sabit iş parçacığı havuzu ExecutorService ve her biri 20 saniye bekleyen ve bunların kesintilerini kaydeden 100 Callable 'luk bir listeye sahibim.Java ExecutorService invokeAll()

Bu listede ayrı bir iş parçacığında invokeAll numaralı telefonu arıyorum ve hemen hemen bu iş parçacığı araya giriyorum. ExecutorService yürütme beklendiği gibi kesildi, ancak Callable s tarafından kaydedilen gerçek kesinti sayısı beklenenden çok daha fazla 10 - yaklaşık 20-40. Neden bu kadar, ExecutorService aynı anda en fazla 10 iş parçacığı yürütürse?

Tam kaynak:

@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

Kullanımı WinXP 32-bit Oracle JRE 1.6.0_27 ve JUnit4

ben hipotezi ile katılmıyorum
+0

Hmm ... ana programa sahip bir programa dönüştürdüğümde, her zaman 10'unu alıyorum ... (Windows 7'de Java 7) –

+0

Aynı işlemi tamamladı 37 (1.6.0_27, Windows XP). Test etmek için Java 7'niz yok mu, birisi onaylayabilir mi? –

+0

Çalışmayı deneyeceğim. Belki de bir Java 6 hatasıdır ... –

cevap

4

(Sen dolayı eşzamanlılık bir kez daha o çalıştırmak gerekebilir) sadece 10 kesinti almalısınız.

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

Esasen, benim argüman Watcher iplik zamanı dilimini verim ve ExecutorService en çalışan iş parçacığı daha başlamasını sağlamak zorundadır önce tüm 100 "Garson" RunnableFuture örneklerini iptal etmek izin verilecek garantisi yoktur olmasıdır Garson görevleri.

Güncelleme: Gösterilen kod AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

itibaren nihayet kesme şu anda çalışıyor göreve dağıtılmasını ne zaman hangi f.cancel(true) içeriyor bloke ederler. Gördüğünüz gibi, bu sıkı bir döngüdür, ancak döngüyü yürüten iş parçacığının tek bir zaman dilimi içinde Future tüm örneklerinde yineleyebileceğinin bir garantisi yoktur. Eğer bu bloğu ekleme aynı davranışı

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

ulaşmak istiyorsanız

+0

Yani diyorsun, 'invokeAll()' ın kesintiye uğraması, çalışanların kesintiye uğratılmasından önce tüm sıraya alınmış görevlerin derhal iptal edilmesini ima etmiyor mu? Benim için, en az şaşkınlık ilkesinin doğrudan kırılmasına benziyor. –

+1

Doğru. Görevleri işleyen işlemler konuları, 'invokeAll()' komutunu yürüten Thread'den farklıdır. Bir iş parçacığı üzerinde arama kesintisi, başka bir iş parçacığının kesintiye uğraması anlamına gelmez, bu yüzden, vesilesiyle 10 işçiden daha fazla ara vermenizden şaşmam.Gönderdiğim ek açıklamalı kodda bahsettiğim gibi, bir kesme işlemi yalnızca, görevi "Future.cancel" yöntemine iletilen boole argümanının bir erdemi olarak işleyen çalışan iş parçacığına gönderilir. –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

önce ThreadPool'da kesiyoruz.

Tüm bekleyen sıra yeni listeye akacaktır.

Sadece çalışan konuları kesecek.