首页 > 其他分享 >客快物流大数据项目(一百):ClickHouse的使用 spark操作ClickHouse代码

客快物流大数据项目(一百):ClickHouse的使用 spark操作ClickHouse代码

时间:2024-03-29 17:56:09浏览次数:32  
标签:case String val 客快 clickhouse sql spark ClickHouse

ClickHouse的使用

一、使用Java操作ClickHouse

1、构建maven工程

2、​​​​​​​导入依赖

<!-- Clickhouse -->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.2</version>
</dependency>

 

3、​​​​​​​​​​​​​​创建包结构

java程序包目录创建

包名

说明

cn.it.clickhouse

代码所在的包目录

4、代码案例

package cn.it.demo;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 使用JDBC操作ClickHouse
 */
public class ClickHouseJDBC {
    public static void main(String[] args) {
        String sqlDB = "show databases";//查询数据库
        String sqlTab = "show tables";//查看表
        String sqlCount = "select count(*) count from ontime";//查询ontime数据量
        exeSql(sqlDB);
        exeSql(sqlTab);
        exeSql(sqlCount);
    }

    public static void exeSql(String sql){
        String address = "jdbc:clickhouse://node2:8123/default";
        Connection connection = null;
        Statement statement = null;
        ResultSet results = null;
        try {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
            connection = DriverManager.getConnection(address);
            statement = connection.createStatement();
            long begin = System.currentTimeMillis();
            results = statement.executeQuery(sql);
            long end = System.currentTimeMillis();
            System.out.println("执行("+sql+")耗时:"+(end-begin)+"ms");
            ResultSetMetaData rsmd = results.getMetaData();
            List<Map> list = new ArrayList();
            while(results.next()){
                Map map = new HashMap();
                for(int i = 1;i<=rsmd.getColumnCount();i++){
                    map.put(rsmd.getColumnName(i),results.getString(rsmd.getColumnName(i)));
                }
                list.add(map);
            }
            for(Map map : list){
                System.err.println(map);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {//关闭连接
            try {
                if(results!=null){
                    results.close();
                }
                if(statement!=null){
                    statement.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

 

二、使用Spark操作ClickHouse

1、导入依赖

<repositories>
    <repository>
        <id>mvnrepository</id>
        <url>https://mvnrepository.com/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>elastic.co</id>
        <url>https://artifacts.elastic.co/maven</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.11</scala.version>
    <!-- Spark -->
    <spark.version>2.4.0-cdh6.2.1</spark.version>
    <!-- Parquet -->
    <parquet.version>1.9.0-cdh6.2.1</parquet.version>
    <!-- ClickHouse -->
    <clickhouse.version>0.2.2</clickhouse.version>
    <jtuple.version>1.2</jtuple.version>
</properties>

<dependencies>
    <!-- Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-graphx_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>net.jpountz.lz4</groupId>
        <artifactId>lz4</artifactId>
        <version>1.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.javatuples</groupId>
        <artifactId>javatuples</artifactId>
        <version>${jtuple.version}</version>
    </dependency>
    <!-- Clickhouse -->
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>${clickhouse.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

 

2、​​​​​​​​​​​​​​创建包结构

scala程序包目录创建

包名

说明

cn.it.clickhouse

代码所在的包目录

3、案例开发

实现步骤:

  • 创建ClickHouseJDBCDemo单例对象
  • 初始化spark运行环境
  • 加载外部数据源(资料\order.json)生成DataFrame对象

代码实现

package cn.it.demo

import cn.it.demo.utils.ClickHouseUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 使用jdbc方式操作clickhouse表
 */
object ClickHouseJDBCDemo {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //读取json文件 创建DataFrame
    val df: DataFrame = spark.read.json("E:\\input\\order.json")
    df.show()

    spark.stop()
  }
}

 

3.1、创建表

实现步骤:

  • 创建ClickHouseUtils工具类
  • 创建方法:clickhouse的连接实例,返回连接对象
  • 创建方法:生成表的sql字符串
  • 创建方法:执行更新操作
  • ClickHouseJDBCDemo单例对象中调用创建表

实现方法:

  • 创建ClickHouseUtils工具类
package cn.it.demo.utils

/**
 * ClickHouse的工具类
 */
class ClickHouseUtils extends Serializable {
}

 

  • 创建方法:clickhouse的连接实例,返回连接对象
/**
* 创建clickhouse的连接实例,返回连接对象
* @return
*/
def createConnection = {
  //定义ClickHouse的服务器地址
  val host = "node2"
  //定义ClickHouse的连接端口号
  val port = "8123"
  val properties = new ClickHouseProperties()
  val dataSource = new ClickHouseDataSource(s"jdbc:clickhouse://${host}:${port}", properties)
  dataSource.getConnection()
}

 

  • 创建方法:生成表的sql字符串
/**
 * 创建clickhouse的表,返回创建表的sql字符串
 * @param table
 * @param schema
 * @return
 */
def createTable(table: String, schema: StructType, primaryKeyField:String = "id"): String = {
  //生成表的列集合字符串
  val tableFieldsStr: String = schema.map { f =>
    val fName = f.name
    val fType = f.dataType match {
      case org.apache.spark.sql.types.DataTypes.StringType => "String"
      case org.apache.spark.sql.types.DataTypes.IntegerType => "Int32"
      case org.apache.spark.sql.types.DataTypes.FloatType => "Float32"
      case org.apache.spark.sql.types.DataTypes.LongType => "Int64"
      case org.apache.spark.sql.types.DataTypes.BooleanType => "UInt8"
      case org.apache.spark.sql.types.DataTypes.DoubleType => "Float64"
      case org.apache.spark.sql.types.DataTypes.DateType => "DateTime"
      case org.apache.spark.sql.types.DataTypes.TimestampType => "DateTime"
      case x => throw new Exception(s"Unsupported type: ${x.toString}")
    }
    s"$fName $fType"
  }.mkString(", ")

  //返回创建表的sql字符串
  s"CREATE TABLE IF NOT EXISTS $table(${tableFieldsStr},sign Int8,version UInt64) ENGINE=VersionedCollapsingMergeTree(sign, version) ORDER BY ${primaryKeyField}"
}

 

  • 创建方法:执行更新操作
/**
 * 执行更新操作
 * @param sql
 * @return
 */
def executeUpdate(sql: String) = {
  //获取clickhouse的连接字符串
  val connection: ClickHouseConnection = createConnection
  println(sql)
  val statement: ClickHouseStatement = connection.createStatement()
  statement.executeUpdate(sql)
}

 

  • ClickHouseJDBCDemo单例对象中调用创建表
//创建clickhouse工具实例对象
val clickHouseUtils: ClickHouseUtils = new ClickHouseUtils

//创建表
val strCreateTable: String = clickHouseUtils.createTable("order", df.schema)
clickHouseUtils.executeUpdate(strCreateTable);

 

3.2、​​​​​​​​​​​​​​插入数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:生成插入表数据的sql字符串
  • 创建方法:根据字段类型为字段赋值默认值
  • 创建方法:将数据插入到clickhouse中
  • ClickHouseJDBCDemo单例对象中调用插入数据

实现方法:

  • 创建方法:生成插入表数据的sql字符串
/**
 * 生成插入表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createInsertStatmentSql(tableName: String)(schema: org.apache.spark.sql.types.StructType) = {
  val columns = schema.map(f => f.name).toList
  val vals = 1 to (columns.length) map (i => "?")
  s"INSERT INTO $tableName (${columns.mkString(",")}) VALUES (${vals.mkString(",")})"
}

 

  • 创建方法:根据字段类型为字段赋值默认值
/**
 * 为sql赋值默认值
 * @param sparkType
 * @param v
 * @return
 */
private def defaultNullValue(sparkType: org.apache.spark.sql.types.DataType, v: Any) = sparkType match {
  case DoubleType => 0
  case LongType => 0
  case FloatType => 0
  case IntegerType => 0
  case StringType => null
  case BooleanType => false
  case _ => null
}

 

  • 创建方法:将数据插入到clickhouse中
/**
 * 将数据插入到clickhouse中
 * @param tableName
 * @param df
 */
def insertToCkWithStatement(tableName :String, df:DataFrame): Unit = {
  //生成插入sql字符串
  val insertSql: String = createInsertStatmentSql(tableName)(df.schema)
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: PreparedStatement = null
    var count = 0;
    try {
      connection = createConnection
      pstmt = connection.prepareStatement(insertSql)
      rows.foreach(line => {
        count += 1
        line.schema.fields.foreach{ f =>
          val fieldName = f.name
          val fieldIdx = line.schema.fieldIndex(fieldName)
          val fieldVal = line.get(fieldIdx)
          if(fieldVal != null)
            pstmt.setObject(fieldIdx+1, fieldVal)
          else{
            val defVal = defaultNullValue(f.dataType, fieldVal)
            pstmt.setObject(fieldIdx+1, defVal)
          }
        }
        // 批量写入
        pstmt.addBatch()
        if (count >= 100000) {
          pstmt.executeBatch()
          count = 0
        }
      })
      pstmt.executeBatch()
    } catch {
      case ex: Exception =>
        println(ex)
    } finally {
      if (connection != null)
        connection.close()
    }
  })
}

 

  • ClickHouseJDBCDemo单例对象中调用插入数据
//插入数据
clickHouseUtils.insertToCkWithStatement("order", df)

 

3.3、​​​​​​​​​​​​​​修改数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:根据指定的字段名称获取字段对应的值
  • 创建方法:生成修改表数据的sql字符串
  • 创建方法:将数据更新到clickhouse中
  • ClickHouseJDBCDemo单例对象中调用更新数据

实现方法:

  • 创建方法:根据指定的字段名称获取字段对应的值
/**
 * 根据指定字段获取该字段的值
 * @param fieldName
 * @param schema
 * @param data
 * @return
 */
private def getFieldValue(fieldName: String, schema: StructType, data: Row): Any = {
  var flag = true
  var fieldValue: String = null
  val fields = schema.fields
  for (i <- 0 until fields.length if flag) {
    val field = fields(i)
    if (fieldName == field.name) {
      fieldValue = field.dataType match {
        case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
        case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
        case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
        case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
        case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"${data.getString(i).toString.trim}"
        case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i).asInstanceOf[Date].getTime / 1000))}'"
        case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}"
        case DataTypes.NullType => "NULL"
      }
      flag = false
    }
  }
  fieldValue
}

 

  • 创建方法:生成修改表数据的sql字符串
/**
 * 生成修改表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createUpdateStatmentSql(tableName: String, row:Row,  primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
  val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
  val noPrimaryKeyFields = schema.fields.filter(field => field.name != primaryKeyField)
  var sets = ArrayBuffer[String]()
  for (i <- 0 until noPrimaryKeyFields.length) {
    val noPrimaryKeyField = noPrimaryKeyFields(i)
    val set = noPrimaryKeyField.name + s"='${getFieldValue(noPrimaryKeyField.name, schema, row).toString}'"
    sets += set
  }
  s"ALTER TABLE $tableName UPDATE ${sets.mkString(", ")} WHERE ${primaryKeyField}=$primaryKeyValue"
}

 

  • 创建方法:将数据更新到clickhouse中
def updateToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: ClickHouseStatement = null
    try {
      connection = createConnection
      pstmt = connection.createStatement()
      rows.foreach(line => {
        //生成修改sql字符串
        val updateSql: String = createUpdateStatmentSql(tableName, line)(line.schema)
        pstmt.executeUpdate(updateSql)
      })
    } catch {
      case ex: Exception =>
        println(ex)
    }
  })
}

 

  • ClickHouseJDBCDemo单例对象中调用更新数据
//更新数据
clickHouseUtils.updateToCkWithStatement("order", df)

 

3.4、​​​​​​​​​​​​​​删除数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:生成删除表数据的sql字符串
  • 创建方法:将数据从clickhouse中删除
  • ClickHouseJDBCDemo单例对象中调用删除数据

实现方法:

  • 创建方法:生成删除表数据的sql字符串
/**
 * 生成删除表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createDeleteStatmentSql(tableName: String, row:Row,  primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
  val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
  s"ALTER TABLE $tableName DELETE WHERE ${primaryKeyField} = $primaryKeyValue"
}

 

  • 创建方法:将数据从clickhouse中删除
/**
 * 将数据从clickhouse中删除
 * @param tableName
 * @param df
 */
def deleteToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: ClickHouseStatement = null
    try {
      connection = createConnection
      pstmt = connection.createStatement()
      rows.foreach(line => {
        //生成删除sql字符串
        val deleteSql: String = createDeleteStatmentSql(tableName, line)(line.schema)
        println(deleteSql)
        pstmt.executeUpdate(deleteSql)
      })
    } catch {
      case ex: Exception =>
        println(ex)
    }
  })
}

 

  • ClickHouseJDBCDemo单例对象中调用删除数据
//删除数据
clickHouseUtils.deleteToCkWithStatement("order", df)

标签:case,String,val,客快,clickhouse,sql,spark,ClickHouse
From: https://www.cnblogs.com/shan13936/p/18104339

相关文章

  • 8 在IPython Notebook 运行Python Spark 程序
    8.1安装Anaconda下载:wget https://mirrors.pku.edu.cn/anaconda/archive/Anaconda3-5.3.1-Linux-x86_64.sh安装:bashAnaconda3-5.3.1-Linux-x86_64.sh-b编辑~/.bashrc:sudogedit~/.bashrc source~/.bashrc查看python版本 在data1,data2按同样的方法安装Anaconda8.2......
  • 7.Python Spark安装
    7.1Scla安装下载:wget https://scala-lang.org/files/archive/scala-2.13.0.tgz解压:tarxvfscala-2.13.0.tgz移动到/usr/local目录:sudomvscala-2.13.0/usr/local/scala设置Scala环境变量:sudogedit~/.bashrc source~/.bashrc启动scala :q退出7.2安装Sparkwg......
  • 基于spark的大数据分析预测地震受灾情况的系统设计
    基于spark的大数据分析预测地震受灾情况的系统设计在本篇博客中,我们将介绍如何使用ApacheSpark框架进行地震受灾情况的预测。我们将结合数据分析、特征工程、模型训练和评估等步骤,最终建立一个预测模型来预测地震造成的破坏程度,同时使用可视化大屏的方式展示数据的分布。......
  • Spark基础必会知识总结
    1、RDD是什么,RDD的五大属性RDD是弹性分布式数据集五大属性:分区(指定数据是哪个分区的)分区内的计算逻辑分区器(如果有键值对可以控制分区的数据流向)血缘关系移动数据不如移动计算2、RDD的弹性体现在哪里存储弹性:内存磁盘一起用计算弹性:重试机制分片弹性:分区可以改变容......
  • Spark SQL— Catalyst 优化器
    SparkSQL—Catalyst优化器1.目的本文的目标是描述SparkSQL优化框架以及它如何允许开发人员用很少的代码行表达复杂的查询转换。我们还将描述SparkSQL如何通过大幅提高其查询优化能力来提高查询的执行时间。在本教程中,我们还将介绍什么是优化、为什么使用Catalyst......
  • Spark 编程
     1、数据准备people.json{"id":1,"name":"张三","age":38}{"id":2,"name":"李四","age":30}{"id":3,"name":"王五","age":28......
  • Clickhouse备份
    版本:1.2.21.下载wgethttps://github.com/AlexAkulov/clickhouse-backup/releases/download/v0.6.0/clickhouse-backup.tar.gzwgethttps://github.com/AlexAkulov/clickhouse-backup/releases/download/v1.2.1/clickhouse-backup.tar.gz下载地址https://github.com/......
  • spark-submit 主要参数详细说明及Standalone集群最佳实践
    文章目录1.前言2.参数说明3.Standalone集群最佳实践1.前言部署提交应用到spark集群,可能会用到spark-submit工具,鉴于网上的博客质量残差不齐,且有很多完全是无效且错误的配置,没有搞明白诸如--total-executor-cores、--executor-cores、--num-executors的关系......
  • hive 、spark 、flink之想一想
    hive1:hive是怎么产生的?2:hive的框架是怎么样的?3:hive执行流程是什么?4:hivesql是如何把sql语句一步一步到最后执行的?5:hivesql任务常用参数调优做过什么?spark6:spark是怎么产生的?7:spark框架是怎么样的?8:spark的DAG是什么?9:spark中的app,job,stage,task是什么?有什么好处?......
  • 05-快速理解SparkSQL的DataSet
    1定义一个数据集是分布式的数据集合。Spark1.6增加新接口Dataset,提供RDD的优点:强类型、能够使用强大lambda函数SparkSQL优化执行引擎的优点可从JVM对象构造Dataset,然后函数式转换(map、flatMap、filter等)操作。DatasetAPI在Scala和Java中可用。Python不支持DatasetAPI,......