首页 > 其他分享 >MetaGPT day03 MetaGPT 订阅智能体

MetaGPT day03 MetaGPT 订阅智能体

时间:2024-01-22 18:22:57浏览次数:30  
标签:订阅 day03 self await MetaGPT url import async def

订阅智能体

基本介绍

# 什么是订阅智能体
MetaGPT提供了Agent的开发能力,Agent可以协助我们解决日常的一些问题,其中一个应用场景就是我们可以让Agent为我们关注某些想关注的信息,当有我们关注的事件发生时,Agent获取信息并进行处理,然后通过一些如邮件、微信、discord等通知渠道将处理后的信息发送给我们,我们将这类Agent称为订阅智能体。

# 订阅智能体的实现要素
从例子可以知道订阅智能体的实现主要有3个要素,分别是Role、Trigger、Callback,即智能体本身、触发器、数据回调。如果之前已经学过了Agent的开发,那么对Role的实现肯定非常熟悉了,这边的Role就是我们的Agent的实际代码实现,所以订阅智能体在代码实现上并没有什么特别的地方,可以看到下面的例子实际运行的就是MetaGPT中的Searcher Role,是官方提供的角色。

基本示例

metagpt提供了订阅智能体的简单实现示例,也就是SubscriptionRunner类。

import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message

async def trigger():
    while True:
        yield Message("the latest news about OpenAI")
        await asyncio.sleep(3600 * 24)

async def callback(msg: Message):
    print(msg.content)

async def main():
    pb = SubscriptionRunner()
    await pb.subscribe(Searcher(), trigger(), callback)
    await pb.run()

asyncio.run(main())

SubscriptionRunner.subscribe

# 做了什么事?
1.根据传入的Role、Tigger、Callback,写一个协程函数,然后将这个协程函数使用create_task变成一个协程Task对象,传入事件循环。
2.将这个协程Task对象保存起来,SubscriptionRunner类有个task属性,是一个字典。将传入的Role作为键,协程Task对象作为值,存放到task字典内。方便后续检查运行状态和异常处理,

类似:
	self.task = {'role-1': task_obj1, 'role-2':task_obj2,...}

这个协程Task对象是按照如下创建的:
    loop = asyncio.get_running_loop()
    async def _start_role():
        async for msg in trigger:
            resp = await role.run(msg)
            await callback(resp)
    self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

可见当这个Task对象运行时,会执行_start_role内的代码,然后拿到trigger中yield后面的Message,然后运行角色、得到角色的返回值之后执行回调函数:
async def trigger():
    while True:
        yield Message("the latest news about OpenAI")
        await asyncio.sleep(3600 * 24)

SubscriptionRunner.run

# 做了什么事?
1秒一次,循环检查,task字典中所有的协程Task对象。也就是检查我们订阅智能体的状态,异常了就报错。
self.task = {'role-1': task_obj, 'role-2':task_obj,...}

async def run(self, raise_exception: bool = True):
    """运行所有已订阅的任务并处理它们的完成或异常。
    args:
        raise_exception (bool): 如果为True,在任何已订阅的任务遇到异常时引发异常。默认为True。
    raises:
        task.exception: 如果`raise_exception`为True,则抛出一个由于task引发的异常
    """
    while True:
        # 遍历任务及其对应的角色
        for role, task in self.tasks.items():
            # 检查任务是否完成
            if task.done():
                # 检查任务是否遇到异常
                if task.exception():
                    # 如果指定了raise_exception,引发异常
                    if raise_exception:
                        raise task.exception()
                    # 记录带有任务异常的错误消息
                    logger.opt(exception=task.exception()).error(f"任务 {task.get_name()} 运行错误")
                else:
                    # 如果任务在没有异常的情况下完成,记录警告
                    logger.warning(
                        f"任务 {task.get_name()} 已完成。"
                        "如果这是意外的行为,请检查触发函数。"
                    )
                # 从任务字典中删除任务
                self.tasks.pop(role)
                break
        else:
            # 如果for没有被break中断,则任务未完成,等待1秒后再次检查
            await asyncio.sleep(1)

实现微信、DISCORD订阅智能体

import asyncio
import os
import time
from typing import Any, AsyncGenerator, Awaitable, Callable, Optional
import aiohttp
import discord
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo
from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message


# 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):
    """A simple wrapper to manage subscription tasks for different roles using asyncio.
    Example:
        >>> import asyncio
        >>> from metagpt.subscription import SubscriptionRunner
        >>> from metagpt.roles import Searcher
        >>> from metagpt.schema import Message
        >>> async def trigger():
        ...     while True:
        ...         yield Message("the latest news about OpenAI")
        ...         await asyncio.sleep(3600 * 24)
        >>> async def callback(msg: Message):
        ...     print(msg.content)
        >>> async def main():
        ...     pb = SubscriptionRunner()
        ...     await pb.subscribe(Searcher(), trigger(), callback)
        ...     await pb.run()
        >>> asyncio.run(main())
    """

    tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)

    class Config:
        arbitrary_types_allowed = True

    async def subscribe(
            self,
            role: Role,
            trigger: AsyncGenerator[Message, None],
            callback: Callable[
                [
                    Message,
                ],
                Awaitable[None],
            ],
    ):
        """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
        Args:
            role: The role to subscribe.
            trigger: An asynchronous generator that yields Messages to be processed by the role.
            callback: An asynchronous function to be called with the response from the role.
        """
        loop = asyncio.get_running_loop()

        async def _start_role():
            async for msg in trigger:
                resp = await role.run(msg)
                await callback(resp)

        self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

    async def unsubscribe(self, role: Role):
        """Unsubscribes a role from its trigger and cancels the associated task.
        Args:
            role: The role to unsubscribe.
        """
        task = self.tasks.pop(role)
        task.cancel()

    async def run(self, raise_exception: bool = True):
        """Runs all subscribed tasks and handles their completion or exception.
        Args:
            raise_exception: _description_. Defaults to True.
        Raises:
            task.exception: _description_
        """
        while True:
            for role, task in self.tasks.items():
                if task.done():
                    if task.exception():
                        if raise_exception:
                            raise task.exception()
                        logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
                    else:
                        logger.warning(
                            f"Task {task.get_name()} has completed. "
                            "If this is unexpected behavior, please check the trigger function."
                        )
                    self.tasks.pop(role)
                    break
            else:
                await asyncio.sleep(1)


# actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example

```
# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
```

---
# Github Trending
{trending}
"""


# 分析结果 action
class AnalysisOSSTrending(Action):

    async def run(
            self,
            trending: Any
    ):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))


# 爬github action
class CrawlOSSTrending(Action):

    async def run(self, url: str = "https://github.com/trending"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url, proxy=CONFIG.global_proxy) as response:
                response.raise_for_status()
                html = await response.text()

        soup = BeautifulSoup(html, 'html.parser')

        repositories = []

        for article in soup.select('article.Box-row'):
            repo_info = {}

            repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
            repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()

            # Description
            description_element = article.select_one('p')
            repo_info['description'] = description_element.text.strip() if description_element else None

            # Language
            language_element = article.select_one('span[itemprop="programmingLanguage"]')
            repo_info['language'] = language_element.text.strip() if language_element else None

            # Stars and Forks
            stars_element = article.select('a.Link--muted')[0]
            forks_element = article.select('a.Link--muted')[1]
            repo_info['stars'] = stars_element.text.strip()
            repo_info['forks'] = forks_element.text.strip()

            # Today's Stars
            today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
            repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None

            repositories.append(repo_info)

        return repositories


# Role实现
class OssWatcher(Role):
    def __init__(
            self,
            name="Codey",
            profile="OssWatcher",
            goal="Generate an insightful GitHub Trending analysis report.",
            constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name, profile, goal, constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self._rc.todo

        msg = self.get_memories(k=1)[0]  # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg)
        return msg


# Trigger
class OssInfo(BaseModel):
    url: str
    timestamp: float = Field(default_factory=time.time)


class GithubTrendingCronTrigger():

    def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message(self.url, OssInfo(url=self.url))


# discord callback
async def discord_callback(msg: Message):
    intents = discord.Intents.default()
    intents.message_content = True
    client = discord.Client(intents=intents, proxy=CONFIG.global_proxy)
    token = os.environ["DISCORD_TOKEN"]
    channel_id = int(os.environ["DISCORD_CHANNEL_ID"])
    async with client:
        await client.login(token)
        channel = await client.fetch_channel(channel_id)
        lines = []
        for i in msg.content.splitlines():
            if i.startswith(("# ", "## ", "### ")):
                if lines:
                    await channel.send("\n".join(lines))
                    lines = []
            lines.append(i)

        if lines:
            await channel.send("\n".join(lines))


# wx callback
async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)


class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
            self,
            content,
            summary: Optional[str] = None,
            content_type: int = 1,
            topic_ids: Optional[list[int]] = None,
            uids: Optional[list[int]] = None,
            verify: bool = False,
            url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()


# 运行入口
async def main(spec: str = "39 0 * * *", discord: bool = True, wxpusher: bool = True):
    callbacks = []
    if discord:
        callbacks.append(discord_callback)

    if wxpusher:
        callbacks.append(wxpusher_callback)

    if not callbacks:
        async def _print(msg: Message):
            print(msg.content)

        callbacks.append(_print)
        
	# 这个回调函数,相当于OssWatcher返回信息,直接产生两个task,同时向微信和discord推送消息。
    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks))

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
    await runner.run()


if __name__ == "__main__":
    os.environ["DISCORD_CHANNEL_ID"] = 'YOUR DISCORD_CHANNEL_ID'
    os.environ["DISCORD_TOKEN"] = 'YOUR DISCORD_TOKEN'
    os.environ["WXPUSHER_UIDS"] = 'YOUR WXPUSHER_UIDS'
    os.environ["WXPUSHER_TOKEN"] = 'YOUR WXPUSHER_TOKEN'

    import fire
    fire.Fire(main)

实现 Huggingface paper 定义智能体

import asyncio
import os
import time
from typing import Any, Optional, List
import aiohttp
from aiocron import crontab
from lxml import etree
from pydantic import BaseModel, Field
from pytz import BaseTzInfo
from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message

# actions 的实现
from metagpt.subscription import SubscriptionRunner

Huggingface_Paper_PROMPT = """
从HuggingFace Papers获取到的论文内容如下:
{content}

论文内容是一个数组,数组中有多篇论文,每一篇论文都是json格式,其中title内容是论文标题,abstract内容是论文摘要。
需要将摘要内容进行总结,并将总结的内容以中文输出。
需要将标题翻译为中文,放在原标题后面。

输出格式如下(Markdown),包括标题和总结:
# HuggingFace Papers
## 论文标题1:标题 (中文标题)
### 总结
## 论文标题2:标题 (中文标题)
### 总结
## 论文标题3:标题 (中文标题)
### 总结
## 论文标题4:标题 (中文标题)
### 总结
## 论文标题5:标题 (中文标题)
### 总结
"""


# 分析结果 action
class AnalysisOSSTrending(Action):

    async def run(
            self,
            content: Any
    ):
        return await self._aask(Huggingface_Paper_PROMPT.format(content=content))


# 爬huggingface action
class CrawlHuggingfacePaper(Action):
    @staticmethod
    async def fetch_html(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url, proxy=CONFIG.global_proxy) as response:
                return await response.text()

    async def parse_huggingface_paper(self, html):
        tree = etree.HTML(html)
        div_list = tree.xpath('/html/body/div/main/div[2]/section/div[2]/div')
        div_list.pop()
        paper_url = ['https://huggingface.co' + div.xpath('.//article/div/div/div[2]/h3/a/@href')[0] for div in
                     div_list]
        papers = []
        for url in paper_url:
            html = await self.fetch_html(url)
            await asyncio.sleep(0)
            tree = etree.HTML(html)
            title = tree.xpath('/html/body/div/main/div/section[1]/div/div[1]/h1/text()')[0]
            abstract = tree.xpath('/html/body/div/main/div/section[1]/div/div[2]/p/text()')[0]
            papers.append({'title': title, 'abstract': abstract})
        return papers

    async def run(self, url: str = 'https://huggingface.co/papers') -> List:
        html = await self.fetch_html(url)
        papers = await self.parse_huggingface_paper(html)
        return papers


# Role实现
class OssWatcher(Role):
    def __init__(
            self,
            name="Codey",
            profile="OssWatcher",
            goal="Generate an Huggingface Paper analysis report.",
            constraints="Only analyze based on the Huggingface Paper title and abstract.",
    ):
        super().__init__(name, profile, goal, constraints)
        self._init_actions([CrawlHuggingfacePaper, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self._rc.todo

        msg = self.get_memories(k=1)[0]  # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg)
        return msg


# Trigger
class OssInfo(BaseModel):
    url: str
    timestamp: float = Field(default_factory=time.time)


class CronTrigger():

    def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message(self.url, OssInfo(url=self.url))


# wx callback
async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)


class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
            self,
            content,
            summary: Optional[str] = None,
            content_type: int = 1,
            topic_ids: Optional[list[int]] = None,
            uids: Optional[list[int]] = None,
            verify: bool = False,
            url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()


# 运行入口
async def main(spec: str = "04 18 * * *", wxpusher: bool = True):
    callbacks = []

    if wxpusher:
        callbacks.append(wxpusher_callback)

    if not callbacks:
        async def _print(msg: Message):
            print(msg.content)

        callbacks.append(_print)

    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks))

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), CronTrigger(spec, url='https://huggingface.co/papers'), callback)
    await runner.run()


if __name__ == "__main__":
    os.environ["WXPUSHER_UIDS"] = 'your apikey'
    os.environ["WXPUSHER_TOKEN"] = 'your apikey'
    import fire

    fire.Fire(main)

标签:订阅,day03,self,await,MetaGPT,url,import,async,def
From: https://www.cnblogs.com/passion2021/p/17980708

相关文章

  • MetaGPT day02: MetaGPT Role源码分析
    MetaGPT源码分析思维导图MetaGPT版本为v0.4.0,如下是frommetagpt.rolesimportRole,Role类执行Role.run时的思维导图:概述其中最重要的部分是_react,里面包含了一个循环,在循环中交替执行_think和_act,也就是让llm先思考再行动。_think中决定了llm下一个执行的动作是什么,这个动作......
  • day03 转义字符和注释
    转义字符注意:韩顺平教育/r北京输出结果:北京平教育。意思是回车回到当前行韩顺平教育/r/n北京输出结果:韩顺平教育换行后北京注释单行注释//xxx多行注释/*xxx*/文档注释,对类进行注释,用javadoc可以生成HTML文件/***xxx*/java代码规范......
  • MetaGPT day01: MetaGPT作者代码走读、软件公司初始示例
    LLM发展历史-2013年word2vec提出,但效果不好-2017年Transformer结构提出,降低网络复杂度-2018年BERT预训练语言模型效果显著提升-2019年GPT-3推出,采用大规模预训练-2020年InstructionTuning提出,实现零样本学习-2022年InstructGPT解决模型毒性问题-当前GPT-4成本......
  • 简单的.NET 8 Web API使用Kafka 发布订阅模式,示例api示例
    简单的.NET8WebAPI使用Kafka发布订阅模式,示例api示例kafka当使用Kafka时,我们需要使用Kafka的客户端库来与Kafka集群进行通信。在.NETCore中,可以使用Confluent.Kafka客户端库来实现与Kafka的集成。首先,我们需要在项目中添加Confluent.Kafka库的引用。首先,使用NuGet包管......
  • redis 发布订阅 的简单用法
    背景目前所使用的爬虫管理平台在自动获取git代码时会有拉不下来代码的情况,导致服务器上运行的不是最新的代码解决方案代码合并到master分支上之后,通过webhook触发消息发布接口,服务器在收到监听到消息开始从git的master分支上拉去代码因为拉取的代码存放在宿主机中,dock......
  • 无涯教程-Redis - 发布订阅命令
    RedisPub/Sub实现了消息传递系统,其中发件人(用redis术语称为发布者)发送消息,而接收者(订户)接收消息,消息传输所通过的链接称为channel。在Redis中,客户端可以订阅任意数量的频道。PublishSubscribe-示例以下示例说明了发布订户概念的工作方式。在以下示例中,一个客户订阅......
  • mqtt 共享订阅简单说明
    mqtt是一个比较常用的iot协议,5.0支持共享订阅,共享订阅是一个特别方便的功能,可以实现业务的高可用,以及提升消息的处理参考格式共享订阅格式$share/{ShareName}/{TopicFilter}ShareName是一个共享名称,可以实现业务组的能力,TopicFilter}可以对于订阅消息的过滤处理参考玩法简......
  • day03 代码随想录算法训练营 203. 移除链表元素
    题目:203.移除链表元素我的感悟:题目里的节点是已经给好的,创建虚拟节点,是为了方便处理头节点。加油,我可以的!!!!!理解难点:节点已经给好的创建虚拟节点代码难点:p是临时变量,类似于foriinrange(10)这里的i,本身是用完就扔的。返回值为什么不能是p.next?因为p是一个指针,......
  • openGauss学习笔记-179 openGauss 数据库运维-逻辑复制-发布订阅
    openGauss学习笔记-179openGauss数据库运维-逻辑复制-发布订阅发布和订阅基于逻辑复制实现,其中有一个或者更多订阅者订阅一个发布者节点上的一个或者更多发布。订阅者从它们所订阅的发布拉取数据。发布者上的更改会被实时发送给订阅者。订阅者以与发布者相同的顺序应用那些数据......
  • 代码随想录day03 链表删除 链表类的实现 反转链表
    首先是链表的删除操作热身题这里使用了一个新的头指针这样在删除头指针的时候就不需要进行额外的判断然后是链表类的实现需要一点背诵加上深刻理解有时候理解了但是写起来还是会有些指针边界的小问题应该多写写多记一下就会好了还有就是手写链表要常复习吧学习新......