首页 > 数据库 >spark在针对MySQL数据库主键对其余字段进行更新

spark在针对MySQL数据库主键对其余字段进行更新

时间:2022-11-02 10:04:44浏览次数:45  
标签:String val sql ID statement MySQL spark 主键

package com.ustcinfo.SDK

import java.sql.{Connection, PreparedStatement}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.sql._

object SDK_number {

val spark: SparkSession = SparkSession
.builder()
.appName("SDK数据入库")
.master("local[2]")
.config("spark.debug.maxToStringFields", 1000)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

import spark.implicits._ //导入隐式转换
import org.apache.spark.sql.functions._ //导入function函数

def main(args: Array[String]): Unit = {

val str = args(0)
val gbkPath = s"/bdtj/test/app$str.txt"

val res_mysql: DataFrame = spark.sparkContext.hadoopFile(gbkPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
  .map(p => new String(p._2.getBytes, 0, p._2.getLength, "UTF-8"))
  //.filter(_.split("\\|\\+\\|", -1).length == 4)
  .map(lines => {
  val line: Array[String] = lines.split("\\|\\+\\|", -1)
  (line(0).drop(1), line(1), line(2), line(3))
}
).toDF("ID", "FIVE_ID", "THREE_ID", "TESTTIME")
res_mysql.show(100, false)

//数据以DF格式进行foreachPartition对每一条数据进行遍历
res_mysql.select("ID", "FIVE_ID", "THREE_ID").foreachPartition(f => {
val connection: Connection = createConnection()//调用createConnection方法,实现对数据库连接
connection.setAutoCommit(false)
//设置connection.setautocommit(false);只有程序调用connection.commit()的时候才会将先前执行的语句一起提交到数据库,这样就实现了数据库的事务。
//true:sql命令的提交(commit)由驱动程序负责
//false:sql命令的提交由应用程序负责,程序必须调用commit或者rollback方法
val sql: String = "insert into sdk_grid_num(ID,FIVE_ID,THREE_ID)" +
"value(?,?,?) on duplicate key update FIVE_ID=?,THREE_ID=?"

  println("更新数据库中的数据")

  val statement: PreparedStatement = connection.prepareStatement(sql)
  f.foreach(row => {
      //这里注意更新插入字段顺序要一直
    statement.setString(1, row.getAs[String]("ID"))
    statement.setString(2, row.getAs[String]("FIVE_ID"))
    statement.setString(3, row.getAs[String]("THREE_ID"))
    statement.setString(5, row.getAs[String]("FIVE_ID"))
    statement.setString(6, row.getAs[String]("THREE_ID"))
    statement.addBatch()
  })
  statement.executeBatch()
  connection.commit()
  statement.close()
  connection.close()
})
res_mysql.unpersist()

}

def createConnection(): Connection = {
Class.forName("com.mysql.cj.jdbc.Driver").newInstaabcsnce()
java.sql.DriverManager.getConnection("jdbc:mysql://132.123.13.113:3306/ads?useUnicode=true&characterEncoding=utf8", "abc", "123456")
}

}

标签:String,val,sql,ID,statement,MySQL,spark,主键
From: https://www.cnblogs.com/xiguabigdata/p/16850035.html

相关文章

  • mysql删库报错
    3.开发人员测试环境删库报错#解决:在数据库的物理目录中(mysql的data目录),进入要删除的数据库目录,查看是否有文件存在,若存在,使用rm-rf命令清除;再次执行删除数据库命令即......
  • spark读取文件方式
    一、调用hadoopfile方法读取TXT文件,针对复杂的分割方式,例如|+|,;等valgbkPath=s"/bdtj/line/DD_OUT_NOW_LV_$month.txt"//文件路径//将gbkPath以参数的形式传入进行读取......
  • linux下使用mysql
    linux下使用mysql1.登录mysqlMySQL-uroot-p123456#-u后面跟的是用户名-p后面跟的是密码2.查看所有数据库showdatabases;3.......
  • MySQL函数
    字符串函数 例子 数值函数 例子日期函数  流程函数 注意:空值不为NULL 案例1 案例2 总结.......
  • MySQL加固
    MySQL加固1.口令加固1.1old_passwords环境变量设置old_passwords决定了使用PASSWORD()函数和IDENTIFIEDBY、CREATEUSER、GRANT等语句时的hash算法:0-authen......
  • Centos离线安装JDK+Tomcat+MySQL8.0+Nginx
    一、安装JDK注:以下命令环境在Xshell中进行。1、查询出系统自带的OpenJDK及版本rpm-qa|grepjdk2、如果显示已安装openjdk则对其进行卸载。#卸载rpm-e--nodeps......
  • MySQL 知识点小结
    ------------------操作mysql的命令--------------------cmd命令行中查看mysql版本:mysql-Vmysql--version登陆mysql:mysql-uroot-pluismysql-uroot-p在mysql......
  • mysql双主架构解决自增冲突
    Mysql双主自增长冲突处理 多主互备和主从复制有一些区别,因为多主中都可以对服务器有写权限,所以设计到自增长重复问题 出现的问题(多主自增长ID重复) 1:首先我们通过A......
  • MySQL查询数据(多表查询)
    准备工作,新建名为students的数据,三张表分别是student,courses,stu_cou,并创建外键约束,级联删除更新,插入数据。/*创建数据库*/createdatabaseifnotEXISTSstudentscha......
  • MySQL基础语句
    DDL(DataDefinitionLanguage) DML(DataManipulateLanguage) DQL(DataQueryLanguage)聚合函数聚合函数作用于字段,而且NULL值不参与计算分组查询 具体......