目录
Nonebot2插件高级
一、 工作流程
1、 概念
hook:钩子函数,它们可以在Nonebot处理事件的不同时刻进行拦截、修改或者扩展。在Nonebot中,事件钩子函数分为事件预处理、运行预处理、运行后预处理和事件后处理
Matcher: Matcher
并不是一个具体的实例 instance
,而是一个具有特定属性的类 class
。只有当 Matcher
响应事件时,才会实例化为具体的 instance
,也就是 matcher
。matcher
可以认为是 NoneBot 处理 Event
的基本单位,运行 matcher
是 NoneBot 工作的主要内容。
handler:事件处理函数,它们可以认为是 NoneBot 处理 Event
的最小单位。在不考虑 hook
的情况下,运行 matcher 就是顺序运行 matcher.handlers,这句话换种表达方式就是,handler
只有添加到 matcher.handlers
时,才可以参与到 NoneBot 的工作中来。
2、 简介
NoneBot2 是一个可扩展的 Python 异步机器人框架,它会对机器人收到的事件进行解析和处理,并以插件化的形式,按优先级分发给事件所对应的事件响应器,来完成具体的功能。
在实际应用中,NoneBot 会充当一个高性能,轻量级的 Python 微服务框架。协议端可以通过 http、websocket 等方式与之通信,这个通信往往是双向的:一方面,协议端可以上报数据给 NoneBot,NoneBot 会处理数据并返回响应给协议端;另一方面,NoneBot 可以主动推送数据给协议端。而 NoneBot 便是围绕双向通信进行工作的。
Nonebot启动之前的准备工作:
- 运行
nonebot.init()
,读取配置文件,初始化Nonebot和Driver对象 - 注册协议适配器Adapter
- 加载插件
准备工作完成后,Nonebot会利用异步进行启动,并且会运行on_startup
钩子函数
随后,协议端与Nonebot进行连接,Driver将数据提交给Adapter,然后,实例化Bot,Nonebot会利用Bot开始工作:
- 事件处理:Bot会将协议端上报的数据转换为Event事件,之后Nonebot根据一套既定的流程来处理事件
- 调用API:在事件处理的过程中,Nonebot可以通过Bot调用协议端指定的API来获取更多数据,或者反馈响应给协议端;Nonebot也可以通过调用API向协议端主动请求数据获取主动推送数据
3、 事件处理
首先,我们来看一下事件处理的流程图:
总的可以分为三个阶段:
-
Driver 接收上报数据
协议端会通过 websocket 或 http 等方式与 NoneBot 的后端驱动
Driver
连接,协议端上报数据后,Driver
会将原始数据交给Adapter
处理 -
Adapter 处理原始数据
Adapter
检查授权许可,并获取self-id
作为唯一识别 id- 根据
self-id
实例化Adapter
相应的Bot
- 根据
Event Model
将原始数据转化为 NoneBot 可以处理的Event
对象 Bot
和Event
交由 NoneBot 进一步处理
-
Nonebot 处理Event
- 执行事件预处理 hook
- 按优先级升序选出同一优先级的 Matcher
- 根据 Matcher 定义的 Rule、Permission 判断是否运行
- 实例化 matcher 并执行运行预处理 hook
- 顺序运行 matcher 的所有 handlers
- 执行运行后处理 hook
- 判断是否停止事件传播
- 执行事件后处理 hook
同时,Nonebot中还有一些特殊异常处理,可以自行到官网查看:https://v2.nonebot.dev/docs/advanced/#%E7%89%B9%E6%AE%8A%E5%BC%82%E5%B8%B8%E5%A4%84%E7%90%86
4、 调用协议端接口
NoneBot 调用 API
会有如下过程:
- 调用
calling_api_hook
预处理钩子 adapter
将信息处理为原始数据,并转交driver
,driver
交给协议端处理driver
接收协议端的结果,交给adapter
处理之后将结果反馈给 NoneBot- 调用
called_api_hook
后处理钩子
一般来说,我们可以用 bot.*
来调用 API
(*是 API
的 action
或者 endpoint
)。
对于发送消息而言,一方面可以调用既有的 API
;另一方面 NoneBot 实现了两个便捷方法,bot.send(event, message, **kwargs)
方法和可以在 handler
中使用的 Matcher.send(message, **kwargs)
方法,来向事件主体发送消息。
二、 定时任务
1、 安装插件
这个插件是在APScheduler
的基础上进行开发的,如果需要更加详细的使用方式,可以通过官方文档进行学习:https://apscheduler.readthedocs.io/en/3.x/,后面我会加一篇关于这个库的使用方式,欢迎大家关注哦!
这个定时任务的实现,是通过一个Nonebot插件来实现的
nb plugin install nonebot_plugin_apscheduler
使用poetry来安装:
poetry add nonebot-plugin-apscheduler
2、 快速使用
- 在需要设置定时任务的插件中,通过
nonebot.require
声明插件依赖 - 从 nonebot_plugin_apscheduler 导入
scheduler
对象 - 在该对象的基础上,根据
APScheduler
的使用方法进一步配置定时任务
from nonebot import require
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
@scheduler.scheduled_job("cron", hour="*/2", id="xxx", args=[1], kwargs={"arg2": 2})
async def run_every_2_hour(arg1, arg2):
pass
scheduler.add_job(run_every_day_from_program_start, "interval", days=1, id="xxx") # 使用这种方法添加也可以
可以使用装饰器来添加定时任务,也可以通过函数来添加定时任务
3、 配置插件
根据项目的 .env
文件设置,向 .env.*
或 bot.py
文件添加 nonebot_plugin_apscheduler 的可选配置项
apscheduler_autostart
:
- 类型:bool
- 默认值:True
- 是否自动启动
APScheduler
apscheduler_config
:
- 类型:dict
- 默认值:
{"apscheduler.timezone": "Asia/Shanghai"}
APScheduler
相关配置。修改/增加其中配置项需要确保prefix: apscheduler
。
三、 匹配规则
1、 创建规则
匹配规则可以是一个 Rule
对象,也可以是一个 RuleChecker
类型。Rule
是多个 RuleChecker
的集合,只有当所有 RuleChecker
检查通过时匹配成功。RuleChecker
是一个返回值为 Bool
类型的依赖函数,即,RuleChecker
支持依赖注入。
2、 创建RuleCheck
async def user_checker(event: Event) -> bool:
return event.get_user_id() == "123123" # 检测触发事件的用户的id是否为123123
matcher = on_message(rule=user_checker)
3、 创建Rule
async def user_checker(event: Event) -> bool:
return event.get_user_id() == "123123" # 检测触发事件的用户的id是否为123123
async def message_checker(event: Event) -> bool:
return event.get_plaintext() == "hello" # 检测用户输入的内容是否为hello
rule = Rule(user_checker, message_checker) # 将两个RuleCheck进行合并
matcher = on_message(rule=rule)
使用rule参数来注册到事件响应器当中
4、 合并匹配规则
在定义匹配规则时,我们往往希望将规则进行细分,来更好地复用规则。而在使用时,我们需要合并多个规则。除了使用 Rule
对象来组合多个 RuleChecker
外,我们还可以对 Rule
对象进行合并
rule1 = Rule(foo_checker)
rule2 = Rule(bar_checker)
rule = rule1 & rule2
rule = rule1 & bar_checker
rule = foo_checker & rule2
Rule会自动忽略合并的None值
四、 权限控制
1、 注册使用
如同 Rule
一样,Permission
可以在定义事件响应器时添加 permission
参数来加以应用,这样 NoneBot2 会在事件响应时检测事件主体的权限
from nonebot.permission import SUPERUSER # 检测是否为超级管理员发送的信息
from nonebot import on_command
matcher = on_command("测试超管", permission=SUPERUSER)
@matcher.handle()
async def _():
await matcher.send("超管命令测试成功")
@matcher.got("key1", "超管提问")
async def _():
await matcher.send("超管命令 got 成功")
Permission
与Rule
的表现并不相同,Rule
只会在初次响应时生效,在余下的对话中并没有限制事件;但是Permission
会持续生效,在连续对话中一直对事件主体加以限制。
2、 主动调用
Permission
除了可以在注册事件响应器时加以应用,还可以在编写事件处理函数 handler
时主动调用,我们可以利用这个特性在一个 handler
里对不同权限的事件主体进行区别响应
from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent
from nonebot.adapters.onebot.v11 import GROUP_ADMIN, GROUP_OWNER
matcher = on_command("测试权限")
@matcher.handle()
async def _(bot: Bot, event: GroupMessageEvent):
if await GROUP_ADMIN(bot, event): # 判断是否为群管理员调用的命令
await matcher.send("管理员测试成功")
elif await GROUP_OWNER(bot, event): # 判断是否为群主调用的命令
await matcher.send("群主测试成功")
else:
await matcher.send("群员测试成功")
3、 自定义
如同 Rule
一样,Permission
也是由非负数个 PermissionChecker
组成的,但只需其中一个返回 True
时就会匹配成功
from nonebot.adapters import Bot, Event
from nonebot.permission import Permission
async def async_checker(bot: Bot, event: Event) -> bool:
return True
def sync_checker(bot: Bot, event: Event) -> bool:
return True
def check(arg1, arg2):
async def _checker(bot: Bot, event: Event) -> bool:
return bool(arg1 + arg2)
return Permission(_checker) # 使用函数封装来生成一个自定义权限控制
# 组合使用
from nonebot.permission import Permission
Permission(async_checker1) | sync_checker | async_checker2
同样地,如果想用
Permission(*checkers)
包裹构造Permission
,函数必须是异步的;但是在利用|
(或符号)连接构造时,NoneBot2 会自动包裹同步函数为异步函数
五、 钩子函数
1、 全局钩子函数
全局钩子函数是指 NoneBot2 针对其本身运行过程的钩子函数。
这些钩子函数是由其后端驱动 Driver
来运行的,故需要先获得全局 Driver
对象:
from nonebot import get_driver
driver=get_driver()
1.1 启动准备
这个钩子函数会在Nonebot启动时运行
@driver.on_startup
async def do_something():
pass
1.2 终止处理
这个钩子函数会在Nonebot终止时运行
@driver.on_shutdown
async def do_something():
pass
1.3 bot 连接处理
这个钩子函数会在 Bot
通过 websocket 连接到 NoneBot2 时运行。
@driver.on_bot_connectasync
def do_something(bot: Bot):
pass
1.4 bot 断开处理
这个钩子函数会在 Bot
断开与 NoneBot2 的 websocket 连接时运行。
@driver.on_bot_disconnectasync
def do_something(bot: Bot): pass
1.5 api调用钩子
这个钩子函数会在 Bot
调用 API 时运行。
from nonebot.adapters import Bot
@Bot.on_calling_apiasync
def handle_api_call(bot: Bot, api: str, data: Dict[str, Any]):
pass
1.6 api调用后钩子
这个钩子函数会在 Bot
调用 API 后运行。
from nonebot.adapters import Bot
@Bot.on_called_apiasync
def handle_api_result(bot: Bot, exception: Optional[Exception], api: str, data: Dict[str, Any], result: Any):
pass
2、 事件钩子函数
2.1 事件预处理
这个钩子函数会在 Event
上报到 NoneBot2 时运行
from nonebot.message import event_preprocessor
@event_preprocessorasync
def do_something():
pass
2.2 事件后处理
这个钩子函数会在 NoneBot2 处理 Event
后运行
from nonebot.message import event_postprocessor
@event_postprocessorasync
def do_something():
pass
2.3 运行预处理
这个钩子函数会在 NoneBot2 运行 matcher
前运行。
from nonebot.message import run_preprocessor
@run_preprocessorasync
def do_something():
pass
2.4 运行后处理
这个钩子函数会在 NoneBot2 运行 matcher
后运行。
from nonebot.message import run_postprocessor
@run_postprocessorasync
def do_something():
pass
六、 依赖注入
1、 简介
在软件工程中,依赖注入(dependency injection)的意思为,给予调用方它所需要的事物。 “依赖”是指可被方法调用的事物。依赖注入形式下,调用方不再直接使用“依赖”,取而代之是“注入” 。“注入”是指将“依赖”传递给调用方的过程。在“注入”之后,调用方才会调用该“依赖。 传递依赖给调用方,而不是让让调用方直接获得依赖,这个是该设计的根本需求。
依赖注入往往起到了分离依赖和调用方的作用,这样一方面能让代码更为整洁可读,一方面可以提升代码的复用性。
2、 使用依赖注入
from nonebot import on_command
from nonebot.params import Depends # 1.引用 Depends
from nonebot.adapters.onebot.v11 import MessageEvent
test = on_command("123")
async def depend(event: MessageEvent): # 2.编写依赖函数
return {"uid": event.get_user_id(), "nickname": event.sender.nickname}
@test.handle()
async def _(x: dict = Depends(depend)): # 3.在事件处理函数里声明依赖项
print(x["uid"], x["nickname"])
依赖注入流程:
- 引用
Depends
。- 编写依赖函数。依赖函数和普通的事件处理函数并无区别,同样可以接收
bot
,event
,state
等参数,你可以把它当作一个普通的事件处理函数,但是去除了装饰器(没有使用matcher.handle()
等来装饰),并且可以返回任何类型的值。- 在这里我们接受了
event
,并以onebot
的MessageEvent
作为类型标注,返回一个新的字典,包括uid
和nickname
两个键值。- 在事件处理函数中声明依赖项。依赖项必须要
Depends
包裹依赖函数作为默认值。事实上,bot、event、state 它们本身只是依赖注入的一个特例,它们无需声明这是依赖即可注入。
虽然声明依赖项的方式和其他参数如 bot
, event
并无二样,但他的参数有一些限制,必须是可调用对象,函数自然是可调用对象,类和生成器也是。
一般来说,当接收到事件时,NoneBot2
会进行以下处理:
- 准备依赖函数所需要的参数
- 调用依赖函数并获得返回值
- 将返回值作为事件处理函数中的参数值传入
3、 依赖缓存
在使用 Depends
包裹依赖函数时,有一个参数 use_cache
,它默认为 True
,这个参数会决定 Nonebot2
在依赖注入的处理中是否使用缓存。
import random
from nonebot import on_command
from nonebot.params import Depends
test = on_command("123")
async def always_run():
return random.randint(1, 100)
@test.handle()
async def _(x: int = Depends(always_run, use_cache=False)):
print(x)
缓存是针对单次事件处理来说的,在事件处理中
Depends
第一次被调用时,结果存入缓存,在之后都会直接返回缓存中的值,在事件处理结束后缓存就会被清除。
当使用缓存时,依赖注入会这样处理:
- 查询缓存,如果缓存中有相应的值,则直接返回
- 准备依赖函数所需要的参数
- 调用依赖函数并获得返回值
- 将返回值存入缓存
- 将返回值作为事件处理函数中的参数值传入
我们在编写依赖函数时,可以简单的使用同步函数
4、 类和生成器
4.1 类作为依赖
from nonebot import on_command
from nonebot.params import Depends # 1.引用 Depends
from nonebot.adapters.onebot.v11 import MessageEvent
test = on_command("123")
class DependClass: # 2.编写依赖类
def __init__(self, event: MessageEvent):
self.uid = event.get_user_id()
self.nickname = event.sender.nickname
@test.handle()
async def _(x: DependClass = Depends(DependClass)): # 3.在事件处理函数里声明依赖项
print(x.uid, x.nickname)
依然可以用三步说明如何用类作为依赖项:
- 引用
Depends
。 - 编写依赖类。类的
__init__
函数可以接收bot
,event
,state
等参数,在这里我们接受了event
,并以onebot
的MessageEvent
作为类型标注。 - 在事件处理函数中声明依赖项。当用类作为依赖项时,它会是一个对应的实例,在这里
x
就是DependClass
实例。
4.2 生成器作为依赖
import httpx
from nonebot import on_command
from nonebot.params import Depends # 1.引用 Depends
test = on_command("123")
async def get_client(): # 2.编写异步生成器函数
async with httpx.AsyncClient() as client:
yield client
print("调用结束")
@test.handle()
async def _(x: httpx.AsyncClient = Depends(get_client)): # 3.在事件处理函数里声明依赖项
resp = await x.get("https://v2.nonebot.dev")
# do something
我们用
yield
代码段作为生成器函数的“返回”,在事件处理函数里用返回出来的client
做自己需要的工作。在NoneBot2
结束事件处理时,会执行yield
之后的代码
4.3 类函数作为依赖
from typing import Type
from nonebot.log import logger
from nonebot.params import Depends # 1.引用 Depends
from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
test = on_command("123")
class EventChecker: # 2.编写需要的类
def __init__(self, EventClass: Type[MessageEvent]):
self.event_class = EventClass
def __call__(self, event: MessageEvent) -> bool:
return isinstance(event, self.event_class)
checker = EventChecker(GroupMessageEvent) # 3.将类实例化
@test.handle()
async def _(x: bool = Depends(checker)): # 4.在事件处理函数里声明依赖项
if x:
print("这是群聊消息")
else:
print("这不是群聊消息")
标签:QQ,Nonebot2,插件,依赖,函数,nonebot,import,event,def From: https://www.cnblogs.com/liuzhongkun/p/16782882.html这是判断
onebot
的消息事件是不是群聊消息事件的一个例子,我们可以用四步来说明这个例子:
- 引用
Depends
。- 编写需要的类。类的
__init__
函数接收参数EventClass
,它将接收事件类本身。类的__call__
函数将接受消息事件对象,并返回一个bool
类型的判定结果。- 将类实例化。我们传入群聊消息事件作为参数实例化
checker
。- 在事件处理函数里声明依赖项。
NoneBot2
将会调用checker
的__call__
方法,返回给参数x
相应的判断结果