文章目录
- Flink 系列文章
- 一、maven依赖
- 二、环境或版本说明
- 三、flink sink到kafka示例
- 1、介绍
- 2、1.13.6版本示例
- 1)、maven依赖
- 2)、实现
- 3)、验证步骤
- 3、1.17.0版本示例
- 1)、maven依赖
- 2)、实现
- 3)、验证步骤
本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到kafka的例子。
本文除了maven依赖外,没有其他依赖。
本文需要有kafka的运行环境。
一、maven依赖
为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖
下文中具体需要的依赖将在介绍时添加新增的依赖。
二、环境或版本说明
1、该示例需要有kafka的运行环境,kafka的部署与使用参考文章:
1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试
2、Flink关于kafka的使用在不同的版本中有不同的实现,最直观的的变化是由FlinkKafkaConsumer换成了KafkaSource,同理sink也有相应的由FlinkKafkaProducer换成了KafkaSink。
3、由于使用kafka涉及的内容较多,请参考文章:
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
4、本文会提供关于kafka 作为sink的2个版本,即1.13.6和1.17的版本。
5、以下属性在构建 KafkaSink 时是必须指定的:
- Bootstrap servers, setBootstrapServers(String)
- 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
- 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)
三、flink sink到kafka示例
1、介绍
Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。
FlinkKafkaProducer 已被弃用并将在 Flink 1.17 中移除,请改用 KafkaSink。
KafkaSink 可将数据流写入一个或多个 Kafka topic。
Kafka sink 提供了构建类来创建 KafkaSink 的实例。
以下两个示例展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka topic。
2、1.13.6版本示例
1)、maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
2)、实现
import java.util.Properties;
import java.util.Random;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
/**
* @author alanchan
*
*/
public class TestKafkaSinkDemo {
public static void test1() throws Exception {
// 1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、source-主题:alan_source
// 准备kafka连接参数
Properties propSource = new Properties();
propSource.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 集群地址
propSource.setProperty("group.id", "flink_kafka");
propSource.setProperty("auto.offset.reset", "latest");
propSource.setProperty("flink.partition-discovery.interval-millis", "5000");
propSource.setProperty("enable.auto.commit", "true");
// 自动提交的时间间隔
propSource.setProperty("auto.commit.interval.ms", "2000");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_source", new SimpleStringSchema(), propSource);
// 使用kafkaSource
DataStream<String> kafkaDS = env.addSource(kafkaSource);
// 3、transformation-统计单词个数
SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
private Random ran = new Random();
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(",");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
System.out.println("输出:" + value.f0 + "->" + value.f1);
return value.f0 + "->" + value.f1;
}
});
// 4、sink-主题alan_sink
Properties propSink = new Properties();
propSink.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
propSink.setProperty("transaction.timeout.ms", "5000");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("alan_sink", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
result.addSink(kafkaSink);
// 5、execute
env.execute();
}
public static void main(String[] args) throws Exception {
test1();
}
}
3)、验证步骤
- 1、创建kafka 主题 alan_source 和 alan_sink
- 2、驱动程序,观察运行控制台
- 3、通过命令往alan_source 写入数据,同时消费 alan_sink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>
## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
hello->2
alan->3
alach->2
hello->3
123->1
- 4、应用程序控制台输出
3、1.17.0版本示例
1)、maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.1</version>
</dependency>
2)、实现
import java.util.Properties;
import java.util.Random;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
/**
* @author alanchan
*
*/
public class TestKafkaSinkDemo {
public static void test2() throws Exception {
// 1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、 source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
.setTopics("alan_nsource")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3、 transformation
DataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(",");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
System.out.println("输出:" + value.f0 + "->" + value.f1);
return value.f0 + "->" + value.f1;
}
});
// 4、 sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("alan_nsink")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
result.sinkTo(kafkaSink);
// 5、execute
env.execute();
}
public static void main(String[] args) throws Exception {
// test1();
test2();
}
}
3)、验证步骤
- 1、创建kafka 主题 alan_nsource 和 alan_nsink
- 2、驱动程序,观察运行控制台
- 3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>
## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
- 4、应用程序控制台输出
以上,本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到kafka的例子。