2016-04-13 19 views
2

Bir foreach döngüsünde çeşitli SELECT deyimleriyle bir DF'nin içeriği üzerinde yineleme yapmam gerekiyor; çıktı metin dosyalarına yazılıyor. Foreach döngüsündeki herhangi bir SELECT ifadesi bir NullPointerException döndürür. Bunun neden olduğunu göremiyorum. "For" döngüsü içindeki SELECT deyimi bu hatayı döndürmez.Spark scala: Bir foreach döngüsünde SELECT komutu döndürür java.lang.NullPointerException

Bu, deneme durumudur.

// step 1 of 6: create the table and load two rows 
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1  varchar(4) 
,username varchar(5) 
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""") 

// step 2 of 6: confirm that the data is queryable 
vc.sql("SELECT * FROM TEST1").show() 
+----+--------+-------+ 
| c1|username|numeric| 
+----+--------+-------+ 
|col1| USER1|  0| 
|col1| USER2|  1| 
+----+--------+-------+ 

// Step 3 of 6: create a dataframe for the table 
var df=vc.sql("""SELECT * FROM TEST1""") 


// step 4 of 6: create a second dataframe that we will use as a loop iterator 
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """) 

// step 5 of 6: first foreach loop works ok: 
df_usernames.foreach(t => 
    { 
     println("(The First foreach works ok: loop iterator t is " + t(0).toString()) 
    } 
) 
(The First foreach works ok: loop iterator t is USER1 
(The First foreach works ok: loop iterator t is USER2 

// step 6 of 6: second foreach with any embedded SQL returns an error 
df_usernames.foreach(t => 
    { 
     println("(The second foreach dies: loop iterator t is " +  t(0).toString()) 
     vc.sql("""SELECT c1 FROM TEST1""").show() 
    } 
)  
The second foreach dies: loop iterator t is USER1 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158  in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195) 

cevap

1

Bu yapılamaz. Sen

>>> df_usernames.collect.foreach(
... lambda x: sqlContext.sql("""SELECT c1 FROM TEST1""").show()) 
+1

ilk Bu OP kullanıyordum aynı foreach düşünce değildir ve önem düzeyi ve verilerin boyutunu bilmeden toplamak için iyi bir uygulama değil toplamak uğramadan foreach içindeki SQL sorgusu başlatılamaz. Örnek olarak 2M kullanıcılarına izin vermeniz durumunda ölçeklendirilmez. – eliasah

+0

Toplama kullanmadan bunu başarmanın bir yolu var mı? RDD'deki her "satır" için, mevcut verilerle (SparkSession.sql'den yükleyebileceğim) karşılaştırmam gerekiyor. – KangarooWest

İlgili konular