2017-02-27 27 views
9

Bazı çalışmaları Java Akışları ile paralel hale getirmeye çalışıyorum. bu basit örneği ele alalım: Ben forEach kullanırsanızNeden Java Stream jeneratörü sırasız?

Stream.generate(new Supplier<Integer>() { 
     @Override 
     public Integer get() { 
      return generateNewInteger(); 
     } 
    }) 
    .parallel() 
    .forEachOrdered(new Consumer<Integer>() { 
     @Override 
     public void accept(Integer integer) { 
      System.out.println(integer); 
     } 
    }); 

sorun forEachOrdered için accept yöntemini çağırın olmamasıdır, sadece çalışır. Sorun şu ki Stream.generate, ORDERED karakteristiğine sahip olmayan InfiniteSupplyingSpliterator'u yaratıyor.

Sorunun nedeni nedir? Verilerin hangi sırada oluşturulduğunu bildiğimiz anlaşılıyor. İkinci soru, akış elemanlarının üretimi ile paralelleştirilmiş akışta forEachOrdered'un nasıl yapılacağıdır?

+3

Belirtim, çünkü şartname öyle diyor. Tamamlama nedeni, uygulama ayrıntıları ile birlikte * sonsuz * olmasıdır. – Holger

+2

Bu lambdalarla çok daha fazla yazılabilir. –

+0

İlişkisiz, ancak sadece .forEachOrdered (System.out :: println) 'yi yapabilirsiniz –

cevap

10

En basit yanıt,diyor, Stream.generate, unordered olduğunu.

Uygulama, öğeleri olabildiğince sıraya göre işlemeye çalışıyor gibi değil, aslında tam tersi. Bir operasyonun sıralı olarak tanımlanmasından sonra, uygulama, mümkün olduğunda, sıralanmamış doğadan bir fayda elde etmeye çalışacaktır. Sırasız bir işlemde bir kaynak siparişine benzeyen bir şeyle karşılaşırsanız, sırasız işlemden fayda elde etmenin bir yolu olmayabilir veya uygulama henüz tüm fırsatları kullanmaz. Bu, gelecekteki bir sürümde veya alternatif bir uygulamada değişebileceğinden, işlem sırasız olarak belirtilmişse, siparişe güvenmemeniz gerekir. Stream.generate tanımsız olarak tanımlamanın amacı, sipariş edilen Stream.iterate ile karşılaştırılırken daha net hale gelebilir. 'a iletilen işlev, önceki öğeyi alır, böylece öğeler arasında bir sonraki-sonradan ilişki, dolayısıyla bir sipariş vardır. Tedarikçi Stream.generate'un önceki bir öğeyi almadığını, başka bir deyişle, yalnızca bir işlev imzasını göz önünde bulundurarak önceki bir öğeyle ilişkisi olmadığını bildirmiştir. Bu kullanım durumları gibi Stream.generate(() -> constant) veya Stream.generate(Type::new) için çalışır, ancak daha az amaçlanan birincil kullanım durumu gibi görünüyor Stream.generate(instance::statefulOp) için çalışır. Operasyon iş parçacığı güvenli ise ve akışının sıralanmamış doğası ile yaşayabilir, hala çalışır.

Örneğinizin hiçbir zaman ilerleme kaydetmemesinin nedeni, forEachOrdered uygulamasının gerçekte, sırasız doğayı dikkate almadığı, ancak karşılaşma sırasına ayrıldıktan sonra parçaları işlemeye çalıştığıdır, yani tüm alt görevler öğelerini arabelleğe almayı dener. Böylece, sol alt görevlerinin tamamlanmasından sonra onları eyleme geçirebilirler. Tabii ki, tamponlama ve sonsuz kaynaklar, özellikle de, temeldeki InfiniteSupplyingSpliterator kendi başına sonsuz alt görevlere ayrılacağından, birlikte iyi çalmaz. Prensip olarak, unsurlarını doğrudan eylemle besleyebilecek en soldaki bir görev vardır, ancak bu görevler kuyrukta bir yerde gibi görünür, aktive edilmeyi bekler, ki bu asla tüm işçi ipleri zaten diğer sonsuz altları işlemekle meşgul olur. -görevler. Sonuç olarak, tüm işlem bir OutOfMemoryError ile kopacak, eğer yeterince uzun süre çalışmasına izin veriyorsanız…

İlgili konular