首页 > 其他分享 >如何安装配置kafka

如何安装配置kafka

时间:2023-01-20 14:31:32浏览次数:69  
标签:log -- 配置 bigdata kafka topic 安装 kfk


最近项目需要用到kafa进行数据流处理,下面将安装部署kafka的方法简单介绍下。

1:配置java环境

修改/etc/bashrc文件,添加JAVA_HOME
cat /etc/bashrc
export JAVA_HOME=/root/jdk-11.0.16.1
export PATH=如何安装配置kafka_kafkaJAVA_HOME/bin:.

2:下载Kafka

​https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz​

3:解压缩

tar -xzf kafka_2.13-3.3.1.tgz

cd kafka_2.13-3.3.1

4:启动zookeeper

Cd zookeeper的目录,执行如下命令
$ bin/zookeeper-server-start.sh config/zookeeper.properties

5:单机版kafka启动

bin/kafka-server-start.sh config/server.properties
成功启动所有服务后,您将拥有一个基本的 Kafka 环境运行并可供使用。

6:配置kafka集群

cd config/
vi server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
############################# Socket Server Settings #############################

#The address the socket server listens on. If not configured, the host name will be equal to the value of
#java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#FORMAT:
#listeners = listener_name://host_name:port
#EXAMPLE:
#listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
#Listener name, hostname and port the broker will advertise to clients.
#If not set, it uses the value for “listeners”.
#advertised.listeners=PLAINTEXT://your.host.name:9092

#Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

#The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

#The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

#The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

#The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

#The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################

#A comma separated list of directories under which to store log files
log.dirs=/home/kfk/kafka_2.13-3.3.1/logs

#The default number of log partitions per topic. More partitions allow greater
#parallelism for consumption, but this will also result in more files across
#the brokers.
num.partitions=1

#The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
#This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
#The replication factor for the group metadata internal topics “__consumer_offsets” and “__transaction_state”
#For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

#Messages are immediately written to the filesystem but by default we only fsync() to sync
#the OS cache lazily. The following configurations control the flush of data to disk.
#There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
#The settings below allow one to configure the flush policy to flush data after a period of time or
############################# Log Retention Policy #############################

#The following configurations control the disposal of log segments. The policy can
#be set to delete segments after a period of time, or after a given size has accumulated.
#A segment will be deleted whenever either of these criteria are met. Deletion always happens
#from the end of the log.

#The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

#A size-based retention policy for logs. Segments are pruned from the log unless the remaining
#segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

#The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

#The interval at which log segments are checked to see if they can be deleted according
#to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
#Zookeeper connection string (see zookeeper docs for details).
#This is a comma separated host:port pairs, each corresponding to a zk
#server. e.g. “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”.
#You can also append an optional chroot string to the urls to specify the
#root directory for all kafka znodes.
#配置连接Zookeeper集群地址
zookeeper.connect=bigdata-pro01:2181,bigdata-pro02:2181,bigdata-pro03:2181,bigdata-pro04:2181,bigdata-pro05:2181

#Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

#The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
#The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
#The default value for this is 3 seconds.
#We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
#However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

6.1 配置KAFKA_HOME变量

export KAFKA_HOME=/home/kfk/kafka_2.13-3.3.1

6.2 分发配置好的kafa包

tar cvf kafka_2.13-3.3.1.tar kafka_2.13-3.3.1
scp kafka_2.13-3.3.1.ta kfk@bigdata-pro02:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro02:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro03:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro04:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro05:/home/kfk/

6.2 在其他节点修改broker.id

将broker.id 顺序递增

6.3 启动集群

在集群的服务器中进入到kafka的安装目录执行如下命令:
bin/kafka-server-start.sh -daemon config/server.properties
这样就完成了kafa的启动

6.4 关闭集群

在集群服务器中进入到kafka的安装目录执行如下命令:
bin/kafka-server-stop.sh stop
就可以关闭kafka服务。

7:kafa基本操作

1)查看当前服务器中的所有topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper bigdata-pro01:2181 --list

2)创建topic

[kfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数

3)删除topic

[kfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。

4)发送消息

[kfk@bigdata-pro01 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

hello world
atguigu atguigu

5)消费消息

[kfk@bigdata-pro01 kafka]$ bin/kafka-console-consumer.sh
–zookeeper hadoop102:2181 --topic first

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
–bootstrap-server hadoop102:9092 --topic first

[kfk@bigdata-pro01 kafka]$ bin/kafka-console-consumer.sh
–bootstrap-server hadoop102:9092 --from-beginning --topic first
–from-beginning:会把主题中以往所有的数据都读取出来。

6)查看某个Topic的详情

[akfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

7)修改分区数

[kfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6


标签:log,--,配置,bigdata,kafka,topic,安装,kfk
From: https://blog.51cto.com/u_15942605/6020648

相关文章

  • Nginx配置将二级域名解析到Linux服务器指定端口
    最近几天用Python写了个接口后台服务,把它部在了服务器的300端口上。之后,我又要把一个二级域名解析到该服务器的300端口上,此时就要用到NGINX了。因为之前对NGINX的使用并不......
  • WSL安装Ubuntu 20.04
    1.      安装WSLWSL是适用于Linux的Windows子系统可让开发人员按原样运行GNU/Linux环境-包括大多数命令行工具、实用工具和应用程序-且不会产生传统虚拟......
  • Rocky Linux 9安装PostgreSQL 12和PostGIS
    一、安装和启用EPEL、CRB、PostgreSQL仓库dnf-yinstallepel-releasednf-yinstallhttps://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-red......
  • Harbor企业级镜像仓库的安装
    1、概述Harbor是VMware公司开源的一个企业级DockerRegistry项目,项目地址:https://github.com/goharbor/harborHarbor作为一个企业级私有Registry服务器,提供......
  • 一种面向业务配置基于JSF广播定时生效的工具
    作者:京东物流王北永姚再毅李振1背景目前,ducc实现了实时近乎所有配置动态生效的场景,但是配置是否实时生效,不能直观展示每个机器上jvm内对象对应的参数是否已变更为准......
  • vue2安装
    参考https://blog.csdn.net/HoD_DoH/article/details/1246093511-安装node2-下载vueclinpminstall-g@vue/cli3-创建工程vuecreateproject_name......
  • JDK下载与配置
    下载链接:JavaDownloads|Oracle配置:变量名:JAVA_HOME变量值:C:\ProgramFiles\Java\jdk1.7.0变量名:CLASSPATH变量值:.;%JAVA_HOME%\lib\dt.jar;%JAVA_HO......
  • Linux下安装DataX和DataX-web
    Linux下安装DataX和DataX-web---------------前言---------------......
  • node.js安装
    node.js安装1.官网下载安装包下载地址:https://nodejs.org/en/download/根据自己的电脑系统选择对应的安装包,由于我用的是windows电脑(64位),下载这个安装包,是一个.msi文......
  • 虚拟机克隆并配置
    虚拟机完整克隆能够克隆出一摸一样的虚拟机,包括登录用户名和密码,主机名、mac地址、ip地址等,所以如果当我们同时打开原虚拟机和克隆虚拟机时就会出现错误(会发生冲突),这还需......