2014-07-22 41 views
6

Aşağıda, 30 saniyenin bir pencere boyutu ve 10 saniyelik slayt boyutu üzerinde kelime sayımı elde etmek için basit bir kod bulunmaktadır.Spark Akış Penceresi Çalışması

import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.api.java.function._ 
import org.apache.spark.streaming.api._ 
import org.apache.spark.storage.StorageLevel 

val ssc = new StreamingContext(sc, Seconds(5)) 

// read from text file 
val lines0 = ssc.textFileStream("test") 
val words0 = lines0.flatMap(_.split(" ")) 

// read from socket 
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) 
val words1 = lines1.flatMap(_.split(" ")) 

val words = words0.union(words1) 
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

wordCounts.print() 
ssc.checkpoint(".") 
ssc.start() 
ssc.awaitTermination() 

Ancak, bu hattan hata alıyorum:

val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

. Özellikle, _ + _'dan. Hata, sorunun ne olduğunu söyleyebilir mi? Teşekkürler!

cevap

10

Bu, düzeltilmesi son derece kolaydır, sadece türler hakkında açık olun. Bu durumda türünü tahmin edemediği
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b, Seconds(30), Seconds(10))

nedeni scala this answer

+0

teşekkür ederiz açıklanmıştır! Değişimden sonra, program beklenen sonuçları veriyordu ama bu arada başka bir hata verdi: java.util.NoSuchElementException: anahtar bulunamadı: 140605186000000 ms \t scala.collection.MapLike $ class.default (MapLike.scala) : 228) scala.collection.AbstractMap.default de \t (Map.scala: 58) scala.collection.mutable.HashMap.apply (HashMap.scala de \t: org.apache.spark.streaming 64) \t. dstream.ReceiverInputDStream.getReceivedBlockInfo (ReceiverInputDStream.scala: 77) Acaba bu nasıl oldu? – user2895478

+0

@ user2895478 Bu [Jira bileti] 'nin (https://issues.apache.org/jira/browse/SPARK-2009) sorununun 1.0.1 ve 1.1.0'da çözüldüğüne inanıyorum – aaronman