首页 > 数据库 >python下使用redis分布式锁

python下使用redis分布式锁

时间:2023-07-07 10:55:55浏览次数:49  
标签:加锁 python redis 线程 timeout time 分布式

python下使用redis分布式锁

1.什么场景需要分布式锁?

我们在写业务逻辑的时候,如果多个线程同时访问某个共享变量,一般是对变量进行上锁或者使用 queue.Queue() 实现,以做到线程安全保证数据不被污染。

在单机部署的情况下这样做完全没问题,但是随着业务规模的发展,某些单机部署的系统需要进化成分布式集群系统,需要部署到多台服务器上去做负载均衡,这就会使得以前的并发控制策略失效,这个时候就需要一种跨机器的互斥机制来控制共享资源的访问,以保证线程安全,这个时候就需要使用到分布式锁。

2.分布式锁应该具备的条件:

① 在分布式系统环境下,同一时间只能有一个客户端能持有锁,且保证加锁和解锁的必须是同一个客户端

② 为了防止死锁,需要具备锁失效机制,即使一个客户端在持有锁期间崩溃没有主动释放锁,也能保证后续其他客户端能正常加锁

③ 具备非堵塞特性,没有获取到锁则直接返回获取锁失败。

3.redis 分布式锁的原理

redis 中的 SETNX 命令可以实现「key不存在才插入」:

如果 key 不存在,则显示插入成功,用来表示加锁成功

如果 key 存在,则会显示插入失败,用来表示加锁失败

通过 setnx 设置分布式锁,拿到这个锁的线程就可以执行业务代码,没有拿到的则只能进行等待,执行完业务代码后的线程需要通过 del key 释放锁,让其他线程能够重新获取,这样就能实现在分布式系统并发情况下,始终只有一个线程在执行业务代码。

4.demo

'''
	pip install redis
'''

import time
import uuid
import redis
from threading import Thread

# redis 存字符串返回的是byte, 指定 decode_responses=True 可以解决
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
redis_cli = redis.Redis(connection_pool=pool)


# 加锁
def acquire_lock(lock_name, acquire_timeout=4, lock_timeout=7):
    """
    param lock_name: 锁名称
    param acquire_timeout: 客户端获取锁的超时时间
    param lock_timeout: 锁过期时间, 超过这个时间锁自动释放
    """
    identifier = str(uuid.uuid4())
    end_time = time.time() + acquire_timeout  # 客户端获取锁的结束时间
    while time.time() <= end_time:
        # setnx(key, value) 只有 key 不存在情况下将 key 设置为 value 返回 True
        # 若 key 存在则不做任何动作,返回 False
        if redis_cli.setnx(lock_name, identifier):
            redis_cli.expire(lock_name, lock_timeout)  # 设置锁的过期时间,防止线程获取锁后崩溃导致死锁
            return identifier  # 返回锁唯一标识
        elif redis_cli.ttl(lock_name) == -1:  # 当锁未被设置过期时间时,重新设置其过期时间
            redis_cli.expire(lock_name, lock_timeout)
        time.sleep(0.001)
    return False  # 获取超时返回 False


# 释放锁
def release_lock(lock_name, identifier):
    """
    param lock_name:   锁名称
    param identifier:  锁标识
    """
    # 解锁操作需要在一个 redis 事务中进行,python 中 redis 事务通过 pipeline 封装实现
    with redis_cli.pipeline() as pipe:
        while True:
            try:
                # 使用 WATCH 监听锁,如果删除过程中锁自动失效又被其他客户端拿到,即锁标识被其他客户端修改
                # 此时设置了 WATCH 事务就不会再执行,这样就不会出现删除了其他客户端锁的情况
                pipe.watch(lock_name)
                id = pipe.get(lock_name)
                if id and id == identifier:  # 判断解锁与加锁线程是否一致
                    pipe.multi()  # 开启事务
                    pipe.delete(lock_name)  # 标识相同,在事务中删除锁
                    pipe.execute()  # 执行EXEC命令后自动执行UNWATCH
                    return True
                pipe.unwatch()
                break
            except redis.WatchError:
                pass
        return False


# 发售商品件数
goods = 10


def exec_test(thread_name):
    identifier = acquire_lock('jaye')
    if identifier:  # 如果获取到锁,则执行业务逻辑
        print(f'{thread_name}获取 redis 分布式锁成功!')
        time.sleep(3)  # 模拟业务耗时
        global goods
        goods = goods - 1
        print(f'------------------------{thread_name}抢到了东西------------------------------')
        res = release_lock('jaye', identifier)  # 处理完之后释放锁
        print(f'{thread_name} 释放状态: {res}')
    else:
        print(f'{thread_name}获取 redis 分布式锁失败, 其他线程正在使用')


if __name__ == '__main__':
    for i in range(1000):
        t_name = f'thread_{i}'
        t = Thread(target=exec_test, args=(t_name,))
        t.start()
    time.sleep(10)
    print('还剩下{}'.format(goods))

5.注意

'''
    ps:redis主从模式中数据的复制是异步的,如果在主节点获取到锁后,没来得及同步到从节点就宕机了,哨兵重新选举新的主节点后依然可以获取锁,也就导致多个服务可以同时拿到锁操作资源,这种情况下可以使用Redlock。
'''

标签:加锁,python,redis,线程,timeout,time,分布式
From: https://www.cnblogs.com/chunyouqudongwuyuan/p/17534228.html

相关文章

  • python wincon32 word复制
    defword_copy(f1,f2):app=win32com.client.Dispatch('Word.Application')#打开word,经测试要是绝对路径doc=app.Documents.Open(f1)#复制word的所有内容doc.Content.Copy()#关闭worddoc.Close()word=win32com.client.Dispatc......
  • 这100道Python面试题,你会做几道?【21~25题】
    二十一、请介绍下TCP和UDP的区别TCP(TransmissionControlProtocol)和UDP(UserDatagramProtocol)是两种常见的传输层协议,用于在计算机网络中传输数据。它们在数据传输方面有以下区别:连接导向vs无连接:TCP是面向连接的协议,它在通信之前建立了一个可靠的连接。连接的建立过程包......
  • Centos7安装python
    1.yum方式默认已经安装2.7.5版本#若没有执行[root@master~]#yuminstall-ypython#查看版本[root@master~]#python-VPython2.7.52.安装python3linux版本下载网址#安装编译python3所用到的相关依赖yuminstallzlibzlib-develbzip2-developenssl-develncur......
  • python新发地每日菜价提取
    importrequestsimportcsvimporttimeclassprice_spider(object):def__init__(self):self.headers={"User-Agent":"Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome......
  • R语言和Python用泊松过程扩展:霍克斯过程Hawkes Processes分析比特币交易数据订单到达
    全文下载链接:http://tecdat.cn/?p=25880 最近我们被客户要求撰写关于泊松过程的研究报告,包括一些图形和统计输出。本文描述了一个模型,该模型解释了交易的聚集到达,并展示了如何将其应用于比特币交易数据。这是很有趣的,原因很多。例如,对于交易来说,能够预测在短期内是否有更多的买......
  • python:导入库、模块失败
    一般发生在程序开始部分:frompymodbus.client.syncimportModbusSerialClientfrompymodbus.payloadimportBinaryPayloadDecoderfrompymodbus.constantsimportEndianfrompymodbus.compatimportiteritemsimporttimeimportthreadingimportjsonfromdeviceimpor......
  • Redis学习笔记(上)
    Redisremotedictionaryserver远程字典服务是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、key-value数据库,并提供多种语言的API。和memcached一样,为了保证效率,数据都是缓存在内存中的。区别的是redis会周期性的把更新的数据写入磁盘或者修......
  • SRGAN图像超分重建算法Python实现(含数据集代码)
    摘要:本文介绍深度学习的SRGAN图像超分重建算法,使用Python以及Pytorch框架实现,包含完整训练、测试代码,以及训练数据集文件。博文介绍图像超分算法的原理,包括生成对抗网络和SRGAN模型原理和实现的代码,同时结合具体内容进行解释说明,完整代码资源文件请转至文末的下载链接。完整......
  • python: using pdfplumber Lib read pdf file
     fromopenpyxlimportWorkbookfromopenpyxl.stylesimportPatternFill,Side,Borderimportpdfplumberl=[]defvisitDir(path):ifnotos.path.isdir(path):print('Error:"',path,'"isnotadirectoryordoesnotexi......
  • redis的订阅发布功能中,前端如何监听到消息,并修改前端页面。
    ......