首页 > 编程语言 >Python连接Etcd集群基础教程

Python连接Etcd集群基础教程

时间:2024-06-22 12:54:36浏览次数:28  
标签:基础教程 Etcd Python lock self watch etcd key print

1、背景介绍

最近接手了一个项目,项目是使用Python开发的,其中使用到了Etcd,但是项目之前开发的方式,只能够支持单节点连接Etcd,不能够在Etcd节点发生故障时,自动转移。因此需要实现基于现有etcd sdk 开发一个能够实现故障转移的功能,或者更换etcd sdk来实现故障转移等功能。

先来看看项目之前使用到的 etcd 库,即 python-etcd3,通过给出的示例,没有看到可以连接多节点的方式,深入到源码后,也没有发现可以连接多节点的方式,基本上可以断定之前使用到的 etcd sdk 不支持集群方式了。因为项目中不仅仅是使用到了简单的 get、put、delete 等功能,还用到了 watch、lock等功能,所以最好是找到一个可以替换的 sdk,这样开发周期可以缩短,并且也可以减少工作量。

2、寻找可替换的SDK

网上搜了下,发现用的比较多的几个库都不支持集群方式连接,而且也蛮久没有更新了。比如: etcd3-pypython-etcd3

那重新找一个 etcd 的sdk 吧,然后在 github 上面搜索,最开始按照默认推荐顺序看了好几个源代码,都是不支持集群方式连接的。

都有点心灰意冷了,突然想到可以换一下 github 的推荐顺序,换成最近有更新的,然后我换成了 Recently updated 搜索,然后从前往后看,在第二页看到了一个库,点击去看了下源代码,发现是通过 grpc 方式调用的 etcd server,点进去看 client.py 文件,看到有一个类是: MultiEndpointEtcd3Client,突然眼前一亮,难道可以,然后更加文档安装了对于的 sdk ,测试发现可以集群连接。

发现可以集群连接后,接下来就是看看项目中用到的其他功能,可以正常使用不,比如: watch、lock 。测试发现都可以正常使用。

接下来就是集成到项目中了,这里就不仔细介绍,大家根据自己实际情况自行调整。

3、etcd-sdk-python 连接集群

官方教程

etcd-sdk-python 连接集群方式比较简单,需要先创建 Endpoint,然后作为参数,传给 MultiEndpointEtcd3Client。

from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError

# time_retry 的意思是,当这个节点连接失败后,多少秒后再次去尝试连接
e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=30)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=30)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=30)

# failover 的意思是,当节点发生故障时,是否进行故障转移,这个参数一定要设置为True,否则当一个节点发生故障时,会报错
c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)

l = c.lease(10)

data = {"data": 8000}
c.put("/test_ttl", json.dumps(data).encode("utf-8"), lease=l)

time.sleep(5)
b = c.get("/test_ttl")
print(dir(b))

print(dir(b[0]))
print(dir(b[1]))
print(b[1].lease_id)

4、实现一个简约的自动续约的分布式锁

import math
from threading import Thread
import time

from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError

e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=2)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=2)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=2)

c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)


class EtcdGlobalMutex(object):

    def __init__(self, etcd_client, lock_key, ttl=5, acquire_timeout=2):
        """

        :param etcd_client: 已连接的etcd客户端
        :param lock_key: 分布式锁key
        :param ttl: key的有效期
        :param acquire_timeout: 尝试获取锁的最长等待时间
        """
        self.etcd_client = etcd_client
        self.lock_key = lock_key
        self.ttl = ttl if ttl else 5
        self.acquire_timeout = acquire_timeout if acquire_timeout else 2
        self.locker = etcd_client.lock(lock_key, ttl)

    def _acquire(self):
        self.locker.acquire(timeout=self.acquire_timeout)

    def _refresh_lock(self):
        """
        刷新lock,本质上就是更新 key 的ttl
        :return:
        """
        # 向上取整
        seconds = math.ceil(self.ttl / 2)
        if seconds == 1 and self.ttl == 1:
            seconds = 0.5

        while True:
            try:
                self.locker.refresh()
            except ConnectionFailedError as e:
                # 测试发现,当etcd集群一个节点故障时,可能会出现这个错误
                print(f"refresh_lock. lock_key:{self.lock_key}. ConnectionFailedError, err:{e}")

            except Exception as e1:
                # 非期望错误,退出,防止线程不能退出
                print(f"refresh_lock. lock_key:{self.lock_key}. unexpected error. err:{e1}")
                return

            time.sleep(seconds)

    def try_lock(self):
        """
        尝试获取锁,当获取不到锁时,会监听对应的key,当key消失时,会再次尝试获取锁
        :return:
        """
        try:
            self._acquire()
        except ConnectionFailedError as e:
            print(f"try_lock. lock_key:{self.lock_key}. ConnectionFailedError. err:{e}")
            time.sleep(1)

            self.try_lock()

        if self.locker.is_acquired():
            print(f"try_lock. lock_key:{self.lock_key}. Lock acquired successfully")
            # 启动刷新锁的线程
            t1 = Thread(target=self._refresh_lock)
            t1.start()
        else:
            print(f"try_lock. lock_key:{self.lock_key}. Failed to acquire lock")
            self._watch_key()

    def _watch_key(self):
        """
        监听 key
        :return: 
        """
        # 写入etcd的key
        real_key = f"/locks/{self.lock_key}"
        cancel = None
        try:
            print(f"watch_key. lock_key:{self.lock_key}")
            # watch 需要捕获异常,这样当一个etcd节点挂掉后,还能够正常 watch
            events_iterator, cancel = self.etcd_client.watch(real_key)
            for event in events_iterator:
                print(f"watch_key. lock_key:{self.lock_key}. event: {event}")
                cancel()
                break
        except ConnectionFailedError as e:
            print(f"watch_key. lock_key:{self.lock_key}, ConnectionFailedError err:{e}")
            if cancel:
                cancel()
            time.sleep(1)
            
            self.etcd_client._clear_old_stubs()

            self._watch_key()

        self.try_lock()


def main():
    name = 'lock_name'
    e = EtcdGlobalMutex(c, name, ttl=10)
    e.try_lock()

    while True:
        print("Main thread sleeping")
        time.sleep(2)


if __name__ == "__main__":
    main()


5、watch key 如何实现?

如果只是单纯的实现一个 watch key 功能,没啥好说的,看看官方给的 api 就可以,因为测试的时候,发现如果一个 etcd 节点挂掉,而这个节点有正好是连接的节点,会出现报错,这个时候需要做一些异常捕获处理。

import math
from threading import Thread
import time

from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError
from pyetcd.events import PutEvent

e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=2)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=2)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=2)

c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)

look_key = "look_key"

def watch(self):
    print('MonitorEqp is watching')
    cancel = None
    try:
        events_iterator, cancel = c.watch_prefix(look_key)

        self.watch_key(events_iterator)
    except  ConnectionFailedError as e:
        # 重点就是这里的异常处理
        print(f"MonitorEqp. ConnectionFailedError, err:{e}")
        if cancel:
            cancel()
        time.sleep(1)

        c._clear_old_stubs()
        watch()
    except Exception as e1:
        # 非期望错误,退出,防止线程不能退出
        print(f"MonitorEqp.  unexpected error. err:{e1}")
        if cancel:
            cancel()

        return


def watch_key(self, events_iterator):
    print("coming watch_key")
    for watch_msg in events_iterator:
        print(watch_msg)
        if type(watch_msg) != PutEvent:
            # 如果不是watch响应的Put信息, 忽略
            continue

        # xxx 处理监听到的信息

通过上面的学习,对 etcd-sdk-python 有一个基础的认识。

哈哈,再次感谢大佬的贡献!

6、部署 etcd 集群

集群部署可以看我之前写的文章 02、etcd单机部署和集群部署

标签:基础教程,Etcd,Python,lock,self,watch,etcd,key,print
From: https://www.cnblogs.com/huageyiyangdewo/p/18261962

相关文章

  • 【深度学习】python之人工智能应用篇——图像生成
    图像生成是计算机视觉和计算机图形学领域的一个重要研究方向,它指的是通过计算机算法和技术生成或合成图像的过程。随着深度学习、生成模型等技术的发展,图像生成领域取得了显著的进步,并在多个应用场景中发挥着重要作用。概述图像生成技术主要依赖于各种生成模型和算法,用于从文......
  • Pycharm或cmd在Terminal中运行tensorboard、pip等python包
    这个主要是添加python包的路径到环境变量里因为装了anaconda,所以我们要找的是对应虚拟环境里的包路径,一般是放在anaconda安装路径下的anaconda3\envs\环境名\Scripts里然后找到环境变量找到Path把文件路径添加这样就可以运行pip、tensorboard等包了......
  • 【MySQL连接器(Python)指南】02-MySQL连接器(Python)版本与实现
    文章目录前言MySQL连接器(Python)版本MySQL连接器(Python)实现总结前言MySQL连接器(Python),用于让Python程序能够访问MySQL数据库。要想让Python应用程序正确高效地使用MySQL数据,就需要深入了解MySQL连接器的特性和使用方法。MySQL连接器(Python)版本下表......
  • 【猫狗识别系统】图像识别Python+TensorFlow+卷积神经网络算法+人工智能深度学习
    猫狗识别系统。通过TensorFlow搭建MobileNetV2轻量级卷积神经算法网络模型,通过对猫狗的图片数据集进行训练,得到一个进度较高的H5格式的模型文件。然后使用Django框架搭建了一个Web网页端可视化操作界面。实现用户上传一张图片识别其名称。一、前言本研究中,我们开发了一个基于深......
  • 双基回文数(python练习)
    编写一个程序来检查一个数字是否是双基回文数。回文是指从前往后读和从后往前读都一样的字母、数字的序列。双基回文数是指在十进制和二进制表示中都是回文的数字。例如:585=1001001001是一个双基回文,其二进制是回文形式,十进制也是回文形式。定义函数check_double_base_......
  • 聪明办法学 Python第5节:循环
    作业链接:https://hydro.ac/d/datawhale_p2s/user/53146for循环和循环范围for循环的特点基于提供的范围,重复执行特定次数的操作defsumFromMToN(m,n):total=0#注意:range(x,y)是左闭右开区间,包含x,不包含yforxinrange(m,n+1):total......
  • 「Python爬虫」最细致的讲解Python爬虫之Python爬虫入门(一)
    一、认识爬虫1.1、什么是爬虫?爬虫:一段自动抓取互联网信息的程序,从互联网上抓取对于我们有价值的信息。1.2、Python爬虫架构调度器:相当于一台电脑的CPU,主要负责调度URL管理器、下载器、解析器之间的协调工作。URL管理器:包括待爬取的URL地址和已爬取的URL地址,防止重复抓取UR......
  • Python进大厂比赛中的特征工程与模型训练
    Python进大厂比赛中的特征工程与模型训练一、引言二、技术概述特征工程模型训练三、技术细节特征工程模型训练四、实战应用五、优化与改进特征工程模型训练六、常见问题特征工程模型训练七、总结与展望一、引言Python作为数据科学界的明星语言,其在机器学习、数......
  • 超越datetime:Arrow,Python中的日期时间管理大师
    介绍Arrow是一个Python库,它提供了一种合理且对人类友好的方法来创建、操作、格式化和转换日期、时间和时间戳。它实现了对datetime类型的更新,填补了功能上的空白,提供了一个智能的模块API,支持许多常见的创建场景。简单来说,它可以帮助您使用更少的导入和更少的代码来处理日期和时间......
  • python期末考试(个人理解)主要内容为函数和文件与数据格式化(三)持续更新
    如有错误,敬请更新!!!函数的概述:将一串代码打包成一个包,为了以后方便使用函数的定义:使用关键字(保留字)def来定义defmy_function():  #函数体  print("Hello,World!")函数的调用#定义一个函数,它接受两个参数并返回它们的和defadd_numbers(a,b):  result=......