一、背景
对于clickhouse有过使用经验的开发者应该知道,ck的写入,最优应该是批量的写入。但是对于流式场景来说,每批写入的数据量都是不可控制的,如kafka,每批拉取的消息数量是不定的,flink对于每条数据流的输出,写入ck的效率会十分缓慢,所以写了一个demo,去批量入库。生产环境使用还需要优化
二、实现思路
维护一个缓存队列当做一个缓冲区,当队列数据条数到达一定阈值,或者数据滞留时间超过一定时间,此时进行ck的批量提交。
三、Sink代码
import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.util.ClickhouseTasK;
import com.su.data.util.ClickhouseUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.Serializable;
/**
* @ClassName:ClickhouseSink
* @Author: sz
* @Date: 2022/7/8 10:44
* @Description:
*/
public class ClickhouseSink extends RichSinkFunction<RouteInfoPoJO> implements Serializable {
String sql;
public ClickhouseSink(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ClickhouseTasK.connection = ClickhouseUtil.getConn();
}
@Override
public void close() throws Exception {
super.close();
ClickhouseTasK.connection.close();
}
@Override
public void invoke(RouteInfoPoJO routeInfoPoJO, Context context) throws Exception {
//流式数据写入缓存
ClickhouseTasK.getInstance(sql).totalAdd(routeInfoPoJO);
}
}
数据处理模块
import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.sink.ClickhouseSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ClassName:ClickhouseTasK
* @Author: sz
* @Date: 2022/7/8 16:24
* @Description:
*/
public class ClickhouseTasK {
/**
* 有界队列容量最大值 单批次提交ck数量
* */
private static final int MAXSIZE = 10000;
/**
* 提交有界队列
* */
private static Queue<RouteInfoPoJO> queue = new LinkedBlockingQueue<>(MAXSIZE);
/**
* 缓存无界队列 无界队列从数据流中获取数据,有界队列从无界队列中拉取数据,数据流不阻塞
* */
private static Queue<RouteInfoPoJO> TOTAL_QUEUE = new ConcurrentLinkedQueue();
/**
* 对提交队列设置锁
* */
private static ReentrantLock queenLock =null;
/**
* 单例实体
* */
private static volatile ClickhouseTasK clickhouseTasK = null;
/**
* 队列满了
* */
private static Condition FULL = null;
/**
* 队列没满
* */
private static Condition UN_FULL = null;
/**
* ck连接
* */
public static Connection connection = null;
/**
* 有界队列最大等待时长 超过时长自动提交 时间 3000毫秒
* */
private static final long MAX_WAIT_TIME = 3000;
/**
* 队列提交线程
* */
private static Thread dataThread = null;
/**
* 从无界队列拉取到有界队列的线程
* */
private static Thread moveThread = null;
/**
* 时间计数线程 时间一到自动提交
* */
private static Thread timeThread = null;
/**
* 记录上次提交时间毫秒值
* */
static AtomicLong atomicLong = null;
/**
* 记录无界队列数据获取总量
* */
static AtomicLong count = null;
/**
* 静态类加载初始化
* */
static {
count = new AtomicLong(0);
//有界队列 10000条提交一次
queenLock = new ReentrantLock();
FULL = queenLock.newCondition();
UN_FULL = queenLock.newCondition();
}
/**
* 单例初始化
* */
public static ClickhouseTasK getInstance(String sql) throws InterruptedException {
if( null == clickhouseTasK){
synchronized (ClickhouseTasK.class){
if(null == clickhouseTasK){
clickhouseTasK = new ClickhouseTasK(sql);
}
}
}
return clickhouseTasK;
}
/**
* 构造函数初始化时间开始值,以及线程
* */
public ClickhouseTasK(String sql) throws InterruptedException {
atomicLong = new AtomicLong(System.currentTimeMillis());
CountDownLatch countDownLatch = new CountDownLatch(2);
//时间记录线程
timeThread = new Thread(()->{
while (true){
queenLock.lock();
try {
//时间一到 自动提交
if(System.currentTimeMillis() - atomicLong.get() >= MAX_WAIT_TIME && queue.size() >0 ){
System.out.println("到时间自动提交:"+queue.size());
//数据提交
commitData(queue,connection,sql);
//记录本次提交的时间
atomicLong.set(System.currentTimeMillis());
// 数据提交后,提交队列空了 唤醒 阻塞于UN_FULL的线程
UN_FULL.signal();
}
} catch (SQLException e) {
System.out.println("中断异常:"+e);
}finally {
queenLock.unlock();
}
}
});
timeThread.setName("timeThread");
//数据拉去线程
moveThread = new Thread(() ->{
countDownLatch.countDown();
while (true){
if(!TOTAL_QUEUE.isEmpty()){
add(TOTAL_QUEUE.poll());
}
}
});
moveThread.setName("moveThread");
dataThread = new Thread(() ->{
System.out.println(Thread.currentThread().getName() + "启动!!!");
countDownLatch.countDown();
while (true){
queenLock.lock();
try {
//等待队列满了 当阻塞于FULL的线程被唤醒,说明队列满了
FULL.await();
//提交逻辑 清空队列
if(!queue.isEmpty()){
commitData(queue,connection,sql);
}
//重置提交时间
atomicLong.set(System.currentTimeMillis());
// 数据提交后,提交队列空了 唤醒 阻塞于UN_FULL的线程
UN_FULL.signal();
} catch (SQLException | InterruptedException exception) {
System.out.println("中断异常:"+exception);
}finally {
queenLock.unlock();
}
}
});
dataThread.setName("dataQueenThread");
moveThread.start();
dataThread.start();
timeThread.start();
//确保各线程启动完成后 构造函数线程初始化完成
countDownLatch.await();
System.out.println("初始化完成了");
}
public void commitData(Queue<RouteInfoPoJO> queue, Connection connection, String sql) throws SQLException {
System.out.println("准备提交,当前数量为:"+queue.size());
//批量提交
try(PreparedStatement preparedStatement = connection.prepareStatement(sql)){
long startTime = System.currentTimeMillis();
while (!queue.isEmpty()){
RouteInfoPoJO routeInfoPoJO = queue.poll();
preparedStatement.setString(1, routeInfoPoJO.getDeal_date());
preparedStatement.setString(2, routeInfoPoJO.getClose_date());
preparedStatement.setString(3, routeInfoPoJO.getCard_no());
preparedStatement.setString(4, routeInfoPoJO.getDeal_value());
preparedStatement.setString(5, routeInfoPoJO.getDeal_type());
preparedStatement.setString(6, routeInfoPoJO.getCompany_name());
preparedStatement.setString(7, routeInfoPoJO.getCar_no());
preparedStatement.setString(8, routeInfoPoJO.getStation());
preparedStatement.setString(9, routeInfoPoJO.getConn_mark());
preparedStatement.setString(10, routeInfoPoJO.getDeal_money());
preparedStatement.setString(11, routeInfoPoJO.getEqu_no());
preparedStatement.addBatch();
}
//ck没有事务,提交了就执行了
int[] ints = preparedStatement.executeBatch();
long endTime = System.currentTimeMillis();
System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
System.out.println("现有总量:"+count.get());
}
// todo 真实场景下,数据要确保不丢失,需要对异常数据进行处理,如日志记录后,进行数据日志采集 重复入库即可
// todo 想要确保ck数据不重复,建立时选择replacing合并树,然后重复数据自动合并就好了
}
public void add (RouteInfoPoJO routeInfoPoJO){
//满足 量 提交条件
queenLock.lock();
try {
//提交队列满了
if (queue.size() >= MAXSIZE ){
//唤醒阻塞于 FULL的线程
FULL.signal();
//阻塞 UN_FULL上的线程
UN_FULL.await();
}
if(routeInfoPoJO !=null){
//提交队列入队
queue.offer(routeInfoPoJO);
}
} catch (InterruptedException exception) {
exception.printStackTrace();
} finally {
queenLock.unlock();
}
}
/**
* 无界缓存队列入队 如果真实场景,评估最大内存,设置最大消息条数,评估消费速度,避免oom
* */
public void totalAdd(RouteInfoPoJO routeInfoPoJO){
TOTAL_QUEUE.add(routeInfoPoJO);
count.incrementAndGet();
}
}
实体类
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* @ClassName:RouteInfoPoJO
* @Author: sz
* @Date: 2022/7/4 17:11
* @Description:
*/
@AllArgsConstructor
@Data
public class RouteInfoPoJO {
private String deal_date;
private String close_date;
private String card_no;
private String deal_value;
private String deal_type;
private String company_name;
private String car_no;
private String station;
private String conn_mark;
private String deal_money;
private String equ_no;
}
FLink读取kafka写入ck
import com.alibaba.fastjson.JSONObject;
import com.su.data.pojo.RouteInfoPoJO;
import com.su.data.serializer.MyDeserialization;
import com.su.data.sink.ClickhouseSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/**
* @ClassName:KakfaToCK
* @Author: sz
* @Date: 2022/7/8 10:35
* @Description:
*/
public class KakfaToCK {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"ckcomsumer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ck-node");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxxx:9092");
DataStreamSource<RouteInfoPoJO> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<RouteInfoPoJO>("szt",new MyDeserialization(),properties));
String sql = "INSERT INTO sz.ods_szt_data(deal_date, close_date, card_no, deal_value, deal_type, company_name, car_no, station, conn_mark, deal_money, equ_no) values (?,?,?,?,?,?,?,?,?,?,?)";
//dataStreamSource.print();
dataStreamSource.addSink(new ClickhouseSink(sql));
env.execute("KakfaToCK");
}
}
参考博客原文:https://blog.csdn.net/qq_38796051/article/details/125768079
程序员工具箱:www.robots2.com
标签:routeInfoPoJO,String,队列,Flink,private,static,流式,import,Clickhouse From: https://www.cnblogs.com/robots2/p/17449684.html