zookeeper样例,包含:
- CRUD
- retry
- watcher
- transaction
docker部署zookeeper
docker run -id --name zk -p 2181:2181 zookeeper:latest
样例代码
import sys
import logging
import os
import pdb
from datetime import datetime
import kazoo.exceptions as zkException
from kazoo.client import KazooClient, KazooState, KeeperState
from kazoo.retry import KazooRetry
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger()
ZK_WORKDIR = "/workspace"
NODE_NAME = ""
zk = KazooClient(hosts="127.0.0.1:2181")
zk.start()
@zk.add_listener
def watch_for_ro(state):
"""
zk read only mode watcher
"""
if state == KeeperState.CONNECTED:
if zk.client_state == KeeperState.CONNECTED_RO:
logger.info("Read only mode!")
else:
logger.info("Read/Write mode!")
@zk.ChildrenWatch(ZK_WORKDIR)
def watch_workdir_children(children):
"""
workdir children watcher
"""
logger.info("Workdir children has been changed, children list: {}".format(children))
@zk.DataWatch(ZK_WORKDIR)
def watch_workdir_node(data, stat):
"""
workdir node watcher
"""
logger.info("Workdir node has been changed, data: {}".format(data.decode("utf-8")))
def zk_create():
"""
zk create/ensure_path example
"""
zk.ensure_path(ZK_WORKDIR)
# generate current time
global NODE_NAME
NODE_NAME = datetime.now().strftime("%Y%m%d__%H_%M_%S")
zk.create(os.path.join(ZK_WORKDIR, NODE_NAME), bytes(NODE_NAME, "utf-8"))
zk.set(ZK_WORKDIR, bytes(NODE_NAME, "utf-8"))
def zk_get():
"""
zk get/exists/get_children example
"""
full_node_name = os.path.join(ZK_WORKDIR, NODE_NAME)
if zk.exists(full_node_name):
logger.info("Node {} exists.".format(NODE_NAME))
else:
logger.info("Node {} not exists.".format(NODE_NAME))
# get node data
data, stat = zk.get(full_node_name)
logger.info("Node version: {}, data: {}".format(stat.version, data.decode("utf-8")))
# list of workspace
children = zk.get_children(ZK_WORKDIR)
logger.info("There are {} children with names {}".format(len(children), children))
def zk_set():
"""
zk set example
"""
full_node_name = os.path.join(ZK_WORKDIR, NODE_NAME)
data, stat = zk.get(full_node_name)
source_data = data.decode("utf-8")
zk.set(full_node_name, bytes(source_data + "-edited", "utf-8"))
data, stat = zk.get(full_node_name)
logger.info("Node version: {}, data: {}".format(stat.version, data.decode("utf-8")))
def zk_del():
"""
zk delete example
"""
full_node_name = os.path.join(ZK_WORKDIR, NODE_NAME)
children = zk.get_children(ZK_WORKDIR)
logger.info("There are {} children with names {}".format(len(children), children))
zk.delete(full_node_name, recursive=True)
children = zk.get_children(ZK_WORKDIR)
logger.info("There are {} children with names {}".format(len(children), children))
def zk_crud():
"""
zk crud example
"""
# zk create/ensure_path
zk_create()
# zk get/exists/get_children
zk_get()
# zk set
zk_set()
# zk del
zk_del()
def zk_retry():
"""
zk retry example
"""
kr = KazooRetry(max_tries=3, ignore_expire=False)
result = kr(zk.get_children, ZK_WORKDIR)
logger.info("Retry result: {}".format(result))
def zk_transaction():
"""
zk transaction example
"""
transaction = zk.transaction()
transaction.check(ZK_WORKDIR, version=1)
node_name = datetime.now().strftime("transaction__%Y%m%d__%H_%M_%S")
transaction.create(node_name, bytes(node_name, "utf-8"))
results = transaction.commit()
logger.info("transaction result: {}".format(results))
# check each result
for result in results:
if isinstance(result, zkException.BadVersionError):
logger.error("BadVersionError occurred.")
elif isinstance(result, zkException.RuntimeInconsistency):
logger.error("RuntimeInconsistency occurred")
def main():
"""
main logic
"""
# zk crud example
zk_crud()
# zk retry
# a better retry example maybe distributed lock
zk_retry()
# zk watcher example can be seen above
# zk transaction
zk_transaction()
zk.stop()
if __name__ == '__main__':
main()
标签:node,zookeeper,name,zk,kazoo,within,logger,children,ZK
From: https://www.cnblogs.com/JHSeng/p/17097867.html