1,检测
参考:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
from kafka.admin import KafkaAdminClient,NewTopic from kafka import KafkaProducer from kafka import KafkaConsumer from kafka import KafkaClient import time # 配置 Kafka 集群信息 bootstrap_servers = '192.168.0.1:29092' sasl_mechanism = 'PLAIN' security_protocol = 'SASL_PLAINTEXT' sasl_plain_username = 'kafka' sasl_plain_password = 'xxx' topic_name = "test" group_name = "test" # 创建 KafkaAdminClient 对象并连接到 Kafka 集群 admin_client = KafkaAdminClient( bootstrap_servers=bootstrap_servers, security_protocol=security_protocol, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, sasl_mechanism=sasl_mechanism ) # 打印消费组列表 for consumer_group_name in admin_client.list_consumer_groups(): print(consumer_group_name) # 打印指定消费组 for line in admin_client.list_consumer_group_offsets('group_name'): print(line) client = KafkaClient( bootstrap_servers=bootstrap_servers, security_protocol=security_protocol, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, sasl_mechanism=sasl_mechanism ) # 打印brokers for broker in client.cluster.brokers(): print(broker.nodeId) # 打印主题列表 for topic_name in admin_client.list_topics(): print(topic_name) # 指定要创建的主题名称和分区数量 topic_names = '{}{}'.format(topic_name, int(time.time())) num_partitions = 3 replication_factors=3 # 创建 NewTopic 对象,并指定主题名称、分区数量和副本因子 new_topic = NewTopic(name=topic_name, num_partitions=3, replication_factor=replication_factors) # 调用 create_topics() 方法来创建主题 admin_client.create_topics(new_topics=[new_topic], validate_only=False) # 创建生产者 producer = KafkaProducer( bootstrap_servers=bootstrap_servers, security_protocol=security_protocol, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, sasl_mechanism=sasl_mechanism ) # 发送消息 message = b'Hello, Kafka!' producer.send(new_topic, message) # 创建消费者 consumer = KafkaConsumer(new_topic, bootstrap_servers=bootstrap_servers, security_protocol=security_protocol, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, sasl_mechanism=sasl_mechanism, group_id=group_name) for message in consumer: print(msg.value)
https://www.volcengine.com/theme/4897060-S-7-1
标签:protocol,name,plain,kafka,topic,sasl,维护,password From: https://www.cnblogs.com/tiantao36/p/18102662