首页 > 编程语言 >Kafka Java客户端Stream API

Kafka Java客户端Stream API

时间:2022-11-09 15:06:30浏览次数:73  
标签:Java stream Stream -- kafka Topic API 处理器 Kafka


Kafka Stream概念及初识高层架构图

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。

Kafka Stream的基本概念:

  • Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib)
  • 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境
  • Kafka Stream通过state store可以实现高效的状态操作
  • 支持原语Processor和高层抽象DSL

虽然Kafka Streams只是一个java库,但是它可以解决如下类似问题:

  1. 一次一件事件的处理而不是微批处理,延迟在毫秒级别;
  2. 有状态的处理,包括连接操作(join)和聚合操作
  3. 提供了必要的流处理原语,包括高级流处理DSL和低级流处理API。高级流处理DSL提供了常用的流处理变换操作,低级处理器API支持客户端自定义处理器并与状态仓库(state store)交互;
  4. 使用类似于DataFlow的模型来处理乱序数据的事件窗口问题;
  5. 分布式处理,有容错机制,可以快速容错;
  6. 有重新处理数据的能力;

Kafka Stream的高层架构图:

Kafka Java客户端Stream API_流处理

  • Partition的数据会分发到不同的Task上,Task主要是用来做流式的并行处理
  • 每个Task都会有自己的state store去记录状态
  • 每个Thread里会有多个Task

Kafka Stream 核心概念

流(stream)是Kafka Streams提供的最重要的抽象,它代表一个无限的、不断更新的数据集。一个流就是由一个有序的、可重放的、支持故障转移的不可变的数据记录(data record)序列,其中每个数据记录被定义成一个键值对。

流处理器

一个流处理器(stream processor)是处理拓扑中的一个节点,它代表了拓扑中的处理步骤。

一个流处理器从它所在的拓扑上游接收数据,通过Kafka Streams提供的流处理的基本方法,如map()、filter()、join()以及聚合等方法,对数据进行处理,然后将处理之后的一个或者多个输出结果发送给下游流处理器。

一个拓扑中的流处理器有Source和Sink处理器连个特殊的流处理器;

  • Source处理器:源处理器指的是数据的源头,即第一个处理器,该处理器没有任何上游处理器
  • Sink处理器:该处理器没有任何下游处理器,是最终产出结果的一个处理器。该处理器将从上游处理器接受到的任何数据发送到指定的主题当中;

流处理拓扑:

一个拓扑图,该拓扑图展示了数据流的走向,以及流处理器的节点位置。流处理器是图的节点,流是图的边。

如下图所示:

Kafka Java客户端Stream API_kafka_02


Kafka提供了两种定义流处理拓扑的API:

KafkaStreams DSL API.:这种类型的API提供了一些开箱即用的数据转换操作算子例如:map、filter 和join和聚合类算子,开发这无需处理底层实现细节,缺点就是在一定程度上不够灵活,这样你就不必从头开始实现这些流处理器。

Low-levelAPI:这些低级API允许开发人员定义和连接定制处理器和状态存储器进行交换,更加灵活,但是开发难度相对较大。

Kafka Stream使用演示

下图是Kafka Stream完整的高层架构图:

Kafka Java客户端Stream API_kafka_03


从上图中可以看到,Consumer对一组Partition进行消费,这组Partition可以在一个Topic中或多个Topic中。然后形成数据流,经过各个流处理器后最终通过Producer输出到一组Partition中,同样这组Partition也可以在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。

下面通过几个例子来使用一下Kafka Stream API

Topic之间的流输入

因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。到服务器上使用命令行创建两个Topic:

创建Topic

使用脚本文件和Admin API 创建Topic均可

脚本文件创建topic

进入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash

cd 到脚本文件的文件夹

cd /opt/kafka/bin

使用脚本文件创建Topic
kafka-1是我使用docker-compose 搭建kafka集群的时候的容器名

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-in
./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-stream-out

使用Admin API 来创建Topic 可见 ​​kafka客户端操作之Admin API​​

脚本文件查看已经创建好的API

./kafka-topics.sh --list --bootstrap-server kafka-1:9092

Kafka Java客户端Stream API_kafka_04

流输入代码

由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入Stream的依赖包。

在项目中添加如下依赖:(版本号和部署的kafka server版本号一致)

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.1</version>
</dependency>

Kafka Streams从一个或多个输入topic进行连续的计算并输出到1个或多个外部topic中。

可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)

KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。

KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。

在内部,KafkaStreams实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读取和写入。

下面是一个简单的Topic之间的流输入来体验一下

static void easyStream(){

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

// 构建流构造器
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("xt-stream-in");

//直接将xt-stream-in topic中的数据写入到 xt-stream-out topic中
source.to("xt-stream-out");

final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, props);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);

}

运行以上Stream代码,然后使用脚本文件生产消费消息

脚本生产消息

也可以使用producer API

./kafka-console-producer.sh --topic xt-stream-in --bootstrap-server kafka-1:9092

CTRL-C结束输入

Kafka Java客户端Stream API_流处理_05

脚本消费消息

也可以使用consumer API

./kafka-console-consumer.sh --topic xt-stream-out --bootstrap-server kafka-1:9092 --from-beginning

Kafka Java客户端Stream API_kafka_06

foreach方法

在之前的例子中,我们是从某个Topic读取数据进行流处理后再输出到另一个Topic里。但在一些场景下,我们可能不希望将结果数据输出到Topic,而是写入到一些存储服务中,例如ElasticSearch、MongoDB、MySQL等。

在这种场景下,就可以利用到foreach方法,该方法用于迭代流中的元素。我们可以在foreach中将数据存入例如Map、List等容器,然后再批量写入到数据库或其他存储中间件即可。

foreach方法使用示例:

// 定义流计算过程
static void foreachStream(){

//配置属性
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"foreach-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();

KStream<String,String> source = builder.stream("xt-stream-in");

source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key,value)-> System.out.println(key + " : " + value));

final KafkaStreams streams = new KafkaStreams(builder.build(), props);

streams.start();
}

更多其他方法可以查阅​​官方文档​​,或者直接CTRL-B进入KStream源码查看

词频统计

创建Topic

使用脚本文件和Admin API 创建Topic均可

脚本文件创建topic

进入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash

cd 到脚本文件的文件夹

cd /opt/kafka/bin

使用脚本文件创建Topic
kafka-1是我使用docker-compose 搭建kafka集群的时候的容器名

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-in

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt-wordcount-out

使用Admin API 来创建Topic 可见 ​​kafka客户端操作之Admin API​​

词频统计代码

代码示例:

// 定义流计算过程,词频统计
static void wordcountStream(){

//配置属性
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();
// 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
KStream<String,String> source = builder.stream("xt-wordcount-in");


final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, props);

final CountDownLatch latch = new CountDownLatch(1);

// KTable是数据集合的抽象对象
final KTable<String, Long> count = source
//数据扁平化 ,按照空格分割数据
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// 合并 -> 按value值合并,分别进行统计
.groupBy((key, value) -> value)
// 统计出现的总数
.count();

// 将结果输入到OUT_TOPIC中
KStream<String, Long> sink = count.toStream();
sink.to("xt-wordcount-out", Produced.with(Serdes.String(),Serdes.Long()));


Runtime.getRuntime().addShutdownHook(new Thread("stream-app"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);

}

KTable与KStream的关系与区别

如下图:

Kafka Java客户端Stream API_kafka_07

  • KTable类似于一个时间片段,在一个时间片段内输入的数据就会update进去,以这样的形式来维护这张表,相同Key的每条记录只保存最新的一条记录,类似于数据库的基于主键更新
  • KStream则没有update这个概念,而是不断的追加,是一个由键值对构成的抽象记录流,每个键值对是一个独立的单元,即使相同的Key也不会覆盖,类似数据库的插入操作;

脚本生产消息

运行以上代码,然后到服务器中使用kafka-console-producer.sh脚本命令向input-topic生产一些数据,如下:

./kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-in

向xt-wordcount-in Topic 写入如下消息,CTRL-C结束

Hello World xt
Hello World Kafka
Hello Java
Hello xt

脚本消费消息

然后再运行kafka-console-consumer.sh脚本命令从xt-wordcount-out Topic中消费数据,并进行打印。具体如下:

./kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic xt-wordcount-out --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning

Kafka Java客户端Stream API_流处理_08


从测试的结果可以看出Kafka Stream是实时进行流计算的,并且每次只会针对有变化的内容进行输出。

References:

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料、官方文档和自己的实践,整理的不足和错误之处,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)


标签:Java,stream,Stream,--,kafka,Topic,API,处理器,Kafka
From: https://blog.51cto.com/u_14020077/5836600

相关文章

  • kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析
    背景:​​kafka客户端之producerAPI发送消息以及简单源码分析​​已经介绍了producer的异步发送和异步回调发送消息的基本使用,但是都是使用内置的负载均衡策略。kafka的负......
  • ElasticSearch Java API之索引操作
    背景:​​ElasticSearchJava客户端连接ElasticSearch​​以这篇博客为基础​​ElasticSearch:简单介绍以及使用Docker部署ElasticSearch和Kibana​​这篇博客简单部署了E......
  • kafka 客户端之producer API发送消息以及简单源码分析
    背景:我使用docker-compose搭建的kafka服务​kafka的简单介绍以及docker-compose部署单主机Kafka集群​​KafkaAPI简单介绍kafka除了用于管理和管理任务的命令行工具,Kafka......
  • ElasticSearch Java API之文档操作
    文档Document简单介绍被索引的一条数据,索引的基本信息单元,以JSON格式来表示。比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档......
  • ElasticSearch Java 客户端连接ElasticSearch
    背景:​​ElasticSearch:简单介绍以及使用Docker部署ElasticSearch和Kibana​​这篇博客简单部署了ElasticSearchElasticSearch客户端特征所有ElasticsearchAPI的强类......
  • 【BUG记录】com.alibaba.nacos.api.exception.NacosException: Request nacos server
    BUG背景使用docker搭建nacos服务后,准备用java客户端连接nacosserver的时候出现了如下问题,连接不上。(可能和大家的导致的问题不一样)BUG日志如下的报错可以看见gprc的字眼,咦,......
  • kafka Java客户端之 consumer API 消费消息
    背景:我使用docker-compose搭建的kafka服务​kafka的简单介绍以及docker-compose部署单主机Kafka集群​​使用consumerAPI消费指定Topic里面的消息首先我们需要使用AdminA......
  • java 迭代器使用
    原文链接:https://blog.csdn.net/ACE_kk/article/details/126182500一、前言在迭代器(Iterator)没有出现之前,如果要遍历数组和集合,需要使用方法。数组遍历,代码如下:String[......
  • 使用Javascript查找图像上的坐标
    下面的代码在页面标题中。GetCoordinates函数使用window.event方法查找单击鼠标时的坐标。它还需要考虑任何滚动和图像在文档中的位置,以便坐标始终相对于图像的左上角。......
  • AXI协议(五)-AXI-STREAM及接入思路解析
     在本文中,你将可能学会:AXI-STREAM协议的梗概(下简称axis)尝试编写出普通摄像头接入AXIS的思路本来想讲完怎么接入的,由于篇幅的原因,代码只能留在下一节中讲了,那我们下......