1、程序作用:从多个topic中读取数据--处理数据--写入新的kafkatopic中 pip3 install kafka-python
import json from kafka import KafkaProducer from kafka import KafkaConsumer #消费者 def comsum(): print('start consumer') consumer = KafkaConsumer( bootstrap_servers=["100.98.100.186:9092"], ) print(consumer) consumer.subscribe(['test_topic', 'topic2']) for message in consumer: rs=json.loads(message.value) # 这样就可以通过topic和value去做后边的操作了 #处理数据生成新的数据 dict1 = {} dict1['name'] = rs['name'] dict1['msg'] = rs['msg'] print(dict1) #生产新数据 kfk_produce(dict1) print('end consumer') #生产者 def kfk_produce(data_dict): """ 发送 string 格式数据 :return: """ producer = KafkaProducer(bootstrap_servers='100.98.100.186:9092') msg: str = json.dumps(data_dict).encode('utf-8') producer.send('new_topic', msg, partition=0) producer.close() if __name__ =='__main__': comsum()2、运行环境
docker run --name python-cron -d wukc/python3-cron:latest3、执行方式
nohup python pyhon_kafka.py >kafka.log 2>&1 &
标签:__,dict1,python,kafka,topic,处理,consumer From: https://www.cnblogs.com/wukc/p/17242876.html