2015-06-26 10 views
7

Scala'da Spark Streaming uygulamasının bir kafka konusundan JSON Strings'i alıp bir veri çerçevesine yüklemek istediğim bir uygulama üzerinde çalışıyorum. Spark'in şemaya kendi başına bir RDD'den (String) ulaştığı bir yol var mı?Spark'da Spark DataFrame'i Oluşturun JSON'dan Mesaj Aktarılıyor

sqlContext.read 
//.schema(schema) //optional, makes it a bit faster, if you've processed it before you can get the schema using df.schema 
.json(jsonRDD) //RDD[String] 

ben yapmaya çalışıyorum:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
    val yourDataFrame = hiveContext.createDataFrame(yourRDD) 

cevap

2

kıvılcım 1.4 olarak, RDD bir Dataframe oluşturmak için aşağıdaki yöntemi deneyebilirsiniz aynı anda. Yine de, Kafka'nın dışındaki RDD [String] 'inin nasıl olduğunu merak ediyorum, hala Spark + Kafka'nın sadece "şu anda orada olanı çıkar" yerine tek seferlik bir yayın olduğu izlenimi altındayım. :)

+1

Bu aşağıdaki soru olarak benzer: http: //stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark – sparklearner

3

Evet, aşağıdakileri kullanabilirsiniz:

+3

KafkaUtils.createRDD'yi Kafka'dan akışa alınmayan bir RDD almak için kullanabilirsiniz. –

1

JSON değerlerini çıkarmak, Kafka gelen mesajların akışında okumak için aşağıdaki kodu kullanabilirsiniz ve DataFrame çevirebiliriz:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

messages.foreachRDD { rdd => 
//extracting the values only 
    val df = sqlContext.read.json(rdd.map(x => x._2)) 
    df.show() 
}