需求:将有多个分区的 RDD 中的数据保存到数据库
ps:针对分区进行数据库操作的话,建议使用 foreachPartition 算子,每个分区创建一个 Connection,避免一个 Connection 被多个分区使用而造成的序列化的麻烦。
def saveToMySql(it:Iterator[(String, String)]) = {
var conn:Connection = null
var ps:PreparedStatement = null
try {
conn = DriverManager.getConnection("jdbc:mysql://qujianlei/spark", "root", "Welcome_1")
ps = conn.prepareStatement("insert into result values(?, ?)")
it.foreach(x=>{
ps.setString(1, x._1)
ps.setString(2, x._2)
ps.executeUpdate()
})
} catch {
case e1:Exception => println("Some Exception Happened! " + e1.getMessage)
} finally {
if (ps != null) ps.close
if (conn != null) conn.close
}
}
调用分区的:rdd.foreachPartition(saveToMySql)