最近看到微信自动化回复,觉得很有意思,想接通大模型,自动回复好友消息。以下文章将对代码进行详细解释,文章末尾附源码
1.在抖音扣子平台创建发布一个大模型智能问答助手,获取API-key等。在扣子平台有详细文档。
2.wxauto安装。pip install wxauto
作者有详细文档介绍。接收消息以及发送消息。
3.代码基本原理:利用wxauto接收到好友消息,以网络请求方式发送到扣子,调用大模型,获取回答,解析回答内容,利用wxauto发送至好友,以此实现自动回复流程。
4.代码如下,很简短。token需要自己从扣子平台获取。不会的可以留言。
标签:Python,微信,answers,content,print,msg,wxauto,response From: https://blog.csdn.net/m0_66980894/article/details/141217696import requests
import json
import re
import uuid
from wxauto import WeChat
# 使用Coze API获取AI生成的回答
def coze_chat(query):
print(f"发送到Coze的消息内容: {query}") # 调试信息
url = 'https://api.coze.cn/open_api/v2/chat'
token = '扣子平台获取'
bot_id = "7380746246962692096"
stream = False # True: 流式返回 False: 非流式返回
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': '*/*',
'Host': 'api.coze.cn',
'Connection': 'keep-alive'
}
data = {
"conversation_id": "123",
"bot_id": bot_id,
"user": str(uuid.uuid4()),
"query": query,
"user_photo": None,
"stream": stream,
}
response = requests.post(url, headers=headers, data=json.dumps(data))
response = str(response.content, encoding='utf-8')
if stream:
answers = extract_answers(response)
else:
answers = extract_answers_non_stream(response)
#print(f"从Coze接收到的回复: {answers}")
return answers
# 流式返回提取模型回答内容
def extract_answers(response):
lines = response.split('\n')
answers_parts = []
content_re = re.compile(r'"type":"answer".*?"content":"(.*?)"')
for line in lines:
if line.startswith('data:'):
match = content_re.search(line)
if match:
answers_parts.append(match.group(1))
full_answer = ''.join(answers_parts)
return full_answer
# 非流式返回提取模型回答内容
def extract_answers_non_stream(response):
data = json.loads(response)
answers_parts = []
for message in data['messages']:
if message['type'] == 'answer':
answers_parts.append(message['content'])
full_answer = ''.join(answers_parts)
return full_answer
# 主函数,自动回复
def main(wx, msg_content):
print(f"收到的微信消息: {msg_content}")
response_msg = coze_chat(msg_content)
#发送给备注名是一的
wx.SendMsg(msg=response_msg + "--此内容为AI生成", who="一")
print(f"发送给一的消息: {response_msg}")
if __name__ == '__main__':
wx = WeChat()
while True:
msgs = wx.GetAllMessage()
#print(f"收到的消息列表: {msgs}")
if msgs:
last_msg = msgs[-1]
# print(f"最新一条消息: {last_msg}")
# print(f"消息备注名: {last_msg.sender_remark}") # 打印备注名
# print(f"消息类型: {last_msg.type}") # 打印消息类型
# 判断是否是好友消息,并且备注名是“一”
if last_msg.type == "friend" and last_msg.sender_remark == "一":
print("符合条件的消息,准备处理...")
main(wx, last_msg.content)