首页 > 数据库 >【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(4) - clickhouse

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(4) - clickhouse

时间:2023-12-28 13:36:02浏览次数:31  
标签:name 示例 flink sink import id clickhouse




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、sink到ClickHouse示例
  • 1、介绍
  • 2、maven依赖
  • 3、创建clickhouse表
  • 4、验证clickhouse web页面是否正常
  • 5、实现
  • 1)、user bean
  • 2)、sink实现
  • 6、验证
  • 1)、nc 输入
  • 2)、启动应用程序
  • 3)、观察应用程序控制台输出
  • 4)、查看clickhouse表中的数据



本文介绍了nc作为数据源,经过flink的transformation,然后sink到clickhouse中,最后进行逐步验证的完整示例。

本文除了maven依赖外,没有其他依赖。

本文依赖clickhouse、nc的环境好用。

一、maven依赖

为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖

下文中具体需要的依赖将在介绍时添加新增的依赖。

二、sink到ClickHouse示例

1、介绍

  • ClickHouse系列文章
    1、ClickHouse介绍2、clickhouse安装与简单验证(centos)3、ClickHouse表引擎-MergeTree引擎4、clickhouse的Log系列表引擎、外部集成表引擎和其他特殊的表引擎介绍及使用5、ClickHouse查看数据库容量、表的指标、表分区、数据大小等

2、maven依赖

<dependency>
  <groupId>ru.ivi.opensource</groupId>
  <artifactId>flink-clickhouse-sink</artifactId>
  <version>1.3.1</version>
</dependency>

3、创建clickhouse表

-- 1、创建数据库 tutorial
--略
-- 2、创建表
CREATE TABLE t_flink_sink_clickhouse (    
id UInt16 COMMENT '员工id',    
name String COMMENT '员工姓名',     
age UInt8 COMMENT '员工年龄' ) 
ENGINE = MergeTree 
ORDER BY id;

4、验证clickhouse web页面是否正常

http://192.168.10.42:8123/【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(4) - clickhouse_clickhouse

5、实现

1)、user bean

import lombok.Data;

@Data
public class User {
	private int id;
	private String name;
	private int age;

	public User(int id, String name, int age) {
		this.id = id;
		this.name = name;
		this.age = age;
	}

	// Java Bean 必须实现的方法,信息通过字符串进行拼接
	public static String convertToCsv(User user) {
		StringBuilder builder = new StringBuilder();
		builder.append("(");

		// add user.id
		builder.append(user.id);
		builder.append(", ");

		// add user.name
		builder.append("'");
		builder.append(String.valueOf(user.name));
		builder.append("', ");

		// add user.age
		builder.append(user.age);

		builder.append(" )");
		return builder.toString();
	}
}

2)、sink实现

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.clickhouse.ClickHouseSink;
import org.clickhouse.model.ClickHouseClusterSettings;
import org.clickhouse.model.ClickHouseSinkConst;

/**
 * @author alanchan
 *
 */
public class TestFinkSinkClickhouse {
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

		// source
		// nc
		DataStream<String> inputStream = env.socketTextStream("192.168.10.42", 9999);

		// Transform
		SingleOutputStreamOperator<String> dataStream = inputStream.map(new MapFunction<String, String>() {
			@Override
			public String map(String data) throws Exception {
				String[] split = data.split(",");
				User user = new User(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2]));
				return User.convertToCsv(user);
			}
		});

		// create props for sink
		Map<String, String> globalParameters = new HashMap<>();
		// clickhouse 的服务地址,该链接访问返回ok
		globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://192.168.10.42:8123/");
		// common
		globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
		globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "/usr/local/bigdata/testdata/clickhouse_failpath");
		globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
		globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
		globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "10");
		globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");

		// set global paramaters
		ParameterTool parameters = ParameterTool.fromMap(globalParameters);
		env.getConfig().setGlobalJobParameters(parameters);

//		env.setParallelism(1);
		Properties props = new Properties();
		// 数据库tutorial和表名称t_flink_sink_clickhouse
		// 需要先创建数据库和表
		// CREATE TABLE t_flink_sink_clickhouse (id UInt16 COMMENT '员工id',name String
		// COMMENT '员工姓名',age UInt8 COMMENT '员工年龄' ) ENGINE = MergeTree ORDER BY id;
		props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "tutorial.t_flink_sink_clickhouse");
		props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
		ClickHouseSink sink = new ClickHouseSink(props);
		dataStream.addSink(sink);
		dataStream.print();

		env.execute();

	}
}

6、验证

1)、nc 输入

[root@server2 etc]# nc -lk 9999
1,alanchan,19
2,alan,20
3,chan,21

2)、启动应用程序

3)、观察应用程序控制台输出

4)、查看clickhouse表中的数据

server2 :) select * from t_flink_sink_clickhouse;

SELECT *
FROM t_flink_sink_clickhouse

Query id: aea358e8-8d9d-4caa-98b1-54903356a7d0

┌─id─┬─name─┬─age─┐
│  2 │ alan │  20 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│  3 │ chan │  21 │
└────┴──────┴─────┘
┌─id─┬─name─────┬─age─┐
│  1 │ alanchan │  19 │
└────┴──────────┴─────┘

3 rows in set. Elapsed: 0.003 sec.

以上,本文介绍了nc作为数据源,经过flink的transformation,然后sink到clickhouse中,最后进行逐步验证的完整示例。

标签:name,示例,flink,sink,import,id,clickhouse
From: https://blog.51cto.com/alanchan2win/9013176

相关文章

  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、环境或版本说明三、flinksink到kafka示例1、介绍2、1.13.6版本示例1)、maven依赖2)、实现3)、验证步骤3、1.17.0版本示例1)、maven依赖2)、实现3)、验证步骤本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到......
  • Flink实验
     题目:实验八姓名 日期12.8实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。 实验内容与完成情况:(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装IntelliJIDEA,然后使用IntelliJIDEA工具开发WordCount程序,并打包......
  • 【SpringBoot快速入门】(4)SpringBoot项目案例代码示例
    目录1创建工程3配置文件4静态资源之前我们已经学习的Spring、SpringMVC、Mabatis、Maven,详细讲解了Spring、SpringMVC、Mabatis整合SSM的方案和案例,上一节我们学习了SpringBoot的开发步骤、工程构建方法以及工程的快速启动,从这一节开始,我们开始学习SpringBoot配置文件。接下来......
  • 【SpringBoot快速入门】(3)SpringBoot整合junit和MyBatis 详细代码示例与讲解
    目录1.SpringBoot整合junit1.1环境准备1.2编写测试类2.SpringBoot整合mybatis2.1回顾Spring整合Mybatis2.2SpringBoot整合mybatis2.2.1创建模块2.2.2定义实体类2.2.3定义dao接口2.2.4定义测试类2.2.5编写配置2.2.6测试2.2.7使用Druid数据源之前我们已经学习的Spring、......
  • 项目应用多级缓存示例
    前不久做的一个项目,需要在前端实时展示硬件设备的数据。设备很多,并且每个设备的数据也很多,总之就是数据很多。同时,设备的刷新频率很快,需要每2秒读取一遍数据。问题来了,我们如何读取数据,并且在前端展示?我的想法是利用多级缓存:1)首先是有个数据采集程序,不停地采集设备的数据。采集到......
  • Linux中date命令使用示例
    一、.Linux中的date命令date"+%Y-%m-%d"输出当前日期,格式为“年-月-日”,例如:2023-06-01date"+%Y年%m月%d日%H:%M:%S"输出当前日期喝时间,格式为“年月日时:分:秒”,例如:2023年12月28日04:28:11date"+%b"输出当前月份的英文缩写,例如:Jundate"+%B"输出当前月份的英文全称,例......
  • spring MVC 后端 接收 前端 批量添加的数据(简单示例)
    <%@pagecontentType="text/html;charset=UTF-8"language="java"%><html><head>  <title>Title</title></head><body><scriptsrc="${pageScope.request.ContextPath}/js/jquery-3.3.1.min.js&qu......
  • Composite 组合模式简介与 C# 示例【结构型3】【设计模式来了_8】
    Composite组合模式简介与C#示例【结构型3】【设计模式来了_8】 阅读目录〇、简介1、什么是组合设计模式?2、优缺点和适用场景一、简单的代码示例二、根据示例代码看结构三、相关模式回到顶部〇、简介1、什么是组合设计模式?一句话解释:  针对树形结构......
  • Facade 外观模式简介与 C# 示例【结构型5】【设计模式来了_10】
    Facade外观模式简介与C#示例【结构型5】【设计模式来了_10】 阅读目录〇、简介1、什么是外观模式?2、外观模式的优缺点和适用场景一、外观模式的代码实现二、结构三、相关模式回到顶部〇、简介1、什么是外观模式?一句话解释:  将一系列需要一起进行的......
  • Builder 生成器模式简介与 C# 示例【创建型2】【设计模式来了_2】
    Builder生成器模式简介与C#示例【创建型2】【设计模式来了_2】 阅读目录〇、简介1、什么是生成器模式?2、优缺点和使用场景一、简单的示例代码二、生成器模式结构三、在.Net框架中的实际应用四、相关模式回到顶部〇、简介1、什么是生成器模式?一句话......