在spark中处理数据,将处理好的数据保存到mysql中,如果直接处理RDD数据,将其循环使得每一条数据都能插入到数据库中,如果数据量不大的情况下,可以使用。但是针对大数据,处理的数据是海量的,所以每次循环一条数据都要创建新的数据库连接,就会非常耗时,如果把数据库的连接放在外面,这样又造成了算子内外变量的问题,所以我们用foreachPartition来优化,这是每个分区建立一次数据库连接,然后再在每个分区内迭代器循环,这样极大减少了连接次数,提高了性能
代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import java.sql.{Connection, DriverManager, PreparedStatement}
object Demo20ToMysql {
def main(args: Array[String]): Unit = {
//1、创建spark的执行环境
val conf = new SparkConf()
//设置运行模式
conf.setMaster("local")
conf.setAppName("wc")
val sc = new SparkContext(conf)
//2、读取数据
//RDD:弹性的分布式数据集(相当于List)
val linesRDD: RDD[String] = sc.textFile("data/lines.txt")
//一行转换多行
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))
val kvRD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
//统计单词的数量
val countRDD: RDD[(String, Int)] = kvRD.reduceByKey((x, y) => x + y)
val start: Long = System.currentTimeMillis()
/* //1 创建数据库连接
//数据库连接不能在网络中传输
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456")
val end: Long = System.currentTimeMillis()
println(end - start)
//保存到数据库中,使用JDBC
countRDD.foreach {
case (word, count) =>
//2 编写sql插入数据
val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")
stat.setString(1, word)
stat.setInt(2, count)
stat.execute()
}
con.close()*/
//foreachPartition: 训练分区
countRDD.foreachPartition(iter => {
//1 创建数据库连接
//每一个分区创建一个数据库连接
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456")
val end: Long = System.currentTimeMillis()
println(end - start)
//在分区内循环
iter.foreach {
case (word, count) =>
//2 编写sql插入数据
val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")
stat.setString(1, word)
stat.setInt(2, count)
stat.execute()
}
con.close()
})
}
}
标签:Saprk,count,stat,word,val,RDD,forachPartition,数据库,插入
From: https://blog.csdn.net/ABU009/article/details/143129814