首页 > 编程语言 >python-API开发zk客户端

python-API开发zk客户端

时间:2024-04-11 10:02:28浏览次数:27  
标签:10.0 logging zk python kazoo 2181 API 节点

前面于超老师讲完了,zk运维的基本命令行玩法,更多的还是开发需要通过代码和zk结合处理。
大多数场景是java后端去操作。
这里我们以运维更友好的python来学习。

1.kazoo模块

zookeeper是一个用于维护配置信息、命名、提供分布式同步和提供组服务。它自身是高可用的,只要宕机节点不达到半数,zookeeper服务都不会离线。zookeeper为实现分布式锁,分布式栅栏,分布式队列,安全的配置存储交换,在线状态监控,选举提供了坚实的基础。

在hadoop环境中,zookeeper被广泛应用。hadoop 高可用是依赖zookeeper的实现的。Hbase,storm,kafka都强依赖zookeeper,没有zookeeper根本都运行不起来。阿里的微服务治理框架dubbo也是依赖zookeeper的。

python访问zookeeper使用的的模块是kazoo。

模块文档
https://kazoo.readthedocs.io/en/latest/api/client.html

完成功能

1. 会话连接、恢复
2. 节点增删改查
3. watch、acl操作

kazoo获取zk服务端信息

1. 模块安装
pip3 install kazoo

# 于超老师这里的版本
kazoo==2.6.0
zookeeper==version--- (3, 5, 6)


2.链接zk

# -*- coding: UTF-8 -*-
'''
www.yuchaoit.cn with python ,zookeeper
'''
import sys
from kazoo.client import KazooClient, KazooState
import logging

logging.basicConfig(
    level=logging.DEBUG
    , stream=sys.stdout
    , format='query ok---%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

# 创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    # hosts='10.0.0.18:2181'
     hosts='10.0.0.18:2181,10.0.0.19:2181,10.0.0.20:2181'
    , timeout=10.0  # 连接超时时间
    ,logger=logging  # 传一个日志对象进行,方便 输出debug日志

)

# 开始心跳
zk.start()
# 获取子节点
znodes = zk.get_children('/')
print(znodes)


# 开始心跳
zk.start()
# 获取根节点数据和状态
data,stat=zk.get('/')
print('data---',data)
print('stat---',stat)

============
# 输出结果

data--- b''
stat--- ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=7, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=3, pzxid=30064771073)



'''
这个是stat的输出:
ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8448, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=30064771073)
ZnodeState的属性列表:
czxid : 创建这个节点时的zxid
mzxid : 修改这个节点时的zxid
ctime : 创建时间
mtime : 修改时间
version : 数据被修改的次数
cversion: 子节点被修改的次数
aversion: acl被改变的次数
ephemeralOwner:临时节点创建的用户,如果不是临时节点值为0
dataLength:节点数据长度
numChildren:子节点的数量
pzxid:子节点被修改的zxid
'''

#获取根节点的所有子节点,返回的是一个列表,只有子节点的名称
children = zk.get_children("/");
print(children)

#下面是根节点的返回值


#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

 

增删改查

增、查


# -*- coding: UTF-8 -*-
'''
www.yuchaoit.cn with python ,zookeeper
'''
import sys
from kazoo.client import KazooClient, KazooState
import logging

logging.basicConfig(
    level=logging.DEBUG
    , stream=sys.stdout
    , format='query ok---%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

# 创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    # hosts='10.0.0.18:2181'
     hosts='10.0.0.18:2181,10.0.0.19:2181,10.0.0.20:2181'
    , timeout=10.0  # 连接超时时间
    ,logger=logging  # 传一个日志对象进行,方便 输出debug日志

)

# 开始心跳
zk.start()

# 创建节点
# zk.create('/kazoo/yu1',b'good linux ,www.yuchaoit.cn',makepath=True) # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认

znodes=zk.get_children('/kazoo')
print(znodes)
data=zk.get('/kazoo/yu1')
print(data)


#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

修改节点


# -*- coding: UTF-8 -*-
'''
www.yuchaoit.cn with python ,zookeeper
'''
import sys
from kazoo.client import KazooClient, KazooState
import logging

logging.basicConfig(
    level=logging.DEBUG
    , stream=sys.stdout
    , format='query ok---%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

# 创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    # hosts='10.0.0.18:2181'
     hosts='10.0.0.18:2181,10.0.0.19:2181,10.0.0.20:2181'
    , timeout=10.0  # 连接超时时间
    ,logger=logging  # 传一个日志对象进行,方便 输出debug日志

)

# 开始心跳
zk.start()

# 创建节点
# zk.create('/kazoo/yu1',b'good linux ,www.yuchaoit.cn',makepath=True) # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认

# znodes=zk.get_children('/kazoo')
# print(znodes)
# data=zk.get('/kazoo/yu1')
# print(data)

zk.set('/kazoo/yu1',b'not bad linux,www.yuchaoit.cn')
data,stat=zk.get('/kazoo/yu1')
print('节点数据提取:',data)

#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

删除


# -*- coding: UTF-8 -*-
'''
www.yuchaoit.cn with python ,zookeeper
'''
import sys
from kazoo.client import KazooClient, KazooState
import logging

logging.basicConfig(
    level=logging.DEBUG
    , stream=sys.stdout
    , format='query ok---%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

# 创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    # hosts='10.0.0.18:2181'
     hosts='10.0.0.18:2181,10.0.0.19:2181,10.0.0.20:2181'
    , timeout=10.0  # 连接超时时间
    ,logger=logging  # 传一个日志对象进行,方便 输出debug日志

)

# 开始心跳
zk.start()

# 创建节点
# zk.create('/kazoo/yu1',b'good linux ,www.yuchaoit.cn',makepath=True) # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认

# znodes=zk.get_children('/kazoo')
# print(znodes)
# data=zk.get('/kazoo/yu1')
# print(data)

# zk.set('/kazoo/yu1',b'not bad linux,www.yuchaoit.cn')
# data,stat=zk.get('/kazoo/yu1')
# print('节点数据提取:',data)


# 删除节点
# 不删除父节点

zk.delete('/kazoo/yu1',recursive=False)

#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

watchers事件


# -*- coding: UTF-8 -*-
'''
www.yuchaoit.cn with python ,zookeeper
'''
import sys
from kazoo.client import KazooClient, KazooState
import logging

logging.basicConfig(
    level=logging.DEBUG
    , stream=sys.stdout
    , format='query ok---%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

# 创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    # hosts='10.0.0.18:2181'
     hosts='10.0.0.18:2181,10.0.0.19:2181,10.0.0.20:2181'
    , timeout=10.0  # 连接超时时间
    ,logger=logging  # 传一个日志对象进行,方便 输出debug日志

)

# 开始心跳
zk.start()

# 创建节点
# zk.create('/kazoo/yu1',b'good linux ,www.yuchaoit.cn',makepath=True) # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认

# znodes=zk.get_children('/kazoo')
# print(znodes)
# data=zk.get('/kazoo/yu1')
# print(data)

# zk.set('/kazoo/yu1',b'not bad linux,www.yuchaoit.cn')
# data,stat=zk.get('/kazoo/yu1')
# print('节点数据提取:',data)


# # 删除节点
# # 不删除父节点
# #  参数 recursive:若为 False,当需要删除的节点存在子节点,会抛异常 NotEmptyError 。若为True,则删除 此节点 以及 删除该节点的所有子节点
# zk.delete('/kazoo/yu1',recursive=False)


# 监控器
def t1(event):
    print('以触发')

zk.get('/kazoo',watch=t1)
print('第一次获取value')
zk.set('/kazoo',b'www.yuchaoit.cn')
zk.get('/kazoo',watch=t1)
print('第二次获取value')

#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

'''
输出
第一次获取value
以触发
第二次获取value
'''

遍历所有子节点

import sys
from kazoo.client import KazooClient, KazooState
import logging

from importlib import reload

reload(sys)

logging.basicConfig(
    level=logging.DEBUG
    , stream=sys.stdout
    , format='query ok---%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

# 创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    # hosts='10.0.0.18:2181'
     hosts='10.0.0.18:2181,10.0.0.19:2181,10.0.0.20:2181'
    , timeout=10.0  # 连接超时时间
    ,logger=logging  # 传一个日志对象进行,方便 输出debug日志

)

#递归遍历所有节点的子节点函数,_zk是KazooClient的对象,node是节点名称字符串,func是回调函数
def zk_walk(_zk, node, func):
    data, stat = _zk.get(node)
    children = _zk.get_children(node)
    func(node, data, stat, children)
    if len(children) > 0:
        for sub in children:
            sub_node = ''
            if node != '/':
                sub_node = node + '/' + sub
            else:
                sub_node = '/' + sub
            zk_walk(_zk, sub_node, func)

#测试zk_walk的打印回调函数,只是把所有数据都打印出来
def printZNode(node,  data, stat, children):
    print("node  : " + node)
    print("data  : " + str(data))
    print("stat  : " + str(stat))
    print("child : " + str(children))
    print('-'*50)

#开始心跳
zk.start()

#遍历谋个节点的所有子节点
zk_walk(zk, '/', printZNode)

#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

python操作zk基本玩法就到这,更多的是如从事python大数据的开发工程师,需要继续看kazoo模块的官网文档,研究其他用法。

标签:10.0,logging,zk,python,kazoo,2181,API,节点
From: https://www.cnblogs.com/sxy-blog/p/18128155

相关文章

  • 华为云发布CodeArts API,为API护航
    本文分享自华为云社区《华为云发布CodeArtsAPI,为API护航》,作者:华为云头条。华为云正式发布API全生命周期管理一体化协作平台CodeArtsAPI,支持开发者高效实现API设计、开发、测试、托管、运维、变现的一站式体验。以API契约为锚点,华为云CodeArtsAPI保证了API各阶段数据高度一......
  • 两个纯数字字符串相加Python实现版
    """两个字符串相加模拟两个大整数相加,但是不能直接相加,采用每一位相加的方式"""defadd_large_numbers(num1,num2):#反转字符串,方便从低位开始相加num1=num1[::-1]num2=num2[::-1]#初始化结果列表和进位result=[]carry=0#......
  • Lumos学习python第九课:VSCode+Anaconda
    注意Anaconda版本和Python版本的对应关系,同一个Anaconda可以支持多个Python版本,注:现在vscode已原生支持jupyternotebook(要求Python版本>=3.6)Anaconda在Python解析器的基础上封装了很多Python包,尤其是涉及科学计算的,不用一个个下载,非常方便,且自带的conda包管理工具比较好用。......
  • 前端学习-vue视频学习015-其他API
    尚硅谷视频教程shallowRefshallowReactive浅层次的响应式数据(仅第一层)shallowRef:只能整体修改person.value可以修改,但是person.value.name无法修改shallowReactive:只能修改对象的第一层数据car.brand可以修改,但是car.options.color无法修改主要用处在于:如果数据量非常......
  • 小程序 Api promise 化
      工具-构建npm-miniprogram_npm下出现miniprogram-api-promise即可正常使用 使用方法:如果提示没有找到miniprogram-api-promise,可以清除缓存重新编译试一下  ......
  • 前后端分离开发和接口文档管理平台YAPI以及前端工程化(Vue-cli)
    前后端分离开发和接口文档管理平台YAPI以及前端工程化(Vue-cli)前后端分离开发需求分析=>接口定义(API接口文档)=>前后端并行开发(遵守规范)=>测试(前端、后端)=>前后端联调测试YApi1.介绍:YApi是高效、易用、功能强大的api管理平台,旨在为开发、产品、测试人员提供更优雅的接口管理服......
  • Python 模块化设计
             模块化设计是一种软件设计方法,它将程序分解成小的、独立的部分,这些部分称为模块。每个模块都有它的功能,并且设计成可以被其他模块重用。在Python中,模块化设计不仅有助于代码的组织和维护,还能提升代码复用性和开发效率。        1.定义模块在Pyth......
  • 【测试开发学习历程】python迭代、可迭代对象、迭代器、生成器
    1迭代Iteration迭代Iteration:所谓迭代就是重复运行一段代码语句块的能力,就好比在一个容器中进行一层一层遍历数据,在应用过程中for循环最为突出。迭代就是从某个容器对象中逐个地读取元素,直到容器中没有元素为止。迭代迭代,更新换代,在上一次基础上更新成新的东西。#使用for循......
  • 【测试开发学习历程】python高阶函数
    目录1map()函数2reduce()函数3filter()函数4sorted()函数1map()函数map()函数语法:map(function,iterable)参数:function:函数iterable:一个或多个序列返回值:迭代器对象作用:map()是Python内置的高阶函数,它接收一个函数function和一个iterable,并通过把......
  • 2024年3月电子学会青少年软件编程 中小学生Python编程等级考试一级真题解析(判断题)
    2024年3月Python编程等级考试一级真题解析判断题(共10题,每题2分,共20分)26、turtle画布的坐标系原点是在画布的左上角答案:错考点分析:考查turtle相关知识,turtle画布坐标系是在画布的中点,答案错误27、Python变量名区分大小写,book和BOOK不是同一个变量答案:对考点分析:考查......