首页 > 数据库 >【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka

时间:2023-12-28 13:35:51浏览次数:34  
标签:示例 flink alan kafka org apache import



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、环境或版本说明
  • 三、flink sink到kafka示例
  • 1、介绍
  • 2、1.13.6版本示例
  • 1)、maven依赖
  • 2)、实现
  • 3)、验证步骤
  • 3、1.17.0版本示例
  • 1)、maven依赖
  • 2)、实现
  • 3)、验证步骤



本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到kafka的例子。

本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

一、maven依赖

为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖

下文中具体需要的依赖将在介绍时添加新增的依赖。

二、环境或版本说明

1、该示例需要有kafka的运行环境,kafka的部署与使用参考文章:
1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

2、Flink关于kafka的使用在不同的版本中有不同的实现,最直观的的变化是由FlinkKafkaConsumer换成了KafkaSource,同理sink也有相应的由FlinkKafkaProducer换成了KafkaSink。

3、由于使用kafka涉及的内容较多,请参考文章:
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版

4、本文会提供关于kafka 作为sink的2个版本,即1.13.6和1.17的版本。

5、以下属性在构建 KafkaSink 时是必须指定的:

  • Bootstrap servers, setBootstrapServers(String)
  • 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)

三、flink sink到kafka示例

1、介绍

Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。

FlinkKafkaProducer 已被弃用并将在 Flink 1.17 中移除,请改用 KafkaSink。

KafkaSink 可将数据流写入一个或多个 Kafka topic。

Kafka sink 提供了构建类来创建 KafkaSink 的实例。

以下两个示例展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka topic。

2、1.13.6版本示例

1)、maven依赖

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.12</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>

2)、实现

import java.util.Properties;
import java.util.Random;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestKafkaSinkDemo {

	public static void test1() throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2、source-主题:alan_source
		// 准备kafka连接参数
		Properties propSource = new Properties();
		propSource.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 集群地址
		propSource.setProperty("group.id", "flink_kafka");
		propSource.setProperty("auto.offset.reset", "latest");
		propSource.setProperty("flink.partition-discovery.interval-millis", "5000");
		propSource.setProperty("enable.auto.commit", "true");
		// 自动提交的时间间隔
		propSource.setProperty("auto.commit.interval.ms", "2000");

		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_source", new SimpleStringSchema(), propSource);

		// 使用kafkaSource
		DataStream<String> kafkaDS = env.addSource(kafkaSource);

		// 3、transformation-统计单词个数
		SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			private Random ran = new Random();

			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] arr = value.split(",");
				for (String word : arr) {
					out.collect(Tuple2.of(word, 1));
				}
			}
		}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
			@Override
			public String map(Tuple2<String, Integer> value) throws Exception {
				System.out.println("输出:" + value.f0 + "->" + value.f1);
				return value.f0 + "->" + value.f1;
			}
		});

		// 4、sink-主题alan_sink
		Properties propSink = new Properties();
		propSink.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
		propSink.setProperty("transaction.timeout.ms", "5000");

		FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("alan_sink", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

		result.addSink(kafkaSink);

		// 5、execute
		env.execute();
	}

	public static void main(String[] args) throws Exception {
		test1();
	}

}

3)、验证步骤

  • 1、创建kafka 主题 alan_source 和 alan_sink
  • 2、驱动程序,观察运行控制台
  • 3、通过命令往alan_source 写入数据,同时消费 alan_sink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>

## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
hello->2
alan->3
alach->2
hello->3
123->1
  • 4、应用程序控制台输出

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka_flink hive

3、1.17.0版本示例

1)、maven依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka</artifactId>
   <version>1.17.1</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-base</artifactId>
   <version>1.17.1</version>
</dependency>

2)、实现

import java.util.Properties;
import java.util.Random;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestKafkaSinkDemo {

	public static void test2() throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// 2、 source
		KafkaSource<String> source = KafkaSource.<String>builder()
																				.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
																				.setTopics("alan_nsource")
																				.setGroupId("flink_kafka")
																				.setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema())
																				.build();

		DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
		
		
		// 3、 transformation
		DataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] arr = value.split(",");
				for (String word : arr) {
					out.collect(Tuple2.of(word, 1));
				}
			}
		}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
			@Override
			public String map(Tuple2<String, Integer> value) throws Exception {
				System.out.println("输出:" + value.f0 + "->" + value.f1);
				return value.f0 + "->" + value.f1;
			}
		});
		
		// 4、 sink
		KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
		        .setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
		        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
		            .setTopic("alan_nsink")
		            .setValueSerializationSchema(new SimpleStringSchema())
		            .build()
		        )
		        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
		        .build();
		
		result.sinkTo(kafkaSink);
		
		// 5、execute
		env.execute();
		
	}

	public static void main(String[] args) throws Exception {
//		test1();
		test2();
	}

}

3)、验证步骤

  • 1、创建kafka 主题 alan_nsource 和 alan_nsink
  • 2、驱动程序,观察运行控制台
  • 3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>


## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
  • 4、应用程序控制台输出

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka_flink sql_02


以上,本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到kafka的例子。


标签:示例,flink,alan,kafka,org,apache,import
From: https://blog.51cto.com/alanchan2win/9013184

相关文章

  • 阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
    ......
  • Flink实验
     题目:实验八姓名 日期12.8实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。 实验内容与完成情况:(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装IntelliJIDEA,然后使用IntelliJIDEA工具开发WordCount程序,并打包......
  • 【SpringBoot快速入门】(4)SpringBoot项目案例代码示例
    目录1创建工程3配置文件4静态资源之前我们已经学习的Spring、SpringMVC、Mabatis、Maven,详细讲解了Spring、SpringMVC、Mabatis整合SSM的方案和案例,上一节我们学习了SpringBoot的开发步骤、工程构建方法以及工程的快速启动,从这一节开始,我们开始学习SpringBoot配置文件。接下来......
  • 【SpringBoot快速入门】(3)SpringBoot整合junit和MyBatis 详细代码示例与讲解
    目录1.SpringBoot整合junit1.1环境准备1.2编写测试类2.SpringBoot整合mybatis2.1回顾Spring整合Mybatis2.2SpringBoot整合mybatis2.2.1创建模块2.2.2定义实体类2.2.3定义dao接口2.2.4定义测试类2.2.5编写配置2.2.6测试2.2.7使用Druid数据源之前我们已经学习的Spring、......
  • 【Kafka-Eagle】EFAK告警配置与实践
    Kafka-Eagle是一个开源的Kafka集群监控与告警系统,可以帮助用户实现对Kafka集群的实时监控、性能指标收集以及异常告警等功能。下面是关于Kafka-Eagle的告警配置和实践的一般步骤:安装和配置Kafka-Eagle:下载最新版本的Kafka-Eagle安装包,并解压到一个合适的目录中。进入Kafka-Eagle的......
  • 项目应用多级缓存示例
    前不久做的一个项目,需要在前端实时展示硬件设备的数据。设备很多,并且每个设备的数据也很多,总之就是数据很多。同时,设备的刷新频率很快,需要每2秒读取一遍数据。问题来了,我们如何读取数据,并且在前端展示?我的想法是利用多级缓存:1)首先是有个数据采集程序,不停地采集设备的数据。采集到......
  • kafka常用命令
    Windows1.启动zookeeper,kafka高版本已经集成zookeeperbin\windows\zookeeper-server-start.batconfig\zookeeper.properties​2.启动kafka服务器bin\windows\kafka-server-start.batconfig\server.properties​3.创建topicbin\windows\kafka-topics.bat--create--......
  • Linux中date命令使用示例
    一、.Linux中的date命令date"+%Y-%m-%d"输出当前日期,格式为“年-月-日”,例如:2023-06-01date"+%Y年%m月%d日%H:%M:%S"输出当前日期喝时间,格式为“年月日时:分:秒”,例如:2023年12月28日04:28:11date"+%b"输出当前月份的英文缩写,例如:Jundate"+%B"输出当前月份的英文全称,例......
  • Kafka 再均衡详解:实现消费者组的负载均衡和故障转移(十二)
    在Kafka中,再均衡(Rebalancing)是指在消费者组内部发生变化时,Kafka会重新分配分区给消费者,以实现负载均衡和故障转移的目的。再均衡是Kafka实现高可用性和可扩展性的重要机制之一。再均衡的场景消费者加入、消费者退出和分区变化等情况。消费者加入:当新的消费者加入消费者组......
  • spring MVC 后端 接收 前端 批量添加的数据(简单示例)
    <%@pagecontentType="text/html;charset=UTF-8"language="java"%><html><head>  <title>Title</title></head><body><scriptsrc="${pageScope.request.ContextPath}/js/jquery-3.3.1.min.js&qu......