首页 > 编程语言 >【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

时间:2023-02-10 18:36:52浏览次数:57  
标签:partition group confluent Python lag kafka topic SSL offset

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误_SSL

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误_kafka_02


 完整的示例代码:

import confluent_kafka

topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"

# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': sasl_password,
'group.id': group_name})

print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)

for topic in topics:
# Get the topic's partitions
metadata = consumer.list_topics(topic, timeout=10)
if metadata.topics[topic].error is not None:
raise confluent_kafka.KafkaException(metadata.topics[topic].error)

# Construct TopicPartition list of partitions to query
partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]

# Query committed offsets for this group and the given partitions
committed = consumer.committed(partitions, timeout=10)

for partition in committed:
# Get the partitions low and high watermark offsets.
(lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

if partition.offset == confluent_kafka.OFFSET_INVALID:
offset = "-"
else:
offset = "%d" % (partition.offset)

if hi < 0:
lag = "no hwmark" # Unlikely
elif partition.offset < 0:
# No committed offset, show total message count as lag.
# The actual message count may be lower due to compaction
# and record deletions.
lag = "%d" % (hi - lo)
else:
lag = "%d" % (hi - partition.offset)

print("%-50s %9s %9s" % (
"{} [{}]".format(partition.topic, partition.partition), offset, lag))


consumer.close()


参考文档


confluent-kafka-python : ​https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py​

 


当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!

标签:partition,group,confluent,Python,lag,kafka,topic,SSL,offset
From: https://blog.51cto.com/u_13773780/6049667

相关文章

  • python的垃圾回收机制
    1.垃圾回收机制的算法分类python垃圾回收算法通常有三类:引用计数,标记清除和分代回收,主要以引用计数为主,标记清除和分代回收为辅 2.对象的存储方式——refchain环......
  • Python爬虫-第四章-5-高效抓取视频网站视频资源至本地
    本章内容:  91看剧抓取影视资源  流程:    1.获取影片播放页面源码    2.获取m3u8链接地址    3.下载m3u8文件    4.读取m3u8......
  • Python爬虫-第五章-1-超级鹰插件实现自动填写识别码并登录12306网站
    功能:自动打开浏览器,定位到网站登录界面,输入账户密码,填写识别码并登录到网站内部#DemoDescribe:12306登录案例importtimefromselenium.webdriverimportChromefromsele......
  • 基于Python的天气API
    ██████╗███████╗██████╗██╗██╗███████╗██╔═══██╗██╔════╝██╔══██╗╚██╗██╔╝██╔═══......
  • Python-知识点2 类型转换
    数据类型转换字符串-->整型(带小数点时将报错)语法:int(变量名)浮点型-->整型(去掉小数部分)语法:int(变量名)字符串-->浮点型语法:float(变量名)整型-->浮点型......
  • 【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误
    问题描述使用PythonSDK(Confluent)相关方法获取offset或lag时,提示SSL相关错误,是否有更清晰的实例以便参考呢? 问题解决执行代码,因为一直连接不成功,所以检查conflue......
  • 【Python】获取项目根目录
    以new_didi项目为例:  在任意路径下的文件内获取根目录信息  代码if__name__=="__main__":cur_path=os.path.abspath(os.path.dirname(__file__))......
  • Python3,我只用一段代码,就写了个词云生成器,YYDS!
    1、引言小鱼:小屌丝,你在干啥呢?小屌丝:鱼哥,你看,我的PPT写的高大尚不。小鱼:这有啥高大尚的啊,小屌丝:你仔细看,往下翻一页小鱼:额。你这那是PPT,就是浴皇大帝、昂科旗等车系......
  • python 对unicode字符进行normalized
    参考:https://blog.csdn.net/weixin_42401159/article/details/112187778在处理一些自然语言文字的过程中,会遇到一些表面很奇怪的现象。比如两个单词人肉眼看着一模一样,......
  • Python numpy数组操作(分割数组) 
    分割数组函数数组及操作split将一个数组分割为多个子数组hsplit将一个数组水平分割为多个子数组(按列)vsplit将一个数组垂直分割为多个子数......