首页 > 其他分享 >Kafka

Kafka

时间:2022-12-13 11:03:47浏览次数:64  
标签:消费者 创建 kafka 消息 提交 Kafka

1.1 kafaka 简介

Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统,

使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点,较之传统的消息中

间件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、内置分区、支持消息副本和高容

错的特性,非常适合大规模消息处理应用程序。

1.2 Kafa 系统架构

Kafka_测试类

 

Kafka_配置文件_02

1.3 应用场景

Kafka 的应用场景很多,这里就举几个最常见的场景。

1.3.1 用户的活动追踪

用户在网站的不同活动消息发布到不同的主题中心,然后可以对这些消息进行实时监测

实时处理。当然,也可加载到 Hadoop 或离线处理数据仓库,对用户进行画像。像淘宝、京

东这些大型的电商平台,用户的所有活动都是要进行追踪的。

1.3.2 日志聚合

Kafka_配置文件_03

 

 

1.3.3 限流削峰

Kafka_kafka_04

 

 

1.4 kafka 高吞吐率实现

Kafka 与其它 MQ 相比,其最大的特点就是高吞吐率。为了增加存储能力,Kafka 将所有

的消息都写入到了低速大容的硬盘。按理说,这将导致性能损失,但实际上,kafka 仍可保

持超高的吞吐率,性能并未受到影响。其主要采用了如下的方式实现了高吞吐率。

 顺序读写:Kafka 将消息写入到了分区 partition 中,而分区中消息是顺序读写的。顺序

读写要远快于随机读写。

 零拷贝:生产者、消费者对于 kafka 中消息的操作是采用零拷贝实现的。

 批量发送:Kafka 允许使用批量消息发送模式。

 消息压缩:Kafka 支持对消息集合进行压缩

第2章 Kafka 工作原理与工作过程

2.1 Kafka 基本原理

2.2 Kafka 工作原理与过程

2.3 Kafka 集群搭建

在生产环境中为了防止单点问题,Kafka 都是以集群方式出现的。下面要搭建一个 Kafka

集群,包含三个 Kafka 主机,即三个 Broker

2.3.1 Kafka 的下载

Kafka_kafka_05

 

 

2.3.2 安装并配置第一台主机

(1) 上传并解压

将下载好的 Kafka 压缩包上传至 CentOS 虚拟机,并解压。

Kafka_配置文件_06

 

 

(2) 创建软链接

Kafka_测试类_07

 

 

(3) 修改配置文件

在 kafka 安装目录下有一个 config/server.properties 文件,修改该文件。

Kafka_kafka_08

 

 

Kafka_配置文件_09

 

 

 

 

 

 

 

Kafka_配置文件_10

 

 

 

 

2.3.3 再克隆两台 Kafka

以 kafkaOS1 为母机再克隆两台 Kafka 主机。在克隆完毕后,需要修改 server.properties

中的 broker.id、listeners 与 advertised.listeners。

 

 

 

Kafka_测试类_11

 

 

 

Kafka_测试类_12

 

 

 

2.3.4 kafka 的启动与停止

 

 

(1) 启动 zookeeper

 

Kafka_测试类_13

 

 

 

(2) 启动 kafka

 

在命令后添加-daemon 参数,可以使 kafka 以守护进程方式启动,即不占用窗口。

 

 

Kafka_kafka_14

 

 

 

(3) 停止 kafka

 

Kafka_配置文件_15

 

 

 

 

2.3.5 kafka 操作

 

(1) 创建 topic

Kafka_测试类_16

 

 

(2) 查看 topic

Kafka_测试类_17

 

 

(3) 发送消息

该命令会创建一个生产者,然后由其生产消息。

Kafka_kafka_18

 

 

 

(4) 消费消息

Kafka_测试类_19

 

 

(5) 继续生产消费

Kafka_测试类_20

 

 

Kafka_配置文件_21

 

 

(6) 删除 topic

Kafka_测试类_22

 

 

2.4 日志查看

我们这里说的日志不是Kafka的启动日志,启动日志在Kafka安装目录下的logs/server.log

中。消息在磁盘上都是以日志的形式保存的。我们这里说的日志是存放在/tmp/kafka_logs

目录中的消息日志,即 partition 与 segment。

2.4.1 查看分区与备份

(1) 1 个分区 1 个备份

我们前面创建的 test 主题是 1 个分区 1 个备份

Kafka_测试类_23

 

 

(2) 3 个分区 1 个备份

再次创建一个主题,命名为 one,创建三个分区,但仍为一个备份。 依次查看三台

broker,可以看到每台 broker 中都有一个 one 主题的分区

Kafka_测试类_24

 

 

 

 

(3) 3 个分区 3 个备份

再次创建一个主题,命名为 two,创建三个分区,三个备份。依次查看三台 broker,可

以看到每台 broker 中都有三份 two 主题的分区。

 

Kafka_测试类_25

 

 

2.4.2 查看段 segment

(1) segment 文件

segment 是一个逻辑概念,其由两类物理文件组成,分别为“.index”文件和“.log”文

件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。

00000000000000001456.log

Kafka_kafka_26

 

 

(2) 查看 segment

对于 segment 中的 log 文件,不能直接通过 cat 命令查找其内容,而是需要通过 kafka

自带的一个工具查看

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files

/tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log

一个用户的一个主题会被提交到一个__consumer_offsets 分区中。使用主题字符串的

hash 值与 50 取模,结果即为分区索引。

 

第3章 Kafka API

首先在命令行创建一个名称为 cities 的主题,并创建该主题的订阅者。

 

3.1 使用 kafka 原生 API

3.1.1 创建工程

创建一个 Maven 的 Java 工程,命名为 kafkaDemo。创建时无需导入依赖。为了简单,

后面的发布者与消费者均创建在该工程中。

 

Kafka_配置文件_27

 

 

3.1.2 导入依赖

<!-- kafka 依赖 -->

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.12</artifactId>

<version>1.1.1</version>

</dependency>

3.1.3 创建发布者 OneProducer

(1) 创建发布者类 OneProducer

Kafka_kafka_28

 

 

Kafka_测试类_29

 

 

(2) 创建测试类 OneProducerTest

Kafka_测试类_30

 

 

3.1.4 创建发布者 TwoProducer

前面的方式在消息发送成功后,代码中没有任何提示,这里可以使用回调方式,即发送

成功后,会触发回调方法的执行

(1) 创建发布者类 TwoProducer

复制 OneProducer 类,仅修改 sendMsg()方法。

Kafka_配置文件_31

 

 

(2) 创建测试类 TwoProducerTest

Kafka_kafka_32

 

 

3.1.5 批量发送消息

(1) 创建发布者类 SomeProducerBatch

复制前面的发布者类,在其基础上进行修改。

Kafka_测试类_33

 

 

Kafka_测试类_34

 

 

(2) 创建测试类 ProducerBatchTest

Kafka_测试类_35

 

 

3.1.6 消费者组

(1) 创建消费者类 SomeConsumer

Kafka_测试类_36

 

 

Kafka_测试类_37

 

 

创建测试类 ConsumerTest

Kafka_测试类_38

 

 

3.1.7 消费者同步手动提交

(1) 自动提交的问题

前面的消费者都是以自动提交 offset 的方式对 broker 中的消息进行消费的,但自动提交

可能会出现消息重复消费的情况。所以在生产环境下,很多时候需要对 offset 进行手动提交,

以解决重复消费的问题

(2) 手动提交分类

手动提交又可以划分为同步提交、异步提交,同异步联合提交。这些提交方式仅仅是

doWork()方法不相同,其构造器是相同的。所以下面首先在前面消费者类的基础上进行构造

器的修改,然后再分别实现三种不同的提交方式。

(3) 创建消费者类 SyncManualConsumer

 

A、原理

同步提交方式是,消费者向 broker 提交 offset 后等待 broker 成功响应。若没有收到响

应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响

了消费者的吞吐量。

B、 修改构造器

直接复制前面的 SomeConsumer,在其基础上进行修改。

Kafka_kafka_39

 

 

 

C、 修改 doWork()方法

Kafka_测试类_40

 

 

(4) 创建测试类 SyncManulTest

Kafka_配置文件_41

 

 

3.1.8 消费者异步手动提交

(1) 原理

手动同步提交方式需要等待 broker 的成功响应,效率太低,影响消费者的吞吐量。异

主讲:Reythor 雷

22 分布式消息系统 Kafka

步提交方式是,消费者向 broker 提交 offset 后不用等待成功响应,所以其增加了消费者的吞

吐量

(2) 创建消费者类 AsyncManualConsumer

复制前面的 SyncManualConsumer 类,在其基础上进行修改

Kafka_配置文件_42

 

 

 

(3) 创建测试类 AsyncManulTest

Kafka_测试类_43

 

 

3.1.9 消费者同异步手动提交

(1) 原理

同异步提交,即同步提交与异步提交组合使用。一般情况下,若偶尔出现提交失败,其

也不会影响消费者的消费。因为后续提交最终会将这次提交失败的 offset 给提交了。

但异步提交会产生重复消费,为了防止重复消费,可以将同步提交与异常提交联合使用

 

(2) 创建消费者类 SyncAsyncManualConsumer

复制前面的 AsyncManualConsumer 类,在其基础上进行修改。


Kafka_配置文件_44

 

 

 

3.2 Spring Boot Kafka

为了简单,以下代码是将消息发布者与订阅者定义到了一个工程中的。

3.2.1 创建工程

创建一个 Spring Boot 工程,导入如下依赖。

 

 

Kafka_kafka_45

 

 

3.2.2 定义发布者

Spring 是通过 KafkaTemplate 来完成对 Kafka 的操作的。

(1) 修改配置文件

 

 

Kafka_kafka_46

 

 

(2) 定义发布者处理器

 

Spring Kafka 通过 KafkaTemplate 完成消息的发布。

Kafka_测试类_47

 

 

3.2.3 定义消费者

Spring 是通过监听方式实现消费者的。

 

 

(1) 修改配置文件

在配置文件中添加如下内容。注意,Spring 中要求必须为消费者指定组。

 

Kafka_测试类_48

 

 

(2) 定义消费者

 

Spring Kafka 是通过 KafkaListener 监听方式来完成消息订阅与接收的。当监听到有指定

主题的消息时,就会触发@KafkaListener 注解所标注的方法的执行。

 

 

Kafka_配置文件_49

 

 

 

 

 

 

 

 

 

 

 

 















 



标签:消费者,创建,kafka,消息,提交,Kafka
From: https://blog.51cto.com/u_15716707/5933148

相关文章

  • kafka重启失败
     一个线上环境,因为修改了zookeeper配置,需要重启下kafka服务,然而重启后kafka却报错了,下面就让我们来看看复原场景吧! 结果启动失败,报如下错误:[2021-05-2507:26:14,35......
  • Zookeeper+Kafka集群
    一、Zookeeper概述1、Zookeeper概述Zookeeper:是一个分布式的、开源的程序协调服务,是hadoop项目下的一个子项目。他提供的主要功能包括:配置管理、名字服务、分布式锁、......
  • zookeeper和消息队列kafka
    一、Zookeeper是什么?1、Zookeeper服务集群的条件2、Zookeeper工作机制3、Zookeeper数据结构4、Zookeper特点5、Zookeeper选举机制6、Zookeeper应用场景二、Zookeepe......
  • Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Time
    遇到问题:今天在启动kafka后创建topic时遇到如下错误:[root@localhostconfig]#kafka-topics.sh--create--zookeeper192.168.68.110:2181,192.168.68.111:2181,192.16......
  • kafka 分区迁移统计时间脚本
    #!/bin/bsahstarttime=`date+'%Y-%m-%d%H:%M:%S'`;echo"Start:$starttime";/app/ctgkafka/ctg_kafka_instance/13/icc-5gcmp-kafka-yl5/1/bin/kafka-reassign-partition......
  • 继续南山聊代码!Apache Kafka × Apache Flink Meetup · 深圳站
    8月31日,ApacheFlinkMeetup深圳站来啦,继续南山聊代码!本次Meetup由ApacheFlink(以下简称Flink)与ApacheKafka联合举办,邀请到来自Confluent、中国农业银行、虎......
  • 回顾 | Kafka x Flink Meetup 与世界人工智能大会大数据 AI 专场精彩回顾(附PPT下载)
    8月最后一天,由ApacheKafka与ApacheFlink联合举办的Meetup深圳站圆满落幕,现场站无虚席,来自Confluent、中国农业银行、虎牙直播、数见科技以及阿里巴巴的五位技术......
  • kafka 简明教程
    概述kafka是一个具有分布式特点的发布订阅的消息队列。可以系统解耦,流量削峰,异步通信,分布式存储。具有相同消息队列功能的其他产品:MQ,REDIS等。名词解释(重点概念)pr......
  • javaclient操作kafka&springboot整合kafka&kafka分区
    1.javaclient测试kafka1.配置kafka允许远程推送修改config/Kraft/server.properties文件,,将地址变为服务器公网IP地址。advertised.listeners=PLAINTEXT://localhos......
  • zookeeper+kafka
    一、Zookeeper概述1、Zookeeper定义Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。2、Zookeeper工作机制Zookeeper从设计模式角度来理解:是......