首页 > 其他分享 >kafka

kafka

时间:2024-10-20 19:24:43浏览次数:1  
标签:oldboyedu 10.0 zk kafka 2181 CONNECTED root

kafka

1. zookeeper集群搭建

1.1 作用

什么是zookeeper

zookeeper致力于维护开源服务器,实现高度可靠的分布式协调

zookeeper是一个用于维护配置信息,命名,提供分布式服务和提供组服务的集中式服务

说白了:zookeeper的作用就是为分布式集群各节点提供数据共享的功能

1.2 应用场景

  • kafka
  • dubbo
  • hdfs

一切需要用到分布式服务的中间件都需要使用zookeeper

zookeeper集群推荐三台为一个集群

1.3 zookeeper单节点部署

(1) 上传软件包到服务器并解压

tar xf apache-zookeeper-3.8.4-bin.tar.gz -C /oldboy/softwares/
[root@zookeeper1 /softwares]#cd /oldboy/softwares/
[root@zookeeper1 /oldboy/softwares]#ll
total 0
drwxr-xr-x 6 root root 133 Oct 14 23:08 apache-zookeeper-3.8.4-bin

(2) 创建环境变量:将bin下的文件配置即可

1. 创建软链接
[root@zookeeper1 /oldboy/softwares]#ln -s apache-zookeeper-3.8.4-bin/ zk
[root@zookeeper1 /oldboy/softwares]#ll
total 0
drwxr-xr-x 6 root root 133 Oct 14 23:08 apache-zookeeper-3.8.4-bin
lrwxrwxrwx 1 root root  27 Oct 14 23:12 zk -> apache-zookeeper-3.8.4-bin/

2. 配置环境变量
[root@zookeeper1 /oldboy/softwares]#vim /etc/profile.d/zk.sh
[root@zookeeper1 /oldboy/softwares]#cat /etc/profile.d/zk.sh 
#!/bin/bash
export ZK_HOME=/oldboy/softwares/zk
export PATH=$PATH:$ZK_HOME/bin

source /etc/profile.d/zk.sh

(3) 生成配置文件

1. 复制出来一个zoo.cfg的配置文件
[root@zookeeper1 /oldboy/softwares]#cp zk/conf/{zoo_sample,zoo}.cfg
[root@zookeeper1 /oldboy/softwares]#ll
total 0
drwxr-xr-x 6 root root 133 Oct 14 23:08 apache-zookeeper-3.8.4-bin
lrwxrwxrwx 1 root root  27 Oct 14 23:12 zk -> apache-zookeeper-3.8.4-bin/
[root@zookeeper1 /oldboy/softwares]#ll zk/conf/
total 20
-rw-r--r-- 1 oldboy oldboy  535 Feb 13  2024 configuration.xsl
-rw-r--r-- 1 oldboy oldboy 4559 Feb 13  2024 logback.xml
-rw-r--r-- 1 root   root   1183 Oct 14 23:21 zoo.cfg
-rw-r--r-- 1 oldboy oldboy 1183 Feb 13  2024 zoo_sample.cfg

(4) 启动服务

[root@zookeeper1 /oldboy/softwares]#zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

# 注意  需要安装jdk

(5) 查看服务状态

[root@zookeeper1 /oldboy/softwares]#zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone

standalone : 单点

(6) 验证

[root@zookeeper1 /oldboy/softwares]# ss -ntl |grep 2181
LISTEN     0      50        [::]:2181                  [::]:*  

(7) 连接测试

[root@zookeeper1 /oldboy/softwares]#zkCli.sh 
Connecting to localhost:2181

......
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]

(8) 停止服务

[root@zookeeper1 /oldboy/softwares]#zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

1.4 zookeeper集群部署

(1) 编写配置文件

[root@zookeeper1 /oldboy/softwares]#vim zk/conf/zoo.cfg 
[root@zookeeper1 /oldboy/softwares]#cat zk/conf/zoo.cfg 
# 定义最小单元的时间范围tick。
tickTime=2000
# 启动时最长等待tick数量。
initLimit=5
# 数据同步时最长等待的tick时间进行响应ACK
syncLimit=2
# 指定数据目录
dataDir=/oldboyedu/data/zk
# 监听端口
clientPort=2181
# 开启四字命令允许所有的节点访问。
4lw.commands.whitelist=*
# server.ID=A:B:C[:D]
# ID:
#    zk的唯一编号。
# A:
#    zk的主机地址。
# B:
#    leader的选举端口,是谁leader角色,就会监听该端口。
# C: 
#    数据通信端口。
# D:
#    可选配置,指定角色。
server.201=10.0.0.201:5888:6888
server.202=10.0.0.202:5888:6888
server.203=10.0.0.203:5888:6888

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

(2) 同步配置文件到其他节点

(3) 创建数据目录

mkdir -p  /oldboyedu/data/zk

(4) 创建myid文件

for ((host_id=201;host_id<=203;host_id++)) do ssh 10.0.0.${host_id} "echo ${host_id} > /oldboyedu/data/zk/myid";done

# 验证
[root@zookeeper1 ~]#cat /oldboyedu/data/zk/myid 
201
[root@zookeeper2 ~]#cat /oldboyedu/data/zk/myid
202
[root@zookeeper3 ~]#cat /oldboyedu/data/zk/myid
203

(5) 启动集群

[root@zookeeper1 ~]#zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

三个节点同时发送启动指定

(6) 查看状态及主

[root@zookeeper1 ~]#zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@zookeeper2 ~]#zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@zookeeper3 ~]#zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /oldboy/softwares/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

(7) 使用集群的方式工作

  • 在各节点zookeeper服务运行时执行下面命令并无影响
zkCli.sh -server 10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181

温馨提示:
1.可以尝试挂掉leader节点,验证是否能够正常使用。
2.当集群半数以上节点存活,发现集群可以正常提供服务,如果集群容错率是N,需要 2N+1台机器;
3.zookeeper的leader选举流程先比较zxid(zookeeper的事务ID),如果比较不出来 再比较myid(zookeeper节点数字编号的唯一标识),谁大谁就是leader

  • 半数以上机制,才能正常提供服务

1.5 zookeeper命令行的基本使用

(1) 进入客户端

zkCli.sh

(2) 创建zookeeper node

[zk: localhost:2181(CONNECTED) 0] create 
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
# 创建时不指定数据
[zk: localhost:2181(CONNECTED) 0] create /oldboy
Created /oldboy
# 创建时指定数据
[zk: localhost:2181(CONNECTED) 1] create /linux90 jiaoshi05
Created /linux90

(3) 查看zookeeper node列表

[zk: localhost:2181(CONNECTED) 2] ls /
[linux90, oldboy, zookeeper]

# # 查看zookeeper node 存储的数据
[zk: localhost:2181(CONNECTED) 3] get /linux90 
jiaoshi05
[zk: localhost:2181(CONNECTED) 4] get /oldboy 
null

(4) 修改数据

[zk: localhost:2181(CONNECTED) 4] get /oldboy 
null
[zk: localhost:2181(CONNECTED) 5] set /oldboy elk
[zk: localhost:2181(CONNECTED) 6] get /oldboy 
elk

(5) 删除zookeeper node

[zk: localhost:2181(CONNECTED) 7] delete /oldboy 
[zk: localhost:2181(CONNECTED) 8] ls /
[linux90, zookeeper]

(6) 递归(嵌套)创建zookeeper node

  • 创建多级zookeeper node 只能一级一级创建
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 16] create /oldboyedu
Created /oldboyedu
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 17] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 17] create /oldboyedu/linux90 jiaoshi05
Created /oldboyedu/linux90
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 18] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 18] create /oldboyedu/linux89 jiaoshi03
Created /oldboyedu/linux89
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 19] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 19] ls /oldboyedu 
[linux89, linux90]
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 20] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 20] get /oldboyedu/linux90 
jiaoshi05
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 21] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 21] get /oldboyedu/linux89 
jiaoshi03
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 22] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 22] get /oldboyedu 
null
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 23] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 23] set /oldboyedu  laonanhai
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 24] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 24] get /oldboyedu 
laonanhai
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 25] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 25] deleteall /oldboyedu 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 26] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 26] ls /
[linux90, zookeeper]
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 27] 

1.6 扩展

(1) 查看zookeeper node的节点状态

- zookeeper的扩展知识上篇(了解即可)
	
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 42] stat /linux90 
cZxid = 0x60000000f
ctime = Tue Feb 27 07:11:33 UTC 2024
mZxid = 0x60000000f
mtime = Tue Feb 27 07:11:33 UTC 2024
pZxid = 0x60000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 43] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 43] ls -s /linux90 
[]
cZxid = 0x60000000f
ctime = Tue Feb 27 07:11:33 UTC 2024
mZxid = 0x60000000f
mtime = Tue Feb 27 07:11:33 UTC 2024
pZxid = 0x60000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 44] 




使用stat命令可以查看znode的元数据信息。
		cZxid:
			数据节点创建时的事物ID。        
		ctime:
			数据节点创建时的时间。        
		mZxid:
			数据节点最后一次更新时的事物ID。        
		mtime:
			数据节点最后一次更新时的时间。        
		pZxid:
			数据节点的子节点最后一次被修改时的事务ID。        
		cversion:
			子节点的更改次数。        
		dataVersion:
			数据节点的更改次数,即维护的是一个数据版本号。        
		aclVersion:
			节点的ACL的更改次数。        
		ephemeralOwner:
			如果节点是临时节点,则表示创建该节点的会话SessionID,如果节点是持久节点,则该属性值为0。        
		dataLength:
			数据内容的长度。        
		numChildren:
			数据节点当前的子节点的数量。

(2) zookeeper的node类型

[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 53] create -s /linux90/c2
Created /linux90/c20000000001
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 54] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 54] create -e /linux90/c3
Created /linux90/c3
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 55] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 55] ls /linux90
[c1, c20000000001, c3]
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 56] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 56] stat /linux90/c1
cZxid = 0x60000001e
ctime = Tue Feb 27 07:25:18 UTC 2024
mZxid = 0x60000001e
mtime = Tue Feb 27 07:25:18 UTC 2024
pZxid = 0x60000001e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 57] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 57] stat /linux90/c20000000001 
cZxid = 0x60000001f
ctime = Tue Feb 27 07:25:29 UTC 2024
mZxid = 0x60000001f
mtime = Tue Feb 27 07:25:29 UTC 2024
pZxid = 0x60000001f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 58] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 58] stat /linux90/c3 
cZxid = 0x600000020
ctime = Tue Feb 27 07:25:34 UTC 2024
mZxid = 0x600000020
mtime = Tue Feb 27 07:25:34 UTC 2024
pZxid = 0x600000020
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x5f0004831c010002
dataLength = 0
numChildren = 0
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 59] 


温馨提示:
	-s :
		创建一个带序号 的zookeeper node。
	-e:
		创建的是临时的zookeeper node,其ephemeralOwner状态默认为会话的session ID,比如:"0x5f0004831c010002"

(3) watch机制

		3.1 终端1 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 3] ls -w /linux90
[c1, c20000000001]
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 4] 
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/linux90


		3.2 终端2
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 4] ls /linux90
[c1, c20000000001]
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 5] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 5] create /linux90/c3
Created /linux90/c3
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 6] 
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 6] ls /linux90
[c1, c20000000001, c3]
[zk: 10.0.0.94:2181,10.0.0.95:2181,10.0.0.96:2181(CONNECTED) 7] 

温馨提示:
	zookeeper node的watch机制是一次性的。

(4) zookeeper的acl权限控制概述

zookeeper类似文件系统,client可以创建节点,更新节点,删除节点,那么如何做到节点的权限的控制呢?

zookeeper的Access Control List(访问控制列表,简称ACL)可以做到这一点。

zookeeper的ACL权限控制使用"scheme:id:permission"来标识,主要涵盖如下三个方面:
	权限模式(scheme):
		所谓的权限模式指的是采用何种方式授权。
		zookeeper支持以下几种权限模式:
            world:
                只有一个用户,即"anyone",代表登录zookeeper的所有人,这也是默认的权限模式。
            ip:
                对客户端使用IP地址认证。
            auth:
                使用已添加认证的用户认证。
            digest:
                使用"用户名:密码"方式进行认证。

	授权的对象(id):
		所谓的授权对象指的是给谁授予权限。授权对象ID是指权限赋予的实体,例如: IP地址或用户。
		
	权限(permission):
		指的是授权的权限。zookeeper支持以下几种权限:
            create(简写"c"):
                表示可以创建子节点。
            delete(简称"d"):
                可以删除子节点。
            read(简称"r"):
                可以读取节点数据及显示子节点列表。
            write(简称"w"):
                可以修改节点数据。
            admin(简称"a"):
                可以设置节点访问控制列表权限。
             
             
zookeeper的ACL的特点如下:
	(1)zookeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限;
	(2)每个znode支持设置多种权限控制方案和多个权限;
 	(3)子节点不会继承父节点的权限,也就是说,客户端无权访问某个znode,并不代表无法访问它的子节点;


授权管理的相关命令:
    getAcl:
        读取ACL权限,使用语法:"getAcl [-s] path"。

    setAcl:
        设置ACL权限,使用语法:"setAcl [-s] [-v version] [-R] path acl"。

    addauth 
        添加认证用户,使用语法:"addauth scheme auth"。

(5) 基于world权限模式(scheme)认证案例

[zk: localhost:2181(CONNECTED) 20] create /oldboyedu-linux 
Created /oldboyedu-linux
[zk: localhost:2181(CONNECTED) 21] 
[zk: localhost:2181(CONNECTED) 21] create /oldboyedu-linux/c1
Created /oldboyedu-linux/c1
[zk: localhost:2181(CONNECTED) 22] 
[zk: localhost:2181(CONNECTED) 22] getAcl /oldboyedu-linux 
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 23] 
[zk: localhost:2181(CONNECTED) 23] setAcl  /oldboyedu-linux  world:anyone:rwa
[zk: localhost:2181(CONNECTED) 24] 
[zk: localhost:2181(CONNECTED) 24] getAcl /oldboyedu-linux 
'world,'anyone
: rwa
[zk: localhost:2181(CONNECTED) 25] 
[zk: localhost:2181(CONNECTED) 25] ls /oldboyedu-linux 
[c1]
[zk: localhost:2181(CONNECTED) 26] 
[zk: localhost:2181(CONNECTED) 26] delete /oldboyedu-linux/c1 
delete      deleteall   
[zk: localhost:2181(CONNECTED) 26] delete /oldboyedu-linux/c1 
Insufficient permission : /oldboyedu-linux/c1
[zk: localhost:2181(CONNECTED) 27] 
[zk: localhost:2181(CONNECTED) 27] create /oldboyedu-linux/c2
Insufficient permission : /oldboyedu-linux/c2
[zk: localhost:2181(CONNECTED) 28] 
[zk: localhost:2181(CONNECTED) 28] get /oldboyedu-linux
null
[zk: localhost:2181(CONNECTED) 29] 
[zk: localhost:2181(CONNECTED) 29] set /oldboyedu-linux oldboy
[zk: localhost:2181(CONNECTED) 30] 
[zk: localhost:2181(CONNECTED) 30] get /oldboyedu-linux
oldboy
[zk: localhost:2181(CONNECTED) 31] 
[zk: localhost:2181(CONNECTED) 31] setAcl  /oldboyedu-linux  world:anyone:rwacd
[zk: localhost:2181(CONNECTED) 32] 
[zk: localhost:2181(CONNECTED) 32] getAcl /oldboyedu-linux
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 33] 
[zk: localhost:2181(CONNECTED) 33] create /oldboyedu-linux/c2
Created /oldboyedu-linux/c2
[zk: localhost:2181(CONNECTED) 34] 
[zk: localhost:2181(CONNECTED) 34] delete /oldboyedu-linux/c1 
[zk: localhost:2181(CONNECTED) 35] 
[zk: localhost:2181(CONNECTED) 35] ls /oldboyedu-linux 
[c2]
[zk: localhost:2181(CONNECTED) 36] 

(6) 基于ip权限模式(scheme)认证案例

3.1 终端1操作
[zk: localhost:2181(CONNECTED) 38] getAcl /oldboyedu-linux 
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 39] 
[zk: localhost:2181(CONNECTED) 39] setAcl /oldboyedu-linux ip:10.0.0.96:cra
[zk: localhost:2181(CONNECTED) 40] 
[zk: localhost:2181(CONNECTED) 40] getAcl /oldboyedu-linux 
Insufficient permission : /oldboyedu-linux
[zk: localhost:2181(CONNECTED) 41] 
[zk: localhost:2181(CONNECTED) 41] whoami 
Auth scheme: User
ip: 127.0.0.1
[zk: localhost:2181(CONNECTED) 42] 


		3.2 终端2操作 
[root@elk96 ~]# zkCli.sh -server 10.0.0.94
...
[zk: 10.0.0.94(CONNECTED) 1] ls /oldboyedu-linux 
[c2]
[zk: 10.0.0.94(CONNECTED) 2] 
[zk: 10.0.0.94(CONNECTED) 2] getAcl /oldboyedu-linux 
'ip,'10.0.0.96
: cra
[zk: 10.0.0.94(CONNECTED) 3] 
[zk: 10.0.0.94(CONNECTED) 3] whoami 
Auth scheme: User
ip: 10.0.0.96
[zk: 10.0.0.94(CONNECTED) 4] 
[zk: 10.0.0.94(CONNECTED) 4] get /oldboyedu-linux 
oldboy
[zk: 10.0.0.94(CONNECTED) 5] 
[zk: 10.0.0.94(CONNECTED) 5] set /oldboyedu-linux oldboyedu
Insufficient permission : /oldboyedu-linux
[zk: 10.0.0.94(CONNECTED) 6] 
...
[zk: 10.0.0.94(CONNECTED) 8] setAcl  /oldboyedu-linux ip:10.0.0.96:crawd
[zk: 10.0.0.94(CONNECTED) 9] 
[zk: 10.0.0.94(CONNECTED) 9] getAcl /oldboyedu-linux 
'ip,'10.0.0.96
: cdrwa
[zk: 10.0.0.94(CONNECTED) 10] 
[zk: 10.0.0.94(CONNECTED) 10] get /oldboyedu-linux 
oldboy
[zk: 10.0.0.94(CONNECTED) 11] 
[zk: 10.0.0.94(CONNECTED) 11] set /oldboyedu-linux oldboyedu
[zk: 10.0.0.94(CONNECTED) 12] 
[zk: 10.0.0.94(CONNECTED) 12] get /oldboyedu-linux 
oldboyedu
[zk: 10.0.0.94(CONNECTED) 13] 
[zk: 10.0.0.94(CONNECTED) 13] 


温馨提示:
	也支持网段的配置。
[zk: localhost:2181(CONNECTED) 12] setAcl /linux90  ip:10.0.0.0/24:cra
[zk: localhost:2181(CONNECTED) 13] 
[zk: localhost:2181(CONNECTED) 13] getAcl /linux90 
'ip,'10.0.0.0/24
: cra
[zk: localhost:2181(CONNECTED) 14] 


4.基于auth权限模式(scheme)认证案例
		4.1 终端1
[zk: 10.0.0.94(CONNECTED) 13] addauth digest admin:oldboyedu
[zk: 10.0.0.94(CONNECTED) 14] 
[zk: 10.0.0.94(CONNECTED) 14] setAcl /oldboyedu-linux auth:admin:cra
[zk: 10.0.0.94(CONNECTED) 15] 
[zk: 10.0.0.94(CONNECTED) 15] getAcl /oldboyedu-linux 
'digest,'admin:Tmbbt77KcTd1bAgjQaI+GqI0hjM=
: cra
[zk: 10.0.0.94(CONNECTED) 16] 
[zk: 10.0.0.94(CONNECTED) 16] get /oldboyedu-linux 
oldboyedu
[zk: 10.0.0.94(CONNECTED) 17] 
[zk: 10.0.0.94(CONNECTED) 17] whoami 
Auth scheme: User
ip: 10.0.0.96
digest: admin
[zk: 10.0.0.94(CONNECTED) 18] 


		4.2 终端2,输入正确的密码
[zk: localhost:2181(CONNECTED) 0] addauth digest admin:oldboyedu
[zk: localhost:2181(CONNECTED) 1] 
[zk: localhost:2181(CONNECTED) 1] whoami 
Auth scheme: User
ip: 127.0.0.1
digest: admin
[zk: localhost:2181(CONNECTED) 2] ls /oldboyedu-linux 
[c2]
[zk: localhost:2181(CONNECTED) 3] 


		4.3 终端3,输入错误的密码
[zk: localhost:2181(CONNECTED) 44] addauth digest admin:123
[zk: localhost:2181(CONNECTED) 45] 
[zk: localhost:2181(CONNECTED) 45] whoami 
Auth scheme: User
ip: 127.0.0.1
digest: admin
[zk: localhost:2181(CONNECTED) 46] getAcl /oldboyedu-linux 
Insufficient permission : /oldboyedu-linux
[zk: localhost:2181(CONNECTED) 47] 




	5.基于digest权限模式(scheme)认证案例
		5.1 使用sha1加密算法及base64编码来进行一个密码的计算
[root@elk94 ~]# echo -n admin:oldboyedu | openssl dgst -binary -sha1 | base64
Tmbbt77KcTd1bAgjQaI+GqI0hjM=
[root@elk94 ~]# 


		5.2 终端1,创建znode时同时指定ACL
[zk: localhost:2181(CONNECTED) 3] create /oldboyedu-linux90 laonanhai digest:admin:Tmbbt77KcTd1bAgjQaI+GqI0hjM=:rwa
Created /oldboyedu-linux90
[zk: localhost:2181(CONNECTED) 4]   
[zk: localhost:2181(CONNECTED) 4] getAcl /oldboyedu-linux90
Insufficient permission : /oldboyedu-linux90
[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] whoami 
Auth scheme: User
ip: 127.0.0.1
[zk: localhost:2181(CONNECTED) 6] 

​    
​   		5.3 终端2验证
[zk: localhost:2181(CONNECTED) 1] whoami 
Auth scheme: User
ip: 127.0.0.1
[zk: localhost:2181(CONNECTED) 2] 
[zk: localhost:2181(CONNECTED) 2] addauth digest admin:oldboyedu
[zk: localhost:2181(CONNECTED) 3] 
[zk: localhost:2181(CONNECTED) 3] whoami 
Auth scheme: User
ip: 127.0.0.1
digest: admin
[zk: localhost:2181(CONNECTED) 4] 
[zk: localhost:2181(CONNECTED) 4] getAcl /oldboyedu-linux90 
'digest,'admin:Tmbbt77KcTd1bAgjQaI+GqI0hjM=
: rwa
[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] get /oldboyedu-linux90 
laonanhai
[zk: localhost:2181(CONNECTED) 6] 
[zk: localhost:2181(CONNECTED) 6] set /oldboyedu-linux90 yitiantian
[zk: localhost:2181(CONNECTED) 7] 
[zk: localhost:2181(CONNECTED) 7] get /oldboyedu-linux90 
yitiantian
[zk: localhost:2181(CONNECTED) 8] 
[zk: localhost:2181(CONNECTED) 8] create /oldboyedu-linux90/c1
Insufficient permission : /oldboyedu-linux90/c1
[zk: localhost:2181(CONNECTED) 9] 

(7) 解决ACL权限不足的两种方案


	6.解决ACL权限不足的两种方案
		6.1 问题复现
[zk: 10.0.0.94(CONNECTED) 64] create /oldboyedu
Created /oldboyedu
[zk: 10.0.0.94(CONNECTED) 65] 
[zk: 10.0.0.94(CONNECTED) 65] create /oldboyedu/linux90
Created /oldboyedu/linux90
[zk: 10.0.0.94(CONNECTED) 66] 
[zk: 10.0.0.94(CONNECTED) 66] setAcl /oldboyedu world:anyone:rw
[zk: 10.0.0.94(CONNECTED) 67] 
[zk: 10.0.0.94(CONNECTED) 67] ls /oldboyedu 
[linux90]
[zk: 10.0.0.94(CONNECTED) 68] create /oldboyedu/linux91
Insufficient permission : /oldboyedu/linux91
[zk: 10.0.0.94(CONNECTED) 69] 
[zk: 10.0.0.94(CONNECTED) 69] delete /oldboyedu l
listquota   ls          
[zk: 10.0.0.94(CONNECTED) 69] delete /oldboyedu/linux90 
Insufficient permission : /oldboyedu/linux90
[zk: 10.0.0.94(CONNECTED) 70] 
[zk: 10.0.0.94(CONNECTED) 70] setAcl /oldboyedu world:anyone:rwdac
Insufficient permission : /oldboyedu
[zk: 10.0.0.94(CONNECTED) 71] 
[zk: 10.0.0.94(CONNECTED) 71] deleteall /oldboyedu 
Failed to delete some node(s) in the subtree!
[zk: 10.0.0.94(CONNECTED) 72] 



温馨提示:
	出现上述情况,是新手可能会遇到的问题,可以尝试下面两种方法解决该问题。
	
	
	
		6.2 zookeeper跳过权限检查实战
			6.2.1 所有节点修改zoo.cfg配置文件,跳过权限检查
vim /oldboyedu/softwares/zk/conf/zoo.cfg
...
skipACL=yes


			6.2.2 将配置同步到其他节点
[root@elk94 ~]# data_rsync.sh /oldboyedu/softwares/zk/conf/zoo.cfg

			6.2.3 重启集群
[root@elk94 ~]# zkServer.sh restart

[root@elk95 ~]# zkServer.sh restart

[root@elk96 ~]# zkServer.sh restart
			
			
			
			6.2.4 修改权限验证,发现没有权限依旧是可以创建数据的(原因是跳过了ACL检查)
[zk: localhost:2181(CONNECTED) 4] getAcl /oldboyedu 
'world,'anyone
: rw
[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] ls /oldboyedu 
[linux90]
[zk: localhost:2181(CONNECTED) 6] 
[zk: localhost:2181(CONNECTED) 6] create /oldboyedu/linux91 
Created /oldboyedu/linux91
[zk: localhost:2181(CONNECTED) 7] 
[zk: localhost:2181(CONNECTED) 7] setAcl /oldboyedu world:anyone:rwcda
[zk: localhost:2181(CONNECTED) 8] 
[zk: localhost:2181(CONNECTED) 8] getAcl /oldboyedu 
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 9] 
[zk: localhost:2181(CONNECTED) 9] 


			6.2.5 所有节点修改zoo.cfg配置文件,跳过权限检查
vim /oldboyedu/softwares/zk/conf/zoo.cfg
...
skipACL=no 


			6.2.6 将配置同步到其他节点
[root@elk94 ~]# data_rsync.sh /oldboyedu/softwares/zk/conf/zoo.cfg

			6.2.7 重启集群
[root@elk94 ~]# zkServer.sh restart

[root@elk95 ~]# zkServer.sh restart

[root@elk96 ~]# zkServer.sh restart
			
			
			6.2.8 再次验证权限
[zk: localhost:2181(CONNECTED) 3] getAcl /oldboyedu 
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 4] 
[zk: localhost:2181(CONNECTED) 4] setAcl /oldboyedu world:anyone:arc
[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] getAcl /oldboyedu 
'world,'anyone
: cra
[zk: localhost:2181(CONNECTED) 6] 
[zk: localhost:2181(CONNECTED) 6] create /oldboyedu/linux9
linux90   linux91   
[zk: localhost:2181(CONNECTED) 6] create /oldboyedu/linux92
Created /oldboyedu/linux92
[zk: localhost:2181(CONNECTED) 7] 
[zk: localhost:2181(CONNECTED) 7] delete /oldboyedu/linux92
Insufficient permission : /oldboyedu/linux92
[zk: localhost:2181(CONNECTED) 8] 



温馨提示:
	实际工作中,应该滚动重启服务,不应该批量重启。

(8) 配置超级用户-推荐部署集群的时候使用

6.3.1 使用sha1加密算法及base64编码来进行一个密码的计算
[root@elk94 ~]# echo -n admin:oldboyedu | openssl dgst -binary -sha1 | base64
Tmbbt77KcTd1bAgjQaI+GqI0hjM=
[root@elk94 ~]# 


			6.3.2 修改zkEnv.sh脚本,配置超级用户
[root@elk94 ~]# vim /oldboyedu/softwares/zk/bin/zkEnv.sh
...
export SERVER_JVMFLAGS="-Dzookeeper.DigestAuthenticationProvider.superDigest=admin:Tmbbt77KcTd1bAgjQaI+GqI0hjM= -Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"

			6.3.3 同步脚本叫其他zk节点
[root@elk94 ~]# data_rsync.sh /oldboyedu/softwares/zk/bin/zkEnv.sh 

			6.3.4 重启集群
[root@elk94 ~]# zkServer.sh restart

[root@elk95 ~]# zkServer.sh restart
 
[root@elk96 ~]# zkServer.sh restart


			6.3.5验证超级管理员用户权限
[zk: localhost:2181(CONNECTED) 3] getAcl /oldboyedu 
'world,'anyone
: cra
[zk: localhost:2181(CONNECTED) 4] 
[zk: localhost:2181(CONNECTED) 4] setAcl /oldboyedu world:anyone:rw
[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] getAcl /oldboyedu 
'world,'anyone
: rw
[zk: localhost:2181(CONNECTED) 6] 
[zk: localhost:2181(CONNECTED) 6] create /oldboyedu/linux93
Insufficient permission : /oldboyedu/linux93
[zk: localhost:2181(CONNECTED) 7] 
[zk: localhost:2181(CONNECTED) 7] 
[zk: localhost:2181(CONNECTED) 7] setAcl /oldboyedu world:anyone:rwc
Insufficient permission : /oldboyedu
[zk: localhost:2181(CONNECTED) 8] 
[zk: localhost:2181(CONNECTED) 8] whoami 
Auth scheme: User
ip: 127.0.0.1
[zk: localhost:2181(CONNECTED) 9] 
[zk: localhost:2181(CONNECTED) 9] addauth digest admin:oldboyedu
[zk: localhost:2181(CONNECTED) 10] 
[zk: localhost:2181(CONNECTED) 10] whoami 
Auth scheme: User
super: 
ip: 127.0.0.1
digest: admin
[zk: localhost:2181(CONNECTED) 11] 
[zk: localhost:2181(CONNECTED) 11] setAcl /oldboyedu world:anyone:rwc
[zk: localhost:2181(CONNECTED) 12] 
[zk: localhost:2181(CONNECTED) 12] getAcl /oldboyedu 
'world,'anyone
: crw
[zk: localhost:2181(CONNECTED) 13] 
[zk: localhost:2181(CONNECTED) 13] create /oldboyedu/linux93
Created /oldboyedu/linux93
[zk: localhost:2181(CONNECTED) 14] 
[zk: localhost:2181(CONNECTED) 14] ls /oldboyedu 
[linux90, linux91, linux92, linux93]
[zk: localhost:2181(CONNECTED) 15] 



温馨提示:
	目前使用zookeeper 3.8版本貌似无法替换用户名,超级管理员用户名称暂时推荐使用admin。
1.6.1 图形化界面

image-20241015011102970

1. 启动服务
java -jar zkWeb-v1.2.1.jar

# 该工具适配的是jdk8版本
# 使用绝对路径的jdk  使用与多个jdk版本  并且该版本为配置为系统环境变量
3.启动服务 
[root@elk94 ~]# /oldboyedu/softwares/jdk1.8.0_291/bin/java -jar zkWeb-v1.2.1.jar 

温馨提示:
	不要使用我们高版本的JDK
[root@elk94 ~]# java --version
java 17.0.8 2023-07-18 LTS
Java(TM) SE Runtime Environment (build 17.0.8+9-LTS-211)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.8+9-LTS-211, mixed mode, sharing)
[root@elk94 ~]# 
[root@elk94 ~]# which java
/oldboyedu/softwares/jdk//bin/java
[root@elk94 ~]# 


	4.检查端口是否监听
[root@elk94 ~]# ss -nlt | grep 8099
LISTEN 0      100                     *:8099             *:*          
[root@elk94 ~]# 

	
	5.访问zkWeb UI
http://10.0.0.94:8099/#


1.7 zookeeper 启动脚本编写

1.停止zookeeper集群
[root@elk94 ~]# zkServer.sh stop

[root@elk95 ~]# zkServer.sh stop

[root@elk96 ~]# zkServer.sh stop

2.所有节点编写启动脚本
cat > /lib/systemd/system/zk.service << EOF
[Unit]
Description=oldboyedu linux zookeeper server daemon
Documentation=www.oldboyedu.com
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/oldboyedu/softwares/jdk
ExecStart=/oldboyedu/softwares/zk/bin/zkServer.sh start

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable --now zk
systemctl status zk

温馨提示:
	注意,此处的类型一定要设置为"forking",因为咱们案例中ExecStart对应的命令执行完成后就结束了,不会阻塞。

1.8 jvm 调优

- zookeeper的JVM调优
	1.查看默认的堆内存为1GB
[root@elk94 ~]# ps -ef | grep zookeeper | grep Xmx
root        4793       1  0 18:01 ?        00:00:03 /oldboyedu/softwares/jdk/bin/java ... -Xmx1000m ...
[root@elk94 ~]# 


	2.配置ZK的堆内存
[root@elk94 ~]# cat > /oldboyedu/softwares/zk/conf/java.env <<'EOF'
#!/bin/bash

# 指定JDK的按住路径
export JAVA_HOME=/oldboyedu/softwares/jdk

# 指定zookeeper的堆内存大小
export JVMFLAGS="-Xms128m -Xmx128m $JVMFLAGS"
EOF

	3.同步文件到其他节点
[root@elk94 ~]# data_rsync.sh /oldboyedu/softwares/zk/conf/java.env

	4.重启集群 
[root@elk94 ~]# systemctl restart zk

[root@elk95 ~]# systemctl restart zk

[root@elk96 ~]# systemctl restart zk


	5.验证配置
[root@elk94 ~]# ps -ef | grep zookeeper | grep Xmx
root        4793       1  0 18:01 ?        00:00:03 /oldboyedu/softwares/jdk/bin/java ...-Xmx1000m -Xms128m -Xmx128m  ...
[root@elk94 ~]# 
	
	
温馨提示:
	- 此处堆内存大小是128MB,但是会发现默认会有一个"-Xmx 1000m",理论上后面的配置会覆盖前面的配置。	
		如果担心冲突,可以考虑修改脚本,将默认的1000注释掉。

[root@elk94 ~]# vim /oldboyedu/softwares/apache-zookeeper-3.8.3-bin/bin/zkEnv.sh 
...
# 注释zk的堆内存默认配置为1000MB。大概在138行。
# ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"
ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-128}"

[root@elk94 ~]# 
[root@elk94 ~]# data_rsync.sh /oldboyedu/softwares/apache-zookeeper-3.8.3-bin/bin/zkEnv.sh 

	
	- 生产环境堆内存修改也不需要设置过大,基本上2~4GB足以。

1.9 zookeeper 监控

- zookeeper的监控
	1.启动zookeeper的JMX端口
		1.1 修改配置文件
vim /oldboyedu/softwares/zk/conf/zoo.cfg  
# 添加下面的一行,启动zk的4字监控命令
4lw.commands.whitelist=*

		1.2 修改zk的启动脚本
vim /oldboyedu/softwares/zk/bin/zkServer.sh 
...
# 如果修改上面的方式不生效,则需修改zkServer.sh脚本中77行之后ZOOMAIN的值即可。
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"


		1.3 可选步骤,开启zk节点的JMX
[root@elk94 ~]# vim /oldboyedu/softwares/zk/bin/zkEnv.sh 
...
# Add by yinzhengjie for enable zookeeper JMX
JMXLOCALONLY=false
JMXHOSTNAME=10.0.0.94
JMXPORT=9999
JMXSSL=false
JMXLOG4J=false

参考链接:
	http://www.kafka-eagle.org/articles/docs/quickstart/metrics.html




	2.验证四字命令
		2.1 校验服务是否存活
[root@elk96 ~]# echo ruok | nc 10.0.0.96 2181
imok

		2.2 查看zookeeper集群的配置
[root@elk96 ~]# echo conf | nc 10.0.0.94 2181
clientPort=2181
secureClientPort=-1
dataDir=/oldboyedu/data/zk/version-2
dataDirSize=536873745
dataLogDir=/oldboyedu/data/zk/version-2
dataLogSize=536873745
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
clientPortListenBacklog=-1
serverId=94
initLimit=5
syncLimit=2
electionAlg=3
electionPort=6888
quorumPort=5888
peerType=0
membership: 
server.96=10.0.0.96:5888:6888:participant
server.94=10.0.0.94:5888:6888:participant
server.95=10.0.0.95:5888:6888:participant
version=0

		2.3 检查集群的模式,比如谁是leader和follower
[root@elk96 ~]# echo srvr | nc 10.0.0.94 2181
Zookeeper version: 3.8.3-6ad6d364c7c0bcf0de452d54ebefa3058098ab56, built on 2023-10-05 10:34 UTC
Latency min/avg/max: 0/0.0/0
Received: 4
Sent: 3
Connections: 1
Outstanding: 0
Zxid: 0xe0000000c
Mode: follower
Node count: 10
[root@elk96 ~]# 
[root@elk96 ~]# 
[root@elk96 ~]# echo srvr | nc 10.0.0.95 2181
Zookeeper version: 3.8.3-6ad6d364c7c0bcf0de452d54ebefa3058098ab56, built on 2023-10-05 10:34 UTC
Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x1000000000
Mode: leader
Node count: 10
Proposal sizes last/min/max: -1/-1/-1
[root@elk96 ~]# 
[root@elk96 ~]# 
[root@elk96 ~]# 
[root@elk96 ~]# echo srvr | nc 10.0.0.96 2181
Zookeeper version: 3.8.3-6ad6d364c7c0bcf0de452d54ebefa3058098ab56, built on 2023-10-05 10:34 UTC
Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0xe0000000c
Mode: follower
Node count: 10
[root@elk96 ~]# 


	3.验证JMX端口是否监听成功
		3.1 服务端检查端口
[root@elk94 ~]# ss -ntl | grep 9999
LISTEN 0      50                      *:9999             *:*          
[root@elk94 ~]# 

		3.2 windows安装JDK环境并切换到安装目录,先敲击cmd
		
C:\Users\yinzhengjie>cd C:\Program Files\Java\jdk-1.8\bin\

C:\Program Files\Java\jdk-1.8\bin>
C:\Program Files\Java\jdk-1.8\bin>jconsole.exe

2. kafka

2.1 MQ

2.1.1 MQ概述

什么是mq

MQ的全称是“Message Queue” 即 消息队列

消息队列是在消息传输过程中保存消息的容器,多用于分布式系统之间通信

类似与一个水管 能将水源抽往目的地

2.1.2 MQ的优势

(1) 应用解耦

image-20220520094258129

(2) 异步提速

image-20220520100031751

(3) 削峰填谷

将消息全部储存到消息队列中,慢慢的消费,这样就抗住了数据量高峰,但是可能导致延迟

image-20220520101832944

2.1.3 MQ的劣势

  1. 系统的可用性降低
  • 系统引入的瓦斯不依赖越多,系统的稳定性越差,一旦MQ宕机,就会对业务造成影响,如果MQ能实现高可用,那问题就自然也就解决了
  1. 系统的复杂性提升
  • 对用户而言,提升了学习成本,也存在数据丢失的风险

综上所述:我们希望MQ有以下几点特征

  • 吞吐量高
  • 支持高可用集群
  • 数据安全:数据访问安全,数据存储安全

2.1.4 常见的MQ产品

image-20241015233826780

2.1.5 MQ的工作模式

(1) MQ的点对点模式(1对1模式,消费者主动拉取数据,消息收到后MQ会将消息清除)

image-20220520111444399

(2) MQ发布/订阅模式(1对多,消费者消费数据后不会清除消息)

image-20220520111505518

一对多,消费者消费数据后不会清除消息。
如下图所示,消费者可以主动去Broker服务器去拉取数据,当然,也可以是Broker主动推送数据。
拉取(pull):
优点:
消费者程序可以根据自身的硬件配置去broker消费。
缺点:
消费者需要长期执行一个进程来询问broker是否有数据。

推(push):
优点:
无需客户端主动拉去数据,而是由服务端主动发送数据,典型的应用场景就是我们的"公众号"等业务。
缺点:
(1)broker推送数据给消费者是,可能因为消费者消费能力不足,直接导致客户端程序崩溃掉;
(2)broker内部需要维护一个订阅者列表,当订阅者较多时,可能会很占用内存哟;

值得注意的是,kafka broker仅支持拉取(pull)等工作方式。

2.2 kafka单点搭建

(1) 部署zookeeper集群

(2) 解压软件包

tar xf kafka_2.13-3.7.0.tgz -C /oldboy/softwares/

(3) 修改配置文件

[root@zookeeper3 /softwares]#cd /oldboy/softwares/kafka_2.13-3.7.0/
[root@zookeeper3 /oldboy/softwares/kafka_2.13-3.7.0]#vim config/server.properties

...
# 指定kafka实例的唯一编号,集群中该编号要唯一。
# broker.id=0
broker.id=203
...
# 指定数据的存储路径
#log.dirs=/tmp/kafka-logs
log.dirs=/oldboyedu/data/kafka
...
# 指定zookeeper集群地址
# zookeeper.connect=localhost:2181
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181/oldboy-kafka370

(4) 配置环境变量

[root@zookeeper3 ~]#vim /etc/profile.d/kafka.sh
[root@zookeeper3 ~]#cat /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/oldboy/softwares/kafka_2.13-3.7.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@zookeeper3 ~]#source /etc/profile.d/kafka.sh 

(5) 启动kafka

  • 必须启动zookeeper集群
[root@zookeeper3 ~]#kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@zookeeper3 ~]#jps
2225 Kafka
2258 Jps
1806 QuorumPeerMain

[root@zookeeper3 /oldboyedu/data/kafka]#ss -ntl |grep 9092
LISTEN     0      50        [::]:9092                  [::]:*  

# 查看zookeeper的数据
[root@zookeeper3 /oldboyedu/data/kafka]#zkCli.sh
。。。
[zk: localhost:2181(CONNECTED) 0] ls /
[linux90, oldboy-kafka370, zookeeper]

2.3 jvm调优(调内存)

- kafka的JVM调优及JMX端口开启
	1.停止kafka服务
[root@elk94 ~]# kafka-server-stop.sh 
[root@elk94 ~]# 
[root@elk94 ~]# jps | grep Kafka
[root@elk94 ~]# 

	2.修改启动脚本
[root@elk94 ~]# vim `which kafka-server-start.sh ` +28
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
	...
	# 将原有的注释掉,更改的是if  fi里面的export
	# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
	export KAFKA_HEAP_OPTS="-server -Xmx256m -Xms256m  -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
	# windows使用jconsole验证JMX端口是否启用,可以不使用
   export JMX_PORT="8888"
fi



温馨提示:
	生成环境建议大家将其设置为6GB即可。物理内存建议32GB+,磁盘根据实际数据数据周期而定。
	
3.启动服务
[root@elk94 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

温馨提示:
	-daemon是放在后台运行,如果发现kafka启动失败且没有日志生成,则考虑去掉该参数,观察报错信息即可。
	
	4.检查堆内存大小
[root@elk94 ~]# ps -ef | grep kafka | grep Xms
root       11938    1653 20 10:44 pts/0    00:00:04 /oldboyedu/softwares/jdk//bin/java -server -Xmx256m -Xms256m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 ...


	5.检查JMX端口
[root@elk94 ~]# ss -ntl | grep 8888
LISTEN 0      50                      *:8888             *:*          
[root@elk94 ~]# 


windows使用jconsole验证JMX端口是否启用
C:\Program Files\Java\jdk-1.8\bin>jconsole

2.4 kafka启动脚本

1. 停止kafka
[root@zookeeper3 /oldboyedu/data/kafka]#kafka-server-stop.sh

2. 编写脚本
cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=oldboyedu linux kafka server daemon
Documentation=www.oldboyedu.com
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/oldboy/softwares/jdk8
ExecStart=/oldboy/softwares/kafka_2.13-3.7.0/bin/kafka-server-start.sh  -daemon /oldboy/softwares/kafka_2.13-3.7.0/config/server.properties

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable --now kafka 
systemctl status kafka

2.5 kafka集群部署

与单点搭建一样即可,逐个单点操作即可

- kafka集群搭建
	1.同步配置文件,软件包,环境变量,启动脚本到其他节点
[root@elk94 ~]# data_rsync.sh /oldboyedu/softwares/kafka_2.13-3.7.0/

[root@elk94 ~]# data_rsync.sh /etc/profile.d/kafka.sh 

[root@elk94 ~]# data_rsync.sh /lib/systemd/system/kafka.service


	2.其他节点修改配置文件
[root@elk95 ~]# source /etc/profile.d/kafka.sh
[root@elk95 ~]# 
[root@elk95 ~]# vim $KAFKA_HOME/config/server.properties 
[root@elk95 ~]# 
[root@elk95 ~]# grep ^broker.id $KAFKA_HOME/config/server.properties 
broker.id=95
[root@elk95 ~]# 


[root@elk96 ~]# source /etc/profile.d/kafka.sh
[root@elk96 ~]# 
[root@elk96 ~]# vim $KAFKA_HOME/config/server.properties 
[root@elk96 ~]# 
[root@elk96 ~]# grep ^broker.id $KAFKA_HOME/config/server.properties 
broker.id=96
[root@elk96 ~]# 



	3.其他节点启动kafka服务
[root@elk95 ~]# systemctl daemon-reload
[root@elk95 ~]# 
[root@elk95 ~]# systemctl enable --now kafka
[root@elk95 ~]# 
[root@elk95 ~]# systemctl status kafka


[root@elk96 ~]# systemctl daemon-reload
[root@elk96 ~]# 
[root@elk96 ~]# systemctl enable --now kafka
[root@elk96 ~]# 
[root@elk96 ~]# systemctl status kafka

	4.zookeeper检查broker节点是否上线
[zk: localhost:2181(CONNECTED) 13] ls /oldboyedu-kafka370/brokers/ids 
[94, 95, 96]
[zk: localhost:2181(CONNECTED) 14] 

5.验证停止kafka服务是否会导致临时zookeeper node自动删除
温馨提示:
	broker节点启动时会连接zookeeper集群,并注册一个临时zookeeper node,名称为对应broker.id。
		
		5.1 检查zookeeper环境
[zk: localhost:2181(CONNECTED) 14] stat /oldboyedu-kafka370/brokers/ids/94
cZxid = 0x1000000061
ctime = Wed Feb 28 02:57:45 UTC 2024
mZxid = 0x1000000061
mtime = Wed Feb 28 02:57:45 UTC 2024
pZxid = 0x1000000061
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x6000007437480003
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 15] 
[zk: localhost:2181(CONNECTED) 15] 
[zk: localhost:2181(CONNECTED) 15] stat /oldboyedu-kafka370/brokers/ids/95
cZxid = 0x1000000088
ctime = Wed Feb 28 03:36:02 UTC 2024
mZxid = 0x1000000088
mtime = Wed Feb 28 03:36:02 UTC 2024
pZxid = 0x1000000088
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x5f0000743cca0002
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 16] 
[zk: localhost:2181(CONNECTED) 16] stat /oldboyedu-kafka370/brokers/ids/96
cZxid = 0x100000009b
ctime = Wed Feb 28 03:37:47 UTC 2024
mZxid = 0x100000009b
mtime = Wed Feb 28 03:37:47 UTC 2024
pZxid = 0x100000009b
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x5f0000743cca0003
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] 

		5.2 让96节点下线
[root@elk96 ~]# systemctl stop kafka


		5.3 观察zookeeper node的变化
[zk: localhost:2181(CONNECTED) 17] ls /oldboyedu-kafka370/brokers/ids 
[94, 95]
[zk: localhost:2181(CONNECTED) 18] 


		5.4 让96节点上线
[root@elk96 ~]# systemctl start kafka


		5.5 再次观察zookeeper node的变化
[zk: localhost:2181(CONNECTED) 22] ls  /oldboyedu-kafka370/brokers/ids
[94, 95, 96]
[zk: localhost:2181(CONNECTED) 23] 
[zk: localhost:2181(CONNECTED) 23] stat  /oldboyedu-kafka370/brokers/ids/96
cZxid = 0x10000000ae
ctime = Wed Feb 28 03:40:48 UTC 2024
mZxid = 0x10000000ae
mtime = Wed Feb 28 03:40:48 UTC 2024
pZxid = 0x10000000ae
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x5f0000743cca0004
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 24] 
[zk: localhost:2181(CONNECTED) 24] 



- 使用systemctl管理zkweb服务
	1.移动jar包到指定目录
[root@elk94 ~]# mv zkWeb-v1.2.1.jar /oldboyedu/softwares/

	2.编写启动脚本
cat > /lib/systemd/system/zkWeb.service <<EOF
[Unit]
Description=oldboyedu linux zkWeb server daemon
Documentation=www.oldboyedu.com
After=network.target

[Service]
ExecStart=/oldboyedu/softwares/jdk1.8.0_291/bin/java -Xms128m -Xmx128m  -jar /oldboyedu/softwares/zkWeb-v1.2.1.jar

[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable --now zkWeb
systemctl status zkWeb


	3.检查服务是否正常启动
[root@elk94 ~]# ss -ntl | grep 8099
LISTEN 0      100                     *:8099             *:*          
[root@elk94 ~]# 
[root@elk94 ~]# systemctl status zkWeb
● zkWeb.service - oldboyedu linux zkWeb server daemon
    Loaded: loaded (/lib/systemd/system/zkWeb.service; enabled; vendor preset: enabled)
    Active: active (running) since Wed 2024-02-28 11:54:55 CST; 57s ago
  Main PID: 16975 (java)
     Tasks: 38 (limit: 4515)
    Memory: 205.6M
       CPU: 10.772s
    CGroup: /system.slice/zkWeb.service
            └─16975 /oldboyedu/softwares/jdk1.8.0_291/bin/java -Xms128m -Xmx128m -jar /oldboyedu/softwares/zkWeb-v1.2.1.jar

Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  MBeanExporter.java:433] o.s.j.e.a.AnnotationMBeanExporter --> Registering beans for JMX >
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  DirectJDKLog.java:180] o.a.coyote.http11.Http11NioProtocol --> Starting ProtocolHandler >
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  DirectJDKLog.java:180] o.a.tomcat.util.net.NioSelectorPool --> Using a shared selector f>
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  DirectJDKLog.java:180] o.a.c.c.C.[Tomcat].[localhost].[/] --> Initializing Spring Framew>
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  FrameworkServlet.java:494] o.s.web.servlet.DispatcherServlet --> FrameworkServlet 'dispa>
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  FrameworkServlet.java:509] o.s.web.servlet.DispatcherServlet --> FrameworkServlet 'dispa>
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  TomcatWebServer.java:206] o.s.b.w.e.tomcat.TomcatWebServer --> Tomcat started on port(s)>
Feb 28 11:55:00 elk94 java[16975]: [2024-02-28 03:55:00 INFO  main  StartupInfoLogger.java:59] c.y.zkweb.ZkWebSpringBootApplication --> Started ZkWebSpringB>
Feb 28 11:55:38 elk94 java[16975]: [2024-02-28 03:55:38 WARN  main-SendThread(elk95:2181)  ClientCnxn.java:1108] org.apache.zookeeper.ClientCnxn --> Client >
Feb 28 11:55:50 elk94 java[16975]: [2024-02-28 03:55:50 WARN  main-SendThread(elk96:2181)  ClientCnxn.java:1108] org.apache.zookeeper.ClientCnxn --> Client >
[root@elk94 ~]# 
[root@elk94 ~]# /oldboyedu/softwares/jdk1.8.0_291/bin/jmap -heap 16975 | grep MaxHeapSize
  MaxHeapSize              = 134217728 (128.0MB)
[root@elk94 ~]# 

2.6 kafka 工作原理补充

副本 分区

2.7 topic的基础管理

(1) 创建topic

方式1
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --topic oldboyedu-linux --partitions 3 --replication-factor 1 --create

创建一个3分区1副本对应的topic名称为"oldboyedu-linux"。

方式2 
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux --partitions 3 --replication-factor 1 --create


[root@elk94 ~]#  kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --topic oldboyedu-linux-001 --partitions 3 --replication-factor 3 --create

创建一个3分区3副本对应的topic名称为"oldboyedu-linux-001"。
	

温馨提示:
	创建的副本数量不得大于存活的broker数量。

(2) 查看topic列表

查看topic的名称列表
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --list
oldboyedu-linux
oldboyedu-linux-001
[root@elk94 ~]# 

单节点查看
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092 --list
oldboyedu-linux
oldboyedu-linux-001
[root@elk94 ~]# 

2.2 查看指定topic的详细列表
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --topic oldboyedu-linux --describe
Topic: oldboyedu-linux	TopicId: eVLUPmeFTGOapg9W-U5aow	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux	Partition: 0	Leader: 94	Replicas: 94	Isr: 94
	Topic: oldboyedu-linux	Partition: 1	Leader: 95	Replicas: 95	Isr: 95
	Topic: oldboyedu-linux	Partition: 2	Leader: 96	Replicas: 96	Isr: 96
[root@elk94 ~]# 

2.3 查看所有topic的详细信息
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092  --describe
Topic: oldboyedu-linux-001	TopicId: 3YcpgAgSTgy77eu15OGHLg	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: oldboyedu-linux-001	Partition: 0	Leader: 96	Replicas: 96,94,95	Isr: 96,94,95
	Topic: oldboyedu-linux-001	Partition: 1	Leader: 94	Replicas: 94,95,96	Isr: 94,95,96
	Topic: oldboyedu-linux-001	Partition: 2	Leader: 95	Replicas: 95,96,94	Isr: 95,96,94
Topic: oldboyedu-linux	TopicId: eVLUPmeFTGOapg9W-U5aow	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux	Partition: 0	Leader: 94	Replicas: 94	Isr: 94
	Topic: oldboyedu-linux	Partition: 1	Leader: 95	Replicas: 95	Isr: 95
	Topic: oldboyedu-linux	Partition: 2	Leader: 96	Replicas: 96	Isr: 96
[root@elk94 ~]# 

(3) 修改topic

3.1 修改topic的分区数
[root@elk94 ~]# kafka-topics.sh --bootstrap-server  10.0.0.96:9092 --topic oldboyedu-linux --alter --partitions 5

3.2 查看修改后的分区数量
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --describe --topic oldboyedu-linux
Topic: oldboyedu-linux	TopicId: eVLUPmeFTGOapg9W-U5aow	PartitionCount: 5	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux	Partition: 0	Leader: 94	Replicas: 94	Isr: 94
	Topic: oldboyedu-linux	Partition: 1	Leader: 95	Replicas: 95	Isr: 95
	Topic: oldboyedu-linux	Partition: 2	Leader: 96	Replicas: 96	Isr: 96
	Topic: oldboyedu-linux	Partition: 3	Leader: 94	Replicas: 94	Isr: 94
	Topic: oldboyedu-linux	Partition: 4	Leader: 95	Replicas: 95	Isr: 95
[root@elk94 ~]# 
	
	
温馨提示:
	分区数只能调大,不能调小。
[root@elk94 ~]# kafka-topics.sh --bootstrap-server  10.0.0.96:9092 --topic oldboyedu-linux --alter --partitions 2
Error while executing topic command : Topic currently has 5 partitions, which is higher than the requested 2.
[2024-02-28 07:09:04,590] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 2.
(org.apache.kafka.tools.TopicCommand)
[root@elk94 ~]# 

3.3 修改topic的副本数量[了解即可]
https://www.cnblogs.com/yinzhengjie/p/9808125.html

(4) 删除topic

4.1 查看现有的topic列表
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --list
oldboyedu-linux
oldboyedu-linux-001
[root@elk94 ~]# 
		
4.2 删除指定的topic
[root@elk94 ~]# kafka-topics.sh --bootstrap-server  10.0.0.96:9092 --topic oldboyedu-linux --delete


4.3 再次查看现有的topic列表
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092,10.0.0.95:9092,10.0.0.96:9092 --list
oldboyedu-linux-001
[root@elk94 ~]# 

	

常见的错误分析:
	1.Replication factor: 4 larger than available brokers: 3.
错误原因:
	创建的副本数量大于存活的节点数量。
	
解决方案:
	将副本数量小于存活的节点数量或者加节点。
	
	

2.Topic currently has 5 partitions, which is higher than the requested 2
错误原因:
	分区数只能调大,不能调小。
	
解决方案:
	分区数只能比现有的分区数大,不能比现有的分区数小。

2.8 生产者和消费者

  1. 启动生产者
[root@zookeeper1 ~]#kafka-console-producer.sh --bootstrap-server 10.0.0.201:9092 --topic oldboyedu-linux-001
>11111111111111
>22222222222222
>33333333333333
>

温馨提示:
	此时不会产生任何的新的topic,因为此时写入的topic我们已经提前创建过。
  1. 启动消费者
[root@zookeeper2 /oldboy/softwares/kafka_2.13-3.7.0]#kafka-console-consumer.sh --bootstrap-server 10.0.0.202:9092 --topic oldboyedu-linux-001 --from-beginning
11111111111111
22222222222222
33333333333333
55555555555
6666666666


--from-beginning  从开头开始消费
不加  就是消费最新的   该消费者启动时,如果生产者  生产了数据才会消费

馨提示:
	此时启动消费者后,会多出来一个新的topic,名称为"__consumer_offsets"的topic。
	该topic由系统维护,默认有50个分区,1个副本。该topic里面存储了每个消费者offset相关消费记录。
	
[root@zookeeper2 /oldboy/softwares/kafka_2.13-3.7.0]#kafka-topics.sh --bootstrap-server 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 --list
__consumer_offsets
oldboyedu-linux-001

# 查看该主题
[root@zookeeper2 /oldboy/softwares/kafka_2.13-3.7.0]#kafka-topics.sh --bootstrap-server 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 --describe --topic __consumer_offsets
Topic: __consumer_offsets       TopicId: ySdEquYYToSxjoGm4wT9aw PartitionCount: 50      ReplicationFactor: 1    Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
       Topic: __consumer_offsets       Partition: 0    Leader: 203     Replicas: 203   Isr: 203
       Topic: __consumer_offsets       Partition: 1    Leader: 202     Replicas: 202   Isr: 202
       Topic: __consumer_offsets       Partition: 2    Leader: 201     Replicas: 201   Isr: 201
       Topic: __consumer_offsets       Partition: 3    Leader: 203     Replicas: 203   Isr: 203
       Topic: __consumer_offsets       Partition: 4    Leader: 202     Replicas: 202   Isr: 202
       Topic: __consumer_offsets       Partition: 5    Leader: 201     Replicas: 201   Isr: 201
  1. 消费者组概念(consumer groups)

(1) 消费者组包含一个或多个消费者

(2) 消费者:负责从kafka拉取数据的角色

(3) offset: 偏移量,记录了消费者组消费的数组位置

  • 这个偏移量存储在kafka集群内置的topic,我们称之为"__consumer_offsets"。
    早期kafka 0.9.0-版本将这个偏移量存储在zookeeper集群。

(4) rebalance: 重平衡,指的是当一个消费组的消费者数量发生变化时,消费者重新分配partitons的动作。

3.1 在消费数据的时候添加消费者组

1. 启动生产者
此时不存在oldboyedu-linux 主题  ,直接指定会自动创建
温馨提示:
	写入数据到kafka集群时,可能会出现以上的报错信息,但是其会自动创建topic,我们忽略即可。
	生产环境中,我强烈建议大家将其关闭。
[2024-10-20 18:29:15,651] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {oldboyedu-linux=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)


[root@zookeeper1 ~]#kafka-console-producer.sh --bootstrap-server 10.0.0.201:9092 --topic oldboyedu-linux
>AAA
[2024-10-20 18:29:15,651] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {oldboyedu-linux=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

2. 启动消费者
(1) 基于配置文件指定消费者组
	①:修改配置文件,linux90  指定消费者组名称为linux90
[root@zookeeper2 /oldboy/softwares/kafka_2.13-3.7.0/config]#vim consumer.properties 
[root@zookeeper2 /oldboy/softwares/kafka_2.13-3.7.0/config]#grep ^group.id consumer.properties 
group.id=linux90
	②:启动消费者时指定啊配置文件
[root@zookeeper2 /oldboy/softwares/kafka_2.13-3.7.0/config]#kafka-console-consumer.sh --bootstrap-server 10.0.0.203:9092 --consumer.config
$KAFKA_HOME/config/consumer.properties --topic oldboyedu-linux --from-beginning
AAA
^CProcessed a total of 1 messages

--bootstrap-server 10.0.0.203:9092 随机指定或全部指定都可以
(2) 基于配置属性指定消费者组启动消费者
kafka-console-consumer.sh --bootstrap-server 10.0.0.95:9092 --topic  oldboyedu-linux --consumer-property group.id=linux90 



3. 查看消费者组的详细信息
[root@zookeeper1 ~]#kafka-consumer-groups.sh  --bootstrap-server 10.0.0.203:9092 --describe  --group linux90

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
linux90         oldboyedu-linux 0          1               1               0               console-consumer-d7b34f12-6ebc-4a24-b2c4-5d00ea16497c /10.0.0.202     console-consumer
# 查看消费者组
[root@zookeeper1 ~]#kafka-consumer-groups.sh  --bootstrap-server 10.0.0.203:9092 --list
linux90

4. 停止消费者,并往生产者写入两条数据
# 停止消费者
	4.1 停止消费者
将95节点的kafka-console-consumer.sh终止运行。

	4.2 94节点发送2条测试数据
[root@elk94 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux
...
>123456
>78910jqk
>

	4.3 再次查看linux90这个消费者的信息,发现有2条数据延迟。
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group linux90

Consumer group 'linux90' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG              CONSUMER-ID     HOST            CLIENT-ID
linux90         oldboyedu-linux 0          2               4               2               -               -               -
[root@elk94 ~]# 

	
温馨提示:
	上面的“Consumer group 'linux90' has no active members”提示,说明当前环境的linux90这个组没有消费者拿数据。注意,此时有2条数据延迟未消费。
	
	5.再次基于配置属性启动消费者进行数据消费
		5.1 在命令行中指定消费者去kafka节点拉取数据
[root@elk96 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.95:9092 --topic  oldboyedu-linux --group linux90 --from-beginning 
123456
78910jqk

温馨提示:
	尽管我使用"--from-beginning",也不会直接从头开始拿数据,因为此时在kafka集群中记录了linux90这个消费者组最新的CURRENT-OFFSET为2。
	
		5.2 再次查看消费组信息,发现数据被消费了。
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group linux90

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
linux90         oldboyedu-linux 0          4               4               0               console-consumer-7d073e4e-c7be-48b9-b114-803f3e5377f2 /10.0.0.96      console-consumer
[root@elk94 ~]# 


	6.停止消费者,并往生产者写入2条测试数据
		6.1 停止消费者
将96节点的kafka-console-consumer.sh终止运行。


		6.2 94节点发送2条测试数据
[root@elk94 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux
...
>ABCDEFG
>abcdefg
>

		6.3 再次查看linux90这个消费者的信息,发现有2条数据延迟。
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group linux90

Consumer group 'linux90' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
linux90         oldboyedu-linux 0          4               6               2               -               -               -
[root@elk94 ~]# 


	7.再次启动消费者并执行消费者组
		7.1 基于配置属性启动消费者
[root@elk96 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.95:9092 --topic  oldboyedu-linux --consumer-property group.id=linux90 
ABCDEFG
abcdefg


		7.2 再次查看消费者的信息
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group linux90

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
linux90         oldboyedu-linux 0          6               6               0               console-consumer-9bdf881c-1c1e-481f-846d-51ceefa533d6 /10.0.0.96      console-consumer
[root@elk94 ~]# 


image-20241020174720371

2.9 kafka消费者rebalance案例

- kafka消费者rebalance案例
	1.创建topic
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux90 --partitions 3 --replication-factor 1 --create
Created topic oldboyedu-linux90.
[root@elk94 ~]# 
[root@elk94 ~]# kafka-topics.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux90 --describe
Topic: oldboyedu-linux90	TopicId: 1ORIw6K4R--Te4AhxTOCpQ	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux90	Partition: 0	Leader: 95	Replicas: 95	Isr: 95
	Topic: oldboyedu-linux90	Partition: 1	Leader: 96	Replicas: 96	Isr: 96
	Topic: oldboyedu-linux90	Partition: 2	Leader: 94	Replicas: 94	Isr: 94
[root@elk94 ~]# 

	
	2.启动生产者并写入测试数据
[root@elk94 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux90
>123
>456
>

	
	3.启动第一个消费者
[root@elk95 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.95:9092 --topic oldboyedu-linux90 --group jasonyin --from-beginning
123
456
	
	4.观察消费者组信息
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group jasonyin

GROUP           TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
jasonyin        oldboyedu-linux90 0          0               0               0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
jasonyin        oldboyedu-linux90 1          2               2               0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
jasonyin        oldboyedu-linux90 2          0               0               0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
[root@elk94 ~]# 


温馨提示:
	不难发现,此时"jasonyin"这个消费者组仅有一个ID为"console-consumer-62560ece-0fbf-474d-a3e4-919064661433"的消费者。
	
	
	5.启动第二个消费者
[root@elk96 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.96:9092 --topic oldboyedu-linux90 --group jasonyin

	
	6.再次观察消费者组信息
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group jasonyin

GROUP           TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
jasonyin        oldboyedu-linux90 0          0               0               0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
jasonyin        oldboyedu-linux90 1          2               2               0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
jasonyin        oldboyedu-linux90 2          0               0               0               console-consumer-b0f4f974-57d2-4597-bd88-110e80f2a1ef /10.0.0.96      console-consumer
[root@elk94 ~]# 


温馨提示:
	不难发现,此时对于"jasonyin"这个消费者组而言,有2个消费者,消费者数量发生了变化,因此会出发rebalance,也就是重新分配partition。
	其中"console-consumer-62560ece-0fbf-474d-a3e4-919064661433"占用0和1对应的partition。
	而"console-consumer-b0f4f974-57d2-4597-bd88-110e80f2a1ef"占用2对应的partition
	
	
	
	
	
	7.再次启动多个生产者写入测试数据
[root@elk94 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.94:9092 --topic oldboyedu-linux90
>11111111111111111111111
>2222222222222222222222222
>
	
	
温馨提示:
	可能启动2个生产者就能看到效果,也可以需要多启动几个生产者才能在96节点看到输出的效果哟~
[root@elk96 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.96:9092 --topic oldboyedu-linux90 --group jasonyin
11111111111111111111111
2222222222222222222222222


	8.再次观察消费者组信息
[root@elk94 ~]# kafka-consumer-groups.sh  --bootstrap-server 10.0.0.95:9092 --describe  --group jasonyin

GROUP           TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
jasonyin        oldboyedu-linux90 0          0               0               0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
jasonyin        oldboyedu-linux90 1          20              20              0               console-consumer-62560ece-0fbf-474d-a3e4-919064661433 /10.0.0.95      console-consumer
jasonyin        oldboyedu-linux90 2          2               2               0               console-consumer-b0f4f974-57d2-4597-bd88-110e80f2a1ef /10.0.0.96      console-consumer
[root@elk94 ~]# 

标签:oldboyedu,10.0,zk,kafka,2181,CONNECTED,root
From: https://www.cnblogs.com/xyff/p/18487670

相关文章

  • 程序员必须了解的消息队列之王-Kafka
    1.Kafka概述1.1定义Kafka是由Apache软件基金会开发的一个开源流处理平台。Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。1.2消息队列1.2.1传统消息队列的应用场景1.2.2为什么需要消息队列解耦:允许你独立的扩展或......
  • k8s部署Kafka集群超详细讲解
    准备部署环境Kubernetes集群信息NAMEVERSIONk8s-masterv1.29.2k8s-node01v1.29.2k8s-node02v1.29.2Kafka:3.7.1版本Zookeeper:3.6.3版本准备StorageClass#kubectlgetscNAMEPROVISIONERRECLAIMPOLICYVOLUMEBINDINGMODEALLOWVOLUMEEXPAN......
  • Kafka原理剖析之「Purgatory(炼狱 | 时间轮)」
    一、前言本文介绍一下Kafka赫赫有名的组件Purgatory,相信做Kafka的朋友或多或少都对其有一定的了解,至少是听过它的名字。那它的作用是什么呢,用来解决什么问题呢?官网confluent早就有文章对其做了阐述https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=3483946......
  • Kafka快速入门
    Kafka简介:ApacheKafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序。它具有高吞吐量、可扩展性和容错性,能够处理数百万级别的读写请求。Kafka的核心特性包括:发布-订阅消息队列:Kaf......
  • Kafka集群以开启客户端鉴权
    在Kubernetes环境中,如果您使用的是StrimziKafkaOperator来管理您的Kafka集群,您可以通过CustomResourceDefinitions(CRD)来配置Kafka集群以开启客户端鉴权。以下是使用API接口创建Kafka集群并开启客户端鉴权的步骤:1.安装StrimziKafkaOperator首先,确保您已经在Kubernet......
  • Apache Kafka消息传递策略
    kafka消息传递策略微信公众号:阿俊的学习记录空间小红书:ArnoZhangwordpress:arnozhang1994博客园:arnozhangCSDN:ArnoZhang1994现在我们了解了一些关于生产者和消费者的工作原理,接下来讨论Kafka在生产者和消费者之间提供的策略保证。显然,消息传递可以提供多种保证:最多一次......
  • Apache Kafka设计思考
    kafka设计微信公众号:阿俊的学习记录空间小红书:ArnoZhangwordpress:arnozhang1994博客园:arnozhangCSDN:ArnoZhang1994一、目标能够作为一个统一的平台,处理大型公司可能拥有的所有实时数据流。(更像是数据库日志)高吞吐量:Kafka必须具有高吞吐量,以支持高容量的事件流,例如实时......
  • Apache Kafka 使用示例
    Kafka快速入门指南微信公众号:阿俊的学习记录空间小红书:ArnoZhangwordpress:arnozhang1994博客园:arnozhangCSDN:ArnoZhang1994第一步:获取Kafka下载2.13-3.8.0版本的Kafka版本并解压:$tar-xzfkafka_2.13-3.8.0.tgz$cdkafka_2.13-3.8.0第二步:启动Kafka环境注意:你的......
  • Apache Kafka的生态
    Kafka生态系统微信公众号:阿俊的学习记录空间小红书:ArnoZhangwordpress:arnozhang1994博客园:arnozhangCSDN:ArnoZhang1994以下是与Kafka集成的工具列表,涵盖了不同领域的工具和扩展。这些工具并非全部经过验证使用,部分可能不兼容或存在问题。KafkaConnectKafk......
  • Apache Kafka各Api模块说明
    KafkaAPI微信公众号:阿俊的学习记录空间小红书:ArnoZhangwordpress:arnozhang1994博客园:arnozhangCSDN:ArnoZhang1994Kafka包含五个核心API:ProducerAPI允许应用程序将数据流发送到Kafka集群中的topic。ConsumerAPI允许应用程序从Kafka集群中的topic读取数据流......