Kafka Stream概念及初识高层架构图
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。
Kafka Stream的基本概念:
- Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib)
- 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境
- Kafka Stream通过state store可以实现高效的状态操作
- 支持原语Processor和高层抽象DSL
虽然Kafka Streams只是一个java库,但是它可以解决如下类似问题:
- 一次一件事件的处理而不是微批处理,延迟在毫秒级别;
- 有状态的处理,包括连接操作(join)和聚合操作
- 提供了必要的流处理原语,包括高级流处理DSL和低级流处理API。高级流处理DSL提供了常用的流处理变换操作,低级处理器API支持客户端自定义处理器并与状态仓库(state store)交互;
- 使用类似于DataFlow的模型来处理乱序数据的事件窗口问题;
- 分布式处理,有容错机制,可以快速容错;
- 有重新处理数据的能力;
Kafka Stream的高层架构图:
- Partition的数据会分发到不同的Task上,Task主要是用来做流式的并行处理
- 每个Task都会有自己的state store去记录状态
- 每个Thread里会有多个Task
Kafka Stream 核心概念
流
流(stream)是Kafka Streams提供的最重要的抽象,它代表一个无限的、不断更新的数据集。一个流就是由一个有序的、可重放的、支持故障转移的不可变的数据记录(data record)序列,其中每个数据记录被定义成一个键值对。
流处理器
一个流处理器(stream processor)是处理拓扑中的一个节点,它代表了拓扑中的处理步骤。
一个流处理器从它所在的拓扑上游接收数据,通过Kafka Streams提供的流处理的基本方法,如map()、filter()、join()以及聚合等方法,对数据进行处理,然后将处理之后的一个或者多个输出结果发送给下游流处理器。
一个拓扑中的流处理器有Source和Sink处理器连个特殊的流处理器;
- Source处理器:源处理器指的是数据的源头,即第一个处理器,该处理器没有任何上游处理器
- Sink处理器:该处理器没有任何下游处理器,是最终产出结果的一个处理器。该处理器将从上游处理器接受到的任何数据发送到指定的主题当中;
流处理拓扑:
一个拓扑图,该拓扑图展示了数据流的走向,以及流处理器的节点位置。流处理器是图的节点,流是图的边。
如下图所示:
Kafka提供了两种定义流处理拓扑的API:
KafkaStreams DSL API.:这种类型的API提供了一些开箱即用的数据转换操作算子例如:map、filter 和join和聚合类算子,开发这无需处理底层实现细节,缺点就是在一定程度上不够灵活,这样你就不必从头开始实现这些流处理器。
Low-levelAPI:这些低级API允许开发人员定义和连接定制处理器和状态存储器进行交换,更加灵活,但是开发难度相对较大。
Kafka Stream使用演示
下图是Kafka Stream完整的高层架构图:
从上图中可以看到,Consumer对一组Partition进行消费,这组Partition可以在一个Topic中或多个Topic中。然后形成数据流,经过各个流处理器后最终通过Producer输出到一组Partition中,同样这组Partition也可以在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。
下面通过几个例子来使用一下Kafka Stream API
Topic之间的流输入
因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。到服务器上使用命令行创建两个Topic:
创建Topic
使用脚本文件和Admin API 创建Topic均可
脚本文件创建topic
进入kafka容器
docker exec -it ${CONTAINER ID} /bin/bash
cd 到脚本文件的文件夹
cd /opt/kafka/bin
使用脚本文件创建Topic
kafka-1是我使用docker-compose 搭建kafka集群的时候的容器名
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-in
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-out
使用Admin API 来创建Topic 可见 kafka客户端操作之Admin API
脚本文件查看已经创建好的API
./kafka-topics.sh --list --bootstrap-server kafka-1:9092
流输入代码
由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入Stream的依赖包。
在项目中添加如下依赖:(版本号和部署的kafka server版本号一致)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.1</version>
</dependency>
Kafka Streams从一个或多个输入topic进行连续的计算并输出到1个或多个外部topic中。
可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)
KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。
KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。
在内部,KafkaStreams实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读取和写入。
下面是一个简单的Topic之间的流输入来体验一下
static void easyStream(){
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
// 构建流构造器
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("xt-stream-in");
//直接将xt-stream-in topic中的数据写入到 xt-stream-out topic中
source.to("xt-stream-out");
final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
运行以上Stream代码,然后使用脚本文件生产消费消息
脚本生产消息
也可以使用producer API
./kafka-console-producer.sh --topic xt-stream-in --bootstrap-server kafka-1:9092
CTRL-C结束输入
脚本消费消息
也可以使用consumer API
./kafka-console-consumer.sh --topic xt-stream-out --bootstrap-server kafka-1:9092 --from-beginning
foreach方法
在之前的例子中,我们是从某个Topic读取数据进行流处理后再输出到另一个Topic里。但在一些场景下,我们可能不希望将结果数据输出到Topic,而是写入到一些存储服务中,例如ElasticSearch、MongoDB、MySQL等。
在这种场景下,就可以利用到foreach方法,该方法用于迭代流中的元素。我们可以在foreach中将数据存入例如Map、List等容器,然后再批量写入到数据库或其他存储中间件即可。
foreach方法使用示例:
// 定义流计算过程
static void foreachStream(){
//配置属性
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"foreach-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> source = builder.stream("xt-stream-in");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key,value)-> System.out.println(key + " : " + value));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
更多其他方法可以查阅官方文档,或者直接CTRL-B进入KStream源码查看
词频统计
创建Topic
使用脚本文件和Admin API 创建Topic均可
脚本文件创建topic
进入kafka容器
docker exec -it ${CONTAINER ID} /bin/bash
cd 到脚本文件的文件夹
cd /opt/kafka/bin
使用脚本文件创建Topic
kafka-1是我使用docker-compose 搭建kafka集群的时候的容器名
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-in
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-out
使用Admin API 来创建Topic 可见 kafka客户端操作之Admin API
词频统计代码
代码示例:
// 定义流计算过程,词频统计
static void wordcountStream(){
//配置属性
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();
// 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
KStream<String,String> source = builder.stream("xt-wordcount-in");
final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, props);
final CountDownLatch latch = new CountDownLatch(1);
// KTable是数据集合的抽象对象
final KTable<String, Long> count = source
//数据扁平化 ,按照空格分割数据
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// 合并 -> 按value值合并,分别进行统计
.groupBy((key, value) -> value)
// 统计出现的总数
.count();
// 将结果输入到OUT_TOPIC中
KStream<String, Long> sink = count.toStream();
sink.to("xt-wordcount-out", Produced.with(Serdes.String(),Serdes.Long()));
Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
KTable与KStream的关系与区别
如下图:
- KTable类似于一个时间片段,在一个时间片段内输入的数据就会update进去,以这样的形式来维护这张表,相同Key的每条记录只保存最新的一条记录,类似于数据库的基于主键更新
- KStream则没有update这个概念,而是不断的追加,是一个由键值对构成的抽象记录流,每个键值对是一个独立的单元,即使相同的Key也不会覆盖,类似数据库的插入操作;
脚本生产消息
运行以上代码,然后到服务器中使用kafka-console-producer.sh脚本命令向input-topic生产一些数据,如下:
./kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-in
向xt-wordcount-in Topic 写入如下消息,CTRL-C结束
Hello World xt
Hello World Kafka
Hello Java
Hello xt
脚本消费消息
然后再运行kafka-console-consumer.sh脚本命令从xt-wordcount-out Topic中消费数据,并进行打印。具体如下:
./kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-out --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
从测试的结果可以看出Kafka Stream是实时进行流计算的,并且每次只会针对有变化的内容进行输出。
References:
(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料、官方文档和自己的实践,整理的不足和错误之处,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)