2016-08-07 19 views
6

Bir Spark DataFrame df verildiğinde, belirli bir sayısal sütun 'values''daki maksimum değeri bulmak ve bu değere ulaşıldığı satırı (satırları) almak istiyorum. Tabii bunu yapabilirsiniz:argmax: satırın maksimum değeriyle nasıl alınacağı

# it doesn't matter if I use scala or python, 
# since I hope I get this done with DataFrame API 
import pyspark.sql.functions as F 
max_value = df.select(F.max('values')).collect()[0][0] 
df.filter(df.values == max_value).show() 

ama df iki geçiş gerektirir çünkü bu verimsizdir.

pandas.Series/DataFrame ve numpy.array (bir geçişte) verimli bir şekilde yapmak argmax/idxmax yöntemleri vardır. Standart python (dahili fonksiyon max bir anahtar parametresini kabul eder, bu yüzden en yüksek değerin indeksini bulmak için kullanılabilir).

Spark'de doğru yaklaşım nedir? Maksimum değerin elde edildiği tüm satırları veya bu satırların yalnızca bazı (boş olmayan!) Alt kümelerini alıp almamaya dikkat etmem.

+0

çapraz dildir ve keyfi veri çalışabilirsiniz daha iyi bir çözüm yoktur. – zero323

+0

@ zero323 RDD kodunu aşağıdaki DataFrame API'sinde Scala'ya dönüştürerek ve Catalyst'ün çalışabilmesi için uygun meta veriler ekleyerek aşağıdaki yanıtı neden kopyalamak imkansız? – max

+0

Bu mümkündür, ancak Scala veya Python_ kullanırsanız _it'in önemli olmadığı varsayımını açıkça keser. Bunu, yalnızca SQL ile birlikte Orderable veri türleri ile de yapabilirsiniz, ancak bu genel bir çözüm olmayan özel bir durumdur. – zero323

cevap

10

şema is Orderable Eğer basit toplamlar kullanabilirsiniz (şema sadece atomics/atomics ait diziler/yinelemeli orderable yapılar içerir) ise:

Python:

df.select(F.max(
    F.struct("values", *(x for x in df.columns if x != "values")) 
)).first() 

Scala:

df.select(max(struct(
    $"values" +: df.columns.collect {case x if x!= "values" => col(x)}: _* 
))).first 

Aksi takdirde azaltabilir Dataset üzerinde (Scala için) ama ek deserialization gerektirir: Genel olarak

type T = ??? 

df.reduce((a, b) => if (a.getAs[T]("values") > b.getAs[T]("values")) a else b) 
+0

Biraz zor, bu 'struct' yöntemini okumalıyım –

+0

' Orderable' şemasının bir açıklamasına/tanımına bağlanıyor musunuz? Google arama sadece bu çok yanıtı buldu :) – max

+0

https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering. scala # L89-L96 – zero323

2

Belki de tamamlanmamış bir yanıttır, ancak DataFrameRDD'u kullanabilir, max yöntemini uygulayabilir ve belirlenen bir anahtarı kullanarak maksimum kaydı alabilirsiniz.

a = sc.parallelize([ 
    ("a", 1, 100), 
    ("b", 2, 120), 
    ("c", 10, 1000), 
    ("d", 14, 1000) 
    ]).toDF(["name", "id", "salary"]) 

a.rdd.max(key=lambda x: x["salary"]) # Row(name=u'c', id=10, salary=1000) 
+1

RDD API'sı (python ek yükünü önlemek için Scala) ile 1 geçişi DataFrame API'sı ile 2 geçişten daha hızlı garantilemiş olabilir miyim? Ya da Catalyst'ün yapabileceği bazı optimizasyonlar var mı? – max

İlgili konular