首页 > 其他分享 >kafka维护

kafka维护

时间:2024-03-28 21:34:46浏览次数:28  
标签:protocol name plain kafka topic sasl 维护 password

1,检测

参考:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

from kafka.admin import KafkaAdminClient,NewTopic
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import KafkaClient
import time

# 配置 Kafka 集群信息
bootstrap_servers = '192.168.0.1:29092'
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_PLAINTEXT'
sasl_plain_username = 'kafka'
sasl_plain_password = 'xxx'
topic_name = "test"
group_name = "test"

# 创建 KafkaAdminClient 对象并连接到 Kafka 集群
admin_client = KafkaAdminClient(
    bootstrap_servers=bootstrap_servers,
    security_protocol=security_protocol,
    sasl_plain_username=sasl_plain_username,
    sasl_plain_password=sasl_plain_password,
    sasl_mechanism=sasl_mechanism
)
# 打印消费组列表
for consumer_group_name in admin_client.list_consumer_groups():
    print(consumer_group_name)

# 打印指定消费组
for line in admin_client.list_consumer_group_offsets('group_name'):
    print(line)
    
client = KafkaClient(
    bootstrap_servers=bootstrap_servers,
    security_protocol=security_protocol,
    sasl_plain_username=sasl_plain_username,
    sasl_plain_password=sasl_plain_password,
    sasl_mechanism=sasl_mechanism
)
# 打印brokers	
for broker in client.cluster.brokers():
    print(broker.nodeId)

# 打印主题列表
for topic_name in admin_client.list_topics():
    print(topic_name)

# 指定要创建的主题名称和分区数量
topic_names = '{}{}'.format(topic_name, int(time.time()))
num_partitions = 3
replication_factors=3

# 创建 NewTopic 对象,并指定主题名称、分区数量和副本因子
new_topic = NewTopic(name=topic_name, num_partitions=3, replication_factor=replication_factors)

# 调用 create_topics() 方法来创建主题
admin_client.create_topics(new_topics=[new_topic], validate_only=False)

# 创建生产者
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    security_protocol=security_protocol,
    sasl_plain_username=sasl_plain_username,
    sasl_plain_password=sasl_plain_password,
    sasl_mechanism=sasl_mechanism
)

# 发送消息
message = b'Hello, Kafka!'
producer.send(new_topic, message)

# 创建消费者
consumer = KafkaConsumer(new_topic,
                         bootstrap_servers=bootstrap_servers,
                         security_protocol=security_protocol,
                         sasl_plain_username=sasl_plain_username,
                         sasl_plain_password=sasl_plain_password,
                         sasl_mechanism=sasl_mechanism,
                         group_id=group_name)
for message in consumer:
    print(msg.value) 

https://www.volcengine.com/theme/4897060-S-7-1  

标签:protocol,name,plain,kafka,topic,sasl,维护,password
From: https://www.cnblogs.com/tiantao36/p/18102662

相关文章

  • ansible维护
     参考:https://docs.ansible.com/ansible/latest/collections/ansible/builtin/unarchive_module.html1,检测网络-hosts:allgather_facts:nobecome:yestasks:-name:Installtraceroutepackage:name:"{{item}}"state:present......
  • Golang操作kafka遇到网络问题重试的案例
    草稿0、实际中会遇到网络抖动会导致消费者有一小段时间与kafka连接遇到问题~0、如何模拟网络问题?本地跑多个kafka实例直接关掉其中一个kafka服务??怎么模拟断网??1、kafka-go与sarama都演示一下2、一个consumer消费一个topic的例子;模拟网络问题可以把kafka服务关了~观察一下再开启k......
  • 云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操
    方案概述在大数据ETL场景,将Kafka中的消息流转到其他下游服务是很常见的场景,除了常规的消息流转外,很多场景还需要基于消息体内容做判断,然后决定下游服务做何种操作。该方案实现了通过Kafka中消息Key的内容来判断应该对MongoDB做增、删、改的哪种DML 操作。当Kafka......
  • 【Canal】Canal+Kafka实现数据同步
    canal介绍参考:【Canal】Canal快速入门Kafka准备参考:【Kafka】Kafka安装(一) + 【Kafka】Kafka-UI安装启动canal-server参考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart下载canal,访问 release页面 ,选择需要的包下载,如......
  • kafka命令工具创建查看topic信息
    转载:https://www.jianshu.com/p/6cf6c7f208c9 1、创建topic./bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topicfirst--partitions1--replication-factor1./bin/kafka-topics.sh--create--bootstrap-serverlocalhost:9092--replication-fa......
  • 推荐一个kafka可视化客户端GUI工具(Kafka King)
    KafkaKing,比较新,只需要填写kafka连接地址就行,不需要什么zookeeper。支持的功能也多:查看集群节点列表(完成)创建主题(支持批量)、删除主题、支持根据消费者组统计每个topic的消息积压量(完成)支持查看topic的分区的详细信息,并为主题添加额外的分区(完成)支持查看每个分区的消息offse......
  • 深入理解 Vue 3.0 宏函数:提升组件代码的工程化与可维护性
    Vue3.0宏函数详解:defineProps、defineEmits、defineExpose、defineSlots和defineOptions在Vue3.0中,为了更好地组织和维护组件代码,引入了几个新的宏函数。这些宏函数包括defineProps、defineEmits、defineExpose、defineSlots和defineOptions。本文将详细介绍这五......
  • win10 docker zookeeper和kafka搭建
    好久没用参与大数据之类的开发了,近日接触到一个项目中使用到kafka,因此要在本地搭建一个简易的kafka服务。时间比较紧急,之前有使用docker的经验,因此本次就使用docker来完成搭建。在搭建过程中出现的一些问题,及时记录,以便后期再遇见。环境计算机环境:win1022H2dockerVersio......
  • 安全更新:关于Cybellum维护服务器问题的情况说明(CVE-2023-42419)
    “转载自CybellumTechnologiesLtd.”我们想通知我们的客户一个我们注意到的安全问题,作为我们对产品透明度和持续安全性的承诺。2023年6月21日,一位名叫Delikely的安全研究员向Cybellum的安全团队报告了一个问题,特别针对Cybellum软件的某个发行版。这个问题是在Cybellum的QCOW......
  • 实时数仓之Flink消费kafka消息队列数据入hbase
    一、流程架构图 二、开源框架及本版选择    本次项目中用到的相关服务有:hadoop、zookeeper、kafka、maxwell、hbase、phoenix、flink   三、服务部署完成后,开发Flink主程序  3.1结构图如下:      3.2代码详细内容  3.2.1pom文件<?xml......