首页 > 其他分享 >【ALL】Kafka从抬脚到入门

【ALL】Kafka从抬脚到入门

时间:2024-04-12 13:12:20浏览次数:18  
标签:入门 ## broker server kafka -- sh Kafka 抬脚

一、Kafka简介

1.1、定义

  • 旧定义
    Kafka 是一个分布式的基于发布/订阅模式的消息队列。
  • 新定义
    Kafka 是一个开源的分布式事件流平台,用于数据管道、流分析、数据集成和关键任务的应用。

1.2、使用场景

主要用于大数据实时处理领域。

  • 缓冲: 有助于控制和优化数据流经过系统的速度。
  • 消峰: 在访问量剧增的情况下,保证应用不会因突发的超负荷请求而崩溃。
  • 解耦: 保证程序的可扩展性。
  • 异步通信:

1.3、使用模式

  1. 点对点: 消费者主动拉取消息,消息收到后清除消息。
  2. 发布/订阅模式: 可以有多个topic主题;消息被消费者消费后不会删除;每个消费者相互独立,都可以消费到数据。

1.4、基础架构及概念

最简单的流程:生产者 -> 消息主题 -> 消费者。

考虑到一台主机可能存不下 海量的消息 数据,引入分区,将topic进行 分区
考虑到分区数据的高可用,引入副本,可为每个分区创建副本;
考虑到一个消费者消费信息的瓶颈,引入 消费者组

  1. Producer: 消息生产者,向 Kafka broker 发送消息。
  2. Consumer: 消息消费者,从 Kafka broker 消费消息。
  3. Consumer Group(CG):消费者组,由多个Consumer组成。
    消费者组与消费者组之间对消息的消费互不影响;
    消费者组内的消费者之间,不能消费同一个Topic的同一分区数据,即一个Topic的一个分区只能由同一消费者组内的一个消费者消费。
  4. Broker: 一台Kafka服务器。一个broker可容纳多个topic。一个集群由多个broker组成。
  5. Topic: 消息主题,Kafka将一组消息抽象归纳为一个主题。主题就是对消息的分类,生产者将消息发送到特定的主题,消费者订阅主题或是主题的某些分区进行消费。
  6. Parition: 一个topic可分为多个Parition,每个partition是一个有序队列。
  7. Replica: 分区副本。每个Parition可设置若干副本。 每个分区的所有副本有且只有一个Leader,其他为Follower。
  8. Leader: 分区Leader副本,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
  9. Follower:分区Leader副本,实时从Leader中同步数据,当Leader故障时,某个follower将成为新的Leader。
  10. Message:消息。消息是Kafka通信的基本单位,由固定长度的消息头和可变长度的消息体构成。在java重新实现的客户端中,Message被称之为Record。

二、Kafka安装

2.1、准备环境

  1. Linux操作系统
  2. Java运行环境(1.8或以上)
  3. zookeeper 集群环境,可参照 Zookeeper集群部署
  4. 服务器列表:

2.2、准备安装介质

分别登录server1、server2、server3执行,操作、配置相同:

##更新或安装wget命令
yum -y install wget
##创建安装目录
mkdir -p /usr/local/services/kafka
##获取安装包kafka_2.12-3.0.0.tgz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz
##解压缩kafka_2.12-3.0.0.tgz
tar -zxvf kafka_2.12-3.0.0.tgz

2.3、修改环境配置

  1. 分别登录server1、server2、server3添加主机名,操作、配置相同:
vi /etc/hosts

##添加如下内容
168.5.7.75 server1
168.5.7.76 server2
168.5.7.77 server3

:wq
  1. 分别登录server1、server2、server3添加kafka环境变量,操作、配置相同
vi ~/.bash_profile

##添加如下内容
export KAFKA_HOME=/usr/local/services/kafka/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin

:wq

source /etc/profile

说明:再任一路径下输入 kafka 按 Tab 键后会补全 Kafka 相关脚本.sh,即Kafka 环境变量配置成功。 因 Kafka 脚本运行时会加载 /config 路径下的相关配置文件,故当不在 Kafka 安装目录 bin 下执行相关脚本时, 需要指定配置文件绝对路径

2.4、修改Kafka配置

登录server1执行操作:

cd $KAFKA_HOME/config
cp server.properties server.properties.$(date +%Y%m%d)
vi server.properties

## 修改文件内配置如下
## broker 的全局唯一编号,不能重复,只能是数字
broker.id=1
## kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/data/kafka-logs
port=9093
## 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=server1:2181,server2:2181,server3:2181/kafka

:wq

登录server2执行操作:

cd $KAFKA_HOME/config
cp server.properties server.properties.$(date +%Y%m%d)
vi server.properties

## 修改文件内配置如下
## broker 的全局唯一编号,不能重复,只能是数字
broker.id=2
## kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/data/kafka-logs
port=9093
## 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=server1:2181,server2:2181,server3:2181/kafka

:wq

登录server3执行操作:

cd $KAFKA_HOME/config
cp server.properties server.properties.$(date +%Y%m%d)
vi server.properties

## 修改文件内配置如下
## broker 的全局唯一编号,不能重复,只能是数字
broker.id=3
## kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/data/kafka-logs
port=9093
## 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=server1:2181,server2:2181,server3:2181/kafka

:wq

2.5、Kafka服务启停

## 默认 zookeeper 集群启动成功,查看命令:./zkServer.sh status

## 启动命令
sh  ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

## 停止命令
sh ${KAFKA_HOME}/bin/kafka-server-stop.sh 

2.6、Kafka集群启停脚本

说明:本脚本基于SSH服务器免密登录,如集群未配置SSH,参照:《SSH安装配置》 。

  • 启动脚本:start-kafka-cluster.sh
#!/bin/bash
brokers="server1 server2 server3"
KAFKA_HOME="/usr/local/services/kafka/kafka_2.11-2.3.0"
KAFKA_NAME="kafka_2.11-2.3.0"

echo "INFO : Begin to start kafka cluster ..."

for broker in $brokers
do
  echo "INFO : Starting ${KAFKA_NAME} on ${broker} ..."
  ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties"
  if [[ $? -eq 0 ]]; then
      echo "INFO:[${broker}] Start successfully"
  fi
done
echo "INFO:Kafka cluster starts successfully !"

为脚本添加执行权限:

chmod a+x start-kafka-cluster.sh
  • 停止脚本:stop-kafka-cluster.sh
#!/bin/bash
brokers="server1 server2 server3"
KAFKA_HOME="/usr/local/services/kafka/kafka_2.11-2.3.0"
KAFKA_NAME="kafka_2.11-2.3.0"

echo "INFO : Begin to stop kafka cluster ..."
for broker in $brokers
do
  echo "INFO : Shut down ${KAFKA_NAME} on ${broker} ..."
  ssh ${broker} "source /etc/profile;bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"
  if [[ $? -ne 0 ]]; then
      echo "INFO : Shut down ${KAFKA_NAME} on ${broker} is down"
  fi
done

echo "INFO : kafka cluster shut down completed!"

为脚本添加执行权限:

chmod a+x stop-kafka-cluster.sh

三、Kafka-Shell API

3.1、主题相关

${KAFKA_HOME}/bin/kafka-topics.sh
## 参数详解
## --bootstrap-server <io:port>  指定连接 kafka-broker 节点
## --topic <topic_name>          指定操作的 主题
## --create                      指定对主题的操作:新增
## --delete                      指定对主题的操作:删除
## --alter                       指定对主题的操作:编辑
## --list                        指定对主题的操作:查看列表
## --describe                    指定对主题的操作:查看详情
## --partitions <num>            设置主题分区数。
## -- replication-factor <num>   设置分区副本数。 
## --config <key-value>          更改系统默认设置

## 案例:查看集群中所有 topic
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --list
## 案例:创建一个名为 hello 的主题,其分区数为:1,副本数为:3。
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --create --partitions 1 --replication-factor 3 --topic hello
## 案例:查看主题 hello 详情
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --describe  --topic hello
## 案例:修改主题 hello 分区数(注意:通过alter命令只能增加分区数,不能减少)
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --alter --topic hello --partitions 3
##  案例:删除主题 hello
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --delete --topic hello

3.2、生产者相关

${KAFKA_HOME}/bin/kafka-console-producer.sh
## 参数详解
## --bootstrap-server <io:port>  指定连接 kafka-broker 节点
## --topic <topic_name>          指定操作的 主题

##  案例:生产者连接 broker 生产消息
kafka-console-producer.sh --bootstrap-server 168.5.7.75:9092 --topic hello
## 进入命令行后发送消息即可
>

3.3、消费者相关

${KAFKA_HOME}/bin/kafka-console-consumer.sh
## 参数详解
## --bootstrap-server <io:port>  指定连接 kafka-broker 节点
## --topic <topic_name>          指定操作的 主题
## –from-beginning               从头开始消费。
## –group <group_id>             指定消费者组名称。

##  案例:消费者连接 broker 消费消息,默认只消费连接之后生产的新消息
kafka-console-consumer.sh --bootstrap-server 168.5.7.75:9092 --topic hello
##  案例:消费者连接 broker 消费消息,指定从主题中的第一条消息开始消费
kafka-console-consumer.sh --bootstrap-server 168.5.7.75:9092 --from-beginning --topic hello

四、Kafka-Java API

TODO:参考gitee项目地址。

五、Kafka详解

5.1、生产者

标签:入门,##,broker,server,kafka,--,sh,Kafka,抬脚
From: https://www.cnblogs.com/DeepInThought/p/18130238

相关文章

  • 网工入门-中小型网络系统综合实验
    一、实验要求1.网络中有3个不同部门,均可自动获取地址2.各部门可互相访问,也可访问内网服务器172.16.100.13.PC1不允许访问互联网,PC2和PC3可以访问互联网4.内网服务器对外发布的地址为64.1.1.3,互联网用户可以访问这台服务器5.内网服务器的域名是www.aaa.co......
  • 网工入门-基础操作
    路由器系统视图:system-view选择网口:interfaceGigabitEthernet0/0/1配置ip:ipaddress192.168.1.1255.255.255.0保存配置:save打开dhcp:dhcpenable选择网口:interfaceGigabitEthernet0/0/1dhcp网口:dhcpselectinterface查看路由表:displayiprouting-table修改设备名......
  • Kafka做消息队列的原理
    Kafka作为消息队列的实现原理主要基于其分布式架构和日志式存储机制。以下是Kafka作为消息队列工作的核心原理:1.分布式架构与分区:Kafka采用分布式架构,将数据分布存储在多个节点(称为Broker)上,以实现数据的水平扩展和并行处理。Kafka中的消息流被组织成主题(Topic),每个主题可以包......
  • LaTeX语法入门
    引言TeX是由DonaldKnuth创造的基于底层编程语言的电子排版系统(TEX是Honeywell公司在1980年为其TextExecutive文本处理系统注册的商标,它与TeX是两回事)。使用TeX可以对版面做精细的操作,生成精美的文档。TeX提供版面编辑的底层命令,但是使用时操作复杂,工作效率不高.TeX允许用这......
  • Docker从入门到精通:ubuntu系统安装docker
    简介上一篇文章我们介绍了docker的发展历史,以及docker的强大应用,本篇文章我们将正式进入docker的学习,讲解如何安装docker。docker基本组成Image(镜像):镜像就好比一个模板,我们可以通过这个模板来创建容器服务,mysql镜像=>run>mysql5.2容器(提供服务器),通过这个镜像可以创建多个容......
  • Kaggle自然语言处理入门 推特灾难文本分类 Natural Language Processing with Disaste
    和新闻按照标题分类差不多,用的朴素贝叶斯#导入必要的包importrandomimportsysfromsklearnimportmodel_selectionfromsklearn.naive_bayesimportMultinomialNBimportjoblibimportre,stringimportpandasaspdimportnumpyasnpdeftext_to_words(file_path)......
  • 消息中间件RabbitMQ_RabbitMQ快速入门3
    一、入门程序需求:使用简单模式完成消息传递步骤:1.创建工程(生成者、消费者)2.分别添加依赖3.编写生产者发送消息4.编写消费者接收消息 二、小结上述的入门案例中其实使用的是如下的简单模式:在上图的模型中,有以下概念:P:生产者,也就是要发送消......
  • 【C++】C++入门
    C++入门什么是缺省参数?缺省参数的语法示例:使用缺省参数注意事项什么是函数重载?函数重载的语法示例:使用函数重载注意事项什么是引用?引用的语法引用的特点示例:使用引用注意事项什么是内联函数?内联函数的优势内联函数的语法示例:使用内联函数注意事项什么是缺省参数......
  • Shell脚本编程入门技能
    Shell脚本编程入门技能Shell脚本的概念Shell是一个命令解释器,它的作用是解释执行用户命令及程序等,用户每输入一条命令,shell就执行一条。这种从键盘输入命令,就可以得到回应的对话方式,称为交互的方式。当命令或程序语句不在命令行下执行,而是通过一个程序文件来执行时,该程序......
  • docker-compose部署kafka
    docker-compose.ymlversion:'2'services:zookeeper:image:develop-harbor.geostar.com.cn/3rd/zookeeper:3.5.5ports:-"2181:2181"kafka:image:develop-harbor.geostar.com.cn/3rd/wurstmeister/kafka:2.12-2.2.1......