2015-05-04 26 views
7

Ben anlık içeren JSON nesneleri adlı text türünde bir alanı olan bir cassandra tablo var RDD için JSON şemasında dönüşüm yapmak için bu RDD'nin alanını başka bir RDD'ye dönüştürmem gerekiyor.Kıvılcım JSON metin alanı

Bu doğru mu? Buna nasıl devam etmeliyim?

Düzenleme:

val conf = new SparkConf().setAppName("signal-aggregation") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") 
val first = snapshots.first() 
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3))) 
firstJson.printSchema() 

bana JSON şema gösterilmektedir: Şimdi ben tek bir metin alanından bir RDD oluşturmak başardı için. İyi!

Spark'a, bu şemanın, her bir satırdaki o anlık görüntü alanında bir RDD almak için Anlık görüntü tablosunun tüm satırlarında uygulanması gerektiğini anlatmaya nasıl devam edebilirim?

+0

Eğer doğru anlamak içine json ile bir RDD [dize] geçmek istiyorum cassandra tablosunda her alanın içinde birkaç JSON nesnesi var ve her nesneyi bağımsız olarak hesaplamanız gerekiyor. –

+0

Evet haklısın, ama Spark'un metin alanını json olarak anlayabildiğini ve bu jrumların bazı değerlerinde dönüşüm yapabileceğimi bir yerde okudum, bu doğru mu? – galex

cevap

12

Neredeyse oldu, sadece, jsonRDD yöntemle

val conf = new SparkConf().setAppName("signal-aggregation") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") 
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly 
jsonSchemaRDD.registerTempTable("testjson") 
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

Hızlı bir örnek

val stringRDD = sc.parallelize(Seq(""" 
    { "isActive": false, 
    "balance": "$1,431.73", 
    "picture": "http://placehold.it/32x32", 
    "age": 35, 
    "eyeColor": "blue" 
    }""", 
    """{ 
    "isActive": true, 
    "balance": "$2,515.60", 
    "picture": "http://placehold.it/32x32", 
    "age": 34, 
    "eyeColor": "blue" 
    }""", 
    """{ 
    "isActive": false, 
    "balance": "$3,765.29", 
    "picture": "http://placehold.it/32x32", 
    "age": 26, 
    "eyeColor": "blue" 
    }""") 
) 
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson") 
csc.sql("SELECT age from testjson").collect 
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26]) 
+0

Mükemmel, teşekkürler! – galex