首页 > 其他分享 >Kafka知识总结之集群环境搭建

Kafka知识总结之集群环境搭建

时间:2022-12-19 14:02:02浏览次数:35  
标签:副本 -- zookeeper broker kafka topic 集群 Kafka 搭建


简述

Kafka是一个分布式流平台,本质是一个消息队列。消息队列的三个作用:异步消峰解耦

一. 安装zookeeper

1.1. 下载并解压

# 下载
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

# 解压
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz

1.2. 修改配置

这里我们需要将解压目录中的​​config​​​目录里面的​​zoo_sample.cfg​​​复制一份为​​zoo.cfg​​。

Kafka知识总结之集群环境搭建_kafka

这个配置文件就是​​zookeeper​​​的配置文件,整体来说我们不需要修改,但这里我们别为了不影响其他的文件目录,只为了我们测试使用,需要修改下​​zookeeper​​的数据目录。

# CS通信心跳时间
tickTime=2000

# 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)
initLimit=10

# 集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数
syncLimit=5

# 该属性对应的目录是用来存放myid信息跟一些版本,日志,跟服务器唯一的ID信息等
dataDir=/tmp/zookeeper

# 客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问!这个端口默认是2181
clientPort=2181

#maxClientCnxns=60

#autopurge.snapRetainCount=3

#autopurge.purgeInterval=1

## Metrics Providers

#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

我们这里修改​​dataDir=/tmp/zookeeper​​为刚才我们解压的目录!

1.3. 启动zookeeper

进入bin目录,执行​​./zkServer.sh start​

Kafka知识总结之集群环境搭建_集群搭建_02

二. 安装kafka

2.1. 下载并解压

# 下载
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
# 解压
tar -zxf kafka_2.12-2.6.0.tgz

2.2. 修改配置文件

我们修改三个位置:

配置项


描述

listeners

PLAINTEXT://192.168.31.26:9092

broker 服务器要监听的地址及端口 . 默认是 localhost:9092 ,​​0.0.0.0​​的话 ,表示监听本机的所有ip地址.

advertiesd.listeners

PLAINTEXT://192.168.31.26:9092

这个是对外提供的地址 , 当​​client​​请求到kafka时, 会分发这个地址.

log.dirs

/home/long/kakfa_test/kafka_data

kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-eogs-1,/tmp/kafka-logs-2

# 每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0

############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.31.26:9092

advertiesd.listeners=PLAINTEXT://192.168.31.26:9092

# broker处理消息的最大线程数,一般情况下数量为cpu核数
num.network.threads=3

# broker处理磁盘IO的线程数,数值为cpu核数2倍
num.io.threads=8

# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400

# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400

# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600


############################# Log Basics #############################
# kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/home/long/kakfa_test/kafka_data

num.partitions=1

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# 检查是否需要固化到硬盘的时间间隔
#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 数据文件保留多长时间
log.retention.hours=168

#log.retention.bytes=1073741824

# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824

# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
# zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

2.3. 启动kafka

执行 ​​bin/kafka-server-start.sh config/server.properties​​​ ,就可以启动​​kafka​​了。

Kafka知识总结之集群环境搭建_zookeeper_03

上面的方法是阻塞执行的,我们可以通过​​-daemon​​ 进行守护执行。

bin/kafka-server-start.sh -daemon config/server.properties

停止 ​​kafka​​​ ,需要执行 ​​bin/kafka-server-stop.sh​

Kafka知识总结之集群环境搭建_zookeeper_04

三、kafka的基本使用

3.1. kafka中的一些概念


描述

topic

一个虚拟的概念,由1到多个partition组成

partition

实际消息存储单位

producer

消息生产者

consumer

消息消费者

3.2. 创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic long-topic

参数解释:

参数

解释

–create

创建topic动作指令

–zookeeper

制定kafka所连接的zookeeper服务地址

–topic

指定了所创建的主题名称

–replication-factor

指定副本因子

–partitions

指定分区个数

Kafka知识总结之集群环境搭建_集群搭建_05

3.3. 查看所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

Kafka知识总结之集群环境搭建_数据_06

3.4. 删除topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic long-topic

3.5. 详情topic

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic long-topic

3.6. 发送和接受消息

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.31.26:9092 --topic long-topic

Kafka知识总结之集群环境搭建_集群搭建_07

接受消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.26:9092 --topic long-topic --from-beginning

Kafka知识总结之集群环境搭建_zookeeper_08

四. kafka涉及名称解释

根据下面的图了解下Kafka涉及到相关名词:

Kafka知识总结之集群环境搭建_zookeeper_09

重要名词

名词

解释

Producer

消息生产者,该角色将消息发布到kafka的topic中,broker接受到生产者发送的消息后,broker将该消息追加到当前追加数据的segment文件中。生产者发送的消息,存储到一个parition中,生产者也可以指定数据存储到parition

Consumer

消息消费者,向Kafka Broker取消息的客户端。消费者可以消费多个topic中的数据。

Consumer Group

消费组,由多个Consumer组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker

一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。

Topic

话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic。可以将kafka看作是一个数据库,topic相当于数据库中的一张表,topic相当于表名。

Partition

为了实现扩展性,一个非常大的 topic 可以分割成一个或者多个parition。每一个topic至少拥有一个parition。每一个parition中的数据使用多个segment文件存储。parition是有序的,但是多个parition之间是没有顺序的,在要保证消息的消费顺序的时候,需要将parition数目设置为1。

partition offset

每条消息都有一个当前parition下唯一的64字节的offset,它指明了这条消息的起始位置。

Replica

副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。replicas是不会被消费者消费的。

Leader

每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

Follower

每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。

zookeeper

负责维护和协调broker。当kafka系统增加broker或者broker发生故障失效时,由zookeeper通知生产者和消费者,生产者和消费者依据zookeeper的broker状态信息于broker协调数据的发布和订阅任务

AR(Assigned Replicas)

分区中所有的副本统称为AR

ISR(In Sync Replicas)

所有与Leader部分保持一定程度的副本(包括leader的副本在内)组成ISR

OSR

与Leader副本同步滞后过多的副本

HW(High Watermark)

高水位,标识一个特定的offset,消费者只能拉取到这个offset之前的消息

LEO(Log End Offset)

即日志末端位移,记录了该副本底层日志中下一条消息的位移值。注意是下一条消息!

五. Kafka特点

特性:

  • 高吞吐、低延迟:kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个主体可以分为多个分区,消费组对分区进行消费操作
  • 可扩展性,kafka集群支持热扩展
  • 持久性、可靠性,消息可以被持久到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性,允许集群结点失败(若副本数量为n,则允许失败n-1个结点)
  • 高并发:支持数千个客户端同时读写

六. 使用场景

场景:

  • 日志收集:kafka可以收集各种服务的log,通过kafka以统一接口服务开放给各种消费者
  • 消息系统:解藕生产者和消费者、缓冲消息等
  • 用户活动跟踪:kafka可以用来记录web用户或者app的各种活动操作,做实时监控分析,或者装载到hadoop、数据仓库中做离线分析和数据挖掘
  • 运营指标:kafka也可以用来记录运营监控数据。
  • 流式处理:storm

七. 技术优势

可伸缩性:

  1. kafka在运行期间可以轻松的扩展和收缩(可以添加和删除代理),而不会宕机
  2. 可以扩展一个kafka主题包来包含更多的分区;由于一个分区无法扩展到多个代理,所以它的容量收到代理磁盘空间限制,能够增加分区和代理的数量意味着当个主题可以存储的数据量没有限制。

容错性和可靠性:

kafka的设计方式使某一个代理的故障能够被集群中其他的代理监测到,由于每一个主体都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此故障中继续运行。

吞吐量:

代理能够以快速有效的存储和检索数据

八. Topic、Parition、Broker、Replica、leader和follower之间的关系

  • 在​​Kafka​​​中是以​​Broker​​​区分集群内服务器的;同一个​​Topic​​​下,多个​​Parition​​​经​​Hash​​​分布到不同的​​Broker​​;
  • 一个​​Topic​​​(主题)对应多个​​Parition​​​(分区),这里​​Parition​​​分布在不同的​​Broker​​​上,多个​​Broker​​​一起提供​​Kafka​​​服务;Parition默认是1,不可以减少Parition的数量,但是可以增加;如果想要减少就需要删除原先的​​Topic​​​,然后创建新​​Topic​​,重新设置分区数
  • 同一个​​Topic​​​中的不同​​Parition​​​中数据有顺序性,但是​​Parition​​之间不存在数据顺序性;
  • 每个​​Parition​​​都会有多个数据​​Replica​​​(副本),这些​​Replica​​​分布于不同的​​Broker​​​中,这些副本中会一个副本是​​Leader​​​,其他的副本是​​Follower​​;
  • 当​​producer​​​或者​​consumer​​​发往​​Parition​​​的请求,都是通过​​leader​​​数据副本所在​​broker​​​进行处理,当​​leader​​​所在的​​Broker​​​发生故障,这个​​Broker​​​会成暂时不可用,​​kakfa​​​会自动从其他副本选择一个​​leader​​用于接收客户端请求;
  • 保证在​​broker​​​之间平均分布​​partition​​​副本,每个副本分布在不同的​​broker​​​上,​​broker​​分布可用轮询或哈希。

九. Java操作Kafka

引入依赖:

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>

生产者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

/**
* @author kirin.麒麟
* @version 1.0.0
* @classname KafkaProducer
* @desc Kafka生产者
* @date 2022/1/9 4:21 下午
*/
public class Producer {

/**
* 主题
*/
public final static String Topic = "test-topic";

public static void main(String[] args) throws InterruptedException {

Properties properties = new Properties();
// 集群通过逗号分割
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
// 设置key和value的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
try {
for (int i = 0; i < 100; i++) {
String msg = "Hello, " + new Random().nextInt(100);
// 构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(Topic, msg);
// 发送
kafkaProducer.send(record);
System.out.println("消息发送:" + msg);
Thread.sleep(500);
}
} finally {
// 执行完之后关闭
kafkaProducer.close();
}
}

}

十. Kafka集群搭建

这里使用一个​​zookeeper​​​+三个​​kafka​​​组成​​kafka​​​集群,如果​​zookeeper​​​也需要集群的话可以从前面文章的​​zookeeper​​​集群搭建获取​​shell​​脚本。

这里需要注意下面几点:

节点

kafka-1

kafka-2

kafka-3

broker.id

1

2

3

listeners

PLAINTEXT://192.168.0.117:9091

PLAINTEXT://192.168.0.117:9092

PLAINTEXT://192.168.0.117:9093

advertised.listeners

PLAINTEXT://192.168.0.117:9091

PLAINTEXT://192.168.0.117:9092

PLAINTEXT://192.168.0.117:9093

log.dirs

kafka-1/data

kafka-2/data

kafka-3/data

zookeeper.connect

192.168.0.xxx:2181

192.168.0.xxx:2181

192.168.0.xxx:2181

搭建脚本附上:

install_kafka() {
echo "安装kafka集群"
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -zxvf kafka_2.12-2.8.1.tgz
mv kafka_2.12-2.8.1 kafka-1
cp -r kafka-1 kafka-2
cp -r kafka-1 kafka-3
mkdir kafka-{1,2,3}/data
BasePath=$(pwd)
for (( i = 1; i <= 3; i++ )); do
#statements
sed -i "/^broker.id/cbroker.id=$i" kafka-$i/config/server.properties
# #listeners=PLAINTEXT://:9092
sed -i "/^#listeners/clisteners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties
# #advertised.listeners=PLAINTEXT://your.host.name:9092
sed -i "/^#advertised.listeners/cadvertised.listeners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties
# log.dirs=/tmp/kafka-logs
sed -i "/^log.dirs/clog.dirs=$BasePath\/kafka-$i\/data" kafka-$i/config/server.properties
# zookeeper.connect=localhost:2181
sed -i '/^zookeeper.connect=/czookeeper.connect=192.168.0.117:2181' kafka-$i/config/server.properties
done
}

start() {
echo "启动kafka集群"
for (( i = 1; i <= 3; i++ )); do
./kafka-$i/bin/kafka-server-start.sh -daemon ./kafka-$i/config/server.properties
done
}

stop() {
echo "停止kafka集群"
for (( i = 1; i <= 3; i++ )); do
./kafka-$i/bin/kafka-server-stop.sh
done
}

echo "Kafka集群脚本"
case $1 in
install)
install_kafka
;;
start)
start
;;
stop)
stop
;;
esac

上面演示的是伪集群搭建,学习Kafka足够了,在实际使用的推荐先搭建zookeeper集群在搭建kafka集群。


标签:副本,--,zookeeper,broker,kafka,topic,集群,Kafka,搭建
From: https://blog.51cto.com/luckyqilin/5952261

相关文章

  • Kafka知识总结之Broker原理总结
    简介这篇文章介绍Kafka的Broker工作流程,包括其中控制器的选举过程;kafka副本的leader选举以及leader和follower故障流程;简单讲述了生产环境中如何调整分区副本;kafka的文件存......
  • Kafka知识总结之生产者简单使用
    一.测试环境搭建引入依赖:<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency>创......
  • Centos7搭建maven私服nexus3
    安装难度: ★☆☆☆☆繁琐程度:★☆☆☆☆易错程度:★☆☆☆☆一准备二安装下载安装包:方式一:nexus: ​​https://www.sonatype.com/download-oss-sonatype​​方式二:......
  • 本地搭建bootlin elixir查阅内核代码
    转自:https://barryx.cn/build_bootlin_elixir平时经常使用elixir.bootlin.com查看内核源码,很方便。但是苦于该站点服务器在国外,国内用起来很卡很慢,所以想着自己在本地搭......
  • Kafka数据可靠性探究
    概述Kafka作为商业级消息中间件,消息的可靠性保障是非常重要。那Kafka是怎么保障消息的可靠性的呢?上图是Kafka的消息发送基础架构,一条消息的完整生命周期是:生产者发送消息至K......
  • 互联网医院系统开发搭建,互联网医疗行业有几大趋势呢?|互联网医院系统源码
    随着互联网医院系统的开发搭建,互联网医疗行业有几大趋势呢?疫情期间,互联网全面“战役”,在线问诊量大增,行业逆势大涨,“互联网+医疗”展现了新的发展趋势,互联网医疗市场发展迅速......
  • 服务器集群使用过程中遇到的一些问题
    自己参与开发的项目,在测试环境只有一台服务器,每次部署的时候只需要部署单台服务器。可是生产环境则不一样,生产环境部署了10台左右的应用服务,跑起来的效果和单台服务......
  • PyTorch中利用LSTMCell搭建多层LSTM实现时间序列预测
    OverridetheentrypointofanimageIntroducedinGitLabandGitLabRunner9.4.Readmoreaboutthe extendedconfigurationoptions.Beforeexplainingtheav......
  • Golang 环境搭建
    简述『Golang』(Go语言,以下简称Go)是Google开发的一种编译型、可并行化、并具有垃圾回收功能的编程语言。罗布·派克(RobPike)、罗伯特·格瑞史莫(RobertGriesemer)、及肯·......
  • frp服务器搭建
    想弄一个自己的访问学校内网资源的代理服务器,于是乎...求助群友..不啦不啦第一步用自己前几天买的服务器搭建frp内网穿透完全按照这个博客搞的.记得在服务器管理界面开......