首页 > 系统相关 >linux-kafka

linux-kafka

时间:2023-06-25 21:45:16浏览次数:46  
标签:10.0 zk -- com kafka linux root

kafka

一、单点部署

docker-compose创建参考地址

https://gitee.com/jasonyin2020/docker-compose/tree/master

1.下载kafka软件包

[[email protected] ~]# ll
-rw-r--r--  1 root root 103956099 Apr 10 16:09 kafka_2.13-3.2.1.tgz

2. 解压软件包

[[email protected] ~]# tar xf kafka_2.13-3.2.1.tgz -C /es/softwares/

3. 创建符号连接

[[email protected] ~]# cd /es/softwares/ && ln -svf kafka_2.13-3.2.1 kafka
‘kafka’ -> ‘kafka_2.13-3.2.1’

4. 配置环境变量

[[email protected] /es/softwares]# cat /etc/profile.d/kafka.sh
#!/bin/bash

export ZK_HOME=/es/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/es/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[[email protected] /es/softwares]# source /etc/profile.d/kafka.sh

5.修改配置文件

[[email protected] ~]# egrep -nv '^$|^#' /es/softwares/kafka/config/server.properties
#修改这一行
24:broker.id=101
44:num.network.threads=3
47:num.io.threads=8
50:socket.send.buffer.bytes=102400
53:socket.receive.buffer.bytes=102400
56:socket.request.max.bytes=104857600
62:log.dirs=/tmp/kafka-logs
67:num.partitions=1
71:num.recovery.threads.per.data.dir=1
76:offsets.topic.replication.factor=1
77:transaction.state.log.replication.factor=1
78:transaction.state.log.min.isr=1
105:log.retention.hours=168
112:log.segment.bytes=1073741824
116:log.retention.check.interval.ms=300000
#修改这一行
125:zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/linux-zk-kafka321
128:zookeeper.connection.timeout.ms=18000
138:group.initial.rebalance.delay.ms=0

6.启动kafka单点

[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

7. 验证zookeeper的源数据信息

nohup java -jar zkWeb-v1.2.1.jar &

image-20230410213619143

补充:

如果装错了,重新启动的话,要删除kafaka的日志/tmp/kafka-logs

二、部署kafka集群

1.停止kafka单点服务

[[email protected] ~]# kafka-server-stop.sh

2.修改配置文件

[[email protected] ~]# egrep -v '^#|^$' /es/softwares/kafka/config/server.properties
# 唯一标识kafka节点的数字编号,随便写,同一个集群中节点的id编号不重复即可。
broker.id=101
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 指定kafka的数据存储路径。
log.dirs=/es/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# kafka连接zookeeper集群地址
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/linux-zk-kafka321
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

3.同步kafka软件包相关文件

[[email protected] ~]# data_rsync.sh /es/softwares/kafka
===== rsyncing elk102.com: kafka =====
命令执行成功!
===== rsyncing elk103.com: kafka =====
命令执行成功!
[[email protected] ~]# data_rsync.sh /es/softwares/kafka_2.13-3.2.1/
===== rsyncing elk102.com: kafka_2.13-3.2.1 =====
命令执行成功!
===== rsyncing elk103.com: kafka_2.13-3.2.1 =====
命令执行成功!
[[email protected] ~]# data_rsync.sh /etc/profile.d/kafka.sh 
===== rsyncing elk102.com: kafka.sh =====
命令执行成功!
===== rsyncing elk103.com: kafka.sh =====
命令执行成功!

4.其他2个节点修改配置文件

[[email protected] ~]# vim /es/softwares/kafka/config/server.properties
...
broker.id=102

[[email protected] ~]# vim /es/softwares/kafka/config/server.properties
...
broker.id=103

5.所有节点启动kafka环境

[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[[email protected] ~]# source /etc/profile.d/kafka.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[[email protected] ~]# source /etc/profile.d/kafka.sh 
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

6.查看zookeeper的源数据

[[email protected] ~]# zkCli.sh -server 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
Connecting to 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
....
[zk: 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181(CONNECTED) 1] ls /linux-zk-kafka321/brokers/ids 
[101, 102, 103]

三、kafka的常用术语

	kafka cluster(broker list):
		kafka集群。
		
	kafka Server (broker):
		指的是kafka集群的某个节点。
		
	Producer:
		生产者,即往kafka集群写入数据的角色。

	Consumer:
		消费者,即从kafka集群中读取数据的角色。一个消费者隶属于一个消费者组。
		
	Concumer Group:
		消费者组,里面有一个或多个消费者。
		
	Topic:
		主题,是一个逻辑概念,用于区分业务,一个主题最少要有1个分区和一个副本。
		
	Partition:
		分区,分区可以暂时理解为分区编号。
	
	replica:
		副本,副本是实际存储数据的地方,分为两种角色,即leader和follower。
		   leader:
				负责读写。
		   follower:
				负责从leader节点同步数据,无法对集群外部提供任何服务。当leader无法访问时,follower会接管leader的角色。
		
		AR:
			所有的副本,包含leader和follower副本。
		ISR:
			表示和leader同步的所有副本集合。
		OSR:
			表示和leader不同步的所有副本即可。
			
	zookeeper集群:
		kafka 0.9之前的版本维护消费者组的offset,之后kafka内部的topic进行维护。
		协调kafka的leader选举,控制器协调者选举等....

	
client:
    consumer API:
        即消费者,指的是从boker拉取数据的角色。
		每个消费者均隶属于一个消费者组(consumer Group),一个消费者组内可以有多个消费者。
		
    producer API:
		即生产者,指的是往broker写入数据的角色。
		
	admin API:
		集群管理的相关API,包括topic,parititon,replica等管理。
		
	stream API:
		数据流处理等API,提供给Spark,Flink,Storm分布式计算框架提供数据流管道。
		
	connect API:
		连接数据库相关的API,例如将MySQL导入到kafka。
		
		

		
常见问题:
Q1: 分区和副本有啥区别?
	分区可以暂时理解为分区编号,它包含该分区编号的所有副本,和磁盘的分区没关系。
	副本是实际存储数据的地方,
	
Q2: offset存储在kafka集群,客户端在kafka集群任意一个节点如何获取偏移量。
	通过内部的消费者组的偏移量读取即可。("__consumer_offsets")

image-20230411200414672

QQ图片20230411104400

四、topic管理

1.查看topic

#查看topic列表
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --list

#查看指定topic的详细信息
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe --topic linux-zk
Topic: linux-zk	TopicId: Bi3swYr7SVWeseyhi8hJFA	PartitionCount: 3 
ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: linux-zk	Partition: 0	Leader: 102	Replicas: 102,101	Isr: 102,101
	Topic: linux-zk	Partition: 1	Leader: 101	Replicas: 101,103	Isr: 101,103
	Topic: linux-zk	Partition: 2	Leader: 103	Replicas: 103,102	Isr: 103,102
					#分区			   #副本领导者   #副本追随者			与领导者同步的副本
					
					
#查看所有的topic详细信息
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe 
Topic: linux-zk	TopicId: Bi3swYr7SVWeseyhi8hJFA	PartitionCount: 3 
ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: linux-zk	Partition: 0	Leader: 102	Replicas: 102,101	Isr: 102,101
	Topic: linux-zk	Partition: 1	Leader: 101	Replicas: 101,103	Isr: 101,103
	Topic: linux-zk	Partition: 2	Leader: 103	Replicas: 103,102	Isr: 103,102
Topic: linux-zk-test	TopicId: EU4xyfjoSL2lYgmVKPb-YQ	PartitionCount: 3	ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: linux-zk-test Partition: 0	Leader: 101	Replicas: 101,103	Isr: 101,103
	Topic: linux-zk-test Partition: 1	Leader: 103	Replicas: 103,102	Isr: 103,102
	Topic: linux-zk-test Partition: 2	Leader: 102	Replicas: 102,101	Isr: 102,101

2.创建topic

一个topic是生产者(producer)和消费者(consumer)进行逻辑的通信单元。
底层存储数据的是对应一个或多个分区(partition)副本(replica)。
#创建一个名为"linux-zk",分区数为3,副本数量为2的topic。
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create  --partitions 3 --replication-factor 2 --topic linux-zk
Created topic linux-zk.

#replication-factor副本数量不能大于主机数量
#创建一个名为"linux-zk-test",分区数为3,副本数量为2的topic。
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create  --partitions 3 --replication-factor 2 --topic linux-zk-test
Created topic linux-zk-test.

3.修改topic

(分区数量可以调大,但不可以调小!)

#修改"linux-zk"的分区大小为5
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --alter --topic linux-zk --partitions 5

#查看修改后的linux-zk
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe --topic linux-zk
Topic: linux-zk	TopicId: Bi3swYr7SVWeseyhi8hJFA	PartitionCount: 5	
ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: linux-zk	Partition: 0	Leader: 102	Replicas: 102,101	Isr: 102,101
	Topic: linux-zk	Partition: 1	Leader: 101	Replicas: 101,103	Isr: 101,103
	Topic: linux-zk	Partition: 2	Leader: 103	Replicas: 103,102	Isr: 103,102
	Topic: linux-zk	Partition: 3	Leader: 102	Replicas: 102,103	Isr: 102,103
	Topic: linux-zk	Partition: 4	Leader: 103	Replicas: 103,101	Isr: 103,101

4.删除topic

[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --delete --topic linux-zk-test

[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --list
linux-zk

补充

kafka关于修改副本数和分区的数的案例实战:
https://www.cnblogs.com/yinzhengjie/p/9808125.html

5.创建生产者

[[email protected] ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.103:9092 --topic linux-zk
>1111

6.创建消费者

[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk --from-beginning
1111


#从头开始读取 --from-beginning

7.消费者组案例

7.1 创建topic

[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --create  --partitions 3 --replication-factor 2 --topic linux-zk-test
Created topic linux-zk-test.

7.2 启动生产者

[[email protected] ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.103:9092 --topic linux-zk-test
>

7.3 启动消费者加入同一个消费者组

#通过配置文件加入
[[email protected] ~]# egrep -v '^$|^#' /es/softwares/kafka/config/consumer.properties
bootstrap.servers=localhost:9092
#修改下边这一行,起一个名字
group.id=zk
#启动消费者
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer.config  /es/softwares/kafka/config/consumer.properties --from-beginning


#通过命令行加入
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer-property group.id=zk --from-beginning

7.4 生产者写入数据查看

#生产者写入
[[email protected] ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.103:9092 --topic linux-zk-test
>1111
>2222

#配置文件的消费者读取
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer.config  /es/softwares/kafka/config/consumer.properties --from-beginning
1111

#命令行的消费者读取
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer-property group.id=zk --from-beginning
2222


[[email protected] ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --group zk
GROUP TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
#组名  #topic			 											#延迟
CONSUMER-ID                                           HOST            CLIENT-ID
#consumerID											主机				 client-id
zk    linux-zk-test   0          1               1               0               console-consumer-be33c37b-f6cf-4787-a528-05aa549ed407 /10.0.0.101     console-consumer
zk    linux-zk-test   1          0               0               0               console-consumer-be33c37b-f6cf-4787-a528-05aa549ed407 /10.0.0.101     console-consumer
zk    linux-zk-test   2          1               1               0               console-consumer-c75187c0-7533-4267-a81b-ea96e840d490 /10.0.0.102     console-consumer

8.查看内置的"__consumer_offsets "的数据

[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092  --topic __consumer_offsets  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning  | grep zk

五、kafka堆内存调优

1.修改启动脚本

[[email protected] ~]# vim `which kafka-server-start.sh ` +28
[[email protected] ~]# cat /es/softwares/kafka_2.13-3.2.1/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#注释掉这一行
#    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
	export KAFKA_HEAP_OPTS="-server -Xmx256m -Xms256m -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
	
    export JMX_PORT="8888"
fi

2.同步集群启动脚本

[[email protected] ~]# data_rsync.sh `which kafka-server-start.sh `
===== rsyncing elk102.com: kafka-server-start.sh =====
命令执行成功!
===== rsyncing elk103.com: kafka-server-start.sh =====
命令执行成功!

3.重启kafka集群

要滚动重启(重要重要重要)

#要滚动重启(重要重要重要)
#kafka停止的时候,会很慢,要等一会儿
#要一台一台重启
[[email protected] ~]# kafka-server-stop.sh
[[email protected] ~]# jps
1456 Elasticsearch
2294 QuorumPeerMain
4059 Jps
1455 Elasticsearch
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[[email protected] ~]# kafka-server-stop.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[[email protected] ~]# kafka-server-stop.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

4.验证kafka集群的内存大小

[[email protected] ~]# jmap -heap `jps | awk '/Kafka/{print $1}'`
Attaching to process ID 8591, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.291-b10

using thread-local object allocation.
Garbage-First (G1) GC with 8 thread(s)

Heap Configuration:
   MinHeapFreeRatio         = 40
   MaxHeapFreeRatio         = 70
   MaxHeapSize              = 268435456 (256.0MB)

六、kafka开源监控组件-kafka-eagle

1.启动kafka的JXM端口

这个步骤上边已经操作,可以不操作

1.1 所有节点停止kafka

kafka-server-stop.sh

1.2所有节点修改kafka的配置文件

#这个步骤上边已经操作,可以不操作
vim `which kafka-server-start.sh`
...
 #  export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  # 注视掉该行,并将下面2行复制即可
    export KAFKA_HEAP_OPTS="-server -Xmx256M -Xms256M -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="8888"

1.3 所有节点启动kafka服务

kafka-server-start.sh -daemon /es/softwares/kafka/config/server.properties 

2.启动zookeeper的JMX端口

2.1 修改zookeeper配置文件

部署的时候已经操作,可以不操作

vim /es/softwares/zk/conf/zoo.cfg
# 添加下面的一行,启动zk的4字监控命令
4lw.commands.whitelist=*

2.2修改zookeeper的启动脚本

[[email protected] ~]# vim /es/softwares/zk/bin/zkServer.sh +77
[[email protected] ~]# cat /es/softwares/zk/bin/zkServer.sh
...
    if [ "x$JMXHOSTNAME" = "x" ]
    then
      ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.man
agement.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
    else
      echo "ZooKeeper remote JMX Hostname set to $JMXHOSTNAME" >&2
      ZOOMAIN="-Dcom.sun.management.jmxremote -Djava.rmi.server.hostname=$JMXHOSTNAME -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxre
mote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMa
in"       
		#加这一行
        ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
          
      fi 

2.3 修改zookeeper环境变量开启JMX

[[email protected] ~]# cat /es/softwares/zk/bin/zkEnv.sh
....
#最后加这几行
JMXLOCALONLY=false
JMXPORT=21812
JMXSSL=false
JMXLOG4J=false

2.4同步脚本

[[email protected] ~]# data_rsync.sh /es/softwares/zk/bin/zkServer.sh
===== rsyncing elk102.com: zkServer.sh =====
命令执行成功!
===== rsyncing elk103.com: zkServer.sh =====
命令执行成功!
[[email protected] ~]# data_rsync.sh /es/softwares/zk/bin/zkEnv.sh
===== rsyncing elk102.com: zkEnv.sh =====
命令执行成功!
===== rsyncing elk103.com: zkEnv.sh =====
命令执行成功!

2.5 启动zk

[[email protected] ~]# zkManager.sh restart

2.6 使用jconsole验证是否能连接JMX端口

image-20230411205017069

image-20230411205142741

3.安装mariadb

3.1 安装mariadb

#如果有mariadb机器的话,可以用自己的
[[email protected] ~]# yum -y install mariadb-server

3.2 配置mariadb的配置文件

[[email protected] ~]# cat /etc/my.cnf
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

#加了下边这一行
skip-name-resolve=1

3.3 启动服务并设置开机自启动

[[email protected] ~]# systemctl enable mariadb --now

3.4 创建数据库

[[email protected] ~]# mysql
MariaDB [(none)]> CREATE DATABASE kafka DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

3.5 创建授权用户

MariaDB [(none)]> CREATE USER admin IDENTIFIED  BY 'admin';
MariaDB [(none)]> GRANT ALL ON kafka.* TO admin;
MariaDB [(none)]> SHOW GRANTS FOR admin;

3.6 测试连接

[[email protected] ~]# mysql -u admin -padmin -h 10.0.0.102

4.部署kafka-eagle监控

4.1 下载kafka-eagle软件

[[email protected] ~]# ll
-rw-r--r--  1 root root  81087419 Apr 10 16:09 kafka-eagle-bin-2.0.8.zip

4.2 解压软件包

[[email protected] ~]# unzip kafka-eagle-bin-2.0.8.zip
[[email protected] ~]# tar xf efak-web-2.0.8-bin.tar.gz -C /es/softwares/

4.3 修改配置文件

[[email protected] ~]# cat /es/softwares/efak-web-2.0.8/conf/system-config.properties
#修改了这个名字,下边的是这个名字的也要修改
efak.zk.cluster.alias=linux-zk
#需要和kafka的名字一样
linux-zk.zk.list=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/linux-zk-kafka321
linux-zk.efak.broker.size=20
kafka.zk.limit.size=32
efak.webui.port=8048
linux-zk.efak.offset.storage=kafka
linux-zk.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
#在执行某些操作的时候进行验证时要输入
efak.topic.token=linuxzk
efak.driver=com.mysql.cj.jdbc.Driver
#修改成自己的数据库地址连接
efak.url=jdbc:mysql://10.0.0.102:3306/kafka?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=admin
efak.password=admin

4.4 配置环境变量

[[email protected] ~]# cat >> /etc/profile.d/kafka.sh <<'EOF'
> export KE_HOME=/es/softwares/efak-web-2.0.8
> export PATH=$PATH:$KE_HOME/bin
> EOF
[[email protected] ~]# cat /etc/profile.d/kafka.sh
#!/bin/bash

export ZK_HOME=/es/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/es/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin
export KE_HOME=/es/softwares/efak-web-2.0.8
export PATH=$PATH:$KE_HOME/bin
[[email protected] ~]# source /etc/profile.d/kafka.sh

4.5 修改堆内存大小

[[email protected] ~]# vim `which ke.sh `
#修改下边的这一行,把1G修改为256m
export KE_JAVA_OPTS="-server -Xmx256m -Xms256m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRe
gionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

4.6 启动服务

[[email protected] ~]# ke.sh start
....
....
....
[2023-04-11 21:10:53] INFO: Status Code[0]
[2023-04-11 21:10:53] INFO: [Job done!]
Welcome to
    ______    ______    ___     __ __
   / ____/   / ____/   /   |   / //_/
  / __/     / /_      / /| |  / ,<   
 / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka® )

Version 2.0.8 -- Copyright 2016-2021
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://10.0.0.101:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

4.7 测试访问

http://10.0.0.101:8048

image-20230411211157626

image-20230411211206530

七、filebeat将数据写入到Kafka实战

[[email protected] /es/softwares/filebeat-7.17.5-linux-x86_64]# cat config/stdin-to-kafka.yaml
filebeat.inputs:
- type: stdin


# 将数据输出到kafka
output.kafka:
  # 指定kafka主机列表
  hosts:
  - 10.0.0.101:9092
  - 10.0.0.102:9092
  - 10.0.0.103:9092
  # 指定kafka的topic
  topic: "linux-zk-kafka"

#启动filebeat
[[email protected] /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/stdin-to-kafka.yaml


#filebeat输入
....
....
2023-04-11T21:19:12.054+0800	INFO	[stdin.harvester]	log/harvester.go:309	Harvester started for paths: []	{"harvester_id": "75c94ca6-3307-4bdd-bcd8-39449590e0ac"}
1111   #输入内容
2023-04-11T21:19:22.182+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:143	Connecting to kafka(10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092)


#kafka创建消费者读取
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-kafka --consumer-property group.id=zk --from-beginning

{"@timestamp":"2023-04-11T13:19:21.165Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.5"},"input":{"type":"stdin"},"ecs":{"version":"1.12.0"},"host":{"name":"elk103.com"},"agent":{"hostname":"elk103.com","ephemeral_id":"e77e2788-be08-4880-ad6f-02a572194e83","id":"21611ec8-4839-4bb7-8161-b55a15616548","name":"elk103.com","type":"filebeat","version":"7.17.5"},"log":{"file":{"path":""},"offset":0},"message":"1111"}

八、logstash从kafka拉取数据并解析json格式案例

[[email protected] ~]# cat config/kafka-to-stdout.conf
input {
  kafka {
    # 指定kafka集群地址
    bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
    # 指定消费的topic
    topics => ["linux-zk-kafka"]
    # 指定消费者组
    group_id => "linux-zk-kafka-logstash"
    # 指定消费的偏移量,"earliest"表示从头读取数据,"latest"表示从最新的位置读取数据.
    auto_offset_reset => "earliest"
  }
}

filter {
  json {
    # 对指定字段进行json格式解析。
    source => "message"
  }

  mutate {
     remove_field => [ "agent","log","input","host","ecs","tags" ]
  }
}


output { 
  stdout {} 
}

#启动logstash
[[email protected] ~]# logstash -rf config/kafka-to-stdout.conf
#输出
{
    "@timestamp" => 2023-04-11T13:19:21.165Z,
      "@version" => "1",
       "message" => "1111"
}

标签:10.0,zk,--,com,kafka,linux,root
From: https://www.cnblogs.com/world-of-yuan/p/17504013.html

相关文章

  • linux的Screen
       Screen: 可以在多个进程之间复用一个物理终端的窗口管理器有会话的概念可以在一个screen会话中创建多个screen窗口  参考:   http://www.ibm.com/developerworks/cn/linux/l-cn-screen/    http://www.turbolinux.com.cn/turbo/wiki/doku.php?id=%E5%91%BD%E4%B......
  • Linux开机启动项
    系统启动时需要加载的配置文件/etc/profile/root/.bash_profile/etc/bashrc/root/.bashrc/etc/profile.d/*.sh/etc/profile.d/lang.sh/etc/sysconfig/i18n/etc/rc.loacl/etc/rc.d/rc.local修改配置文件,再配置文件中加入即可。通过命令将脚本加入开机启动项:chkconfig-......
  • linux下使用scp远程传输自动输入密码
    由于需要将A服务器的文件远程传输到B服务器但是scp命令每次都要手动输入密码这样脚本执行太繁琐,所以讲A服务器和B服务器互信即可,具体操作如下: 首先在A服务器配置:mkdir-p~/.sshchmod700~/.ssh 然后在~/.ssh目录生成密钥文件:cd~/.shhssh-keygen-trsa-P""......
  • 【问题记录】Linux虚拟机的tomcat访问不了
    问题起因是电脑没电自动关机,虚拟机当时还在运行。第二天重启虚拟机的tomcat,主机访问不了,zookeeper注册中心可以正常使用防火墙,tomcat端口都检查过没问题,重装tomcat,重启服务器也没用重启时出现smbushostcontrollernotenable,四台机器都出现处理方法:查明装入模块的确切......
  • Linux常见命令
    1.列出文件列表:lsllls:显示当前路径下的所有文件;ls-a:显示所有文件到货目录(包含隐藏的文件);ls-l(ll):显示当前路径下的所有文件的详细信息;2.切换目录命令:cdcd/:切换到系统根目录;cd/文件夹:切换到绝对路径的文件夹下;cd文件夹:切换到文件夹下的文件夹下......
  • Linux VM通过NFS3.0挂载Azure Blob Storage Container后访问共享文件夹Permission den
    问题描述如图所示,/root-squash是一个BlobStorageContainer的挂载点。ls-al查看该目录的权限为:drwxr-xr--2rootroot0Jun2323:15root-squash当前用户身份为root,但在尝试进入该目录时失败,报错信息为:-bash:cd:root-squash:Permissiondenied调查过程猜......
  • linux 服务优化与防盗链
    摘要:网页安全至关重要,所有设置隐藏版本号,防止黑客攻击,日志切割方便维护,网页压缩方便响应速度,防盗链防止重要文件泄露。 目录一、隐藏版本号二、修改缓存时间三、日志切割四、连接超时五、工作进程数六、网页压缩七、防盗链八、总结  一、隐藏版本号1.curl查......
  • Linux 文件系统 | mount & umount
    Linux中一切皆文件并且所有文件都统一在/根目录下面类比windows系统,插入U盘,或者硬盘等存储设备,可以直接看到并访问里面的内容。而在Linux系统中,需要进行挂载,将外来设备加入到系统管理中,才可以正常访问。上面的挂载就需要用到mount命令查看/etc/fstab配置文件,......
  • Linux扩展篇-shell编程(八)-shell字符串截取
    shell字符串截取,一般包含从指定位置和从指定字符截取。一、从指定位置截取从字符串左边开始计数格式:${string:start:length}从string字符串的左边第start个字符开始,向右截取length个字符。${string:start}从string字符串的左边第start个字符开始截取,直到最......
  • linux文件系统和设备驱动+file结构体
    1,文件系统和设备驱动之间的关系 1)应用程序和VFS之间是系统调用;2)VFS与文件系统以及设备文件之间的接口是file_operations结构体成员函数,这个结构体可以对文件进行打开,读写,定位,控制等操作;如下图所示: 3)由于字符设备的上层没有类似磁盘的ext2等文件系统,所以字符设备的file_opte......