Spark性能优化

Spark性能优化

性能优化

对于某些工作负载,可以通过在内存中缓存数据或打开一些实验性选项来提高性能。

在内存中缓存数据

Spark SQL可以通过调用spark.catalog.cacheTable("tableName")dataFrame.cache()来使用内存中的列状格式缓存表。然后Spark SQL将只扫描所需的列,并自动调优压缩,以最小化内存使用和GC压力。可以调用spark.catalog.uncacheTable("tableName")从内存中删除表。

可以使用SparkSession上的setConf方法或使用SQL运行SET key=value命令来配置内存缓存。

属性名 默认值 描述
spark.sql.inMemoryColumnarStorage.compressed true 当设置为true时,Spark SQL将根据数据的统计信息自动为每一列选择压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列缓存的批大小。更大的批处理大小可以提高内存利用率和压缩,但是在缓存数据时存在oom风险。

其他配置选项

属性名 默认值 描述
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时装入单个分区的最大字节数
spark.sql.files.openCostInBytes 4194304 (4 MB) 打开一个文件的估计成本,以字节数衡量,可以在同一时间扫描。这是在将多个文件放入一个分区时使用的。最好是高估它,那么带有小文件的分区将比带有大文件的分区(首先是调度的)更快
spark.sql.broadcastTimeout 300 以秒为单位的广播连接中的广播等待时间超时
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行联接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为-1,可以禁用广播。注意,目前统计数据只支持Hive Metastore表,其中命令ANALYZE TABLE COMPUTE statistics noscan已经运行
spark.sql.shuffle.partitions 200 配置在为连接或聚合洗牌数据时要使用的分区数

要想了解更多关于Spark的使用,请查看本人下面博客:

  1. Spark的使用
  2. Python处理各种Spark数据源
  3. Spark性能优化