首页 > 数据库 >最新版 redis-py 操作 redis(同步、异步、集群、连接池)

最新版 redis-py 操作 redis(同步、异步、集群、连接池)

时间:2023-08-08 18:14:06浏览次数:46  
标签:最新版 db self py redis host password port

现在的 Python 异步操作 redis,有三种( aredis 、aioredis、asynio_redis) 但是都不推荐

背景

从 redis.py 4.2.0rc1+ 开始,Aioredis 已经集成到 redis-py 中,并且 Aioredis 将不再更新维护, 导入方式:from redis import asyncio as aioredis, 本次验证的是 redis==4.6.0

#!/usr/bin/env python
# -*- coding=utf8 -*-
# 从 redis.py  4.2.0rc1+ 开始,Aioredis 已经集成到 redis.py 中,并且 Aioredis 将不再更新维护,
# 导入:from redis import asyncio as aioredis (只有 2023 版本的 pycharm 才不会报错)

from redis import asyncio as aioredis  # noqa
import redis
from typing import Dict, Tuple, List

# 为了方便测试将配置写死置顶
host = '1.1.1.1'
port = 13310
password = 'guest'
db = 0


class RedisManager(object):
    _pool_dict: Dict[Tuple[str, int, int, str], redis.ConnectionPool] = {}

    def __init__(
            self,
            host: str = '127.0.0.1',
            port: int = 6379,
            db: int = 0,
            password: str = '',
            decode_responses: bool = True,
            max_connections: int = 50
    ) -> None:
        pool_key = (host, port, db, password)
        if pool_key not in self._pool_dict:
            self._pool_dict[pool_key] = redis.ConnectionPool(
                host=host,
                port=port,
                db=db,
                password=password,
                decode_responses=decode_responses,
                max_connections=max_connections
            )
        self._r = redis.Redis(connection_pool=self._pool_dict[pool_key])
        self._ping()

    def get_redis(self) -> redis.Redis:
        return self._r

    def _ping(self):
        try:
            self._r.ping()
        except BaseException as e:
            raise e


class RedisMixin(object):

    def __init__(
            self,
            host="localhost",
            port=6379,
            db=0,
            password=None,
            ssl=False,
    ):
        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.ssl = ssl

    @property
    def redis_db_frame(self) -> redis.Redis:
        # 获取单点连接
        return redis.Redis(host=self.host, port=self.port, password=self.password,
                           db=self.db, decode_responses=True)

    @property
    def redis_db_pool(self) -> redis.Redis:
        # 获取连接池
        return RedisManager(host=self.host, port=port, password=self.password,
                            db=self.db, decode_responses=True).get_redis()

    @property
    def redis_cluster_frame(self) -> redis.RedisCluster:
        # 获取 Cluster 连接
        return redis.RedisCluster(host=self.host, port=self.port, password=self.password,
                                  decode_responses=True, ssl=False)  # aws 需要 ssl=True


class AioRedisManager(object):
    _pool_dict: Dict[Tuple[str, int, int, str], redis.ConnectionPool] = {}

    def __init__(
            self,
            host: str = '127.0.0.1',
            port: int = 6379,
            db: int = 0,
            password: str = '',
            decode_responses: bool = True,
            max_connections: int = 50,
    ) -> None:
        pool_key = (host, port, db, password)
        if pool_key not in self._pool_dict:
            self._pool_dict[pool_key] = aioredis.ConnectionPool(
                host=host,
                port=port,
                db=db,
                password=password,
                decode_responses=decode_responses,
                max_connections=max_connections
            )
        self._r = aioredis.Redis(connection_pool=self._pool_dict[pool_key])
        asyncio.create_task(self._ping_async())

    async def get_redis(self) -> aioredis.Redis:
        return self._r

    async def _ping_async(self):
        try:
            await self._r.ping()
        except Exception as e:
            raise e


class AioRedisMixin(object):
    def __init__(
            self,
            host: str = '127.0.0.1',
            port: int = 6379,
            db: int = 0,
            password: str = '',
            decode_responses: bool = True,
            max_connections: int = 50,
            ssl=False,
    ):
        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.decode_responses = decode_responses
        self.max_connections = max_connections
        self.ssl = ssl

    @property
    async def redis_db_frame(self) -> aioredis.Redis:
        return aioredis.Redis(host=self.host, port=self.port, password=self.password,
                              db=self.db, decode_responses=self.decode_responses)

    @property
    async def redis_db_pool(self) -> aioredis.Redis:
        return await AioRedisManager(host=self.host, port=port, password=self.password,
                                     db=self.db, decode_responses=self.decode_responses).get_redis()

    @property
    async def redis_cluster_frame(self) -> aioredis.RedisCluster:
        return aioredis.RedisCluster(host=self.host, port=self.port, password=self.password,
                                     decode_responses=self.decode_responses, ssl=self.ssl)  # aws 需要 ssl=True


async def main():
    # 链接单点
    # pool = await aioredis.Redis(host=host, port=port, password=password, decode_responses=True, )
    # res = await pool.get('name')
    # print(res)

    # 集群连接 (集群)
    # redis_host = "172.17.120.83;172.17.120.84;172.17.120.85;172.17.120.86;172.17.120.87;172.17.120.88".split(';')
    # password = "123@2022"
    # 使用方法 1
    # for host in redis_host:
    #     async with aioredis.RedisCluster(host=host, port=6379, password=password, decode_responses=True, ) as conn:
    #         res = await conn.get('name')
    #         print(res)
    # 使用方法 2
    # for host in redis_host:
    #     conn = await aioredis.RedisCluster(host=host, port=6379, password=password, decode_responses=True, )
    #     res = await conn.get('name')
    #     print(res)
    #     # 关闭 RedisCluster 连接
    #     await conn.close()

    # pool 连接
    # pool = aioredis.ConnectionPool(
    #     host=host,
    #     port=port,
    #     db=db,
    #     password=password,
    #     decode_responses=True,
    #     max_connections=50
    # )
    # _r = aioredis.Redis(connection_pool=pool)
    # res = await _r.get('name')
    # print(res)

    # 封装一下
    # 链接单点
    r = AioRedisMixin(host=host, port=port, password=password, db=0)
    rc1 = await r.redis_db_frame
    res = await rc1.get('name')
    print(res)

    # pool 连接 redis_db_pool
    # rc1 = await AioRedisMixin(host=host, port=port, password=password, db=0).redis_db_pool
    # rc = AioRedisMixin(host=host, port=port, password=password, db=0)
    # rc1 = await rc.redis_db_pool
    # res = await rc1.get('name')
    # print(res)

    # 集群连接 (集群)
    # redis_host = "172.17.120.83;172.17.120.84;172.17.120.85;172.17.120.86;172.17.120.87;172.17.120.88".split(';')
    # password = "123@2022xx"
    # # 使用方法 1
    # for host in redis_host:
    #     rc = await AioRedisMixin(host=host, port=6379, password=password, db=0).redis_cluster_frame
    #     async with rc as conn:
    #         res = await conn.get('name')
    #         print(res)
    #     # 使用 pipeline
    #     async with rc as conn:
    #         pipe = conn.pipeline()
    #         for i in range(1, 20):
    #             pipe.get(f'name')
    #         res = await pipe.execute()
    #         print(res)
    # 使用方法 2
    # for host in redis_host:
    #     conn = await AioRedisMixin(host=host, port=6379, password=password).redis_cluster_frame
    #     res = await conn.get('name')
    #     print(res)
    #     # 关闭 RedisCluster 连接
    #     await conn.close()
    #
    #     # 使用 pipeline
    #     pipe = conn.pipeline()
    #     for i in range(1, 20):
    #         pipe.get(f'name')
    #     res = await pipe.execute()
    #     print(res)
    #     # 关闭 RedisCluster 连接
    #     await conn.close()


if __name__ == '__main__':
    import asyncio

    asyncio.run(main())

标签:最新版,db,self,py,redis,host,password,port
From: https://www.cnblogs.com/Dr-wei/p/17615058.html

相关文章

  • Python_GUI(pySide)开发指南(@Like)
    Python_GUI(pySide)开发指南(@Like) 目录一、PythonGUI简介二、PySide6工具安装1.安装VSCode:https://code.visualstudio.com/2.安装Python:https://www.python.org/downloads/3.安装PyCharm:https://www.jetbrains.com/pycharm/4.更新pip: 命令python.exe-mpip......
  • Python | isinstance函数
    isinstance函数isinstance的意思是“判断类型”;isinstance()是一个内置函数,用于判断一个对象是否是一个已知的类型,类似type()。isinstance()与type()区别type()不会认为子类是一种父类类型,不考虑继承关系。isinstance()会认为子类是一种父类类型,考虑继承关系。如果要判......
  • redis应用场景、
    目录1、redis应用场景缓存:会话存储:计数器和统计:实时排行榜:发布订阅系统:地理位置信息:消息队列:分布式锁:数据持久化:分布式系统:1、redis应用场景Redis(RemoteDictionaryServer)是一个开源的内存数据存储系统,它可以被用于多种不同的场景,由于其高性能、低延迟和丰富的数据结构支持,以下......
  • 我的python路-python基础
    以前用的比较多的语言是java,但是自从从事测试行业以来,发现“通用的语言”竟然是python!呜呼~各种评论都说python学习很简单,but一点也不简单好吗,本次分享就是一个记录,给一些小白同学做参考,大神请帮忙指正错误~~本期学习笔记:1、python语言使用变量直接赋值即可,不用声明类型,但是使......
  • python 标准库Enum模块
    1.Enum模块简介枚举(enumeration)在许多编程语言中常被表示为一种基础的数据结构使用,枚举帮助组织一系列密切相关的成员到同一个群组机制下,一般各种离散的属性都可以用枚举的数据结构定义,比如颜色、季节、国家、时间单位等enum规定了一个有限集合的属性,限定只能使用集合内的值,明......
  • Unittest + python + Selenium + HTMLTestRunner 自动化测试
      1.测试框架参数说明 base/base_page.py对selenium方法进行二次封装 config/setting.py基础信息 pageobject/把每个页面的页面元素和操作,放在一个py文件中。测试用例只需引用对应页面的操作 report存放测试报告的 runcase/start_ca......
  • 利用Python Flask蓝图加自定义蓝图划分优雅的目录结构
    我们在用Flask开发网站的时候。经常看到有很多人把所有的路由函数放到了入口文件,这种做法是非常不可取的,如果我们的视图函数有几百个了都写到一个文件里肯定是不行的。还有在实现中我们都在比较大型项目里面我们可能有十几个甚至几十个这种不同模型。我们需要考虑把这些模型分文别......
  • Numpy
    官方文档:https://www.numpy.org.cn/article/numpy中的数组importnumpyasnpmy_array=np.array([1,2,3,4,5])print(my_array,my_array.shape,my_array[0])my_array[0]=-2print(my_array)my_new_array=np.zeros((5))print(my_new_array)my_random_array=np.random.......
  • PyTorch基础知识-新手笔记
    使用NumPy实现机器学习任务使用最原始的的NumPy实现一个有关回归的机器学习任务,不使用PyTorch中的包或类。代码可能会多一点,但每一步都是透明的,有利于理解每一步的工作原理。主要步骤如下:1)首先,给出一个数组x,然后基于表达式y=3x^2+2,加上一些噪声数据到达另一组数据。2)然后,构建......
  • 桌面应用打包:pyinstaller
    1背景在使用python开发一些小工具时,如果其他人电脑中没有python环境或者没有安装相应的第三方库,是没办法运行的,而要求对方安装又不现实,尤其是对方不是技术人员,因此如何将一个独立的python程序,使它成为成为一个不用考虑环境,双击即可运行的桌面应用呢?使用pyinstaller打包是一个不......