2017-02-24 21 views
6

Bu, örnek olarak açıklanması muhtemelen en kolay yoldur. Örneğin, bir web sitesine kullanıcı girişlerinin bir DataFrame olduğunu varsayalım:Kıvılcım koşullu Spark SQL pencere işlevi

scala> df.show(5) 
+----------------+----------+ 
|  user_name|login_date| 
+----------------+----------+ 
|SirChillingtonIV|2012-01-04| 
|Booooooo99900098|2012-01-04| 
|Booooooo99900098|2012-01-06| 
| OprahWinfreyJr|2012-01-10| 
|SirChillingtonIV|2012-01-11| 
+----------------+----------+ 
only showing top 5 rows 

onlar sitede aktif kullanıcı olunca buna gösteren bir sütun eklemek istiyorum. Ancak bir uyarı var: Bir kullanıcının aktif olarak kabul edildiği bir zaman aralığı var ve bu süre sonunda tekrar giriş yapacaklarsa became_active tarih sıfırlar. Bu sürenin 5 gün olduğunu varsayalım. Sonra yukarıdaki tablodan elde edilen istenen tablo bu gibi bir şey olacaktır: Aktif süresi sona sonra ikinci giriş geldiği için

+----------------+----------+-------------+ 
|  user_name|login_date|became_active| 
+----------------+----------+-------------+ 
|SirChillingtonIV|2012-01-04| 2012-01-04| 
|Booooooo99900098|2012-01-04| 2012-01-04| 
|Booooooo99900098|2012-01-06| 2012-01-04| 
| OprahWinfreyJr|2012-01-10| 2012-01-10| 
|SirChillingtonIV|2012-01-11| 2012-01-11| 
+----------------+----------+-------------+ 

Yani, özellikle, SirChillingtonIV en became_active tarih sıfırlandı ama Booooooo99900098 en became_active tarih sıfırlanmaz edildi giriş yaptığı ikinci kez, çünkü aktif süre içinde düştü.

İlk düşüncem lag ile pencere işlevlerini kullanmak ve became_active sütununu doldurmak için lag ged değerlerini kullanmak; Örneğin, bir şey kabaca gibi başlayan: tmpnull ise

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 

val window = Window.partitionBy("user_name").orderBy("login_date") 
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window) 

Sonra kural, became_active tarih olacağını doldurmak için (yani, ilk giriş ise) veya daha sonra login_date - tmp >= 5became_active = login_date eğer; aksi halde, tmp numaralı en son değere gidin ve aynı kuralı uygulayın. Bu, uygulamak için bir yol hayal etmekte zorlandığım özyinelemeli bir yaklaşımı öneriyor.

Sorularım: Bu uygulanabilir bir yaklaşım mıdır, eğer öyleyse, nasıl geri dönüp nasıl durduğumu bulana kadar tmp'un önceki değerlerine nasıl bakabilirim? Bilgim olsun, bir Spark SQL Column'un değerleriyle yinelemem. Bu sonucu elde etmenin başka bir yolu var mı?

cevap

10

İşte bir numaradır. yeni oturumların başladığı

val userWindow = Window.partitionBy("user_name").orderBy("login_date") 
val userSessionWindow = Window.partitionBy("user_name", "session") 

noktaları bulun:

val result = sessionized 
    .withColumn("became_active", min($"login_date").over(userSessionWindow)) 
    .drop("session") 
:

val newSession = (coalesce(
    datediff($"login_date", lag($"login_date", 1).over(userWindow)), 
    lit(0) 
) > 5).cast("bigint") 

val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) 

oturum başına en erken tarihi bulun

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum} 

pencere tanımlayın: fonksiyonların bir demet aktarma

Veri kümesi tanımlı edilmiştir:

val df = Seq(
    ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), 
    ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
    ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), 
    ("SirChillingtonIV", "2012-08-11") 
).toDF("user_name", "login_date") 

sonucudur:

+----------------+----------+-------------+ 
|  user_name|login_date|became_active| 
+----------------+----------+-------------+ 
| OprahWinfreyJr|2012-01-10| 2012-01-10| 
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user 
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user 
|SirChillingtonIV|2012-01-14| 2012-01-11| 
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user 
|Booooooo99900098|2012-01-04| 2012-01-04| 
|Booooooo99900098|2012-01-06| 2012-01-04| 
+----------------+----------+-------------+