首页 > 其他分享 >zookeeper examples within kazoo

zookeeper examples within kazoo

时间:2023-02-07 11:44:14浏览次数:147  
标签:node zookeeper name zk kazoo within logger children ZK

zookeeper样例,包含:

  • CRUD
  • retry
  • watcher
  • transaction

docker部署zookeeper

docker run -id --name zk -p 2181:2181 zookeeper:latest

样例代码

import sys
import logging
import os
import pdb
from datetime import datetime

import kazoo.exceptions as zkException
from kazoo.client import KazooClient, KazooState, KeeperState
from kazoo.retry import KazooRetry


logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger()

ZK_WORKDIR = "/workspace"
NODE_NAME = ""

zk = KazooClient(hosts="127.0.0.1:2181")
zk.start()


@zk.add_listener
def watch_for_ro(state):
    """
    zk read only mode watcher
    """
    if state == KeeperState.CONNECTED:
        if zk.client_state == KeeperState.CONNECTED_RO:
            logger.info("Read only mode!")
        else:
            logger.info("Read/Write mode!")


@zk.ChildrenWatch(ZK_WORKDIR)
def watch_workdir_children(children):
    """
    workdir children watcher
    """
    logger.info("Workdir children has been changed, children list: {}".format(children))


@zk.DataWatch(ZK_WORKDIR)
def watch_workdir_node(data, stat):
    """
    workdir node watcher
    """
    logger.info("Workdir node has been changed, data: {}".format(data.decode("utf-8")))


def zk_create():
    """
    zk create/ensure_path example
    """
    zk.ensure_path(ZK_WORKDIR)
    # generate current time
    global NODE_NAME
    NODE_NAME = datetime.now().strftime("%Y%m%d__%H_%M_%S")
    zk.create(os.path.join(ZK_WORKDIR, NODE_NAME), bytes(NODE_NAME, "utf-8"))

    zk.set(ZK_WORKDIR, bytes(NODE_NAME, "utf-8"))


def zk_get():
    """
    zk get/exists/get_children example
    """
    full_node_name = os.path.join(ZK_WORKDIR, NODE_NAME)
    if zk.exists(full_node_name):
        logger.info("Node {} exists.".format(NODE_NAME))
    else:
        logger.info("Node {} not exists.".format(NODE_NAME))

    # get node data
    data, stat = zk.get(full_node_name)
    logger.info("Node version: {}, data: {}".format(stat.version, data.decode("utf-8")))

    # list of workspace
    children = zk.get_children(ZK_WORKDIR)
    logger.info("There are {} children with names {}".format(len(children), children))


def zk_set():
    """
    zk set example
    """
    full_node_name = os.path.join(ZK_WORKDIR, NODE_NAME)

    data, stat = zk.get(full_node_name)
    source_data = data.decode("utf-8")

    zk.set(full_node_name, bytes(source_data + "-edited", "utf-8"))

    data, stat = zk.get(full_node_name)
    logger.info("Node version: {}, data: {}".format(stat.version, data.decode("utf-8")))


def zk_del():
    """
    zk delete example
    """
    full_node_name = os.path.join(ZK_WORKDIR, NODE_NAME)
    children = zk.get_children(ZK_WORKDIR)
    logger.info("There are {} children with names {}".format(len(children), children))

    zk.delete(full_node_name, recursive=True)

    children = zk.get_children(ZK_WORKDIR)
    logger.info("There are {} children with names {}".format(len(children), children))


def zk_crud():
    """
    zk crud example
    """
    # zk create/ensure_path
    zk_create()
    # zk get/exists/get_children
    zk_get()
    # zk set
    zk_set()
    # zk del
    zk_del()


def zk_retry():
    """
    zk retry example
    """
    kr = KazooRetry(max_tries=3, ignore_expire=False)
    result = kr(zk.get_children, ZK_WORKDIR)
    logger.info("Retry result: {}".format(result))


def zk_transaction():
    """
    zk transaction example
    """
    transaction = zk.transaction()
    transaction.check(ZK_WORKDIR, version=1)
    node_name = datetime.now().strftime("transaction__%Y%m%d__%H_%M_%S")
    transaction.create(node_name, bytes(node_name, "utf-8"))
    results = transaction.commit()
    logger.info("transaction result: {}".format(results))

    # check each result
    for result in results:
        if isinstance(result, zkException.BadVersionError):
            logger.error("BadVersionError occurred.")
        elif isinstance(result, zkException.RuntimeInconsistency):
            logger.error("RuntimeInconsistency occurred")


def main():
    """
    main logic
    """
    # zk crud example
    zk_crud()
    # zk retry
    # a better retry example maybe distributed lock
    zk_retry()
    # zk watcher example can be seen above
    # zk transaction
    zk_transaction()

    zk.stop()


if __name__ == '__main__':
    main()

标签:node,zookeeper,name,zk,kazoo,within,logger,children,ZK
From: https://www.cnblogs.com/JHSeng/p/17097867.html

相关文章

  • <<从Paxos到Zookeeper-分布式一致性原理与实践>>
    <<从Paxos到Zookeeper-分布式一致性原理与实践>>1分布式特点分布性对等性并发性缺乏全局时钟故障总会发生2分布式问题通信异常网络分区三态单体应用中一次......
  • [LeetCode] 2452. Words Within Two Edits of Dictionary
    Youaregiventwostringarrays, queries and dictionary.AllwordsineacharraycompriseoflowercaseEnglishlettersandhavethesamelength.Inone edi......
  • 基础Dubbo+基础zookeeper
    服务的提供方1.引入依赖2.改成war项目3.换成dubbo的@service注解4.配置文件 服务的消费者自动注入,换成dubbo的@reference远程调用,其他的一样把提供服务的接口,抽成一......
  • 【转载】 基于Zookeeper的分布式锁与领导选举
    【转载】基于Zookeeper的分布式锁与领导选举原创文章,转载请务必将下面这段话置于文章开头处。本文转发自技术世界,原文链接http://www.jasongj.com/zookeeper/distribu......
  • 57-Zookeeper集群和kafka消息队列集群
    MartinFowler发现所有成功的微服务都遵循了通用的模式-MonolithFirst(单体优先):几乎所有成功的微服务故事,都是从一个变得太大而被分解的单体开始的。几乎所有我听说过的从......
  • apache-zookeeper-3.7.1 安装部署
    apache-zookeeper-3.7.1安装部署下载地址:https://mirrors.bfsu.edu.cn/apache/zookeeper/apache-zookeeper-3.7.11.下载直接解压,进入../conf/目录下复制一份zoo_sample.......
  • Zookeeper不打印运行日志
     zookeeper的配置文件目录中缺少log4j.properties文件#Copyright2012TheApacheSoftwareFoundation##LicensedtotheApacheSoftwareFoundation(ASF)und......
  • Zookeeper算法基础
    第一章算法基础思考:Zookeeper是如何保证数据一致性的?这也是困扰分布式系统框架的一个难题。1.1拜占庭将军问题拜占庭将军问题是一个协议问题,拜占庭帝国军队的将军们......
  • Dubbo 中 Zookeeper 注册中心原理分析
    vivo互联网服务器团队-LiWanghong本文通过分析Dubbo中ZooKeeper注册中心的实现ZooKeeperResitry的继承体系结构,自顶向下分析了AbstractRegistry(提供了服务数据的本地......
  • 基于k8s的zookeeper搭建
    1.官方文档  https://kubernetes.io/zh-cn/docs/tutorials/stateful-application/zookeeper/2.k8s部署2.1.部署文件zookeeper.yamlapiVersion:v1kind:Service......