首页 > 其他分享 >Kafka的研究&实战

Kafka的研究&实战

时间:2023-04-18 20:31:31浏览次数:33  
标签:实战 研究 分区 192.168 kafka topic -- Kafka

一、Kafka简介

Kafka是一个高吞吐量的分布式的发布--订阅消息系统,可以处理大量的数据,并将消息从一个端点传递到另一个端点。同时Kafka还能将消息保存在磁盘上并在集群内复制以防数据丢失。

二、Kafka的优势

  • 可靠性:Kafka是分布式、分区、复制和容错的。
  • 扩展性:可结合Zookeeper实现动态扩容。
  • 高吞吐量:Kafka对于发布和订阅消息都有很高的吞吐量,即使在非常廉价的机器上,Kafka也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒。
  • 持久性:Kafka可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异。
  • 容错性:Kafka会将数据备份到多台服务器节点中,即使Kafka集群中的某一台Kafka服务节点宕机,也不会影响整个系统的功能。

三、Kafka的角色

Broker

Kafka 集群包含一个或多个服务器,服务器节点称为broker。

Topic

每条发布到Kafka集群的消息都有一个类别(主题),这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。Topic在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的Topic才能消费Topic中的消息。

Parition

topic中的数据可分割为一个或多个partition,每个topic至少有一个partition,而每个parition只属于一个topic。分区的作用是做负载,提升kafka的吞吐量。创建topic时可指定parition数量,每个paritiion对应于一个文件夹,该文件夹下存储该parition的数据和索引文件。为了实现数据的高可用,比如将分区01的数据分散到不同的kafka节点,对于每个分区都有一个broker作为leader,其它的broker作为follower。同一个 Topic 在不同的分区的数据是不重复的。建议分区的数量正好等于服务器总数。

分区的优势:
1.实现存储空间的横向扩容,将多个kafka节点的空间结合利用。

2.提升性能,多服务器读写。

3.实现高可用。通常分区的leader分布在不同的节点,比如01分区的leader为01节点,则对于分区01而言,02节点和03节点为01节点的follower;02分区的leader为02节点,则对于分区02而言,01节点和03节点为02节点的follower;03分区的leader为03节点,则对于分区03而言,01节点和02节点为03节点的follower。

      打个比方:一个Topic里面有3G的数据,分3个分区存储,每个分区平均存1G数据。分区分散在3个节点,且每个分区存储的数据不相同。如果每个分区配置2个副本,则这3G的数据要写3份(2份副本+1份原始数据),共占据9G的空间。
      具体可参考下图

Kafka的研究&实战_kafka

Replication

副本,是Kafka保证数据高可用的方式,Kafka同一Partition的数据可以在多Broker上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在broker崩溃或发生网络异常,Kafka会重新选择新的主副本对外提供读写服务。

Producer

生产者,负责发布消息到broker。

Consumer

消费者,负责消费消息。每个Comsumer属于一个Consumer group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。使用comsumer high level API时,同一分区的一条消息只能被同一个consumer group内的一个consumer消费,但多个comsumer group可同时消费这一消息。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据。
      还是结合上图打个比方:生产者在一个topic里面发布了3条消息,分别为消息1~3,这3条消息分别隶属3个分区,3个分区可分别命名为01、02、03。现在有两个消费者组A、B要消费消息。当消息1(隶属01分区)被A组的消费者01消费时,A组的其它消费者不能再消费消息1,但B组的其中一个消费者也可以消费消息1。而A组的消费者可以消费02和03分区的消息(消息2和消息3)。

四、Kafka集群部署

4.1 各个节点部署Kafka

建议Kafka与Zookeeper最好部署在同一批服务器上。仍然使用三台机器,IP地址为192.168.131.11~13,与部署Zookeeper使用同一批机器

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.4.0/kafka_2.12-3.4.0.tgz
tar -xf kafka_2.12-3.4.0.tgz
mv kafka_2.12-3.4.0 /usr/local/kafka
cd /usr/local/kafka/config
#创建数据目录
mkdir /usr/local/kafka/data
##修改主配置文件
vim server.properties
#每个broker在集群中的唯一标识(不同机器id不一样)
broker.id=1
#监听地址(本机ip:9092)
listeners=PLAINTEXT://192.168.131.11:9092
#Kafka保存数据的目录
log.dirs=/usr/locak/kafka/data
#设置新的topic的默认分区数
num.partitinotallow=3
#设置Kafka的数据保存时间,默认为168h即7天
log.retention.hours=168
#指定连接的Zookeeper地址,Kafka基于Zookeeper实现高可用
zookeeper.cnotallow=192.168.131.11:2181,192.168.131.12:2181,192.168.131.13:2181
#设置连接Zookeeper的超时时间(5s)
zookeeper.connection.timeout.ms=5000

4.2 各节点启动Kafka

注意:先启动Zookeeper在启动Kafka

#以后台方式启动
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

4.3 验证Zookeeper中的Kafka元数据

  • Broker依赖于Zookeeper,每个Broker的id和Topic、Parition这些元数据信息都会写入Zookeeper的ZNode节点中
  • 在Kafka0.9之前,Consumer每消费完一条消息,会将产生的offset保存到Zookeeper中,下次消费在当前offset往后继续消费;Kafka0.9之后offset保存在本地

Kafka的研究&实战_kafka_02

4.4 测试Kafka读写数据

4.4.1 创建topic

#创建名为wxd,分区为3,每个分区的副本数为3的topic
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.131.11:9092 --partitions 3 --replication-factor 3 --topic wxd

新版Kafka的启动方式是加--bootstrap-server,以往的--zookeeper启动方式不再适用了。这也是zookeeper is not a recognized option错误产生的原因。

4.4.2 查看已有的topic

/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.131.11:9092
wxd

4.4.3 查询topic的详细信息

/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.131.11:9092 --topic wxd

Kafka的研究&实战_kafka_03

状态说明:名为wxd的topic有3个分区分别为0、1、2,这三个分区的Leader均为3(这里的3指broker.id,对应192.168.131.13机器)。每个分区都有三个副本,且状态均为lsr(ln-sync,表示可以参加选举成为leader)

4.4.4 测试向指定topic发送消息

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.131.11:9092,192.168.131.12:9092,192.168.131.13:9092 --topic wxd

Kafka的研究&实战_kafka_04

4.4.5 测试获取指定topic的消息

#--from-beginning:从最早的消息开始获取。如果不加该选项则获取最新数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.131.13:9092 --topic wxd --from-beginning

Kafka的研究&实战_kafka_05

Kafka的研究&实战_kafka_06

4.4.6 删除topic

/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.131.11:9092 --topic wxd

五、拓展:用Python批量向Kafka写入数据

脚本内容如下

from pykafka import KafkaClient
#Kafka地址
client = KafkaClient(hosts="192.168.131.11:9092,192.168.131.12:9092,192.168.131.13:9092")
#topic的名字
topic = client.topics['wxd']
with topic.get_sync_producer() as producer:
	for i in range(10):
  	producer.produce(('test message ' + str(i)).encode())

标签:实战,研究,分区,192.168,kafka,topic,--,Kafka
From: https://blog.51cto.com/u_15796303/6204040

相关文章

  • Go接入kafka
    需要借助的库github.com/Shopify/sarama//kafka主要的库*github.com/bsm/sarama-cluster//kafka消费组生产者packageproducerimport( "fmt" "github.com/HappyTeemo7569/teemoKit/tlog" "github.com/Shopify/sarama" "kafkaDemo/define"......
  • vue3微信公众号商城项目实战系列(7)自定义底部tabbar组件
    在开始之前,先看看官方对组件的定义: vue3的生态非常丰富,有各种各样的开源组件库可以拿来就用,比如vant、element-ui等,本系列不使用任何第3方组件,完全使用原生的语法来写,只为聚焦vue3技术本身,本篇写一个自定义tabbar组件,效果如下图所示:要实现如下功能:1.底部tab项固定3个:首页......
  • 【敲敲云】零代码实战,主子表汇总统计—免费的零代码产品
    近来很多朋友在使用敲敲云时,不清楚如何使用主子表,及如何在主表中统计子表数据;下面我们就以《订单》表及《订单明细》表来设计一下吧,用到的组件有“设计子表”、“公式”、“汇总”等。《订单》表展示总金额=订单明细中“小计”求和小计=单价*数量首选我们打开敲敲云......
  • Sentinel实战
    一、Sentinel简介Sentinel是阿里开源的面向服务流量治理的框架,官方原文是Sentinel是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性......
  • 【LeetCode动态规划#07】01背包问题一维写法(状态压缩)实战,其二(目标和、零一和)
    目标和(放满背包的方法有几种)力扣题目链接(opensnewwindow)难度:中等给定一个非负整数数组,a1,a2,...,an,和一个目标数,S。现在你有两个符号+和-。对于数组中的任意一个整数,你都可以从+或-中选择一个符号添加在前面。返回可以使最终数组和为目标数S的所有添加符号的......
  • 3、ShardingSphere实战(三)
    一、前言:本项目按照时间字段进行分表,需要提前将主表写入数据库优势:1、实现自动建表,且不需要配置SQL2、范围分表查询时自动排除不存在的表 二、项目实战:1、创建主表:CREATETABLE`t_user`(`id`bigint(32)NOTNULL,`name`varchar(255)DEFAULTNULL,`create_a......
  • ssh基于key验证的实战
    实现目标实现同网段4-254的所有主机打通基于key验证的ssh登录点击查看代码PASS=123#设置网段最后的地址,4-255之间,越小扫描越快END=254IP=`ipaseth0|awk-F'[/]+''NR==3{print$3}'`NET=${IP%.*}../etc/os-releaserm-f/root/.ssh/id_rsa[-e./SCANIP.log......
  • 面试题百日百刷-kafka篇(四)
    锁屏面试题百日百刷,每个工作日坚持更新面试题。****请看到最后就能获取你想要的,接下来的是今日的面试题:1.为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?Kafka是分布式消息系统,需要处理海量的消息,Kafka的设计是把所有的消息都写入速度低容量大的硬盘,......
  • 基于遗传算法的最优潮流 以IEEE30节点的输电网为研究对象 以系统发电成本最小为目标函
    基于遗传算法的最优潮流 以IEEE30节点的输电网为研究对象以系统发电成本最小为目标函数以机组出力为优化变量其中出力与成本的关系是经典的二次函数关系 通过优化求解得到最佳机组出力ID:2550672838253871......
  • vue3微信公众号商城项目实战系列(6)用户登录
    1.一个商城要实现购物的功能,需要能识别用户的身份,这样才能完成加购物车,下单,付款等操作。但微信公众号商城和PC端商城有些不一样,区别在于微信公众号商城使用微信支付的时候需要一个openid的参数(以后再具体讲)这个参数必须访问微信公众号提供的接口才能获取到,基于这个原因,用户登录......