İki veri çerçevesi arasında dış birleştirmeyi yaptığım bir kıvılcım var. İlk veri çerçevesinin boyutu 260 GB'dir, dosya formatı 2200 dosyaya bölünmüş metin dosyalarıdır ve ikinci veri çerçevesinin boyutu 2GB'dir.EMR'de kıvılcım işini hızlıca yazmak için nasıl yapılır S3
Bu iki dosyayı veri çerçevesine yüklemek 10 dakika sürer.
Daha sonra S3'e yaklaşık 260 GB olan veri çerçeve çıktısının yazılması yaklaşık 1 saat sürer.
İşte küme bilgilerim. Burada
emr-5.9.0
Master:1m3.2xlarge
Core:c3.4large 5 machines
Bu benim kod ben burada
[
{
"Classification": "spark-defaults"
, "Properties": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
},{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.maxConnections": "200"
}
}
]
kuruyorum benim küme yapılandırma olduğunu
CPU:16
RAM:30
DISK:2 × 160 GB SSD
Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2
her c3.4xlarge makinesinin detay
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN"))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name))
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer")
.select($"FundamentalSeriesId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"),
when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"),
when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"),
when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"),
when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"),
$"AuditID_1").otherwise($"AuditID").as("AuditID"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))
val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq
val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|")
val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear")
.format("csv")
.option("nullValue", "")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FundamentalAnalytic/output")
I Buna ek olarak, önce HDFS'ye veri yazmayı denedim, ancak aynı zamanda HDFS dizinine yazmak için de aynı zaman alıyor (S3'ten 4 dakika daha az).
DAG
ekleme SQL Fiziksel planını eklemeBellek İşimin son saat kullanılmış
012 İşte bazı iş günlükleriI looking into the job execution on the below clusters .Here were some on my observations :
Majority time is being consumed at RDD getting spilling in the disk .
#########
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far)
#########
This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size :
#########################
17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead)
#########################
Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled .
You will notice the same in the nodemanager Logs :
2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used
Web UI'den mantıksal planları gösterin. Bana/insanlara kodun ne yaptığına dair fikir vermek için ekran görüntüleri yapın. Teşekkürler. –
Spark'in web kullanıcı arayüzünden gelen sorgular için fiziksel planlar sordum. Bu, sorgularınızın tam olarak ne yaptığını bilmeme yardımcı olmalı. –
@JacekLaskowski Ah alright .. Gangelia'dakileri görebilir miyim? – SUDARSHAN