4

forma İşteSpark SQL, filtrelenmiş eşdeğer birleştirmelerde aşağı itmeyi yapıyor mu? Ben Spark SQL (1.6) kullanarak ilgileniyorum

A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols]) 

ait "filtrelenmiş equi-katılır" gerçekleştirmek için group_id kabadır: group_id bir tek değer 10.000 kayıtları, diyelim ki, ilişkili olabilir Hem A hem de B.

Eşit birleştirme, pair_filter_udf olmadan, kendi başına gerçekleştirilirse, group_id'un karmaşıklığı, hesaplama sorunları oluşturacaktır. Örneğin, hem A hem de B'de 10.000 kayıt içeren group_id için, birleştirme işleminde 100 milyon giriş olacaktır. Eğer bu kadar büyük sayıda binlerce grubumuz olsaydı, muazzam bir tablo üretecektik ve hafızamız çok kolay bir şekilde tükenebilirdi. Bu nedenle, pair_filter_udf'u birleştirmeye ve tüm çiftler oluşturuluncaya kadar bekletmek yerine, oluşturulduğu gibi çiftleri filtre etmemiz önemlidir. Sorum şu, Spark SQL'in bunu yapıp yapamayacağı.

Ben onun sorgu planı ne basit filtrelenmiş eşit-katılıp sorulan Spark kurmak:

# run in PySpark Shell 
import pyspark.sql.functions as F 

sq = sqlContext 
n=100 
g=10 
a = sq.range(n) 
a = a.withColumn('grp',F.floor(a['id']/g)*g) 
a = a.withColumnRenamed('id','id_a') 

b = sq.range(n) 
b = b.withColumn('grp',F.floor(b['id']/g)*g) 
b = b.withColumnRenamed('id','id_b') 

c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp']) 
c = c.sort('id_a') 
c = c[['grp','id_a','id_b']] 
c.explain() 

Sonuç:

== Physical Plan == 
Sort [id_a#21L ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(id_a#21L ASC,200), None 
     +- ConvertToSafe 
     +- Project [grp#20L,id_a#21L,id_b#24L] 
      +- Filter (abs((id_a#21L - id_b#24L)) < 2) 
       +- SortMergeJoin [grp#20L], [grp#23L] 
        :- Sort [grp#20L ASC], false, 0 
        : +- TungstenExchange hashpartitioning(grp#20L,200), None 
        :  +- Project [id#19L AS id_a#21L,(FLOOR((cast(id#19L as double)/10.0)) * 10) AS grp#20L] 
        :  +- Scan ExistingRDD[id#19L] 
        +- Sort [grp#23L ASC], false, 0 
        +- TungstenExchange hashpartitioning(grp#23L,200), None 
         +- Project [id#22L AS id_b#24L,(FLOOR((cast(id#22L as double)/10.0)) * 10) AS grp#23L] 
          +- Scan ExistingRDD[id#22L] 

Bu plandan anahtar hatları şunlardır:

+- Filter (abs((id_a#21L - id_b#24L)) < 2) 
    +- SortMergeJoin [grp#20L], [grp#23L] 

Bu satırlar, filtrenin birleştirmeden sonra ayrı bir aşamada gerçekleştirileceği izlenimini verir. istenen davranış. Ama belki de dolaylı olarak birleşme sürecine doğru itiliyor ve sorgu planı sadece bu detay seviyesinden yoksun.

Bu durumda Spark'in ne yaptığını nasıl anlarım?

Güncelleme: Kıvılcım aşağı açılan çıkmıyor eğer benim laptop çökmesine yeterli olacaktır n = 1E6 ve g = 1E5, ile

ben koşuyorum deneyler. Çöküşü olmadığından, sanırım itme yapıyor. Ancak, bu harika optimizasyondan, Spark SQL kaynağının hangi bölümlerinin ve nasıl çalıştığını bilmek ilginç olacaktır.

cevap

3

Oldukça çok, aşağı doğru itme ile ne demek istediğinize bağlıdır. |a.id_a - b.id_b| < 2'un a.grp = b.grp'un yanındaki join mantığının bir parçası olarak yürütüldüğünü sorarsanız, yanıt negatiftir. Eşitliğe dayalı olmayan tahminler doğrudan join koşulunda yer almamaktadır.

enter image description here

sen filter görebileceğiniz gibi SortMergeJoin ayrı bir dönüşüm olarak yürütülür: Bunu örneklemek mümkün

bir yolu Böyle fazla veya daha az bakmak gerekir yerine yürütme planının DAG kullanmaktır . Başka bir yaklaşım, a.grp = b.grp'u bıraktığınızda yürütme planını analiz etmektir.üretir - (gerçekten pratikte Bunu önlemek istemiyor Kartezyen bir kez)

d = a.join(b,(F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp']) 

## == Physical Plan == 
## Project [id_a#2L,grp#1L,id_b#5L] 
## +- Filter (abs((id_a#2L - id_b#5L)) < 2) 
## +- CartesianProduct 
##  :- ConvertToSafe 
##  : +- Project [id#0L AS id_a#2L,(FLOOR((cast(id#0L as double)/10.0)) * 10) AS grp#1L] 
##  :  +- Scan ExistingRDD[id#0L] 
##  +- ConvertToSafe 
##   +- Project [id#3L AS id_b#5L] 
##    +- Scan ExistingRDD[id#3L] 

o kodunuzu anlamına mı: Bunu hiçbir ek optimizasyonlarla bir filter ardından Kartezyen ürüne join genişler göreceksiniz büyük bir ara masa?

Hayır, öyle değil. Hem SortMergeJoin hem de filter, tek bir aşama olarak yürütülür (bkz. DAG). DataFrame işlemlerinin bazı ayrıntıları biraz daha düşük bir seviyede uygulanabilirken, temel olarak sadece Scala Iterators ve as shown in a very illustrative way by Justin Pihony'daki dönüşümlerin bir zinciridir, farklı işlemler Spark'e özgü bir mantık eklemeden birlikte ezilebilir. Her iki filtre de bir şekilde veya tek bir görevde uygulanacaktır.

+0

Yineleyici ezmesi çok mantıklı, ama benim için başka bir soru ortaya çıkıyor. Birkaç ay önce, Spark 1.2'de RDD'lerle [benzer bir deney] (http://stackoverflow.com/questions/34092211/how-does-spark-execute-a-join-filter-is-it-scalable) yaptım. 1 ve tersi sonuç aldı. Aynı mantığın bu örnek için geçerli olması gerektiği gibi görünüyor: Spark RDD katılmadığı sürece katılma + filtresi yalnızca yineleyicileri ezmek olmalıdır. – Paul

+0

Oh, üzgünüm :) 'python_join.dispatch' aslında bir jeneratör ifadesi döndürüyor. – zero323

+1

Sadece açık olmak gerekirse - bu (https://github.com/apache/spark/blob/master/python/pyspark/join.py#L46-L51) sorun mu yaşıyorsunuz? Bağladığınız soru/cevap <= 1.2 davranışı açıklar. – zero323

İlgili konular