2015-12-02 22 views
5

Flink'te programımızdaki farklı görevler için farklı derecelerde paralellik kurabilir miyim? Örneğin, Flink aşağıdaki örnek kodu nasıl yorumluyor? İki özel uygulayıcı olan MyPartitioner1, MyPartitioner2, giriş verilerini iki 4 ve 2 bölüm olarak bölümlere ayırır.Apache Flink içinde paralellik derecesi

partitionedData1 = inputData1 
    .partitionCustom(new MyPartitioner1(), 1); 
env.setParallelism(4); 
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1 
    .mapPartition(new calculateFun()); 

partitionedData2 = inputData2 
    .partitionCustom(new MyPartitioner2(), 2); 
env.setParallelism(2); 
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2 
    .mapPartition(new calculateFun()); 

Bu kod için aşağıdaki hatayı alıyorum:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80) 
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92) 
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) 
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
    at java.lang.Thread.run(Unknown Source) 

cevap

5

ExecutionEnvironment.setParallelism() tüm programın, programın yani tüm operatörler için paralelliği ayarlar.

Operatörde setParallelism() yöntemini arayarak her bir işleç için paralellik belirtebilirsiniz.

ArrayIndexOutOfBoundsException, özel bölümleyiciniz muhtemelen beklenmedik bir paralellik derecesi nedeniyle geçersiz bir bölüm numarası döndürdüğü için atılır. Özel bölümleyici, alıcının gerçek paralelliğini, partition(K key, int numPartitions) yönteminde bir parametre olarak alır.

İlgili konular