2016-03-23 11 views
0

Bazı hesaplamalar yapmak için Apache Spark kullanıyorum. Böyle bir sorguyu çalıştırıyorum. (Ülkeye + school_id tarafından temsil edilir) her okul için ŞimdiKıvılcımda foreachGroup metodu gibi bir fonksiyon var mı?

USA, school1, math, 99 
USA, school1, sport, 98 
USA, school2, math, 90 
ENG, school1, science, 100 

, onların skoruna göre ilk 3 konuyu almak gerekir:

SELECT country, school, subjects, avg(score) FROM table GROUP BY country,school,subject 

Yani sonuç gibidir.

Bunu yapmanın iki yolunu düşünüyorum.

1. If there is some method called foreachGROUP, Then I will run code like 

result.foreachGROUP(get_top_3) 


2. I know there is a method called repartion. Then I guess I can do something like : 

result.repartion(country,school) # repartion by country and school 
foreachPartion(get_top_3) 

Apache kıvılcımına aşina değilim. Yani hangi yolun mümkün veya daha iyi olduğundan emin değilsiniz. Lütfen biraz tavsiye verin. Bundan daha iyi bir yolun varsa. Eğer test verilerini oluşturduktan sonra da adivce

cevap

0

edin:

val df = sc.parallelize(Array(
    Rec("USA","school1", "math", 98.0), 
    Rec("USA","school1", "lit", 96.0), 
    Rec("USA","school1", "trig", 92.0), 
    Rec("USA","school1", "eng", 94.0) 
)).toDF 

Bir groupBy(), bir collect_list() ve üst 3 ardından explode gayrimenkulünü yapın:

val top3bySchool = df.groupBy($"country", $"school") 
    .agg(collect_list($"subject") as "subjectList", collect_list($"score") as "scoreList") 
    .explode($"subjectList", $"scoreList"){r => { 
    val subjectList = r.getSeq[String](0).zip(r.getSeq[Double](1)).sortWith((a,b) => { 
     a._2 > b._2 
    }); 
    subjectList.slice(0, if (subjectList.length < 3) subjectList.length else 3); 
    }}.select($"country",$"school",$"_1" as "subject", $"_2" as "score") 
+0

Teşekkür David. Scala veya JAVA kullanıyor musunuz? Çünkü bu ikisine aşina değilim. Python kullanıyorum. Python ile bazı örnekler verebilir misiniz? –

İlgili konular