首页 > 数据库 >Python实战项目-9 Redis/celery-基础使用

Python实战项目-9 Redis/celery-基础使用

时间:2023-03-08 19:22:05浏览次数:59  
标签:name Python Redis redis celery res print conn

Redis介绍与安装

Redis->缓存数据库【大部分时间用来做缓存,不仅仅可以做缓存】
也是称为非关系型数据库,区别与Mysql关系型数据库
-noSql:泛指非关系型数据库,not only Sql

redis数据存储在内存中
	在取值,放值速度非常快
快的原因:
	1.纯内存操作
	2.网络模型使用的是I/O多路复用(epoll)
	用了这模型,处理的请求数更多
	3.6.x之前,单进程/单线程架构(没有进程线程间切换,更少的消耗资源)
redis是用c语言写的服务(监听端口)用来存储数据,数据是存储在内存中 取值/放值速度非常快
qps == 10w

安装

-mac 源码编译安装
-linux 源码编译安装
-win 微软自己基于源码改动,编译为安装包

一路下一步,安装完释放出两个命令,会把redis自动加入到服务中

版本

最新:7.x
公司里 5.x 比较多
  • 启动服务端,启动客户端

redis-cli redis-server

  • 客户端和服务端在同一台机器上
  • 本地的客户端可以连接远程的服务器

客户端连接redis

方式一:
redis-cli 默认连接本地的6379端口
方式二:
redis-cli -h 地址 -p 端口 自己可以自定义地址端口连接
方式三:
使用图形化客户端操作
Redis Desktop Manager : 开源软件,原先免费,后续收费了.. 推荐用(mac,win,linux 都有)
Qt5 qt是个平台,专门用来做图形化界面的
-可以使用c++写
-可以使用python写 pyqt5 使用python写图形化界面 (少量公司再用)
-resp-2022.1.0.0.exe 一路下一步,安装完启动起来
Redis Client 小众
图形化界面,连接redis 输入地址和端口,点击连接即可

redis默认有16个库,默认连进去就是第0

扩展:

mysql是cs架构软件

pymysql 是 mysql的客户端
Navicate 是mysql的客户端
客户端连接-cmd中使用redis-cli
图形化界面,
python的redis模块操作redis

python连接redis

普通连接
连接池连接
django使用mysql连接池 目的:-->防止连接数过高,服务挂掉

进程线程协程

进程:资源分配的最小单位,一个程序运行起来可能是一个进程,或多个进程
线程:CPU调度的最小单位,程序要执行,是线程在执行,Cpu调度的最小单位,一个进程里面可以有许多线程,->操作系统控制切换
协程:单线程下的并发,程序层面控制,遇到io操作,切换到别的任务执行

Redis之本地连接和连接池

python 相当于redis的客户端进行操作redis
我们操作redis只需要安装响应模块即可

pip install redis

django中操作mysql是没有连接池的,一个请求就是一个mysql连接
但是这样可能会处问题,并发数过高,导致mysql连接数过高,影响mysql性能
参考:

普通连接

在python中安装 redis 模块

  • 导入模块Redis类

from redis import Redis

  • 实例化得到对象

conn = Redis(host='127.0.0.1',port=6379)

  • 使用conn操作redis

res = conn.get('name') # 返回数据是bytes格式

  • 设置值

conn.set('age',19)
conn.close()

连接池连接

pool.py
import redis
POOL = redis.ConnerctionPool(max_connections=10,host='127.0.0.1',port=6379)
# 创建一个大小为10的redis连接池

# 测试
import redis
from threading imoport Thread
from pool import POOL
def task():
    # 做成模块后,导入,无论导入多少次,导入的都那一个POOL对象
    conn = redis.Redis(connection_pool=POOL)
    # 报错的原因是拿连接,连接池池里连接不够了,没有等待,线程报错  可以设置等待参数
    print(conn.get('name'))
for i in range(1000):
    t = Thread(target=task)   # 每次都是一个新的连接,会导致 的连接数过多
	t.start  

redis之列表

列表方法示例

lpush(name, values)
image.png
rpush(name,values) 与lpush相反
lpushx(name,value) 如果键存在,则从左侧插入,反之右插入,键不存在不做操作
rpushx(name, value) 表示从右向左操作
llen(name) 获取键长度 获取不到返回0
linsert(name, where, refvalue, value))
例:
** **linsert('girls','before','迪丽热巴','古力娜扎') 在迪丽热巴前面插入古力娜扎
linsert('girls','after','小红','小绿') 在小红后面插入小绿
如果插入的数据不存在,插不进去返回-1
lset(name, index, value) 按照位置修改值
例:
conn.lset('girls',1,'xiaoming') 在girls键对应的列表中 索引为1的值修改为xiaoming
lrem(name, value, num) 删除值
conn.lrem('girls',1,'xxx') # 从左侧开始,删除1个xxx
conn.lrem('girls',-1,'xxx') # 从右侧开始,删除1个xxx
conn.lrem('girls',0,'xxx') # 从左开始,删除全部xxx
lpop(name)弹出列表左侧第一个值
rpop(name)弹出列表右侧第一个值
lindex(name, index) 获取索引对应位置的值,如果没有返回None
lrange(name, start, end) 切片返回列表
ltrim(name, start, end) 修剪不在这个区间的所有值
rpoplpush(src,dst)
conn.rpoplpush("girls", 'boys')

  • 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。
  • 将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。

blpop(keys, timeout) 可以做消息队列使用 阻塞式弹出,如果没有,就阻塞
res=conn.blpop('boys')
print(res)
brpoplpush(src,dst,timeout=0)

  • 命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

redis之字符串

rediskey-value形式存储
redis 数据存放在内存中,如果断点,数据丢失--->需要有持久化的方案

# 5 种数据类型,value类型
	-字符串:用的最多,做缓存;做计数器
    -列表: 简单的消息队列
    -字典(hash):缓存
    -集合:去重
    -有序集合:排行榜

字符串方法示例

设置值
1 set(name, value, ex=None, px=None, nx=False, xx=False)
参数:
ex,过期时间(秒)
px,过期时间(毫秒)
nx,如果设置为True,则只有name不存在时,当前set操作才执行, 值存在,就修改不了,执行没效果
xx,如果设置为True,则只有name存在时,当前set操作才执行,值存在才能修改,值不存在,不会设置新值
2 setnx(name, value)
等同于:conn.set('name','xiaoming',nx=True)
conn.setnx('name', '刘亦菲')
3 setex(name, value, time)
等同于:conn.set('name','xiaoming',ex=3)
conn.setex('wife', 3, '刘亦菲')
4 psetex(name, time_ms, value)
参数:
    time_ms,过期时间(数字毫秒 或 timedelta对象
5 mset(*args, **kwargs)
批量设置
如:
    mset(k1='v1', k2='v2')
    或
    mget({'k1': 'v1', 'k2': 'v2'})
6 get(name)
获取值
7 mget(keys, *args)
批量获取值
8 getset(name, value)
设置新值并获取原来的值
9 getrange(key, start, end)
获取子序列(根据字节获取,非字符)
参数:
    name,Redis 的 name
    start,起始位置(字节)
    end,结束位置(字节)
如: "刘亦菲" ,0-3表示 "刘"
''''
10 setrange(name, offset, value)
修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
# 参数:
    offset,字符串的索引,字节(一个汉字三个字节)
    value,要设置的值
11 setbit(name, offset, value)
# 对name对应值的二进制表示的位进行操作
 
参数:
    name,redis的name
    offset,位的索引(将值变换成二进制后再进行索引)
    value,值只能是 1 或 0
 
注:如果在Redis中有一个对应: n1 = "foo",
        那么字符串foo的二进制表示为:01100110 01101111 01101111
    所以,如果执行 setbit('n1', 7, 1),则就会将第7位设置为1,
        那么最终二进制则变成 01100111 01101111 01101111,即:"goo"
12 getbit(name, offset)
获取name对应的值的二进制表示中的某位的值 (0或1)
13 bitcount(key, start=None, end=None)
获取name对应的值的二进制表示中 1 的个数
参数:
    key,Redis的name
    start,位起始位置
    end,位结束位置
14 bitop(operation, dest, *keys)
获取多个值,并将值做位运算,将最后的结果保存至新的name对应的值
 
参数:
    operation,AND(并) 、 OR(或) 、 NOT(非) 、 XOR(异或)
    dest, 新的Redis的name
    *keys,要查找的Redis的name
 
# 如:
    bitop("AND", 'new_name', 'n1', 'n2', 'n3')
    获取Redis中n1,n2,n3对应的值,然后讲所有的值做位运算(求并集),然后将结果保存 new_name 对应的值中
15 strlen(name)
返回name对应值的字节长度(一个汉字3个字节)
16 incr(self, name, amount=1)
自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。
 
参数:
    name,Redis的name
    amount,自增数(必须是整数)
 
# 注:同incrby
17 incrbyfloat(self, name, amount=1.0)
自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。
 
参数:
    name,Redis的name
    amount,自增数(浮点型)
18 decr(self, name, amount=1)
自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。
 
参数:
    name,Redis的name
    amount,自减数(整数)
19 append(key, value)
# 在redis name对应的值后面追加内容
 
参数:
    key, redis的name
    value, 要追加的字符串

redis之hash

import redis
conn = redis.Redis()
conn.hset('userinfo','name','xiaoming')
conn.hset('userinfo',mapping={'name':'xiaoming'})

1 hset(name, key, value)
res = conn.hget('userinfo','name')
print(res)  # b'xiaoming'

2 hmset(name, mapping)
res = conn.hmget('userinfo',['name'])
print(res)  # b'xiaoming'


3 hget(name,key) # 获取值

5 hgetall(name)
res = conn.hgetall('userinfo')
print(res)   # 全部拿出来,但是要慎用 因为数据量可能过大

6 hlen(name)
res = conn.hlen('userinfo') # 获取键对应字典长度

7 hkeys(name)
res = conn.hkeys('userinfo')
print(res)  # 拿出所有的key 二进制类型

9 hexists(name, key)
res = conn.hexists('userinfo','name')
print(res)   # true 如果name不存在,则False

10 hdel(name,*keys)
res = conn.hdel('userinfo','age')
print(res)  # 删除 如果没有就报错

11 hincrby(name, key, amount=1)
conn.hincrby('userinfo','age',2) # 自增


12hscan(name, cursor=0, match=None, count=None)
# 一次性全部取出,效率低,可能占内存很多 

14 hscan_iter(name, match=None, count=None) # 分批获取
    # generator 只要函数中有yield关键字,这个函数执行的结果就是生成器 
    #生成器就是迭代器,可以被for循环

redis管道

redis支持事务.但是并不是很严谨,因为可能满足不了持久性的要求(几率小,但是不代表没有)

事务四大特性:
	原子性
	一致性
	隔离性
	持久性
使用:
import redis
conn = redis.Redis()
p=conn.pipeline(transaction=True)
p.multi()
p.decr('zhangsan_je', 100)
# raise Exception('崩了')
p.incr('lisi_je', 100)

p.execute()
conn.close()

redis其他操作

''' 通用操作,不指定类型,所有类型都支持
1 delete(*names)
2 exists(name)
3 keys(pattern='*')
4 expire(name ,time)
5 rename(src, dst)
6 move(name, db))
7 randomkey()
8 type(name)
'''

import redis

conn = redis.Redis()
1 delete(*names)
conn.delete('name', 'userinfo2')
conn.delete(['name', 'userinfo2'])  # 不能用它
conn.delete(*['name', 'userinfo2'])  # 可以用它


2 exists(name)
res=conn.exists('userinfo')
print(res)


3 keys(pattern='*')
res=conn.keys('w?e')  #  ?表示一个字符,   * 表示多个字符
print(res)


4 expire(name ,time)
conn.expire('userinfo',3)

5 rename(src, dst)
conn.rename('hobby','hobby111')

6 move(name, db))
conn.move('hobby111',8)
7 randomkey()
res=conn.randomkey()
print(res)
8 type(name)
print(conn.type('girls'))
print(conn.type('age'))
conn.close()

django中使用redis

1.自定义包方案(通用方案,不针对与django其他也可以用)

1.写一个pool.py
2.在以后使用的地方直接使用即可

import redis
POOL = redis.Connectionpool(max_connections=100)
conn = redis.Redis(connection_pool=Pool)
conn.incr('count')
res = conn.get('count')
return JsonResponse({'count':"今日访问次数%s"%res})

2.django中使用redis作为缓存【推荐使用】

django的缓存使用redis  【推荐使用】
    	-settings.py 中配置
        CACHES = {
            "default": {
                "BACKEND": "django_redis.cache.RedisCache",
                "LOCATION": "redis://127.0.0.1:6379",
                "OPTIONS": {
                    "CLIENT_CLASS": "django_redis.client.DefaultClient",
                    "CONNECTION_POOL_KWARGS": {"max_connections": 100} 最大连接数100
                    # "PASSWORD": "123",
                }
            }
        }
        
-在使用redis的地方:cache.set('count',  res+1)
-如果是对象的话是通过pickle序列化后,存入的

3.第三方:django-redis模块

from django_redis import get_redis_connection
    def test_redis(request):
        conn=get_redis_connection()
        print(conn.get('count'))
        return JsonResponse({'count': '今天这个接口被访问的次数为:%s'}, json_dumps_params={'ensure_ascii': False})

celery介绍与安装

celery 是什么
	翻译过来是 "芹菜" 
	框架:服务,python的框架跟django无关
	能用来做:
	1.异步任务
	2.定时任务
	3.延时任务

# 理解celery的运行原理
"""
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

image.png

celery架构
	1.任务中间件 Broker(中间件),其他服务提交的异步任务,放在里面排队
    	redis  rabbitmq  (需要借助第三方)
	2.任务执行单元 Worker  真正执行异步任务的进程
    	celery 提供的
	3.结果存储 Backend  结果存储m函数的返回结果 存到backend中
    	需要借助于第三方
    	redis,mysql

 # 使用场景
    异步执行:解决耗时任务
    延迟执行:解决延迟任务
    定时执行:解决周期(周期)任务

celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成

celery快速使用

1.新建main.py
from celery import Celery

  • 提交的异步任务,放在里面

broker = 'redis://127.0.0.1:6379/1' redis协议,存储在1号库中

  • 执行完的结果,放在这里

backend = 'redis://127.0.0.1:6379/2'

app = Celery('test', broker=broker, backend=backend)
@app.task
def add(a, b):
    import time
    time.sleep(3)
    print('------',a + b)
    return a + b

2.其他程序提交任务

res = add.delay(5,6)   # 原来add的参数,直接放在delay中传入即可
print(res)  # f150d8a5-c955-478d-9343-f3b60d0d5bdb

3.启动worker

# 启动worker命令,windows需要安装eventlet
	win:
       -4.x之前版本
		celery worker -A main -l info -P eventlet
       -4.x之后
    	celery  -A main  worker -l info -P eventlet
	mac:
        celery  -A main  worker -l info

4.worker会执行消息中间件的任务,把结果存起来
5.可以拿到执行的结果

from main import app
from celery.result import AsyncResult
id = '51611be7-4914-4bd2-992d-749008e9c1a6' # 第三步返回的任务id
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery包结构

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果
    
    
############# 第一步:新建包 celery_task #############
# 在包下新建[必须叫celery]的py文件,celery.py 写代码
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])


##### 第二步:在包内部,写task,任务异步任务####
# order_task
from .celery import app
import time
@app.task
def add(a, b):
    print('-----', a + b)
    time.sleep(2)
    return a + b

# user_task
from .celery import app
import time
@app.task
def send_sms(phone, code):
    print("给%s发送短信成功,验证码为:%s" % (phone, code))
    time.sleep(2)
    return True

####第三步:启动worker ,包所在目录下
	celery  -A celery_task  worker -l info -P eventlet
    
    
###第四步:其他程序 提交任务,被提交到中间件中,等待worker执行,因为worker启动了,就会被worker执行
from celery_task import send_sms
res=send_sms.delay('1999999', 8888)
print(res)  # 7d39033c-4cc7-4af2-8d78-e62c277db183


### 第五步:worker执行完,结果存到backend中

### 第六步:我们查看结构
from celery_task import app
from celery.result import AsyncResult
id = '7d39033c-4cc7-4af2-8d78-e62c277db183'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

标签:name,Python,Redis,redis,celery,res,print,conn
From: https://www.cnblogs.com/ddsuifeng/p/17195809.html

相关文章

  • Redis使用
    Redis1.Redis介绍安装redis:缓存数据库--大部分时间做缓存但是不仅仅能做缓存非关系型数据库--区别于mysql关系型数据库#存储方式:k:v形式存储......
  • 1. python+Django开发前的程序环境搭建
    程序环境的搭建1.Python3.9的安装下载地址:https://www.python.org/downloads/,最好是下载安装版的。安装过程勾选环境变量为了使用pip下载器,在环境变量添加Pyth......
  • Python第六天
    append()在列表的末尾添加一个元素extend()在列表的末尾为至少添加一个元素insert()在列表的任意位置添加一个元素remove()一次删除一个元素,重复元素只删除第一个 pop()......
  • python单例模式处理多线程问题
    #单例模式处理多线程的问题importthreadingimporttimeclassSingle:instance=Nonedef__init__(self,name):self.name=namedef__new__(cls,......
  • .Net Core redis 调用报错 '6000 Redis requests per hour' 解决 6000 此调用限制
    .NetCoreredis调用报错'6000Redisrequestsperhour'解决6000此调用限制 阅读目录问题描述仅升级动态库ServiceStsck.redis.dll报错解决('get_Db') 新......
  • python FastAPI sqlalchemy 数据库模型基类通用模型
    作用用于所有表都需要使用的字段或者方法实现代码base.py#!/usr/bin/python#-*-coding:utf-8-*-#@time:2023/2/1317:43#@author:pugongying#@de......
  • Redis缓存数据库-快速入门
    目录Redis数据库快速入门一、Redis数据库1、redis的安装与运行2、RESP图形化操作文件二、pycharm操作redis1、Redis普通连接和连接池2、Redis数据类型2、1.String类型2、2.......
  • python 通过API操作阿里云oss
    catpython_oss.py#!/usr/bin/python#-*-coding:utf-8-*-#@time:2023/2/2314:29#@author:pugongying#@description:#pipinstallalibabacloud_os......
  • django中使用redis
    ##方式一:自定义包方案(通用的,不针对与框架,所有框架都可以用)-第一步:写一个pool.pyimportredisPOOL=redis.ConnectionPool(max_connections=100)-......
  • 我又和redis超时杠上了
    我又和redis超时杠上了服务监控系列文章服务监控系列视频背景经过上次redis超时排查,并联系云服务商解决之后,redis超时的现象好了一阵子,但是最近又有超时现象报出,但与上......