首页 > 数据库 >【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka

【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka

时间:2023-12-12 17:01:52浏览次数:45  
标签:示例 flink kafka source apache org

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


(文章目录)


本文主要介绍Flink 的kafka作为数据源的使用,并给出了flink不同版本对kafka作为数据源的不同实现示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文依赖kafka的环境是好用的。

本专题分为以下几篇文章: 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、环境或版本说明

1、该示例需要有kafka的运行环境,kafka的部署与使用参考文章: 1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

2、Flink关于kafka的使用在不同的版本中有不同的实现,最直观的的变化是由FlinkKafkaConsumer换成了KafkaSource,同理sink也有相应的由FlinkKafkaProducer换成了KafkaSink。

3、由于使用kafka涉及的内容较多,请参考文章: 40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版

4、本文会提供关于kafka 作为source的2个版本,即1.13.6和1.17的版本。

5、以下属性在构建 KafkaSource 时是必须指定的:

  • Bootstrap server,通过 setBootstrapServers(String) 方法配置
  • 消费者组 ID,通过 setGroupId(String) 配置
  • 要订阅的 Topic / Partition
  • 用于解析 Kafka 消息的反序列化器(Deserializer)

三、自定义数据源kafka(Flink 1.13.6 版本)

Kafka Source 提供了构建类来创建 FlinkKafkaConsumer 的实例。

以下代码片段展示了如何构建 FlinkKafkaConsumer 来消费 “alan_kafkasource” 最早位点的数据, 使用消费组 “flink_kafka”,并且将 Kafka 消息体反序列化为字符串

1、maven依赖

	<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.13.6</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- flink连接器 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- 日志 -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
			<scope>runtime</scope>
		</dependency>

	</dependencies>

2、实现

package org.datastreamapi.source.custom.kafka;

import java.util.Properties;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class TestCustomKafkaSourceDemo {
	public static void main(String[] args) throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2、 source
		// 准备kafka连接参数
		Properties props = new Properties();
		// 集群地址
		props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
		// 消费者组id
		props.setProperty("group.id", "flink_kafka");
		// latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
		// earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费

		props.setProperty("auto.offset.reset", "latest");
		// 会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
		props.setProperty("flink.partition-discovery.interval-millis", "5000");
		// 自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
		props.setProperty("enable.auto.commit", "true");
		// 自动提交的时间间隔
		props.setProperty("auto.commit.interval.ms", "2000");
		// 使用连接参数创建FlinkKafkaConsumer/kafkaSource
		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_kafkasource", new SimpleStringSchema(), props);
		// 使用kafkaSource
		DataStream<String> kafkaDS = env.addSource(kafkaSource);

		// 3、 transformation

		// 4、 sink
		kafkaDS.print();

		// 5、execute
		env.execute();
	}
}

3、验证

1、创建kafka主题alan_kafkasource,kafka命令发送数据

[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic alan_kafkasource --partitions 1 --replication-factor 1

[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20

2、启动应用程序,并观察控制台输出 在这里插入图片描述

三、自定义数据源kafka(Flink 1.17.0 版本)

Kafka Source 提供了构建类来创建 KafkaSource 的实例。

以下代码片段展示了如何构建 KafkaSource 来消费 “alan_kafkasource” 最早位点的数据, 使用消费组 “flink_kafka”,并且将 Kafka 消息体反序列化为字符串

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
	</dependencies>

2、实现

为了避免误解,1.13.6版本与1.17.0版本实现不同的地方KafkaSource和FlinkKafkaConsumer的不同,相关属性值是一样的,只是本例中没有将1.13.5的中的所有属性都列出来。

package org.datastreamapi.source.custom.kafka;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class TestCustomKafkaSourceDemo {
	public static void main(String[] args) throws Exception {
		// 1、env
				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
				
				// 2、 source
				KafkaSource<String> source = KafkaSource.<String>builder()
																						.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
																						.setTopics("alan_kafkasource")
																						.setGroupId("flink_kafka")
																						.setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema())
																						.build();

				DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
				
				// 3、 transformation

				// 4、 sink
				kafkaDS.print();
				// 5、execute
				env.execute();
	}
}

3、验证

1、创建kafka主题alan_kafkasource,kafka命令发送数据

[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic alan_kafkasource --partitions 1 --replication-factor 1

[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20

2、启动应用程序,并观察控制台输出 在这里插入图片描述

以上,本文主要介绍Flink 的kafka作为数据源的使用,并给出了flink不同版本对kafka作为数据源的不同实现示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章: 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

标签:示例,flink,kafka,source,apache,org
From: https://blog.51cto.com/alanchan2win/8789430

相关文章

  • 写了一个flinkcdc的简单demo,大概说一下实现过程和建议点
    架构图大致如下:版本信息大致如下,具体版本信息根据自己的需求进行调整即可:oracle:19cflinkcdc:2.4.0kafka:3.1.2flink:1.15.4mysql:8.0.27springboot:2.5.6实现需求:1.使用flinkcdc采集oracle中的数据(历史数据+增量数据:含增删改)同步至kafka的某个topic中2.使用flink消费kafka中......
  • 如何写简历-学习如何在 2023 年撰写简历所需的所有技巧、工具、模板和示例
    学习如何在2023年撰写简历所需的所有技巧、工具、模板和示例拥有一份经过精心打磨和精心撰写的简历就像在求职过程中拥有超能力一样。 当大多数人走上招聘阶梯时,你却披上了深红色的斗篷,以超音速的速度向上翱翔。本关于如何撰写简历的指南概述了创建此类令人惊叹的简历的最重......
  • 使用Flink完成流数据统计 | 京东云技术团队
    一、统计流程所有流计算统计的流程都是:1、接入数据源2、进行多次数据转换操作(过滤、拆分、聚合计算等)3、计算结果的存储其中数据源可以是多个、数据转换的节点处理完数据可以发送到一个和多个下一个节点继续处理数据Flink程序构建的基本单元是stream和transformation(DataSet实质......
  • go-zero开发入门-API网关鉴权开发示例
    本文是go-zero开发入门-API网关开发示例一文的延伸,继续之前请先阅读此文。在项目根目录下创建子目录middleware,在此目录下创建文件auth.go,内容如下://鉴权中间件packagemiddlewareimport("context""errors""net/http")varErrInvalidToken=errors.Ne......
  • flink事件时间的水印延迟不会导致延迟数据在上一个窗口内
    设窗口为5,延迟为3。假如数据为:012567348则两个窗口为:window=TimeWindow{start=0,end=5}01234window=TimeWindow{start=5,end=10}5678即:567的数据不会包含在TimeWindow{start=0,end=5}里。验证程序:publicclassFlinkWindowExample{pu......
  • 幺半群同态一个示例的双向分析
    全体自然数(含0)在加法下构成一个幺半群,记作(N,+),而全体正整数在乘法下也构成一个幺半群,记作(Z+,·).假设映射f:N→ Z+满足 ①    ∀x,y∈N,  f(x+y)=f(x)·f(y). 令y=0,代入①有f(x)=f(x)·f(0),由此可知f(0)=1,即f把 (N,+)中的单位......
  • K8S-部署Kafka
    nfs&rpc离线包下载链接:https://pan.baidu.com/s/1NtsBd_5W4NVfL3A2BvwqUA提取码:0000#master&slave#上传rpm文件到此目录mkdir-p/opt/software/nfs_rpc#安装NFSrpm-Uvh*.rpm--nodeps--force#mastermkdir-p/data/{kafka,zookeeper}chmod755-R/data/*cat>>/etc......
  • kafka消费端速度慢解决方案
    一、增加patition数量,有多少个patition就会启多少个消费者线程去消费,跟消费者服务节点的数量无关,只能patition有关。二、patition一样的情况下,改为批量消费,一次拉取多条数据,max.poll.records,利用多线程去处理数据,避免单线程处理批量数据时间过长,导致超时。 提交方式问题:enab......
  • flinkcdc连接oracle的报错汇总
    报错一:原因分析:字面原因,找不到 org.apache.flink.table.api.ValidationException类。解决办法:根据类名可知,应该 org.apache.flink.table.api包下面的,然后去阿里云maven仓库搜索,添加如下依赖即可 报错二:原因分析:ORA-16331:容器"ORCLPDB1"未打开。解决办法:使用命令打......
  • thinkphp---电子签章功能开发示例
    最近在做一个项目,需要开发电子签章的功能,也就是电子合同,下面:具体思路:利用PDF合成,将所需要的文字,图片,合成到PDF里面。首先下载Fpdi库:https://gitee.com/meiyouzhanghao/fpdi放到extend里面: 具体代码示例:<?phpnamespaceapp\index\controller;useapp\BaseControll......