首页 > 其他分享 >Flink流式数据缓冲后批量写入Clickhouse

Flink流式数据缓冲后批量写入Clickhouse

时间:2023-06-01 17:33:47浏览次数:43  
标签:routeInfoPoJO String 队列 Flink private static 流式 import Clickhouse

一、背景

对于clickhouse有过使用经验的开发者应该知道,ck的写入,最优应该是批量的写入。但是对于流式场景来说,每批写入的数据量都是不可控制的,如kafka,每批拉取的消息数量是不定的,flink对于每条数据流的输出,写入ck的效率会十分缓慢,所以写了一个demo,去批量入库。生产环境使用还需要优化

二、实现思路

维护一个缓存队列当做一个缓冲区,当队列数据条数到达一定阈值,或者数据滞留时间超过一定时间,此时进行ck的批量提交。

三、Sink代码

import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.util.ClickhouseTasK;
import com.su.data.util.ClickhouseUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.Serializable;
/**
 * @ClassName:ClickhouseSink
 * @Author: sz
 * @Date: 2022/7/8 10:44
 * @Description:
 */

public class ClickhouseSink extends RichSinkFunction<RouteInfoPoJO> implements  Serializable {
    String sql;
    public ClickhouseSink(String sql) {
        this.sql = sql;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ClickhouseTasK.connection  = ClickhouseUtil.getConn();
    }
    @Override
    public void close() throws Exception {
        super.close();
        ClickhouseTasK.connection.close();
    }
    @Override
    public void invoke(RouteInfoPoJO routeInfoPoJO, Context context) throws Exception {
    	//流式数据写入缓存
        ClickhouseTasK.getInstance(sql).totalAdd(routeInfoPoJO);
    }
}

数据处理模块

import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.sink.ClickhouseSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName:ClickhouseTasK
 * @Author: sz
 * @Date: 2022/7/8 16:24
 * @Description:
 */

public class ClickhouseTasK {
    /**
     * 有界队列容量最大值 单批次提交ck数量
     * */
    private static final int MAXSIZE = 10000;

    /**
     * 提交有界队列
     * */
    private static Queue<RouteInfoPoJO> queue = new LinkedBlockingQueue<>(MAXSIZE);

    /**
     * 缓存无界队列 无界队列从数据流中获取数据,有界队列从无界队列中拉取数据,数据流不阻塞
     * */
    private static Queue<RouteInfoPoJO> TOTAL_QUEUE = new ConcurrentLinkedQueue();

    /**
     * 对提交队列设置锁
     * */
    private static ReentrantLock queenLock =null;

    /**
     * 单例实体
     * */
    private static volatile ClickhouseTasK clickhouseTasK = null;

    /**
     * 队列满了
     * */
    private static Condition FULL = null;

    /**
     * 队列没满
     * */
    private static Condition UN_FULL = null;
    /**
     * ck连接
     * */
    public static Connection connection = null;


    /**
     * 有界队列最大等待时长 超过时长自动提交 时间 3000毫秒
     * */
    private static final long MAX_WAIT_TIME = 3000;

    /**
     * 队列提交线程
     * */
    private  static Thread dataThread = null;

    /**
     * 从无界队列拉取到有界队列的线程
     * */
    private  static Thread moveThread = null;

    /**
     * 时间计数线程 时间一到自动提交
     * */
    private static Thread timeThread = null;

    /**
     * 记录上次提交时间毫秒值
     * */
    static  AtomicLong atomicLong = null;

    /**
     * 记录无界队列数据获取总量
     * */
    static AtomicLong count = null;

    /**
     * 静态类加载初始化
     * */
    static {
        count = new AtomicLong(0);
        //有界队列  10000条提交一次
        queenLock = new ReentrantLock();
        FULL = queenLock.newCondition();
        UN_FULL =  queenLock.newCondition();
    }

    /**
     * 单例初始化
     * */
    public static ClickhouseTasK getInstance(String sql) throws  InterruptedException {
        if( null == clickhouseTasK){
            synchronized (ClickhouseTasK.class){
                if(null == clickhouseTasK){
                    clickhouseTasK = new ClickhouseTasK(sql);
                }
            }
        }
        return  clickhouseTasK;
    }

    /**
     * 构造函数初始化时间开始值,以及线程
     * */
    public ClickhouseTasK(String sql) throws  InterruptedException {
        atomicLong  = new AtomicLong(System.currentTimeMillis());
        CountDownLatch countDownLatch = new CountDownLatch(2);
        //时间记录线程
        timeThread = new Thread(()->{
            while (true){
                queenLock.lock();
                try {
                //时间一到 自动提交
                if(System.currentTimeMillis() - atomicLong.get()  >= MAX_WAIT_TIME && queue.size() >0 ){
                        System.out.println("到时间自动提交:"+queue.size());
                        //数据提交
                        commitData(queue,connection,sql);
                        //记录本次提交的时间
                        atomicLong.set(System.currentTimeMillis());
                        // 数据提交后,提交队列空了  唤醒 阻塞于UN_FULL的线程
                        UN_FULL.signal();
                }
                } catch (SQLException e) {

                    System.out.println("中断异常:"+e);
                }finally {
                    queenLock.unlock();
                }
            }
        });
        timeThread.setName("timeThread");
        //数据拉去线程
        moveThread = new Thread(() ->{
            countDownLatch.countDown();
            while (true){
                if(!TOTAL_QUEUE.isEmpty()){
                    add(TOTAL_QUEUE.poll());
                }
            }
        });
        moveThread.setName("moveThread");
        dataThread = new Thread(() ->{
            System.out.println(Thread.currentThread().getName() + "启动!!!");
            countDownLatch.countDown();
            while (true){
                queenLock.lock();
                try {
                    //等待队列满了 当阻塞于FULL的线程被唤醒,说明队列满了
                    FULL.await();
                    //提交逻辑 清空队列
                    if(!queue.isEmpty()){
                        commitData(queue,connection,sql);
                    }

                    //重置提交时间
                    atomicLong.set(System.currentTimeMillis());
                    // 数据提交后,提交队列空了  唤醒 阻塞于UN_FULL的线程
                    UN_FULL.signal();
                } catch (SQLException | InterruptedException exception) {
                    System.out.println("中断异常:"+exception);
                }finally {
                    queenLock.unlock();
                }
            }
        });
        dataThread.setName("dataQueenThread");
        moveThread.start();
        dataThread.start();
        timeThread.start();
        //确保各线程启动完成后 构造函数线程初始化完成
        countDownLatch.await();
        System.out.println("初始化完成了");

    }
    public void commitData(Queue<RouteInfoPoJO> queue, Connection connection, String sql) throws SQLException {
        System.out.println("准备提交,当前数量为:"+queue.size());
        //批量提交
        try(PreparedStatement preparedStatement = connection.prepareStatement(sql)){
            long startTime = System.currentTimeMillis();
            while (!queue.isEmpty()){
                RouteInfoPoJO routeInfoPoJO = queue.poll();
                preparedStatement.setString(1, routeInfoPoJO.getDeal_date());
                preparedStatement.setString(2, routeInfoPoJO.getClose_date());
                preparedStatement.setString(3, routeInfoPoJO.getCard_no());
                preparedStatement.setString(4, routeInfoPoJO.getDeal_value());
                preparedStatement.setString(5, routeInfoPoJO.getDeal_type());
                preparedStatement.setString(6, routeInfoPoJO.getCompany_name());
                preparedStatement.setString(7, routeInfoPoJO.getCar_no());
                preparedStatement.setString(8, routeInfoPoJO.getStation());
                preparedStatement.setString(9, routeInfoPoJO.getConn_mark());
                preparedStatement.setString(10, routeInfoPoJO.getDeal_money());
                preparedStatement.setString(11, routeInfoPoJO.getEqu_no());
                preparedStatement.addBatch();
            }
            //ck没有事务,提交了就执行了
            int[] ints = preparedStatement.executeBatch();
            long endTime = System.currentTimeMillis();
            System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
            System.out.println("现有总量:"+count.get());
        }
        // todo 真实场景下,数据要确保不丢失,需要对异常数据进行处理,如日志记录后,进行数据日志采集 重复入库即可
        // todo 想要确保ck数据不重复,建立时选择replacing合并树,然后重复数据自动合并就好了
    }
    public void  add (RouteInfoPoJO routeInfoPoJO){
        //满足 量 提交条件
        queenLock.lock();
        try {
            //提交队列满了
            if (queue.size() >= MAXSIZE ){
                //唤醒阻塞于 FULL的线程
                FULL.signal();
                //阻塞 UN_FULL上的线程
                UN_FULL.await();
            }
            if(routeInfoPoJO !=null){
                //提交队列入队
                queue.offer(routeInfoPoJO);
            }
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        } finally {
            queenLock.unlock();
        }
    }
    /**
     * 无界缓存队列入队  如果真实场景,评估最大内存,设置最大消息条数,评估消费速度,避免oom
     * */
    public void totalAdd(RouteInfoPoJO routeInfoPoJO){
        TOTAL_QUEUE.add(routeInfoPoJO);
        count.incrementAndGet();
    }
}

实体类

import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * @ClassName:RouteInfoPoJO
 * @Author: sz
 * @Date: 2022/7/4 17:11
 * @Description:
 */

@AllArgsConstructor
@Data
public class RouteInfoPoJO {
    private String deal_date;
    private String close_date;
    private String card_no;
    private String deal_value;
    private String deal_type;
    private String company_name;
    private String car_no;
    private String station;
    private String conn_mark;
    private String deal_money;
    private String equ_no;
}

FLink读取kafka写入ck

import com.alibaba.fastjson.JSONObject;
import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.serializer.MyDeserialization;
import com.su.data.sink.ClickhouseSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @ClassName:KakfaToCK
 * @Author: sz
 * @Date: 2022/7/8 10:35
 * @Description:
 */

public class KakfaToCK {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"ckcomsumer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ck-node");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxxx:9092");


        DataStreamSource<RouteInfoPoJO> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<RouteInfoPoJO>("szt",new MyDeserialization(),properties));

        String sql = "INSERT INTO sz.ods_szt_data(deal_date, close_date, card_no, deal_value, deal_type, company_name, car_no, station, conn_mark, deal_money, equ_no) values (?,?,?,?,?,?,?,?,?,?,?)";
        //dataStreamSource.print();
        dataStreamSource.addSink(new ClickhouseSink(sql));
        env.execute("KakfaToCK");

    }
}

 

参考博客原文:https://blog.csdn.net/qq_38796051/article/details/125768079

程序员工具箱:www.robots2.com

标签:routeInfoPoJO,String,队列,Flink,private,static,流式,import,Clickhouse
From: https://www.cnblogs.com/robots2/p/17449684.html

相关文章

  • flink安装(无hadoop)
    下载Flink:访问Flink的官方网站(https://flink.apache.org/),在下载页面找到适合你操作系统的预编译二进制包。选择与你的操作系统和版本相对应的下载链接,点击下载。解压二进制包:下载完成后,将二进制包解压到你想要安装Flink的目录中。你可以使用命令行工具(如tar命令)(没动)配置环......
  • flink1.14.5集群(flink on yarn)部署1
    先安装hadoop,yarn,zookeeper 配置环境变量vim/etc/profile(注意新增了HADOOP_CLASSPATH变量)exportHADOOP_CLASSPATH=`/home/opt/hadoop-2.9.2/bin/hadoopclasspath`exportFLINK_HOME=/home/opt/flink-1.14.5exportPATH=$PATH:$FLINK_HOME/binsource/etc/profile......
  • Flink CEP的使用
    探索如何使用FlinkCEP写在前面前言的前言在学习Flink的过程中,我看过很多教程。无论是视频还是博文,几乎都把FlinkCEP作为进阶内容来讲授。究其原因,大概是CEP涉及到的计算机基础知识很多,而我对于诸如NFA、DFA之类名词的印象,基本只停留在很多年前编译原理的课本上。那么如何在仅了解......
  • 【Flink系列十八】History Server 重新登场,如何跟Yarn进行集成
    先看Flink的官方文档本文适用于Flink-1.11+HistoryServer至少Flink-1.16+JobManagerThearchivingofcompletedjobshappensontheJobManager,whichuploadsthearchivedjobinformationtoafilesystemdirectory.Youcanconfigurethedirectorytoarchiveco......
  • clickhouse学习资源
    ClickHouse是一个开源的列式数据库管理系统,最初由俄罗斯搜索引擎Yandex开发。它专为OLAP(联机分析处理)场景设计,可以快速处理大量数据。以下是一些ClickHouse学习资源:ClickHouse官方文档:https://clickhouse.tech/docs/zh/ClickHouse中文文档:https://clickhouse-docs-cn.......
  • ByConity与主流开源OLAP引擎(Clickhouse、Doris、Presto)性能对比分析
    引言:随着数据量和数据复杂性的不断增加,越来越多的企业开始使用OLAP(联机分析处理)引擎来处理大规模数据并提供即时分析结果。在选择OLAP引擎时,性能是一个非常重要的因素。因此,本文将使用TPC-DS基准测试的99个查询语句来对比开源的ClickHouse、Doris、Presto以及ByConity这4个OLAP......
  • 页面各种布局概念与区别--静态布局、自适应布局、流式布局、响应式布局、弹性布局等
    @目录静态布局(StaticLayout)流式布局(LiquidLayout)自适应布局(AdaptiveLayout)响应式布局(ResponsiveLayout)响应式布局,自适应布局,流式布局总结和比较弹性布局(rem/em布局)响应式和弹性布局之间的对比总结静态布局(StaticLayout)即传统Web设计,网页上的所有元素的尺寸一律使用px作为......
  • clickhouse导入csv文件
    clickhouse导入csv格式文件内容参考如下:UlHtlFVg,2023-04-2302:13:34,APP,Go1KtO,1682187214,2023-04-2302:13:34,trunck,APP_START,,,,PURvq4,list2,78IaN6f9,sanxing,x5,sanxing,sanxing,5G,移动,andriod,x4,x1.2,,,10.11.1.11,,,1,1804,1412,china,江西省,九江,,,1412,,1804,......
  • flink计算引擎
    第1章Flink简介1.1初识Flink1)Flink项目的理念是:“ApacheFlink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。2)ApacheFlink是一个框架和分布式处理引擎,用于对无界(nclk9999)和有界数据(一个文档)流进行有状态计算。Flink被设计在所有......
  • Springboot+Mybatisplus+ClickHouse集成
    核心依赖引入<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.1.53</version></dependency><!--Mybati......