2015-06-30 24 views
5

CSV dosyasını okumak ve veri çerçevesini oluşturmak için çalışıyorum Spark.Ayrıştırma tarih saat Zeppelin, CSV bilgi ve

darbe gibi CSV biçimi. Veri/zaman dizisi gösterimi için ISO8602 tarih/saat formatını kullandım.

2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1 
2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1 
...... 

bu verileri yüklemek için, ben aşağıda gibi hata yapan

import org.apache.spark.sql.types.DateType 
import org.apache.spark.sql.functions._ 
import org.joda.time.DateTime 
import org.joda.time.format.DateTimeFormat 
import sys.process._ 

val logCSV = sc.textFile ("log_table.csv") 

case class record(
    callingTime:DateTime, 
    userID:String, 
    CID:String, 
    serverConnectionTime:DateTime, 
    serverResponseTime:DateTime, 
    connectedAgentID:String, 
    beginCallingTime:DateTime, 
    endCallingTime:DateTime, 
    Succeed:Int) 


val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss") 

val logTable = logCSV.map(s => s.split(",")).map(
    s => record(
      formatter.parseDateTime(s(0)), 
      s(1), 
      s(2), 
      formatter.parseDateTime(s(3)), 
      formatter.parseDateTime(s(4)), 
      s(5), 
      formatter.parseDateTime(s(6)), 
      formatter.parseDateTime(s(7)),    
      s(8).toInt 
     ) 
).toDF() 

aşağıda gibi Zeppelin, scala kodu yazdım. Ana konu DateTime serileştirilemez.

logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169 
defined class record 
formatter: org.joda.time.format.DateTimeFormatter = [email protected] 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:286) 

Scala'da tarih/saat bilgilerinin nasıl ele alınacağını merak ediyorum. Bana yardım eder misin?

+0

Verileri https://github.com/databricks/spark-csv –

+0

Ah yüklemek için kıvılcım-csv deneyebilirsiniz, iyi bir çözüm olabilir. Deneyeceğim. –

cevap

2

Eğer DateTimeFormatter ait parseMillis yöntemini kullanırsanız bir DateTime, serialiable olmasa da, sen ücretsiz Serializable olan Long, köprülü olan uzun alırsınız. DateTime'ı Long'dan geri almak için DateTime(longInstance.longValue()) yapıcısını kullanın.

1

Sen biçimlendirici serializeable değil meydan çalıştırmak. Bunun yerine, biçimlendiriciyi haritanın içinde oluşturabilir (veya mapPartitions'ı kullanabilir ve bunu MapPartitions'ın içine yerleştirebilirsiniz, böylece yalnızca bölüm başına bir biçimlendirici oluşturmanız gerekir).

+0

Harita içindeki biçimlendiriciyi oluşturmak, bu sorunu gideremedi. : Ayrıca (Spark SQL DateTimeUtils bakınız) SQL'ın dahili tarih saat biçimini Spark için Joda DateTime dönüştürmek gerekebilir gibi < –

+0

bu baktığımızda, öyle görünüyor. – Holden

1

herkesten cevaplar için teşekkür ederiz !! Timestamp'u kullanmaya karar verdim çünkü serileştirmek mümkün ve Dataframe bunu destekliyor. Aşağıdaki gibi kodu revize ettim.

import java.sql.Timestamp 

case class Record(
    callingTime:Timestamp, 
    userID:String, 
    CID:String, 
    succeed:Int) 


val dataFrame = logCSV.map(_.split(",")).map(
    r => Record(
      Timestamp.valueOf(r(0).replace("T", " ")), 
      r(1), 
      r(2), 
      r(10).toInt 
     ) 
    ).toDF() 
dataFrame.registerTempTable("dataFrame") 

benim verilerinde tarih/saat biçimi ISO8601 olduğunu. Bu yüzden Timestamp için 'T' değerini değiştirmem gerekiyor. Daha sonra Timestamp.valueof kullanılabilir.

İlgili konular