脚本宝典收集整理的这篇文章主要介绍了spark性能优化(一),脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
spark:2.4.0
服务器:5台(8核32G)
%%init_spark
launcher.master = "yarn"
launcher.conf.spark.app.name = "BDP-xw"
launcher.conf.spark.driver.cores = 1
launcher.conf.spark.driver.memory = '1g'
launcher.conf.spark.executor.instances = 3
launcher.conf.spark.executor.memory = '1g'
launcher.conf.spark.executor.cores = 2
launcher.conf.spark.default.parallelism = 5
launcher.conf.spark.dynamicallocation.enabled = False
import org.apache.spark.SQL.SparkSession
VAR NumExecutors = spark.conf.getOption("spark.num_executors").rePR
var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
var AppName = spark.conf.getOption("spark.app.name").repr
var max_buffer = spark.conf.getOption("spark.kryoserializer.buffer.max").repr
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.LOG4j.{Level, Logger}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.functions.{uDF, _}
import org.apache.spark.{SparkConf, SparkContext}
object LoadingData_From_files{
def main(args: Tuple2[String, Array[String]]=Tuple2(hdfs_file, etl_date:Array[String])): Unit = {
for( a <- etl_date){
val hdfs_file_ = s"$hdfs_file" + a
val rdd_20210113 = spark.sparkContext.textFile(hdfs_file_).cache()
val num1 = rdd_20210113.count
println(s"加载数据啦:$a RDD的数据量是$num1")
}
val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210328").cache()
var num1 = rdd_20210113_test.count()
println(s"加载数据啦:20210113 RDD的数据量是$num1")
rdd_20210113_test.unpersist() // 解除持久化
val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF.cache()
num1 = df_20210420.count() // 指定memory之后,cache的数量太多之前cache的结果会被干掉
println(s"加载数据啦:20210420 DataFrame的数据量是$num1")
}
}
// 配置参数multiple_duplicated
val hdfs_file = "hdfs://path/etl_date="
val etl_date = Array("20210113","20210112","20210112","20210112","20210112","20210112", "20210113")
LoadingData_from_files.main(hdfs_file, etl_date)
Memory
的,而DataFrame的缓存方式都是Memory and disk
的import org.apache.spark.sql.SparkSession
var NumExecutors = spark.conf.getOption("spark.num_executors").repr
var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
var AppName = spark.conf.getOption("spark.app.name").repr
object LoadingData_from_files{
def main(args: Tuple2[String, Array[String]]=Tuple2(hdfs_file, etl_date:Array[String])): Unit = {
for( a <- etl_date){
val hdfs_file_ = s"$hdfs_file" + a
val rdd_20210113 = spark.sparkContext.textFile(hdfs_file_).cache()
val num1 = rdd_20210113.count
println(s"加载数据啦:$a RDD的数据量是$num1")
}
val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210328").cache()
var num1 = rdd_20210113_test.count()
println(s"加载数据啦:20210328 RDD的数据量是$num1")
rdd_20210113_test.unpersist() // 解除持久化
val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF.cache()
num1 = df_20210420.count() // 指定memory之后,cache的数量太多之前cache的结果会被干掉
println(s"加载数据啦:20210420 DataFrame的数据量是$num1 n当前环境下cache的个数及id为:")
spark.sparkContext.getPersistentRDDs.foreach(i=>println("cache的id:" + i._1))
}
}
// 无配置参数multiple_duplicated
val hdfs_file = "hdfs://path/etl_date="
val etl_date = Array("20210113","20210112","20210112","20210112","20210112","20210112", "20210113" )
LoadingData_from_files.main(hdfs_file, etl_date)
得到结果如下:
结果分析
spark.sparkContext.getPersistentRDDs.foreach(i=>println(i._1))
spark.sparkContext.getPersistentRDDs.foreach(i=>{i._2.unpersist()})
//RDD
rdd.getNumPartitions
rdd.partitions.length
rdd.partitions.size
// For DataFrame, convert to RDD First
df.rdd.getNumPartitions
df.rdd.partitions.length
df.rdd.partitions.size
Memory
val hdfs_file = "hdfs://path1/etl_date="
val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210113").cache()
// 文件大小为1.5G
rdd_20210113_test.getNumPartitions
// res2: Int = 13
val rdd_20210113_test_par1 = rdd_20210113_test.repartition(5)
rdd_20210113_test_par1.partitions.size
// res9: Int = 5
val rdd_20210113_test_par2 = rdd_20210113_test_par1.coalesce(13)
rdd_20210113_test_par2.partitions.length
// res14: Int = 5 增加分区没生效
val rdd_20210113_test_par3 = rdd_20210113_test_par1.coalesce(3)
rdd_20210113_test_par3.partitions.length
// res16: Int = 3 增加分区生效
默认cache的级别是Memory and Disk
val hdfs_file = "hdfs://path1/etl_date="
val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF().cache()
df_20210420.rdd.getNumPartitions
// res18: Int = 13
val df_20210420_par1 = df_20210420.repartition(20)
df_20210420_par1.rdd.getNumPartitions
// res19: Int = 20 增加分区生效
val df_20210420_par2 = df_20210420_par1.coalesce(5)
df_20210420_par2.rdd.getNumPartitions
// res20: Int = 5
import org.apache.spark.storage.StorageLevel._
// MEMORY_AND_DISK
val hdfs_file = "hdfs://path1/etl_date="
var etl_date = "20210113"
var hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_DISK_MEMORY = spark.sparkContext.textFile(hdfs_file_).persist(MEMORY_AND_DISK)
println("DISK_ONLY数据量为" + rdd_20210113_DISK_MEMORY.count())
// MEMORY_ONLY
etl_date = "20210112"
hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_MEMORY_ONLY = spark.sparkContext.textFile(hdfs_file_).persist(MEMORY_ONLY)
println("MEMORY_ONLY数据量为" + rdd_20210113_MEMORY_ONLY.count())
// DISK_ONLY
etl_date = "20210328"
hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_DISK_ONLY = spark.sparkContext.textFile(hdfs_file_).persist(DISK_ONLY)
println("DISK_ONLY数据量为" + rdd_20210113_DISK_ONLY.count())
// DISK_ONLY数据量为4298617
// MEMORY_ONLY数据量为86340
// DISK_ONLY数据量为20000
优化方面说明
tips
// 计算driver Memory的
// spark 分配的实际资源情况
def getSparkMemory():Float={
val driver_memory_set1 = sc.getConf.getSizeAsBytes("spark.driver.memory")/1024/1024/1024
val SystemMemory = Runtime.getRuntime.maXMemory.toFloat///1024/1024/1024
// fixed amount of memory for non-storage, non-execution purposes
val reservedMemory = 300 * 1024 * 1024
// minimum system memory required
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
val usableMemory = systemMemory - reservedMemory
val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
val maxMemory = (usableMemory * memoryFraction).toLong
import org.apache.spark.network.util.JavaUtils
val allocateMemory = JavaUtils.byteStringAsMb(maxMemory + "b")
println(f"driver_memory: $driver_memory_set1%1.1f, allocateMemory: $allocateMemory%1.1f,")
maxMemory
}
val maxMemory = getSparkMemory()
// driver_memory: 2.0, allocateMemory: 912.0,
// // 查看 spark web ui资源情况
def formatBytes(bytes: Double) = {
val k = 1000
val i = math.floor(math.log(bytes) / math.log(k))
val maxMemoryWebUI = bytes / math.pow(k, i)
f"$maxMemoryWebUI%1.1f"
}
println(formatBytes(maxMemory))
// 956.6
def allocateMemory(executorMemory:Float=1, executors:Float=1, driverMemory:Float=1):Double={
val driver_overmemory = Array(384, driverMemory * 0.07).max
val executor_Memory = Array(384, executorMemory * 0.07).max
val allocateMemory = (driver_overmemory + driverMemory) + executors * (executor_Memory + executorMemory)
allocateMemory/1024
}
allocateMemory(1 * 1024, 16, 1 * 1024)
// res3: Double = 23.375
http://ip:8088/proxy/application_jobid/executors/
http://ip:8088/cluster/apps
以上是脚本宝典为你收集整理的spark性能优化(一)全部内容,希望文章能够帮你解决spark性能优化(一)所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。