2011-05-18 22 views
10

Bir ThreadPoolExecutor'dan aldığım FutureTask'ı iptal etmek istiyorum ancak iş parçacığı havuzundaki Callable'ın çalışmayı durdurduğundan emin olmak istiyorum.FutureTask'ta iptal() için bekleyin

FutureTask # cancel (false) öğesini çağırırsam ve sonra() (tamamlanana kadar engellemek için) bir CancelledException alıyorum. Bu istisna, hemen veya görev yürütmeyi durdurduktan sonra mı atılır?

+0

Kullanım durumu nedir? İptal etmek sana çok fayda sağlıyor gibi görünmüyor - eğer işin kesilmesini istemiyorsan yine de tamamlanmasını beklemek istiyorsan, sana vermeyi iptal etmek nedir? –

+1

Görev, paylaşılan durumda çalışıyor, aynı paylaşılan durumda çalışan yeni bir tane başlatmadan önce çalışmasını durdurduğundan emin olmak istiyorum. –

+0

Ancak, iptalin değeri nedir? Gelecek birçok müşteri arasında paylaşılıyor ve onlarla iletişim kurmak mı istiyorsunuz? –

cevap

1

Bu cevap eğer kontrol ederek yarışı Aleksey en koşulu ve FooJBar kodunu giderir: Bir yapıcı bir Runnable alarak (null adlı dönecektir) ve) (iptal doğrudan blok (katılmak) nasıl gösteren bir varyant yazdı Görev callable içinde iptal edildi. (Gelecekte FutureTask.run durumu denetler ve hem iptal hem de getWithJoin öğesinin başarıyla tamamlanabildiği durumlarda callable'ı çalıştırdığı zaman arasında bir pencere vardır. Ancak, callable hala çalışır.)

Ayrıca, orijinal iptal işlemini geçersiz kılmamaya karar verdim Yeni iptalin InterruptedException bildirmesi gerekiyor. Yeni iptal, işe yaramaz geri dönüş değerinden kurtulur (true, "görev başlamadı", "görev başladı ve zaten hasarının çoğunu yaptı", "görev başladı ve sonunda tamamlanacak"). Gone ayrıca super.cancel'un dönüş değeri olup olmadığını kontrol eder, böylece yeni iptaller farklı iş parçacıklarından birden çok kez çağrılırsa, bunların tümü görevlerin tamamlanmasını bekler.

import java.util.concurrent.Callable; 
import java.util.concurrent.CancellationException; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Executors; 
import java.util.concurrent.FutureTask; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

/** 
* Based on: http://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask 
* 
* @author Aleksandr Dubinsky 
*/ 
public class FixedFutureTask<T> extends FutureTask<T> { 

    /** 
     * Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable}, 
     * and arrange that {@code get} will return the given result on successful completion. 
     * 
     * @param runnable the runnable task 
     * @param result the result to return on successful completion. 
     *    If you don't need a particular result, consider using constructions of the form: 
     *    {@code Future<?> f = new FutureTask<Void>(runnable, null)} 
     * @throws NullPointerException if the runnable is null 
     */ 
     public 
    FixedFutureTask (Runnable runnable, T result) { 
      this (Executors.callable (runnable, result)); 
     } 

    /** 
     * Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}. 
     * 
     * @param callable the callable task 
     * @throws NullPointerException if the callable is null 
     */ 
     public 
    FixedFutureTask (Callable<T> callable) { 
      this (new MyCallable (callable)); 
     } 

     /** Some ugly code to work around the compiler's limitations on constructors */ 
     private 
    FixedFutureTask (MyCallable<T> myCallable) { 
      super (myCallable); 
      myCallable.task = this; 
     } 

    private final Semaphore semaphore = new Semaphore(1); 

    private static class MyCallable<T> implements Callable<T> 
    { 
     MyCallable (Callable<T> callable) { 
       this.callable = callable; 
      } 

     final Callable<T> callable; 
     FixedFutureTask<T> task; 

      @Override public T 
     call() throws Exception { 

       task.semaphore.acquire(); 
       try 
       { 
        if (task.isCancelled()) 
         return null; 

        return callable.call(); 
       } 
       finally 
       { 
        task.semaphore.release(); 
       } 
      } 
    } 

    /** 
     * Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available. 
     * 
     * @return the computed result 
     * @throws CancellationException if the computation was cancelled 
     * @throws ExecutionException if the computation threw an exception 
     * @throws InterruptedException if the current thread was interrupted while waiting 
     */ 
     @Override public T 
    get() throws InterruptedException, ExecutionException, CancellationException { 

      try 
      { 
       return super.get(); 
      } 
      catch (CancellationException e) 
      { 
       semaphore.acquire(); 
       semaphore.release(); 
       throw e; 
      } 
     } 

    /** 
     * Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available. 
     * 
     * @param timeout the maximum time to wait 
     * @param unit the time unit of the timeout argument 
     * @return the computed result 
     * @throws CancellationException if the computation was cancelled 
     * @throws ExecutionException if the computation threw an exception 
     * @throws InterruptedException if the current thread was interrupted while waiting 
     * @throws CancellationException 
     * @throws TimeoutException if the wait timed out 
     */ 
     @Override public T 
    get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException { 

      try 
      { 
       return super.get (timeout, unit); 
      } 
      catch (CancellationException e) 
      { 
       semaphore.acquire(); 
       semaphore.release(); 
       throw e; 
      } 
     } 

    /** 
     * Attempts to cancel execution of this task and waits for the task to complete if it has been started. 
     * If the task has not started when {@code cancelWithJoin} is called, this task should never run. 
     * If the task has already started, then the {@code mayInterruptIfRunning} parameter determines 
     * whether the thread executing this task should be interrupted in an attempt to stop the task. 
     * 
     * <p>After this method returns, subsequent calls to {@link #isDone} will 
     * always return {@code true}. Subsequent calls to {@link #isCancelled} 
     * will always return {@code true} if this method returned {@code true}. 
     * 
     * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; 
     *        otherwise, in-progress tasks are allowed to complete 
     * @throws InterruptedException if the thread is interrupted 
     */ 
     public void 
    cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException { 

      super.cancel (mayInterruptIfRunning); 

      semaphore.acquire(); 
      semaphore.release(); 
     } 
} 
+0

Elbette, '' (uzun zaman aşımı, TimeUnit birimi) 'artık sözleşmesine uymuyor çünkü' semaphore.acquire() 'hiçbir zaman aşımı yok. Farklı bir iplik senkronizasyonu ilkel kullanılmalıdır. Ancak, şimdilik değiştirmeyeceğim. –

1

İptal edildiği anda atılır.

Başladığını ve bittiğini bilmenin kolay bir yolu yoktur. Durumunu kontrol etmek için runnable için bir sarmalayıcı oluşturabilirsiniz.

final AtomicInteger state = new AtomicInteger(); 
// in the runnable 
state.incrementAndGet(); 
try { 
    // do work 
} finally { 
    state.decrementAdnGet(); 
} 
+0

"Başladığını ve bittiğini bilmek için kolay bir yol yoktur" - böyle bir durumda cancel() yönteminden yanlış alırsınız. Bence cancel() yönteminin sonucu ile get() yönteminin sonucunun birleşimi size tam bilgi vermeli, fakat yanılıyor olabilirim. –

+0

Çalışan bir görevi iptal ederseniz, "iptal et (true)", "true" değerini döndürmeli ve "get()" bir "CancellationException" ("İptal Et)" ifadesini atar. Ancak, "cancel (true)" sadece bir iş parçacığı üzerinde kesintili bayrağı ayarlar. çalışan görev ve hala çalışıyor olabilir. –

+0

Basit bir bayrak, "Başladı mı bitti mi, henüz başlamadı mı?" Sorusuna cevap vermiyor. Görev tamamlanana kadar beklemenize izin vermez (veya başlatılma şansı olmadan iptal edilmiş). –

2

Evet, CancellationException hemen atılır. Callable 'in iş parçacığı bitene kadar bekleyeceği get() yönteminin sürümünü eklemek için FutureTask'ı uzatabilirsiniz.

public class ThreadWaitingFutureTask<T> extends FutureTask<T> { 

    private final Semaphore semaphore; 

    public ThreadWaitingFutureTask(Callable<T> callable) { 
     this(callable, new Semaphore(1)); 
    } 

    public T getWithJoin() throws InterruptedException, ExecutionException { 
     try { 
      return super.get(); 
     } 
     catch (CancellationException e) { 
      semaphore.acquire(); 
      semaphore.release(); 
      throw e; 
     } 
    } 

    private ThreadWaitingFutureTask(final Callable<T> callable, 
       final Semaphore semaphore) { 
     super(new Callable<T>() { 
      public T call() throws Exception { 
       semaphore.acquire(); 
       try { 
        return callable.call(); 
       } 
       finally { 
        semaphore.release(); 
       } 
      } 
     }); 
     this.semaphore = semaphore; 
    } 
} 
+0

'u kullanmayın. Bu cevap, @ FooJBar ile aynı yarış koşuluna sahiptir. FutureTask'ın arasında bir pencere var.run 'state' kontrol eder ve hem 'cancel' hem de getWithJoin''in başarılı bir şekilde tamamlanabildiği callable'ı çalıştırır. Ancak, callable hala çalışır. Arayanın iptal edilip edilmediğini kontrol etmek için kontrol edilebilir, ancak yapılması zor bir işlemdir. –

2

Aleksey'in örneği iyi çalışıyor.

public class FutureTaskCancelWaits<T> extends FutureTask<T> { 

    private final Semaphore semaphore; 

    public FutureTaskCancelWaits(Runnable runnable) { 
     this(Executors.callable(runnable, (T) null)); 
    } 

    public FutureTaskCancelWaits(Callable<T> callable) { 
     this(callable, new Semaphore(1)); 
    } 

    @Override 
    public boolean cancel(boolean mayInterruptIfRunning) { 
     // If the task was successfully cancelled, block here until call() returns 
     if (super.cancel(mayInterruptIfRunning)) { 
      try { 
       semaphore.acquire(); 
       // All is well 
       return true; 
      } catch (InterruptedException e) { 
       // Interrupted while waiting... 
      } finally { 
       semaphore.release(); 
      } 
     } 
     return false; 
    } 

    private FutureTaskCancelWaits(final Callable<T> callable, final Semaphore semaphore) { 
     super(new Callable<T>() { 
      public T call() throws Exception { 
       semaphore.acquire(); 
       try { 
        return callable.call(); 
       } finally { 
        semaphore.release(); 
       } 
      } 
     }); 
     this.semaphore = semaphore; 
    } 
} 
+0

Maalesef, bu cevapta ölümcül bir kusur var. FutureTask.run:262'de (Oracle JDK 8u40) görebileceğiniz gibi, durum kontrol edildikten ve “c.call()” den önce, “iptal” için bir çağrı gerçekleşebilir. Semafor henüz girilmediğinden, 'cancel' hemen sonlanacak. Ancak, c.call() 'bağımsız olarak çalışır. Çözüm, yeni callable'da 'isCancelled()' için bir kontrol eklemek olabilir. Bununla birlikte, derleyici "süper tip kurucunun çağrılmasından önce bunu referans gösteremez" ile şikâyet eder. –

+0

OTOH, 'semaphore.acquire', 'cancel' tarafından iş parçacığı kesintiye uğradıysa ancak 'mayInterruptIfRunning' true değerine ayarlanmışsa' InterruptedException 'komutunu atacaktır. –