使用的python包
pip install confluent-kafka
创建topic && 扩充partition
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time:2023/2/6 16:48
# @Software:PyCharm
__author__ = "JentZhang"
import json
from confluent_kafka.admin import AdminClient, NewPartitions, NewTopic
client = AdminClient({
'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095",
})
topics = client.list_topics()
print(topics.topics)
# # 创建topic
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.
# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = client.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
# print(json.dumps(topics.topics, indent=4))
# 添加新的partition
client.create_partitions([NewPartitions("test01", 10)])
同步生产数据
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time:2023/2/6 13:40
# @Software:PyCharm
__author__ = "JentZhang"
import logging
# 引入生产者、消费者
import random
from confluent_kafka import Consumer, Producer
# 引入指针和kafka异常数据机制
from confluent_kafka import TopicPartition, KafkaError
import json
conf = {'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095", 'client.id': "zt01"}
# 初始化producer对象
producer = Producer(conf)
# 推送数据 topic为主题,json_data 为传送的数据
# 生产数据异常处理机制
def __publish_delivery_report(err, msg) -> None:
"""
发布消息记录
:param err: 消息的错误信息
:param msg: 消息
:return:
"""
try:
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
except Exception as e:
print(e.args)
for i in range(100000):
need_push = {
"name": f"张三-{i}",
"age": random.randint(19, 59)
}
# json数据格式转化
data = json.dumps(need_push)
# # 推送数据 publisher_topic推送主题, data 数据, callback 召回处理机制
producer.produce("test01", data, callback=__publish_delivery_report)
producer.poll() # kafka_producer_timeout 为超时时间
producer.flush()
异步生产数据
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time:2023/2/6 13:40
# @Software:PyCharm
__author__ = "JentZhang"
import random
import asyncio
from threading import Thread
# 引入生产者、消费者
from confluent_kafka import Consumer, Producer, KafkaException
# 引入指针和kafka异常数据机制
from confluent_kafka import TopicPartition, KafkaError
import json
class AIOProducer:
def __init__(self, configs, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._producer = Producer(configs)
self._cancelled = False
self._poll_thread = Thread(target=self._poll_loop)
self._poll_thread.start()
def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0.1)
def close(self):
self._cancelled = True
self._poll_thread.join()
def produce(self, topic, value):
result = self._loop.create_future()
def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(
result.set_exception, KafkaException(err))
else:
self._loop.call_soon_threadsafe(
result.set_result, msg)
self._producer.produce(topic, value, on_delivery=ack)
return result
if __name__ == '__main__':
conf = {'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095", 'client.id': "zt01"}
producer = AIOProducer(conf)
for i in range(10000):
need_push = {
"name": f"张三-{i}",
"age": random.randint(19, 59)
}
# json数据格式转化
data = json.dumps(need_push)
# # 推送数据 publisher_topic推送主题, data 数据, callback 召回处理机制
producer.produce("test01", data)
消费数据
import logging
import json
# 引入生产者、消费者
from confluent_kafka import Consumer, Producer
# 引入指针和kafka异常数据机制
from confluent_kafka import TopicPartition, KafkaError
conf = {'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095", # 地址接口host1:9092
'group.id': "test-group-id", # 分组号
'enable.auto.commit': True, # 是否自动提交偏移量
'topic.metadata.refresh.interval.ms': "3000",
'auto.commit.interval.ms': "3000",
'default.topic.config': {'auto.offset.reset': 'smallest'} # 默认设置topic的消费的方式
# 'default.topic.config': {'auto.offset.reset': 'latest'}
}
subscriber = Consumer(conf)
subscriber.subscribe(["test01"])
while True:
msg = subscriber.poll(1)
if msg is None:
continue
else:
if msg.error():
print(msg.error())
else:
message = msg.value()
print(msg.partition(), msg.offset())
print(json.loads(message))
标签:__,self,confluent,Kafka,topic,._,import,kafka
From: https://www.cnblogs.com/JentZhang/p/17532043.html