2015-08-21 30 views
5

numaralı i-1 numaralı i-1 ile col2 (col1 sıralanır) karşılaştırmak istiyorum.Birden çok satır nasıl karşılaştırılır?

i-inci sıranın item_i ve item_[i-1]_row farklıysa bir iki satır tarama, ben, yukarıdaki örnekte 1.

+--------------+ 
| col1 col2 | 
+--------------+ 
| row_1 item_1 | 
| row_2 item_1 | 
| row_3 item_2 | 
| row_4 item_1 | 
| row_5 item_2 | 
| row_6 item_1 | 
+--------------+ 

tarafından item_[i-1] sayısını artırmak istiyorum Zaman aşağı doğru, row_2 ve row_3'un farklı olduğunu görüyoruz, bu yüzden bir tane item_1'e ekliyoruz. Sonra, row_3'un row_4'dan farklı olduğunu ve ardından item_2'a bir tane eklediğini görüyoruz.

+-------------+ 
| col2 col3 | 
+-------------+ 
| item_1 2 | 
| item_2 2 | 
+-------------+ 

cevap

8

Bunu yapmak için bir pencere fonksiyonu ve bir agrega bileşimi kullanabilirsiniz: biz ile bitirmek kadar devam edin. Pencere fonksiyonu col2'un bir sonraki değerini almak için kullanılır (sipariş için col1 kullanarak). Agrega, bir farkla karşılaştığımız zamanları sayar. Bu aşağıdaki kodda uygulanır:

val data = Seq(
    ("row_1", "item_1"), 
    ("row_2", "item_1"), 
    ("row_3", "item_2"), 
    ("row_4", "item_1"), 
    ("row_5", "item_2"), 
    ("row_6", "item_1")).toDF("col1", "col2") 

import org.apache.spark.sql.expressions.Window 
val q = data. 
    withColumn("col2_next", 
    coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")). 
    groupBy($"col2"). 
    agg(sum($"col2" =!= $"col2_next" cast "int") as "col3") 

scala> q.show 
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 
+------+----+ 
| col2|col3| 
+------+----+ 
|item_1| 2| 
|item_2| 2| 
+------+----+ 
İlgili konular