意识篇
-
类型转换
优化前:val extractFields: Seq[Row] => Seq[(String, Int)] = { (rows: Seq[Row]) => { var fields = Seq[(String, Int)]() rows.map(row => { fields = fields :+ (row.getString(2), row.getInt(4)) }) fields } }
优化后:
val extractFields: Seq[Row] => Seq[(String, Int)] = { (rows: Seq[Row]) => rows.map(row => (row.getString(2), row.getInt(4))).toSeq }
过程式编程变为函数式编程
关注主要业务逻辑,最终一次性完成类型转换。
原理篇
-
数据行的操作不要在Driver端执行
服务端才是分布式,从服务端拉数据过来不仅有网络消耗,而且硬生生把分布式操作改为本地操作,失去了大数据分布式分而治之的思想,Driver端资源更有限,会处理更慢。
优化前:
def createInstance(factDF: DataFrame, startDate: String, endDate: String): Dat val instanceDF = factDF .filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate)) .groupBy("dim1", "dim2", "dim3", "event_date") .agg(sum("value") as "sum_value") instanceDF } pairDF.collect.foreach{ case (startDate: String, endDate: String) => val instance = createInstance(factDF, startDate, endDate) val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}" instance.write.parquet(outPath) }
这种写法会把数据拉回到Driver端一句一句操作,会很慢。
优化后:val instances = factDF .join(pairDF, factDF("eventDate") > pairDF("startDate") && factDF("eventDate") .groupBy("dim1", "dim2", "dim3", "eventDate", "startDate", "endDate") .agg(sum("value") as "sum_value") instances.write.partitionBy("endDate", "startDate").parquet(rootPath)
这种只会分区落盘时拉取数据一次。
TipsSpark 延迟计算的 Actions 算子主要有两类:
-
一类是将分布式计算结果直接落盘的操作,
如 DataFrame 的 write、RDD 的 saveAsTextFile 等; -
另一类是将分布式结果收集到 Driver 端的操作,
如 first、take、collect。
-