-
在 kafka 的启动配置中修改 num.partitions=2
每一个配置参考文档 https://blog.csdn.net/lizhitao/article/details/25667831 -
python 代码在创建的时候, 将 partion 指定为 10 个
### producer
from kafka.admin import NewTopic
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient
from kafka.errors import TopicAlreadyExistsError
producer = KafkaProducer(bootstrap_servers=['xxx:9092'])
value = {"type": "test", "value": {"requestNo": "1", "auditStatus": "2", "failReason": "3"}}
bytesDict = bytes('{}'.format(value), 'utf-8')
print(bytesDict)
topic = 'test_topic31'
if __name__ == '__main__':
# 让 kafka 自动化创建 topic, 默认的个数在config/server.properties配置 num.partitions=10
# producer.send(topic, bytesDict)
# producer.close()
# 先创建 topic, 然后指定该 topic 的分区为 10
try:
admin_client = KafkaAdminClient(bootstrap_servers=['xxx:9092'])
admin_client.create_topics([NewTopic(topic, 10, 1)])
except TopicAlreadyExistsError:
pass
producer.send(topic, bytesDict)
producer.close()
### consumer
from kafka import KafkaConsumer
topic = 'test_topic31'
consumer = KafkaConsumer(topic, bootstrap_servers=['xxx:9092'])
if __name__ == '__main__':
for m in consumer:
print(m)
print(m.topic)
先启动消费者, 然后启动生产者, 生产者一旦运行就会去创建一个新的 Topic, 去 kafka 中查看该 Topic 是否真的为10 个
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_topic31