Bu, örnek olarak açıklanması muhtemelen en kolay yoldur. Örneğin, bir web sitesine kullanıcı girişlerinin bir DataFrame olduğunu varsayalım:Kıvılcım koşullu Spark SQL pencere işlevi
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
onlar sitede aktif kullanıcı olunca buna gösteren bir sütun eklemek istiyorum. Ancak bir uyarı var: Bir kullanıcının aktif olarak kabul edildiği bir zaman aralığı var ve bu süre sonunda tekrar giriş yapacaklarsa became_active
tarih sıfırlar. Bu sürenin 5 gün olduğunu varsayalım. Sonra yukarıdaki tablodan elde edilen istenen tablo bu gibi bir şey olacaktır: Aktif süresi sona sonra ikinci giriş geldiği için
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Yani, özellikle, SirChillingtonIV en became_active
tarih sıfırlandı ama Booooooo99900098 en became_active
tarih sıfırlanmaz edildi giriş yaptığı ikinci kez, çünkü aktif süre içinde düştü.
İlk düşüncem lag
ile pencere işlevlerini kullanmak ve became_active
sütununu doldurmak için lag
ged değerlerini kullanmak; Örneğin, bir şey kabaca gibi başlayan: tmp
null
ise
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window)
Sonra kural, became_active
tarih olacağını doldurmak için (yani, ilk giriş ise) veya daha sonra login_date - tmp >= 5
became_active = login_date
eğer; aksi halde, tmp
numaralı en son değere gidin ve aynı kuralı uygulayın. Bu, uygulamak için bir yol hayal etmekte zorlandığım özyinelemeli bir yaklaşımı öneriyor.
Sorularım: Bu uygulanabilir bir yaklaşım mıdır, eğer öyleyse, nasıl geri dönüp nasıl durduğumu bulana kadar tmp
'un önceki değerlerine nasıl bakabilirim? Bilgim olsun, bir Spark SQL Column
'un değerleriyle yinelemem. Bu sonucu elde etmenin başka bir yolu var mı?