Kafka学习
一、kafka所需的命令
启动kafka要先启动zookeeper,zookeeper学习可以参考bilibili的尚硅谷的教程:07_尚硅谷_zk_本地_安装_哔哩哔哩_bilibili。
启动kafka需要执行命令,进入到/opt/module/zookeeper-3.5.7路径下,执行
bin/zkServer.sh start
将start改为status可以查看zookeeper的状态。
kafka启动命令:进入到kafka目录中,/opt/module/kafka。使用命令起到kafka:
bin/kafka-server-start.sh -daemon config/server.properties
停止kafka命令
bin/kafka-server-stop.sh
二、kafka的connect尝试
同步文件
尝试使用connect自带的文件source和sink测试文件的同步更新。
- standalone模式启动connect:
其中的一些配置文件说明如下,首先是config目录下的connect-standalone.properties
配置文件:
#设置需要连接到的Kafka节点
bootstrap.servers=localhost:9092
# key和value的转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 是否也转换schema,如果设置为false的话,就只会转换payload
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# offset保存于的文件
offset.storage.file.filename=/tmp/connect.offsets
# 自动刷新Offset的时间,单位时毫秒
offset.flush.interval.ms=10000
因为我们要测试文件的同步,所以我们先编辑source的配置文件connect-file-source.properties
,内容如下:
# Source的名字
name=local-file-source
# Source对应的类型
connector.class=FileStreamSource
# Task的数量,因为我们是standalone模式,所以只能是1个。在分布式部署时,可以设置为多个
tasks.max=1
# 需要读取的文件
file=/root/access.log
# 读取消息后存入的主题,这个主题最好提前创建出来,可以提前规划好分区和副本因子
topic=connect-test
对应的sink的配置文件是connect-file-sink.properties
:
# Sink的名字
name=local-file-sink
# Sink的类型,因为我们是文件的connect,所以这个类不能动
connector.class=FileStreamSink
# Task的数量
tasks.max=1
# sink输出的文件
file=/root/access2.log.sink
# 消费的主题,必须与source的主题一致
topics=connect-test
connect-test
主题的创建,如果我们不自己创建的话,connect是会自动创建的,我们就默认由connect自动创建。接着启动connect,使用如下命令进行启动:
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
对应的三个配置文件分别就是我们刚刚介绍的三个properties文件,运行命令后,查看jps
命令,可以看到多了一个ConnectStandalone
进程出来。
这就是一个Worker
了。下面我测试是否可以进行同步,多开一个ssh选项卡,使用tail -f /root/access2.log.sink
命令追踪该文件内容,可以发现,/root/access2.log.sink
文件自动创建出来了,但是source
的文件/root/access.log
并没有自动创建,下面我们创建该文件touch /root/access.log
,并向其中追加内容。通过echo
命令新增内容。
如果我修改,将“123”改成“321”的话:
access2.log.sink
的内容不会修改。所以该方式是适合日志同步的,因为日志是追加编辑的,并且一般来说不会修改已经写入的内容。