首页 > 系统相关 >kafka集群安装(CentOS7 + kafka 2.7.1)

kafka集群安装(CentOS7 + kafka 2.7.1)

时间:2023-09-05 14:33:37浏览次数:56  
标签:topic -- root 192.168 kafka CentOS7 2.7

Linux系统-部署-运维系列导航

 

kafka介绍

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 主要应用场景是:日志收集系统和消息系统。 Kafka主要设计目标如下:
  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展
  kafka架构

 

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
概念/对象 简单说明
Broker Kafka节点
Topic 主题,用来承载消息
Partition 分区,用于主题分片存储
Producer 生产者,向主题发布消息的应用
Consumer 消费者,从主题订阅消息的应用
Consumer Group 消费者组,由多个消费者组成
kafka版本  在下载 Kafka 时会发现 Kafka 的版本号大概是这样的:
Scala 2.12 - kafka_2.12-2.7.1.gz
Scala 2.13 - kafka_2.13-2.7.1.gz
前面的版本号是编译 Kafka 源代码的 Scala 编译器版本,如 2.12 或者 2.13,除非特殊场景明确版本,或运行异常,否则建议选择2.12版本。 Kafka 服务器端的代码完全由 Scala 语言编写,后面 2.7.1 才是 Kafka 的版本号。由 3 个部分构成,即“大版本号 - 小版本号 - Patch 号“。 不论你用的是哪个版本,都请尽量保持服务器端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。 还有就是在生产环境不要贸然升级到最新版本,新版本多多少少都存在一些小问题,至少要在测试环境确认没问题后再升级。  

kafka安装

架构设计
机器名称   IP 服务器角色 备注
localhost 192.168.11.64 节点0 CentOS7 + kafka 2.7.1
localhost 192.168.11.67 节点1 CentOS7 + kafka 2.7.1
localhost 192.168.11.73 节点2 CentOS7 + kafka 2.7.1
  kafka官方支持二进制安装。   组件安装操作步骤参考 组件安装部署手册模板,根据不同组件的安装目标,部分操作可以省略。 本文将按照该参考步骤执行。

一、获取组件可执行程序库,包括主程序,此为组件的基本文件

1.官网下载 kafka安装包 创建目录 /usr/local/kafka,将源码包下载到该目录下,支持wget获取  

 

2.安装依赖组件 kafka组件依赖 JDK 与 zookeeper请参考   3.解压安装
[root@localhost kafka]# tar -zxvf kafka_2.13-2.7.1.tgz 
[root@localhost kafka]# cd kafka_2.13-2.7.1
[root@localhost kafka_2.13-2.7.1]# ll
总用量 40
drwxr-xr-x 3 root root  4096 4月   8 2021 bin
drwxr-xr-x 2 root root  4096 2月  14 17:31 config
drwxr-xr-x 2 root root  8192 2月  14 14:44 libs
-rw-r--r-- 1 root root 14535 4月   8 2021 LICENSE
drwxr-xr-x 2 root root   236 4月   8 2021 licenses
-rw-r--r-- 1 root root   953 4月   8 2021 NOTICE
drwxr-xr-x 2 root root    44 4月   8 2021 site-docs

 

二、安装系统服务

kafka默认没有安装系统服务,通过主程序运行。 kafka启动时需要指定配置文件,不建议安装系统服务  

三、主程序加入到环境变量

1..将kafka目录添加到环境变量
[root@localhost bin]# vim /etc/profile
#kafka env
export KAFKA_HOME=/usr/local/kafka/kafka_2.13-2.7.1
export PATH=$PATH:$KAFKA_HOME/bin

[root@localhost bin]# source /etc/profile

 

四、配置文件

1.创建主配置文件 kafka主程序启动时,需要指定配置文件,默认配置文件路径为 /usr/local/kafka/kafka_2.13-2.7.1/config/server.properties
[root@localhost config]# vim server.properties
核心配置项有4个,如下

############################# Server Basics #############################
# broker id,3个节点此处分别设置为:broker.id=0,broker.id=1,broker.id=2
broker.id=0

############################# Socket Server Settings #############################
# 服务监听端口,即kafka服务对外提供服务的端口,格式为 listeners = listener_name://host_name:port
listeners=PLAINTEXT://192.168.11.64:8092

############################# Log Basics #############################
# 数据日志目录,默认为/tmp/kafka-logs,可能会被定时清除,设置专用目录
log.dirs=/data/kafka/logs

############################# Zookeeper #############################
# zookeeper地址及端口号
# 如果是集群,格式为:hostname1:port1,hostname2:port2,hostname3:port3
# 如果是带有根目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/root-path
zookeeper.connect=192.168.11.63:8181,192.168.11.66:8181,192.168.11.72:8181

# 如根目录为kafka:在末尾增加 /kafka,因为注册到ZooKeeper的除了kafka可能还有其他服务,增加根目录便于区分
# zookeeper.connect=192.168.11.63:8181,192.168.11.66:8181,192.168.11.72:8181/kafka

 

2.其他常用配置项请参考 kafka 配置文件参数详解 初始采用默认配置如下,根据项目需要调整
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# broker id,3个节点此处分别设置为:broker.id=0,broker.id=1,broker.id=2
broker.id=1

############################# Zookeeper #############################
# zookeeper地址及端口号
# 如果是集群,格式为:hostname1:port1,hostname2:port2,hostname3:port3
# 如果是带有根目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/root-path
# 不指定kafka根目录,则默认使用zk根目录/
zookeeper.connect=192.168.11.171:8181,192.168.11.173:8181,192.168.11.177:8181

# 指定目录为kafka:在末尾增加 /kafka,因为注册到ZooKeeper的除了kafka可能还有其他服务,增加根目录便于区分
# zookeeper.connect=192.168.11.63:8181,192.168.11.66:8181,192.168.11.72:8181/kafka

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
# kafka真实监听的地址,内网地址,格式为 listeners = listener_name://host_name:port
# kafka对外开放的地址,会注册到zk,供集群、生产者、消费者使用,可以单独配置:advertised.listeners,如果未配置,则使用 listeners
# 以下listeners为新版本(>0.10)配置方式,老版本分为 host.name、port、advertised.host.name、advertised.port
listeners=PLAINTEXT://192.168.11.173:8092

# broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3

# broker处理磁盘IO 的线程数 ,数值应该大于硬盘数
num.io.threads=8

# 套接字发送缓冲区大小
socket.send.buffer.bytes=102400

# socket接收缓冲区大小
socket.receive.buffer.bytes=102400

# socket请求的最大数值,防止server OOM,如果设置message.max.bytes,必然要小于socket.request.max.bytes
# 该参数会被topic创建时的指定参数覆盖
# 默认为 100 * 1024 * 1024
socket.request.max.bytes=104857600

############################# Log Basics #############################
# 数据日志目录,默认为/tmp/kafka-logs,可能会被定时清除,设置专用目录
log.dirs=/data/kafka/logs

# 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
# 建议根据项目实际数据量、客户端负载并发量评估,避免分区过少造成负载无法扩展
num.partitions=9

# 自动创建的topic分区的副本个数,影响客户端负载均衡默认为1,如果broker宕机,则其上分区将失效,建议配置大于1,如3
# 特别注意,该配置不能大于集群内节点个数
default.replication.factor=3

############################# Internal Topic Settings  #############################
# kafka内部topic__consumer_offsets,影响消费者offset信息的高可用
# 开发和测试期间,可以设置为1,生产环境设置应该大于1,建议3
# 特别注意,offsets.topic.replication.factor不能大于集群内节点个数
offsets.topic.num.partitions=3
offsets.topic.replication.factor=3

############################# Log Retention Policy #############################
# segment文件保留的最长时间,默认保留7天(168小时)
log.retention.hours=24

# segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。
# 这里就是用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

# 日志文件中每个segment大小的检查周期
log.retention.check.interval.ms=300000

# zookeeper连接超时时间
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
# 指定rebalance延迟时间,单位毫秒,默认3s,即3000
# 当消费组加入消费者成员时,组协调器(GroupCoordinator)默认立即执行再平衡(rebalance),所以多个消费者同时加入消费者组时(如应用启动),组协调器将多次处理rebalance,造成性能浪费
# 开发和测试期间,可以设置为0,提高rebalance效率,但生产环境建议设置默认值3s,可以根据消费端应用启动时间评估合理的延迟时间
# 注意:该参数默认不超过rebalance超时时间
group.initial.rebalance.delay.ms=3000

# rebalance超时时间,单位毫秒,默认5分钟,即300000,超时后,服务端默认消费组为空(没有消费者),将重新发送数据
# 如果消费者应用启动时间超过该超时时间,则将收到重复数据
max.poll.interval.ms=300000

 

特别关注:
  • 配置文件中相关副本数replication.factor,不能大于集群内节点个数,即不允许副本同时在一台节点上;
  • 分区数partitions配置不受限制。
  特别关注:配置文件中指定的所有路径,请在启动前确保已存在   特别关注:kafka各命令的帮助信息,可以执行 bin 目录下相应命令,不带任何参数,即可打印帮助信息  

五、运行用户

默认使用root运行即可。  

六、开机启动

请参考教程 Linux开机启动方案   

七、服务启动运行

以主程序运行为例(已经将主目录添加环境变量) 1.启动 
# -daemon 指定后台启动
[root@localhost kafka_2.13-2.7.1]# ./bin/kafka-server-start.sh -daemon ./config/server.properties

 

2.启动日志可以查看
/usr/local/kafka/kafka_2.13-2.7.1/logs/kafkaServer.out 
/usr/local/kafka/kafka_2.13-2.7.1/logs/server.log

 

3.停止
[root@localhost kafka_2.13-2.7.1]# ./bin/kafka-server-stop.sh

 

4.验证集群信息 登录zookeeper,查看kafka节点信息
#连接zk节点
[root@localhost conf]# zkCli.sh -server 192.168.11.72:8181  

#查看kafka节点,1,2,3即为3台kafka节点server.properties中配置的broker.id的值
[zk: 192.168.11.72:8181(CONNECTED) 0] ls /brokers/ids 
[0, 1, 2]               

 

5.创建topic
#在11.64执行创建topic:testtopic,指定6个分区,每个分区有3个备份
[root@localhost kafka_2.13-2.7.1]# bin/kafka-topics.sh --create --bootstrap-server 192.168.11.64:8092 --replication-factor 3 --partitions 6 --topic testtopic
Created topic testtopic.

#在11.67、11.73上查看topic,确认topic:testtopic有6个分区,每个分区有3个备份
[root@localhost kafka_2.13-2.7.1]# bin/kafka-topics.sh --describe --bootstrap-server 192.168.11.67:8092 --topic testtopic
Topic: testtopic    PartitionCount: 6    ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: testtopic    Partition: 0    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: testtopic    Partition: 1    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: testtopic    Partition: 2    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: testtopic    Partition: 3    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: testtopic    Partition: 4    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: testtopic    Partition: 5    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1 

 

6.生产者发送消息
#在11.64上生产消息
[root@localhost kafka_2.13-2.7.1]# bin/kafka-console-producer.sh --bootstrap-server 192.168.11.64:8092 --topic testtopic
>hello
>wolrd
>!
>^C

 

7.消费者消费消息
#在11.67、11.73上消费消息
[root@localhost kafka_2.13-2.7.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.64:8092 --topic testtopic --from-beginning
!
wolrd
hello

 

8.模拟集群节点故障
#模拟11.67节点故障宕机
[root@localhost kafka_2.13-2.7.1]# bin/kafka-server-stop.sh

#11.64继续生产消息
[root@localhost kafka_2.13-2.7.1]# bin/kafka-console-producer.sh --bootstrap-server 192.168.11.64:8092 --topic testtopic
>new message 1
>new message 2

#11.73正常消费消息
[root@server73 kafka_2.13-2.7.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.64:8092 --topic testtopic --from-beginning
!
wolrd
hello
new
new message 2
new message 1

 

特别关注:kafka的数据日志在配置文件中调整 log.retention 相关参数支持自动清理,但kafka的运行日志(默认在kafka安装目录下 logs),没有自动清理功能,需要人工维护,可以结合crontab定时清理,示例如下,保存最近7天运行日志
crontab -e
#定时清理日志
1 0 * * * find /usr/local/kafka/kafka_2.13-2.7.1/logs/ -mtime +7 | xargs rm -rf {}

更多日志管理,请参考 Linux日志管理经验总结(crontab+logrotate)

  特别关注:系统默认启用了SELinux内核模块(安全子系统),所以在服务绑定/监听某些端口时,提示无访问权限,此时需要禁用SELinux,修改 /etc/selinux/config 文件,设置SELINUX=disabled
Can't start server: Bind on TCP/IP port: Permission denied
  特别关注:selinux设置完成需要重启生效,如果当前不方便重启,可以执行 setenforce 0 临时关闭selinux,下次重启是配置再生效   特别关注:系统默认启用了防火墙,请在启动服务前关闭防火墙,或在防火墙中添加服务端口

 

附录:关于默认分区数与默认分区副本数

更多说明,请参考附录:Kafka的分区重分配
  1. 分区数影响客户端负载均衡,即最大支持分区数量的客户端负载
  2. 副本数影响数据高可用,即最大支持副本数的broker宕机,只要在存活broker中有足够的副本组成完整topic即可
  3. 分区数与副本数可以在配置文件 server.properties 中指定,也可以在创建topic脚本中指定
  4. 如果topic在使用之前没有创建,则会根据配置文件指定的属性自动创建topic
  5. 分区数支持在线命令调整,但只能扩展,不能减少,命令:kafka-topics.sh --alter
  6. 副本数不支持在线命令直接调整,但可以通过重新分区方案,大致流程如下
1.导出当前topic列表,保存为topics.json,文件格式通过执行 kafka-reassign-partitions.sh 查看帮助 --topics-to-move-json-file选项
bin/kafka-topics.sh --bootstrap-server 192.168.11.63:8092 --list 

2.根据topic列表生成重新分区模板,将重新分区后的内容保存reassign.json
kafka-reassign-partitions.sh --bootstrap-server 192.168.11.63:8092 --generate --topics-to-move-json-file topics.json --broker-list 0,1,2

3.修改各分区模板,调整各topic各分区副本,注意各分区副本列表replicas不能设置一致(id顺序),否则所有分区leader将在同一台broker上,数据和流量不平均
{
    "version": 1,
    "partitions": [
        {
            "topic": "testtopic",
            "partition": 0,
            "replicas": [2,1,0],
            "log_dirs": ["any","any","any"]
        },
        {
            "topic": "testtopic",
            "partition": 1,
            "replicas": [1,2,0],
            "log_dirs": ["any","any","any"]
        },
        {
            "topic": "testtopic",
            "partition": 2,
            "replicas": [0,1,2],
            "log_dirs": ["any","any","any"]
        }
    ]
}
  
4.执行重新分区设置
kafka-reassign-partitions.sh --bootstrap-server 192.168.11.63:8092 --reassignment-json-file reassign.json --execute

5.查看重新分区结果
kafka-reassign-partitions.sh --bootstrap-server 192.168.11.63:8092 --reassignment-json-file reassign.json --verify
kafka-topics.sh --bootstrap-server 192.168.11.63:8092 --topic testtopic --describe

 

 

附录:参考资料

1.Kafka学习之路 (一)Kafka的简介 2.【kafka学习笔记】Kafka的演进历史 3.KAFKA集群部署指南 4.kafka架构和原理 5.Kafka Broker配置 6.kafka控制台工具使用 7.Kafka的分区重分配

标签:topic,--,root,192.168,kafka,CentOS7,2.7
From: https://www.cnblogs.com/xiaoyaozhe/p/17671556.html

相关文章

  • MySQL安装--rpm(CentOS7 + MySQL 5.7.35)
    Linux系统-部署-运维系列导航 MySQL常用安装方式有3种:rpm安装、yum安装、二进制文件安装。本文介绍rpm安装方式。 组件安装操作步骤参考 组件安装部署手册模板,根据不同组件的安装目标,部分操作可以省略。本文将按照该参考步骤执行。 一、获取组件可执行程序库,包括主程......
  • zookeeper集群安装(CentOS7 + zookeeper 3.7.0)
    Linux系统-部署-运维系列导航 zookeeper介绍ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。 zo......
  • nginx安装 - rpm安装(CentOS7 + nginx 1.20.2)
    Linux系统-部署-运维系列导航 Nginx介绍官方网站为:http://nginx.org/。它是一款免费开源的高性能HTTP代理服务器及反向代理服务器(ReverseProxy)产品,同时它还可以提供IMAP/POP3邮件代理服务等功能。它高并发性能很好,官方测试能够支撑5万的并发量;运行时内存和CPU占用率......
  • nginx安装 - yum安装(CentOS7 + nginx 1.20.2)
    Linux系统-部署-运维系列导航 Nginx介绍官方网站为:http://nginx.org/。它是一款免费开源的高性能HTTP代理服务器及反向代理服务器(ReverseProxy)产品,同时它还可以提供IMAP/POP3邮件代理服务等功能。它高并发性能很好,官方测试能够支撑5万的并发量;运行时内存和CPU占用率......
  • nginx安装 - 二进制源码编译安装(CentOS7 + nginx 1.20.2)
    Linux系统-部署-运维系列导航 Nginx介绍官方网站为:http://nginx.org/。它是一款免费开源的高性能HTTP代理服务器及反向代理服务器(ReverseProxy)产品,同时它还可以提供IMAP/POP3邮件代理服务等功能。它高并发性能很好,官方测试能够支撑5万的并发量;运行时内存和CPU占用率......
  • 【Kafka系列】(一)Kafka入门
    有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准https://blog.zysicyj.top首发博客地址系列文章地址Kafka是什么?一句话概括:ApacheKafka是一款开源的消息引擎系统什么是消息引擎系统?消息引擎系统(MessageBrokerSystem)是一种中间件软件或服务,用......
  • Go语言实现Kafka消费者的示例代码
    Kafka是一种分布式流处理平台,由Facebook于2011年推出,现在已经成为Apache项目的一部分。Kafka提供了高可用性、可扩展性和低延迟的消息传递服务,适用于处理实时和离线数据。Kafka的主要功能包括生产者-消费者通信、批处理和实时数据流处理。Kafka基于发布/订阅模型,允许消息发布者将数......
  • 远程连接centos7
    首先第一步,将虚拟机网络设置成NAT模式。第二步,通过vi编辑该文件:  vi/etc/sysconfig/network-scripts/ifcfg-ens33  首先修改BOOTPROTO的值: BOOTPROTO="static" 然后在文件最后加上IP配置:(三个值与前面VMWarefusion网络配置中的Subn......
  • linux centos7分区
    boot: swap: /: ......
  • kafka原理与应用
    架构图BrokerKafka集群包含多个服务器,服务器节点称为BrokerBroker存储Topic数据如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个br......