2015-02-21 48 views
10

Rx Java'dan Java 8 lambdas'a geçmek için çalışıyorum. Bulamadığım bir örnek, istekleri tamponlamanın bir yoludur. Örneğin, Rx Java'da şunları söyleyebilirim. Biz hiç önce olursa 1000 milisaniye, 20 bir liste halinde elementler, ya da zaman aşımı tamponJava 8 yazılım kutusu

Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list)); 

.

RX'deki gözlenebilirler, Akımlar bir java çekmesi kullandıklarında gözlemlenebilir bir "itme" stilidir. Bu, kendi harita operasyonlarımı akışlarda uygulamak mı, yoksa doOnNext'un bir önceki elemanı sorgulaması gerektiğinden, bu durumun ortaya çıkamaması sorun yaratıyor mu?

+2

http://stackoverflow.com/questions/ benzer görünümde olacaktır basit-tepki 27944784/java-8-akış-araçları-için-giriş veri/27.950.637 # 27.950.637 – Misha

cevap

2

Bunu yapmanın bir yolu, bir BlockingQueue ve Guava kullanmak olacaktır. Queues.drain'u kullanarak, stream() numaralı telefonu arayabileceğiniz ve dönüştürmelerinizi gerçekleştirebileceğiniz bir Collection oluşturabilirsiniz. İşte bir bağlantı: Guava Queues.drain

Ve burada bir örnek aşağıda verilmiştir:

public void transform(BlockingQueue<Something> input) 
{ 
    List<Something> buffer = new ArrayList<>(20); 
    Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS); 
    doWrite(buffer); 
} 
1

simple-react benzer operatörleri, ama bu kesin birine sahiptir. Gerçi oldukça uzayabilir, bu yüzden kendi yazınızı yazmak mümkün olmalıdır. Bunu bir IDE bu yazılı veya test etmedim uyarı için zaman aşımı operatörüyle boyutuna göre kabaca bir tampon ile bu

import com.aol.simple.react.async.Queue; 
    import com.aol.simple.react.stream.traits.LazyFutureStream; 
    import com.aol.simple.react.async.Queue.ClosedQueueException; 
    import com.aol.simple.react.util.SimpleTimer; 
    import java.util.concurrent.TimeUnit; 

    static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) { 
    Queue queue = stream.toQueue(); 
    Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> { 
     return() -> { 
      SimpleTimer timer = new SimpleTimer(); 
      List<U> list = new ArrayList<>(); 
      try { 
       do { 
        if(list.size()==size()) 
         return list; 
        list.add(s.get()); 
       } while (timer.getElapsedNanoseconds()<unit.toNanos(time)); 
      } catch (ClosedQueueException e) { 

       throw new ClosedQueueException(list); 
      } 
      return list; 
     }; 
    }; 
    return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn)); 
}