首页 > 其他分享 >kafka随记

kafka随记

时间:2022-10-19 10:46:30浏览次数:53  
标签:-- 分区 broker kafka topic server 随记

一、概述

1.定义

传统定义:kafka是一个分布式的基于发布/订阅模式的消息队列

最新定义:kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用

2.应用场景

缓存/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致情况

解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

异步通信:允许用户把一个消息放入队列,但不立即处理,在需要的时候再处理

3.消息队列模式

点对点模式:消费者主动拉取数据,消息收到后消除消息

发布/订阅模式:可以有多个topic;消费者消费后不删除数据;每个消费者相互独立,都可以消费到数据

4.基础架构

(1) Producer: 消息生产者,就是向 Kafka broker 发消息的客户端。
(2) Consumer: 消息消费者,向 Kafka broker 取消息的客户端。
(3) Consumer Group(CG): 消费者组,由多个 consumer 组成。 消费者组内每个消
费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4) Broker: 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个
broker 可以容纳多个 topic。
(5) Topic: 可以理解为一个队列, 生产者和消费者面向的都是一个 topic。
(6) Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服
务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
(7) Replica: 副本。 一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个
Follower。
(8) Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数
据的对象都是 Leader。
( 9) Follower: 每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和
Leader 数据的同步。 Leader 发生故障时,某个 Follower 会成为新的 Leader。

注:zk上查看brokers信息:ls /kafka/brokers/ids  topic信息:ls /kafka/topics/....

 

二、安装

1.官网下载地址:https://kafka.apache.org/downloads

2.安装脚本:

3.配置说明:server.properties

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建, kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用", "分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824尚硅谷大数据技术之 Kafka
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181/ka
fka

 4.服务启停(需zk先启动)

bin/kafka-server-start.sh  -daemon config/server.properties 

bin/kafka-server-stop.sh

启停脚本:

#! /bin/bash
case $1 in
"start"){
for i in kf1 kf2 kf3
do
echo " --------启动 $i Kafka-------"
ssh $i "/data/apps/kafka/bin/kafka-server-start.sh -daemon /data/apps/kafka/config/server.properties"
done
};;
"stop"){
for i in kf1 kf2 kf3
do
echo " --------停止 $i Kafka-------"
ssh $i "/data/apps/kafka/bin/kafka-server-stop.sh "
done
};;
esac

注:先停止kafka再关闭zk,不然kafka没办法获取停止进程的信息,只能手动杀死kafka进程

 

三、使用

1.命令操作

操作主题命令参数:kafka-topics.sh  --help

--bootstrap-server :连接kafka broker主机名和端口号

--topic: 操作的topic

--create/delete/alter/list/describe   : 创建、删除、修改、查看所有主题、查看主题详细信息

--partitions : 设置分区数

--replication-factor :设置分区副本

--config : 更新系统默认配置

 

示例:

创建topic:kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 3 --topic mytest

查看topic详情: kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic mytest

修改分区数: kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic mytest

注:分区数只能增加不能减少

减少分区报错:
[root@kf1 bin]# kafka-topics.sh --bootstrap-server localhost:9092 --alter --partitions 2 --topic mytest Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2. [2022-10-19 09:27:01,826] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2. (kafka.admin.TopicCommand$)

 

2.生产者

(1)发送消息:kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic mytest

(2)消费消息:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning

参数:--from-beinning 从头开始消费(默认最新消息消费)

--group  指定消费组名称

(2)发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,

Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broke

参考名称 描述
bootstrap.servers broker地址
key.serializer和value.serializer 指定发送消息的key和value的序列化类型。一定要写全类名
buffer.memory RecordAccumulator缓冲区总大小,默认32M
batch.size

缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加

linger.ms 如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。
acks

0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据, Leader 收到数据后应答。
-1(all):生产者发送过来的数据, Leader+和 isr 队列
里面的所有节点收齐数据后应答。 默认值是-1, -1 和all 是等价的。

max.in.flight.requests.per.connection  允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries

当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence  是否开启幂等性, 默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。
   

 (3) 生产者分区

分区好处:便于合理使用存储资源 ,每个分区在一个broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多个broker上。合理控制分区任务,可以实现负载均衡的效果。 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区单位进行消费数据

(4)生产者提高吞吐量

batch.size: 批次大小,默认16k

linger.ms: 等待时间,需改为5-100ms

compression.type: 压缩snappy

RecordAccumulator: 缓冲区大小,修改为64m

 

 

 

 

参考官网配置:https://kafka.apache.org/documentation/#producerconfigs

标签:--,分区,broker,kafka,topic,server,随记
From: https://www.cnblogs.com/aroin/p/16804961.html

相关文章

  • Linux安装Kafka(Docker方式)
    安装步骤(已亲测好使):#笔者版本ZOOKEEPER_VERSION=3.4.13//DockerVersion=18.03.1-ee-3#拉zookeeper镜像dockerpullwurstmeister/zookeeper#笔者版本KAFKA_VERSION......
  • kafka 按时间戳消费
    步骤获取当前topic的分区列表利用offsets_for_times()+时间戳查找给定分区的偏移量,如:找到开始时间的偏移量循环每个分区,设置偏移量根据end_offset或结束时间退......
  • 技术分享| 消息队列Kafka群集部署
    一、简介1、介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志......
  • Dinky的使用——kafka2mysql
    需求:通过在kafka的topic里面传入json串,再把数据同步到mysql中,这个也可以作为半结构化数据同步的案例一、添加依赖包将依赖包放到dinky的pulgins目录和flink的lib目录下,并......
  • 自动生成模拟数据发至kafka topic
    自动生成一下json数据脚本json数据样例{"provinceCode":"290","companyName":"test","appId":"10","appName":"apptest","eventTime":"2022-10-1709:52:","errorTy......
  • flink sql kafka数据接入clickhouse
    --参数--并行度设置set'parallelism.default'='2';--resetexecution.savepoint.path;--resetexecution.checkpoint.path;--设置队列set'yarn.application.q......
  • 技术分享| 消息队列Kafka群集部署
    一、简介1、介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日......
  • flink sql kafka数据接入mysql
    --定义source表droptableIFEXISTSsource_applet_kafka;CREATETABLEIFNOTEXISTSsource_applet_kafka(provinceCodeString,companyNameString,appIdStri......
  • kafka集群启动脚本
    #!/bin/bashcase$1in#启动“start”)forhostinhadoop101hadoop102hadoop103doecho“***********starthostkafka***********”......
  • 初识Kafka
    Kafka是一个多分区、多副本且基于ZooKeeper协调的分布式消息系统Kafka之所以受到越来越多的青睐,与它所“扮演”的三大角色是分不开的:消息系统:Kafka和传统的消息中......