2016-02-23 13 views
6

Varsayılan olarak, Java akışları varsayılan parametrelerle oluşturulan bir common thread pool tarafından işlenir. another question'da yanıtlandığı gibi, bir özel havuz belirterek veya java.util.concurrent.ForkJoinPool.common.parallelism sistem parametresini ayarlayarak bu varsayılanları ayarlayabilirsiniz. Ancak, bu iki yöntemden herhangi biriyle akış işlemeye ayrılan iş parçacıklarının sayısını artıramamış bulunmaktayım. Örnek olarak, ilk argümanında belirtilen bir dosyada bulunan IP adreslerinin bir listesini işleyen ve çözümlenen adresleri çıkaran aşağıdaki programı göz önünde bulundurun. Bunu 13000 benzersiz IP adresine sahip bir dosyada çalıştırıyorum, Oracle Java Görev Kontrolü'u 16 iş parçacığı kadar azını kullanarak görüyorum. Bunlardan sadece beşi ForkJoinPool işçi. Yine de, bu özel görev pek çok iş parçacığından daha iyi yararlanacaktır çünkü iş parçacıkları çoğu zaman DNS yanıtlarını beklerken harcarlar. Yani sorum şu ki, kullanılan iplik sayısını nasıl arttırabilirim?Bir Java akışını varsayılan iş parçacığı sayısıyla nasıl işlerim?

Programı üç ortamda denedim; Bunlar OS tarafından bildirilen iş parçacığı sayısıdır. 17 konu

  • Java SE Runtime Environment OS X, bir 2-çekirdekli makinesinde 1.8.0_66-b17 oluşturmak:

    • Java SE Runtime Environment Windows 7 çalıştıran bir 8 çekirdekli makinesinde 1.8.0_73-B02 oluşturmak Darwin 15.2.0: FreeBSD 11.0 çalıştıran bir 24 çekirdekli makinede 23 ipler
    • OpenJDK versiyon 1.8.0_72: 44 konu
    
    import java.io.IOException; 
    import java.net.InetAddress; 
    import java.net.UnknownHostException; 
    import java.nio.file.Files; 
    import java.nio.file.Files; 
    import java.nio.file.Path; 
    import java.nio.file.Paths; 
    import java.util.concurrent.ForkJoinPool; 
    
    /** Resolve IP addresses in file args[0] using 100 threads */ 
    public class Resolve100 { 
        /** Resolve the passed IP address into a name */ 
        static String addressName(String ipAddress) { 
         try { 
          return InetAddress.getByName(ipAddress).getHostName(); 
         } catch (UnknownHostException e) { 
          return ipAddress; 
         } 
        } 
    
        public static void main(String[] args) { 
         Path path = Paths.get(args[0]); 
         ForkJoinPool fjp = new ForkJoinPool(100); 
         try { 
          fjp.submit(() -> { 
           try { 
            Files.lines(path) 
            .parallel() 
            .map(line -> addressName(line)) 
            .forEach(System.out::println); 
           } catch (IOException e) { 
            System.err.println("Failed: " + e); 
           } 
          }).get(); 
         } catch (Exception e) { 
          System.err.println("Failed: " + e); 
         } 
        } 
    } 
    
  • +2

    Kaynaklarla birlikte tryles with sources deyiminde 'Files.lines()' ifadesini eklemelisiniz! – fge

    +2

    Paragrafı() kullanmaya başlamadan önce satırları bir Listeye eklemenizi öneririm. Önceden kaç giriş olduğunu bildiğinde çok daha iyi bir iş çıkarır. –

    cevap

    6

    senin yaklaşımla iki sorun vardır. İlk akışı API tarafından oluşturulan bireysel görevlerin maksimal sayısı değişmeyecek özel FJP kullanarak bu in the following way tanımlanır üzere bunun:

    static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 
    

    Yani özel havuzu kullanıyor olsanız bile, paralel görevler sayısı sınırlı olacak commonPoolParallelism * 4. (aslında zor bir sınır değil, ama bir hedef, ancak birçok durumda görev sayısı bu sayıya eşittir).

    Yukarıdaki sorun, java.util.concurrent.ForkJoinPool.common.parallelism sistem özelliği kullanılarak düzeltilebilir, ancak burada başka bir soruna da rastlayabilirsiniz: Files.lines gerçekten çok parazitli. Detaylar için bakınız this question. Özellikle, 13000 giriş hattında, maksimum 100 hızda işlemciniz olsa bile, mümkün olan maksimum hız 3.17x'dir (her satır işleminin kabaca aynı zamana denk geldiği varsayılarak). My StreamEx kütüphanesi, bunun için bir çalışma sağlar (StreamEx.ofLines(path).parallel() numaralı telefonu kullanarak akış oluştur).

    Files.readAllLines(path).parallelStream()... 
    

    Bu sistem özelliğiyle birlikte çalışır: Başka bir olası çözüm ardından bu paralel bir akış oluşturmak, List içine sırayla dosya hatları okumaktır. Ancak, Stream API genel olarak G/Ç'yi içerdiğinde paralel işleme için uygun değildir. Daha esnek bir çözüm her satırı için CompletableFuture kullanmaktır:

    ForkJoinPool fjp = new ForkJoinPool(100); 
    List<CompletableFuture<String>> list = Files.lines(path) 
        .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp)) 
        .collect(Collectors.toList()); 
    list.stream().map(CompletableFuture::join) 
        .forEach(System.out::println); 
    

    sistem özelliği çimdik gerekmez ve ayrı görevler için ayrı havuzlarını kullanabilirsiniz Bu şekilde.

    +0

    ve iş parçacığı sayısını değiştirmek için bu tekniğin tamamen uygulamaya bağımlı, belirtilmemiş davranış ve hiçbir şey olmadığını, geliştiricilerin – Holger

    +0

    @ Holger güvenmelidir, bu nedenle .submit yöntemi, demek istediğimi varsayarsak değil mi? –

    +0

    Teşekkürler! CompletableFuture yaklaşımı gerçekten 100 iş parçacığı üretir ve büyüklük hızlandırma sırası sunar. İşte sayılar. Orijinal: 48m40.036s; CompletableFuture: 0m37.465s. (Orijinal sürümün ayrıca ılık bir DNS önbelleğinde de çalıştığını unutmayın.) –

    İlgili konular