一、Kafka简介
Kafka是一个高吞吐量的分布式的发布--订阅消息系统,可以处理大量的数据,并将消息从一个端点传递到另一个端点。同时Kafka还能将消息保存在磁盘上并在集群内复制以防数据丢失。
二、Kafka的优势
- 可靠性:Kafka是分布式、分区、复制和容错的。
- 扩展性:可结合Zookeeper实现动态扩容。
- 高吞吐量:Kafka对于发布和订阅消息都有很高的吞吐量,即使在非常廉价的机器上,Kafka也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒。
- 持久性:Kafka可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异。
- 容错性:Kafka会将数据备份到多台服务器节点中,即使Kafka集群中的某一台Kafka服务节点宕机,也不会影响整个系统的功能。
三、Kafka的角色
Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
Topic
每条发布到Kafka集群的消息都有一个类别(主题),这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。Topic在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的Topic才能消费Topic中的消息。
Parition
topic中的数据可分割为一个或多个partition,每个topic至少有一个partition,而每个parition只属于一个topic。分区的作用是做负载,提升kafka的吞吐量。创建topic时可指定parition数量,每个paritiion对应于一个文件夹,该文件夹下存储该parition的数据和索引文件。为了实现数据的高可用,比如将分区01的数据分散到不同的kafka节点,对于每个分区都有一个broker作为leader,其它的broker作为follower。同一个 Topic 在不同的分区的数据是不重复的。建议分区的数量正好等于服务器总数。
分区的优势:
1.实现存储空间的横向扩容,将多个kafka节点的空间结合利用。
2.提升性能,多服务器读写。
3.实现高可用。通常分区的leader分布在不同的节点,比如01分区的leader为01节点,则对于分区01而言,02节点和03节点为01节点的follower;02分区的leader为02节点,则对于分区02而言,01节点和03节点为02节点的follower;03分区的leader为03节点,则对于分区03而言,01节点和02节点为03节点的follower。
打个比方:一个Topic里面有3G的数据,分3个分区存储,每个分区平均存1G数据。分区分散在3个节点,且每个分区存储的数据不相同。如果每个分区配置2个副本,则这3G的数据要写3份(2份副本+1份原始数据),共占据9G的空间。
具体可参考下图
Replication
副本,是Kafka保证数据高可用的方式,Kafka同一Partition的数据可以在多Broker上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在broker崩溃或发生网络异常,Kafka会重新选择新的主副本对外提供读写服务。
Producer
生产者,负责发布消息到broker。
Consumer
消费者,负责消费消息。每个Comsumer属于一个Consumer group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。使用comsumer high level API时,同一分区的一条消息只能被同一个consumer group内的一个consumer消费,但多个comsumer group可同时消费这一消息。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据。
还是结合上图打个比方:生产者在一个topic里面发布了3条消息,分别为消息1~3,这3条消息分别隶属3个分区,3个分区可分别命名为01、02、03。现在有两个消费者组A、B要消费消息。当消息1(隶属01分区)被A组的消费者01消费时,A组的其它消费者不能再消费消息1,但B组的其中一个消费者也可以消费消息1。而A组的消费者可以消费02和03分区的消息(消息2和消息3)。
四、Kafka集群部署
4.1 各个节点部署Kafka
建议Kafka与Zookeeper最好部署在同一批服务器上。仍然使用三台机器,IP地址为192.168.131.11~13,与部署Zookeeper使用同一批机器
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.4.0/kafka_2.12-3.4.0.tgz
tar -xf kafka_2.12-3.4.0.tgz
mv kafka_2.12-3.4.0 /usr/local/kafka
cd /usr/local/kafka/config
#创建数据目录
mkdir /usr/local/kafka/data
##修改主配置文件
vim server.properties
#每个broker在集群中的唯一标识(不同机器id不一样)
broker.id=1
#监听地址(本机ip:9092)
listeners=PLAINTEXT://192.168.131.11:9092
#Kafka保存数据的目录
log.dirs=/usr/locak/kafka/data
#设置新的topic的默认分区数
num.partitinotallow=3
#设置Kafka的数据保存时间,默认为168h即7天
log.retention.hours=168
#指定连接的Zookeeper地址,Kafka基于Zookeeper实现高可用
zookeeper.cnotallow=192.168.131.11:2181,192.168.131.12:2181,192.168.131.13:2181
#设置连接Zookeeper的超时时间(5s)
zookeeper.connection.timeout.ms=5000
4.2 各节点启动Kafka
注意:先启动Zookeeper在启动Kafka
#以后台方式启动
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
4.3 验证Zookeeper中的Kafka元数据
- Broker依赖于Zookeeper,每个Broker的id和Topic、Parition这些元数据信息都会写入Zookeeper的ZNode节点中
- 在Kafka0.9之前,Consumer每消费完一条消息,会将产生的offset保存到Zookeeper中,下次消费在当前offset往后继续消费;Kafka0.9之后offset保存在本地
4.4 测试Kafka读写数据
4.4.1 创建topic
#创建名为wxd,分区为3,每个分区的副本数为3的topic
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.131.11:9092 --partitions 3 --replication-factor 3 --topic wxd
新版Kafka的启动方式是加--bootstrap-server,以往的--zookeeper启动方式不再适用了。这也是zookeeper is not a recognized option错误产生的原因。
4.4.2 查看已有的topic
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.131.11:9092
wxd
4.4.3 查询topic的详细信息
/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.131.11:9092 --topic wxd
状态说明:名为wxd的topic有3个分区分别为0、1、2,这三个分区的Leader均为3(这里的3指broker.id,对应192.168.131.13机器)。每个分区都有三个副本,且状态均为lsr(ln-sync,表示可以参加选举成为leader)
4.4.4 测试向指定topic发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.131.11:9092,192.168.131.12:9092,192.168.131.13:9092 --topic wxd
4.4.5 测试获取指定topic的消息
#--from-beginning:从最早的消息开始获取。如果不加该选项则获取最新数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.131.13:9092 --topic wxd --from-beginning
4.4.6 删除topic
/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.131.11:9092 --topic wxd
五、拓展:用Python批量向Kafka写入数据
脚本内容如下
from pykafka import KafkaClient
#Kafka地址
client = KafkaClient(hosts="192.168.131.11:9092,192.168.131.12:9092,192.168.131.13:9092")
#topic的名字
topic = client.topics['wxd']
with topic.get_sync_producer() as producer:
for i in range(10):
producer.produce(('test message ' + str(i)).encode())
标签:实战,研究,分区,192.168,kafka,topic,--,Kafka
From: https://blog.51cto.com/u_15796303/6204040