模拟需要用到kafka的包,需要pip安装,但注意pip install kafka不适用于python3.x的某个版本以上,均已经换成kafka-python
推荐使用版本2.0.2,目前稳定
pip没有的问题
如果是windows环境,可通过直接去官网下载python版本,指定版本会顺带安装pip
如果是linux环境,
有节点是不带pip的,可使用yum install python3-pip安装pip3,然后通过pip3进行安装
如果是现网环境,无法连接到内网地址,使用官网https://pypi.org/project/kafka-python/#files下载kafka_python-2.0.2-py2.py3-none-any.whl
指令pip install kafka_python-2.0.2-py2.py3-none-any.whl
非现网环境
pip install kafka-python==2.0.2 -i 公司镜像地址
Kafka需要在两个地方进行配置
先选一台节点上安装kafka
注意如果有的节点之前装过未卸载干净可能导致安装失败
步骤 1 软件包
https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
步骤 2 上传软件包至/opt目录下,解压缩,重命名目录
使用pass用户上传至/home/paas
将软件包移动到/opt目录下
mv /home/paas/kafka_2.13-3.7.1.tgz /opt
cd /opt;tar -zxvf kafka_2.13-3.7.1.tgz
mv kafka_2.13-3.7.1 kafka
步骤 3 启动脚本和基础配置,上传至/opt
(这里没传zip,这个zip放的是start.sh和server.properties以及zookeeper.properties)
使用pass用户上传至/home/paas
将软件包移动到/opt目录下
mv /home/paas/kafka.zip /opt
cd /opt; unzip -o kafka.zip
替换配置文件中的监听ip
sed -i "s/LOCAL_IP/$(ip addr|grep eth0|grep inet|awk '{print $2}'|awk -F/ '{print $1}')/g" /opt/kafka/config/server.properties
启动服务sh /opt/kafka/start.sh
安装完以后,如果有其他地方有kafka消息推送过来,可使用这个指令
cd /opt/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 10.72.128.205:9092 --topic bridge-message-topic
如果是json的数据可直接通过这种读到
from kafka import KafkaConsumer
import json
KafkaServerList = ['10.243.1.144:9092']
Topics = 'manual_drive_truck_rtk_msg'
consumer = KafkaConsumer(Topics, bootstrap_servers=KafkaServerList, value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
print(message.value['latitude'])
print(message.value)
南向配置里面需要对应kafka配置节点
配置的节点就是安装的位置
如果南向配置里面没有kafka,需要找运维
配置好以后,如果出现连接失败也没有影响,只是页面显示连接失败,实际上是连接成功
标签:opt,python,kafka,3.7,pip,Kafka,安装,模拟 From: https://www.cnblogs.com/immersed-in-the-deep-sea/p/18520124