python下使用redis分布式锁
1.什么场景需要分布式锁?
我们在写业务逻辑的时候,如果多个线程同时访问某个共享变量,一般是对变量进行上锁或者使用 queue.Queue() 实现,以做到线程安全保证数据不被污染。
在单机部署的情况下这样做完全没问题,但是随着业务规模的发展,某些单机部署的系统需要进化成分布式集群系统,需要部署到多台服务器上去做负载均衡,这就会使得以前的并发控制策略失效,这个时候就需要一种跨机器的互斥机制来控制共享资源的访问,以保证线程安全,这个时候就需要使用到分布式锁。
2.分布式锁应该具备的条件:
① 在分布式系统环境下,同一时间只能有一个客户端能持有锁,且保证加锁和解锁的必须是同一个客户端
② 为了防止死锁,需要具备锁失效机制,即使一个客户端在持有锁期间崩溃没有主动释放锁,也能保证后续其他客户端能正常加锁
③ 具备非堵塞特性,没有获取到锁则直接返回获取锁失败。
3.redis 分布式锁的原理
redis 中的 SETNX 命令可以实现「key不存在才插入」:
如果 key 不存在,则显示插入成功,用来表示加锁成功
如果 key 存在,则会显示插入失败,用来表示加锁失败
通过 setnx 设置分布式锁,拿到这个锁的线程就可以执行业务代码,没有拿到的则只能进行等待,执行完业务代码后的线程需要通过 del key 释放锁,让其他线程能够重新获取,这样就能实现在分布式系统并发情况下,始终只有一个线程在执行业务代码。
4.demo
'''
pip install redis
'''
import time
import uuid
import redis
from threading import Thread
# redis 存字符串返回的是byte, 指定 decode_responses=True 可以解决
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
redis_cli = redis.Redis(connection_pool=pool)
# 加锁
def acquire_lock(lock_name, acquire_timeout=4, lock_timeout=7):
"""
param lock_name: 锁名称
param acquire_timeout: 客户端获取锁的超时时间
param lock_timeout: 锁过期时间, 超过这个时间锁自动释放
"""
identifier = str(uuid.uuid4())
end_time = time.time() + acquire_timeout # 客户端获取锁的结束时间
while time.time() <= end_time:
# setnx(key, value) 只有 key 不存在情况下将 key 设置为 value 返回 True
# 若 key 存在则不做任何动作,返回 False
if redis_cli.setnx(lock_name, identifier):
redis_cli.expire(lock_name, lock_timeout) # 设置锁的过期时间,防止线程获取锁后崩溃导致死锁
return identifier # 返回锁唯一标识
elif redis_cli.ttl(lock_name) == -1: # 当锁未被设置过期时间时,重新设置其过期时间
redis_cli.expire(lock_name, lock_timeout)
time.sleep(0.001)
return False # 获取超时返回 False
# 释放锁
def release_lock(lock_name, identifier):
"""
param lock_name: 锁名称
param identifier: 锁标识
"""
# 解锁操作需要在一个 redis 事务中进行,python 中 redis 事务通过 pipeline 封装实现
with redis_cli.pipeline() as pipe:
while True:
try:
# 使用 WATCH 监听锁,如果删除过程中锁自动失效又被其他客户端拿到,即锁标识被其他客户端修改
# 此时设置了 WATCH 事务就不会再执行,这样就不会出现删除了其他客户端锁的情况
pipe.watch(lock_name)
id = pipe.get(lock_name)
if id and id == identifier: # 判断解锁与加锁线程是否一致
pipe.multi() # 开启事务
pipe.delete(lock_name) # 标识相同,在事务中删除锁
pipe.execute() # 执行EXEC命令后自动执行UNWATCH
return True
pipe.unwatch()
break
except redis.WatchError:
pass
return False
# 发售商品件数
goods = 10
def exec_test(thread_name):
identifier = acquire_lock('jaye')
if identifier: # 如果获取到锁,则执行业务逻辑
print(f'{thread_name}获取 redis 分布式锁成功!')
time.sleep(3) # 模拟业务耗时
global goods
goods = goods - 1
print(f'------------------------{thread_name}抢到了东西------------------------------')
res = release_lock('jaye', identifier) # 处理完之后释放锁
print(f'{thread_name} 释放状态: {res}')
else:
print(f'{thread_name}获取 redis 分布式锁失败, 其他线程正在使用')
if __name__ == '__main__':
for i in range(1000):
t_name = f'thread_{i}'
t = Thread(target=exec_test, args=(t_name,))
t.start()
time.sleep(10)
print('还剩下{}'.format(goods))
5.注意
'''
ps:redis主从模式中数据的复制是异步的,如果在主节点获取到锁后,没来得及同步到从节点就宕机了,哨兵重新选举新的主节点后依然可以获取锁,也就导致多个服务可以同时拿到锁操作资源,这种情况下可以使用Redlock。
'''
标签:加锁,python,redis,线程,timeout,time,分布式
From: https://www.cnblogs.com/chunyouqudongwuyuan/p/17534228.html