前言
视频链接
https://www.bilibili.com/video/BV1Xy4y1G7zA
一、kafka介绍
1、为什么使用消息队列
实现异步通信
2、消息队列的流派
消息队列解决了通信问题
(1)、有broker(类似消息中转站)
a.重topic:kafka、activemq、rocketmq
b.轻topic:rabbitmq
(2)、无broker:zeromq
3、kafka安装(linux)
(1)下载地址
https://kafka.apache.org/downloads
// 使用2.4.0
(2)、解压目录
drwxr-xr-x 3 root root 4096 12月 10 2019 bin
drwxr-xr-x 2 root root 4096 12月 10 2019 config
drwxr-xr-x 2 root root 4096 8月 28 15:46 libs
-rw-r--r-- 1 root root 32216 12月 10 2019 LICENSE
-rw-r--r-- 1 root root 337 12月 10 2019 NOTICE
drwxr-xr-x 2 root root 4096 12月 10 2019 site-docs
(3)、配置文件
vi /config/server.properties
(4)、修改内容
listeners=PLAINTEXT://172.30.0.3:9092
// 当前主机地址
advertised.listeners=PLAINTEXT://公网ip:9092
// 云服务器公网ip,外部连接需要使用公网
log.dirs=../kafka-logs
// 日志地址 即消息地址 默认保存七天
zookeeper.connect=123.45.6.78:2181
// zk地址
(5)、启动服务
sh kafka-server-start.sh -daemon ../config/server.properties
// -daemon为后台启动
(6)、观察进程和日志
ps -ef|grep kafka
// tail -f logs/server.log
[2023-08-28 16:00:21,294] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-08-28 16:00:21,295] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2023-08-28 16:00:21,295] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-08-28 16:00:21,311] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-08-28 16:00:21,324] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2023-08-28 16:00:21,483] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2023-08-28 16:00:21,485] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-08-28 16:00:21,485] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
[2023-08-28 16:00:21,485] INFO Kafka startTimeMs: 1693209621484 (org.apache.kafka.common.utils.AppInfoParser)
[2023-08-28 16:00:21,488] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
(7)、观察zk中有kafka节点信息
./zkCli.sh
ls /brokers/ids
[0]
二、kafka基本使用
1、kafka基本概念
名称 | 解释 |
---|---|
broker | 消息中间件处理节点,一个kafka节点就是一个broker,多个broker组成一个集群 |
topic | kafka通过topic对消息进行归类,发布到kafka集群的每条消息都需要指定一个topic |
producer | 消息生产者,向broker发送消息的客户端 |
consumer | 消息消费者,从broker读取消息的客户端 |
2、创建主题topic
(1)、使用脚本创建主题
sh kafka-topics.sh --create --zookeeper 123.45.6.78:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
(2)、使用脚本查看创建是否成功
sh kafka-topics.sh --list --zookeeper 123.45.6.78:2181
test
(3)、在zk查看是否创建成功
./zkCli.sh
ls /brokers/topics
[test]
(4)、topic等信息存储在zk,保证kafka是无状态的
3、发送消息
sh kafka-console-producer.sh --broker-list 172.30.0.3:9092 --topic test
>123
>456
>qwe
4、接收消息
(1)、从最后一条消息的偏移量+1消费
sh kafka-console-consumer.sh --bootstrap-server 172.30.0.3:9092 --topic test
111
(2)、从开头开始消费
sh kafka-console-consumer.sh --bootstrap-server 172.30.0.3:9092 --from-beginning --topic test
123
456
qwe
111
几个需要注意的点