首页 > 数据库 >第四章 Python操作redis(操作案例)

第四章 Python操作redis(操作案例)

时间:2024-08-22 20:37:47浏览次数:8  
标签:task Python redis msg key print 操作 id

一、python对redis基本操作

(1)连接redis

# 方式1
import redis

r = redis.Redis(host='127.0.0.1', port=6379)
r.set('foo', 'Bar')
print(r.get('foo'))


# 方式2
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('bar', 'Foo')
print(r.get('bar'))

通常情况下, 当我们需要做redis操作时, 会创建一个连接, 并基于这个连接进行redis操作, 操作完成后, 释放连接,一般情况下, 这是没问题的, 但当并发量比较高的时候, 频繁的连接创建和释放对性能会有较高的影响。于是, 连接池就发挥作用了。连接池的原理是, 通过预先创建多个连接, 当进行redis操作时, 直接获取已经创建的连接进行操作, 而且操作完成后, 不会释放, 用于后续的其他redis操作。这样就达到了避免频繁的redis连接创建和释放的目的, 从而提高性能。

(2)数据类型操作

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)

# (1)字符串操作:不允许对已经存在的键设置值
ret = r.setnx("name", "yuan")
print(ret)  # False
# (2)字符串操作:设置键有效期
r.setex("good_1001", 10, "2")
# (3)字符串操作:自增自减
r.set("age", 20)
r.incrby("age", 2)
print(r.get("age"))  # b'22'

# (4)hash操作:设置hash
r.hset("info", "name", "rain")
print(r.hget("info", "name"))  # b'rain'
r.hmset("info", {"gedner": "male", "age": 22})
print(r.hgetall("info"))  # {b'name': b'rain', b'gender': b'male', b'age': b'22'}

# (5)list操作:设置list
r.rpush("scores", "100", "90", "80")
r.rpush("scores", "70")
r.lpush("scores", "120")
print(r.lrange("scores", 0, -1))  # ['120', '100', '90', '80', '70']
r.linsert("scores", "AFTER", "100", 95)
print(r.lrange("scores", 0, -1))  # ['120', '100', '95', '90', '80', '70']
print(r.lpop("scores"))  # 120
print(r.rpop("scores"))  # 70
print(r.lindex("scores", 1)) # '95'

# (6)集合操作
# key对应的集合中添加元素
r.sadd("name_set", "zhangsan", "lisi", "wangwu")
# 获取key对应的集合的所有成员
print(r.smembers("name_set"))  # {'lisi', 'zhangsan', 'wangwu'}
# 从key对应的集合中随机获取 numbers 个元素
print(r.srandmember("name_set", 2))
r.srem("name_set", "lisi")
print(r.smembers("name_set"))  # {'wangwu', 'zhangsan'}

# (7)有序集合操作
# 在key对应的有序集合中添加元素
r.zadd("jifenbang", {"yuan": 78, "rain": 20, "alvin": 89, "eric": 45})
# 按照索引范围获取key对应的有序集合的元素
# zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
print(r.zrange("jifenbang", 0, -1))  # ['rain', 'eric', 'yuan', 'alvin']
print(r.zrange("jifenbang", 0, -1, withscores=True))  # ['rain', 'eric', 'yuan', 'alvin']
print(r.zrevrange("jifenbang", 0, -1, withscores=True))  # ['rain', 'eric', 'yuan', 'alvin']

print(r.zrangebyscore("jifenbang", 0, 100))
print(r.zrangebyscore("jifenbang", 0, 100, start=0, num=1))

# 删除key对应的有序集合中值是values的成员
print(r.zrem("jifenbang", "yuan"))  # 删除成功返回1
print(r.zrange("jifenbang", 0, -1))  # ['rain', 'eric', 'alvin']

# (8)键操作
r.delete("scores")
print(r.exists("scores"))
print(r.keys("*"))
r.expire("name",10)

二、关于redis的实战案例

(1)案例1:KV缓存

img

第1个是最基础也是最常?的就是KV功能,我们可以用Redis来缓存用户信息、会话信息、商品信息等等。下面这段代码就是通过缓存读取逻辑。

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=6, decode_responses=True)
r = redis.Redis(connection_pool=pool)


def get_user(user_id):
    user = r.get(user_id)
    if not user:
        user = UserInfo.objects.get(pk=user_id)
        r.setex(user_id, 3600, user)

    return user

(2)案例2:分布式锁

什么是分布式锁

分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

提到Redis的分布式锁,很多小伙伴马上就会想到setnx+ expire命令。即先用setnx来抢锁,如果抢到之后,再用expire给锁设置一个过期时间,防止锁忘记了释放。

SETNX 是SET IF NOT EXISTS的简写.日常命令格式是SETNX key value,如果 key不存在,则SETNX成功返回1,如果这个key已经存在了,则返回0。

假设某电商网站的某商品做秒杀活动,key可以设置为key_resource_id,value设置任意值,伪代码如下:

方案1

import redis

pool = redis.ConnectionPool(host='127.0.0.1')
r = redis.Redis(connection_pool=pool)
ret = r.setnx("key_resource_id", "ok")
if ret:
    r.expire("key_resource_id", 5)  # 设置过期时间
    print("抢购成功!")
    r.delete("key_resource_id")  # 释放资源
else:
    print("抢购失败!")

但是这个方案中,setnxexpire两个命令分开了,「不是原子操作」。如果执行完setnx加锁,正要执行expire设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」

方案2:SETNX + value值是(系统时间+过期时间)

为了解决方案一,「发生异常锁得不到释放的场景」,可以把过期时间放到setnx的value值里面。如果加锁失败,再拿出value值校验一下即可。加锁代码如下:

import time


def foo():
    expiresTime = time.time() + 10
    ret = r.setnx("key_resource_id", expiresTime)
    if ret:
        print("当前锁不存在,加锁成功")
        return True

    oldExpiresTime = r.get("key_resource_id")
    if float(oldExpiresTime) < time.time():  # 如果获取到的过期时间,小于系统当前时间,表示已经过期
        # 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
        newExpiresTime = r.getset("key_resource_id", expiresTime)
        if oldExpiresTime == newExpiresTime:
            #  考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁
            return True  # 加锁成功

    return False  # 其余情况加锁皆失败


foo()

方案3

实际上,我们还可以使用Py的redis模块中的set函数来保证原子性(包含setnx和expire两条指令)代码如下:

r.set("key_resource_id", "1", nx=True, ex=10)

(3)案例4:延迟队列

延时队列可以通过Redis的zset(有序列表)来实现。我们将消息序列化为一个字符串作为zset的值。这个消息的到期时间处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理,多线程时为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所有需要考虑并发争抢任务,确保任务不能被多次执行。

img

import time
import uuid

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True)
r = redis.Redis(connection_pool=pool)


def delay_task(task_name, delay_time):
    # 保证value唯一
    task_id = task_name + str(uuid.uuid4())

    retry_ts = time.time() + delay_time
    r.zadd("delay-queue", {task_id: retry_ts})


def loop():
    print("循环监听中...")
    while True:
        # 最多取1条
        task_list = r.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)

        if not task_list:
            # 延时队列空的,休息1s
            print("cost 1秒钟")
            time.sleep(1)
            continue
        task_id = task_list[0]
        success = r.zrem("delay-queue", task_id)
        if success:
            # 处理消息逻辑函数
            handle_msg(task_id)

def handle_msg(msg):
    """消息处理逻辑"""
    print(f"消息{msg}已经被处理完成!")


import threading

t = threading.Thread(target=loop)
t.start()

delay_task("任务1延迟5", 5)
delay_task("任务2延迟2", 2)
delay_task("任务3延迟3", 3)
delay_task("任务4延迟10", 10)

redis的zrem方法是对多线程争抢任务的关键,它的返回值决定了当前实例有没有抢到任务,因为loop方法可能会被多个线程、多个进程调用, 同一个任务可能会被多个进程线程抢到,通过zrem来决定唯一的属主。同时,一定要对handle_msg进行异常捕获, 避免因为个别任务处理问题导致的循环异常退出。

(4)案例5:发布订阅

subscribe channel # 订阅
publish channel mes # 发布消息
import threading

import redis

r = redis.Redis(host='127.0.0.1')


def recv_msg():
    pub = r.pubsub()

    pub.subscribe("fm104.5")
    pub.parse_response()

    while 1:
        msg = pub.parse_response()
        print(msg)


def send_msg():
    msg = input(">>>")
    r.publish("fm104.5", msg)


t = threading.Thread(target=send_msg)
t.start()

recv_msg()

(5)案例3:定时任务

利用 Redis 也能实现订单30分钟自动取消。

用户下单之后,在规定时间内如果不完成付款,订单自动取消,并且释放库存使用技术:Redis键空间通知(过期回调)用户下单之后将订单id作为key,任意值作为值存入redis中,给这条数据设置过期时间,也就是订单超时的时间启用键空间通知

开启过期key监听

from redis import StrictRedis

redis = StrictRedis(host='localhost', port=6379)

# 监听所有事件
# pubsub = redis.pubsub()
# pubsub.psubscribe('__keyspace@0__:*')
#
# print('Starting message loop')
# while True:
#     message = pubsub.get_message()
#     if message:
#         print(message)

# 监听过期key
def event_handler(msg):
    print("sss",msg)
    thread.stop()

pubsub = redis.pubsub()
pubsub.psubscribe(**{'__keyevent@0__:expired': event_handler})
thread = pubsub.run_in_thread(sleep_time=0.01)

标签:task,Python,redis,msg,key,print,操作,id
From: https://www.cnblogs.com/Tmars/p/18374698

相关文章

  • Python 基础:编程概念
    在黑客和网络安全领域,这通常意味着BASH和Python脚本。Python脚本在网络安全专业人士中最受欢迎,因为它拥有丰富的库和模块,可用于网络安全(你可以使用任何编程语言进行网络安全,但如果有人已经用Python等语言编写了轮子,那么你的生活就会轻松得多)。如果你检查Kali中的工具,你会......
  • 3.2 key操作
    key操作redis中所有的数据都是通过key(键)来进行操作,这里我们学习一下关于任何数据类型都通用的命令。(1)查找键参数支持简单的正则表达式keyspattern查看所有键keys*例子:#查看名称中包含`a`的键keys*a*#查看以a开头的键keysa*#查看以a结尾的键keys*a(2)判断键......
  • 队列操作(深入理解FreeRTOS队列之队列实战)
    文章目录一、队列的操作二、学习总结在FreeRTOS中,队列的本质是环形缓冲区。一、队列的操作1、创建队列2、写队列3、读队列详细可看此篇博客:FreeRTOS——队列(基于百问网DshanMCU-F103实现挡球板游戏改造)-CSDN博客基于链表解析队列的使用:代码示例:#include"......
  • python——concurrent.futures
    concurrent.futures是Python标准库中用于并行编程的高级模块,它提供了一种高级别的接口来管理线程和进程。通过这个模块,你可以轻松地利用多线程和多进程来并行执行任务,进而提高程序的执行效率。1.concurrent.futures概述concurrent.futures提供了两种执行器类型:Thre......
  • 第三章 redis数据类型
    redis数据类型redis可以理解成一个全局的大字典,key就是数据的唯一标识符。根据key对应的值不同,可以划分成5个基本数据类型。redis={"name":"yuan","scors":["100","89","78"],"info":{"name":"rain"......
  • 操作系统-线程
    一、线程介绍线程是操作系统能内够进行运算、执行的最小单位,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。​ 总结:线程是进程的一部分,是进程内负责执行的单位,进程是由资源单位(......
  • 【精选】基于Python的热门旅游景点数据分析系统的设计与实现(南京旅游,北京旅游,旅游网站
    目录: 系统简介:  关键技术介绍2.1PYTHON语言简介2.2MySql数据库2.3DJANGO框架2.4Hadoop介绍2.5Scrapy介绍2.6B/S架构 系统总功能结构设计系统详细实现:6系统测试系统测试的目的软件测试过程测试用例为什么选择我: 博主介绍:  ✌我是阿龙,一名......
  • Python系列(7)| 命名空间、作用域
     1.命名空间(Namespace)   Python中的命名空间(Namespace)和作用域是密切相关的概念。Python命名空间(Namespace)可以视为一个字典,其中键是变量名,值是与之关联的对象。   各个命名空间是独立的,同一个命名空间中不能有重名(重名的以后一个为准),不同的命名空间是......
  • PCA原理与水果成熟状态数据分析实例:Python中PCA-LDA 与卷积神经网络CNN
    全文链接:https://tecdat.cn/?p=37450 主成分分析(PCA)作为数据科学中用于可视化和降维的重要工具,在处理具有大量特征的数据集时非常有用。就像我们难以找到时间阅读一本1000页的书,而更倾向于2到3页的总结以抓住整体概貌一样,当数据集中特征过多时,PCA可以帮助我们减少维度,提......
  • 大厂面试官:你知道Redis如何实现分布式锁么?
    常见面试题,看完基本也没啥问题了Redis如何实现分布式锁分布式锁是用于分布式环境下并发控制的一种机制,用于控制某个资源在同一时刻只能被一个应用所使用。如下图所示:Redis本身可以被多个客户端共享访问,正好就是一个共享存储系统,可以用来保存分布式锁,而且Redis的读写......