A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols])
ait "filtrelenmiş equi-katılır" gerçekleştirmek için group_id
kabadır: group_id
bir tek değer 10.000 kayıtları, diyelim ki, ilişkili olabilir Hem A hem de B.
Eşit birleştirme, pair_filter_udf
olmadan, kendi başına gerçekleştirilirse, group_id
'un karmaşıklığı, hesaplama sorunları oluşturacaktır. Örneğin, hem A hem de B'de 10.000 kayıt içeren group_id
için, birleştirme işleminde 100 milyon giriş olacaktır. Eğer bu kadar büyük sayıda binlerce grubumuz olsaydı, muazzam bir tablo üretecektik ve hafızamız çok kolay bir şekilde tükenebilirdi. Bu nedenle, pair_filter_udf
'u birleştirmeye ve tüm çiftler oluşturuluncaya kadar bekletmek yerine, oluşturulduğu gibi çiftleri filtre etmemiz önemlidir. Sorum şu, Spark SQL'in bunu yapıp yapamayacağı.
Ben onun sorgu planı ne basit filtrelenmiş eşit-katılıp sorulan Spark kurmak:
# run in PySpark Shell
import pyspark.sql.functions as F
sq = sqlContext
n=100
g=10
a = sq.range(n)
a = a.withColumn('grp',F.floor(a['id']/g)*g)
a = a.withColumnRenamed('id','id_a')
b = sq.range(n)
b = b.withColumn('grp',F.floor(b['id']/g)*g)
b = b.withColumnRenamed('id','id_b')
c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])
c = c.sort('id_a')
c = c[['grp','id_a','id_b']]
c.explain()
Sonuç:
== Physical Plan ==
Sort [id_a#21L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(id_a#21L ASC,200), None
+- ConvertToSafe
+- Project [grp#20L,id_a#21L,id_b#24L]
+- Filter (abs((id_a#21L - id_b#24L)) < 2)
+- SortMergeJoin [grp#20L], [grp#23L]
:- Sort [grp#20L ASC], false, 0
: +- TungstenExchange hashpartitioning(grp#20L,200), None
: +- Project [id#19L AS id_a#21L,(FLOOR((cast(id#19L as double)/10.0)) * 10) AS grp#20L]
: +- Scan ExistingRDD[id#19L]
+- Sort [grp#23L ASC], false, 0
+- TungstenExchange hashpartitioning(grp#23L,200), None
+- Project [id#22L AS id_b#24L,(FLOOR((cast(id#22L as double)/10.0)) * 10) AS grp#23L]
+- Scan ExistingRDD[id#22L]
Bu plandan anahtar hatları şunlardır:
+- Filter (abs((id_a#21L - id_b#24L)) < 2)
+- SortMergeJoin [grp#20L], [grp#23L]
Bu satırlar, filtrenin birleştirmeden sonra ayrı bir aşamada gerçekleştirileceği izlenimini verir. istenen davranış. Ama belki de dolaylı olarak birleşme sürecine doğru itiliyor ve sorgu planı sadece bu detay seviyesinden yoksun.
Bu durumda Spark'in ne yaptığını nasıl anlarım?
Güncelleme: Kıvılcım aşağı açılan çıkmıyor eğer benim laptop çökmesine yeterli olacaktır n = 1E6 ve g = 1E5, ile
ben koşuyorum deneyler. Çöküşü olmadığından, sanırım itme yapıyor. Ancak, bu harika optimizasyondan, Spark SQL kaynağının hangi bölümlerinin ve nasıl çalıştığını bilmek ilginç olacaktır.
Yineleyici ezmesi çok mantıklı, ama benim için başka bir soru ortaya çıkıyor. Birkaç ay önce, Spark 1.2'de RDD'lerle [benzer bir deney] (http://stackoverflow.com/questions/34092211/how-does-spark-execute-a-join-filter-is-it-scalable) yaptım. 1 ve tersi sonuç aldı. Aynı mantığın bu örnek için geçerli olması gerektiği gibi görünüyor: Spark RDD katılmadığı sürece katılma + filtresi yalnızca yineleyicileri ezmek olmalıdır. – Paul
Oh, üzgünüm :) 'python_join.dispatch' aslında bir jeneratör ifadesi döndürüyor. – zero323
Sadece açık olmak gerekirse - bu (https://github.com/apache/spark/blob/master/python/pyspark/join.py#L46-L51) sorun mu yaşıyorsunuz? Bağladığınız soru/cevap <= 1.2 davranışı açıklar. – zero323