首页 > 编程语言 >langchain chatchat运行机制源码解析

langchain chatchat运行机制源码解析

时间:2024-03-19 17:33:06浏览次数:22  
标签:prompt fastchat self messages langchain chatchat 源码 params model

langchain chatchat的简介就不多说了,大家可以去看github官网介绍,虽然当前版本停止了更新,下个版本还没有出来,但作为学习还是很好的。

一、关键启动过程:

1、start_main_server 入口

2、run_controller 启动fastchat controller 端口20001

3、run_openai_api启动fastchat对外提供的类似openai接口的服务,端口20000

4、run_model_worker 创建fastchat的model_worker,其中又执行了以下过程:

         4.1、create_model_worker_app,根据配置文件,创建并初始化对应的model_workder,初始化过程中,model_worker会通过self.init_heart_beat()将自己注册到fastchat controller中,以供fastchat管理调用。最后create_model_worker_app方法取出model_workder的fastaip对象app,将app返回。

        4.2 、uvicorn.run(app, host=host, port=port, log_level=log_level.lower()),启动模型对应的model_workder服务,这里的app来自model_workder的app。

二、chat过程

1、app.post("/chat/chat",              tags=["Chat"],              summary="与llm模型对话(通过LLMChain)",              )(chat) 2、本地模型LLM对话 model = get_ChatOpenAI(             model_name=model_name,             temperature=temperature,             max_tokens=max_tokens,             callbacks=callbacks,         ) get_ChatOpenAI: model = ChatOpenAI(         streaming=streaming,         verbose=verbose,         callbacks=callbacks,         openai_api_key=config.get("api_key", "EMPTY"),         openai_api_base=config.get("api_base_url", fschat_openai_api_address()),         model_name=model_name,         temperature=temperature,         max_tokens=max_tokens,         openai_proxy=config.get("openai_proxy"),         **kwargs     ) 在这里指定了fastchat的openai_api接口地址,这样就获得了指定接口地址的langchain ChatOpenAI对象 然后创建LLMChain: chain = LLMChain(prompt=chat_prompt, llm=model, memory=memory) 后面省略 3、在线模型LLM对话 在线模型的调用并没有直接发起,还是和上面一样,通过获取ChatOpenAI对象,来和fastchat进行交互,但是fastchat是不支持自定义调用在线模型的,langchain chatchat是怎么实现的呢? 原来,对应在线模型调用,langchain chatchat还是通过类似创建本地模型一样创建model_worker,但是对model_worker进行了继承,交互部分进行了重写,如qwen在线调用: class QwenWorker(ApiModelWorker):
而ApiModelWorker来自BaseModelWorker,BaseModelWorker就是fastchat的worker_model的基类。(本地模型实例化时用的ModelWorker本身也是继承自BaseModelWorker)
class ApiModelWorker(BaseModelWorker):
    DEFAULT_EMBED_MODEL: str = None # None means not support embedding

    def __init__(
        self,
        model_names: List[str],
        controller_addr: str = None,
        worker_addr: str = None,
        context_len: int = 2048,
        no_register: bool = False,
        **kwargs,
    ):
        kwargs.setdefault("worker_id", uuid.uuid4().hex[:8])
        kwargs.setdefault("model_path", "")
        kwargs.setdefault("limit_worker_concurrency", 5)
        super().__init__(model_names=model_names,
                        controller_addr=controller_addr,
                        worker_addr=worker_addr,
                        **kwargs)
        import fastchat.serve.base_model_worker
        import sys
        self.logger = fastchat.serve.base_model_worker.logger
        # 恢复被fastchat覆盖的标准输出
        sys.stdout = sys.__stdout__
        sys.stderr = sys.__stderr__

        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)

        self.context_len = context_len
        self.semaphore = asyncio.Semaphore(self.limit_worker_concurrency)
        self.version = None

        if not no_register and self.controller_addr:
            self.init_heart_beat()


    def count_token(self, params):
        prompt = params["prompt"]
        return {"count": len(str(prompt)), "error_code": 0}

    def generate_stream_gate(self, params: Dict):
        self.call_ct += 1

        try:
            prompt = params["prompt"]
            if self._is_chat(prompt):
                messages = self.prompt_to_messages(prompt)
                messages = self.validate_messages(messages)
            else: # 使用chat模仿续写功能,不支持历史消息
                messages = [{"role": self.user_role, "content": f"please continue writing from here: {prompt}"}]

            p = ApiChatParams(
                messages=messages,
                temperature=params.get("temperature"),
                top_p=params.get("top_p"),
                max_tokens=params.get("max_new_tokens"),
                version=self.version,
            )
            for resp in self.do_chat(p):
                yield self._jsonify(resp)
        except Exception as e:
            yield self._jsonify({"error_code": 500, "text": f"{self.model_names[0]}请求API时发生错误:{e}"})

    def generate_gate(self, params):
        try:
            for x in self.generate_stream_gate(params):
                ...
            return json.loads(x[:-1].decode())
        except Exception as e:
            return {"error_code": 500, "text": str(e)}


    # 需要用户自定义的方法

    def(self, params: ApiChatParams) -> Dict:
        '''
        执行Chat的方法,默认使用模块里面的chat函数。
        要求返回形式:{"error_code": int, "text": str}
        '''
        return {"error_code": 500, "text": f"{self.model_names[0]}未实现chat功能"}

    # def do_completion(self, p: ApiCompletionParams) -> Dict:
    #     '''
    #     执行Completion的方法,默认使用模块里面的completion函数。
    #     要求返回形式:{"error_code": int, "text": str}
    #     '''
    #     return {"error_code": 500, "text": f"{self.model_names[0]}未实现completion功能"}

    def do_embeddings(self, params: ApiEmbeddingsParams) -> Dict:
        '''
        执行Embeddings的方法,默认使用模块里面的embed_documents函数。
        要求返回形式:{"code": int, "data": List[List[float]], "msg": str}
        '''
        return {"code": 500, "msg": f"{self.model_names[0]}未实现embeddings功能"}

    def get_embeddings(self, params):
        # fastchat对LLM做Embeddings限制很大,似乎只能使用openai的。
        # 在前端通过OpenAIEmbeddings发起的请求直接出错,无法请求过来。
        print("get_embedding")
        print(params)

    def make_conv_template(self, conv_template: str = None, model_path: str = None) -> Conversation:
        raise NotImplementedError

    def validate_messages(self, messages: List[Dict]) -> List[Dict]:
        '''
        有些API对mesages有特殊格式,可以重写该函数替换默认的messages。
        之所以跟prompt_to_messages分开,是因为他们应用场景不同、参数不同
        '''
        return messages


    # help methods
    @property
    def user_role(self):
        return self.conv.roles[0]

    @property
    def ai_role(self):
        return self.conv.roles[1]

    def _jsonify(self, data: Dict) -> str:
        '''
        将chat函数返回的结果按照fastchat openai-api-server的格式返回
        '''
        return json.dumps(data, ensure_ascii=False).encode() + b"\0"

    def _is_chat(self, prompt: str) -> bool:
        '''
        检查prompt是否由chat messages拼接而来
        TODO: 存在误判的可能,也许从fastchat直接传入原始messages是更好的做法
        '''
        key = f"{self.conv.sep}{self.user_role}:"
        return key in prompt

    def prompt_to_messages(self, prompt: str) -> List[Dict]:
        '''
        将prompt字符串拆分成messages.
        '''
        result = []
        user_role = self.user_role
        ai_role = self.ai_role
        user_start = user_role + ":"
        ai_start = ai_role + ":"
        for msg in prompt.split(self.conv.sep)[1:-1]:
            if msg.startswith(user_start):
                if content := msg[len(user_start):].strip():
                    result.append({"role": user_role, "content": content})
            elif msg.startswith(ai_start):
                if content := msg[len(ai_start):].strip():
                    result.append({"role": ai_role, "content": content})
            else:
                raise RuntimeError(f"unknown role in msg: {msg}")
        return result

    @classmethod
    def can_embedding(cls):
        return cls.DEFAULT_EMBED_MODEL is not None

  从代码中可以看到ApiModelWorker重写了generate_stream_gate,并且调用了do_chat方法,该方法要求子类去实现实际的chat过程。我们再回到class QwenWorker(ApiModelWorker):

def do_chat(self, params: ApiChatParams) -> Dict:
        import dashscope
        params.load_config(self.model_names[0])
        if log_verbose:
            logger.info(f'{self.__class__.__name__}:params: {params}')

        gen = dashscope.Generation()
        responses = gen.call(
            model=params.version,
            temperature=params.temperature,
            api_key=params.api_key,
            messages=params.messages,
            result_format='message',  # set the result is message format.
            stream=True,
        )

        for resp in responses:
            if resp["status_code"] == 200:
                if choices := resp["output"]["choices"]:
                    yield {
                        "error_code": 0,
                        "text": choices[0]["message"]["content"],
                    }
            else:
                data = {
                    "error_code": resp["status_code"],
                    "text": resp["message"],
                    "error": {
                        "message": resp["message"],
                        "type": "invalid_request_error",
                        "param": None,
                        "code": None,
                    }
                }
                self.logger.error(f"请求千问 API 时发生错误:{data}")
                yield data

  至此,qwen在线模型完成了调用。

三、总结

不得不说,这种设计还是很精妙的,借助fastchat,不仅实现了fastchat支持的几个本地大模型的调用,对于在线模型,即使不同的在线模型有不同的api接口定义,但只需要去定义实现一个新的继承ApiModelWorker的类,就可以屏蔽掉接口之间的差异,通过fastchat对齐接口,统一对外提供类openai api接口服务,这样在langchain不做修改的情况下,langchain就可以正常调用市面上各类接口迥异的在线大模型。

三、后续计划

1、尝试langchain chatchat和ollama的对接
2、Agent应用实践

我建了一个langchain交流群,欢迎加入一起交流学习心得:

 

 

标签:prompt,fastchat,self,messages,langchain,chatchat,源码,params,model
From: https://www.cnblogs.com/srszzw/p/18083487

相关文章

  • 鸿鹄电子招投标系统源码实现与立项流程:基于Spring Boot、Mybatis、Redis和Layui的企业
    随着企业的快速发展,招采管理逐渐成为企业运营中的重要环节。为了满足公司对内部招采管理提升的要求,建立一个公平、公开、公正的采购环境至关重要。在这个背景下,我们开发了一款电子招标采购软件,以最大限度地控制采购成本,提高招投标工作的公开性和透明性,并确保符合国家电子招投标......
  • 高性能、可扩展、支持二次开发的企业电子招标采购系统源码
     在数字化时代,采购管理也正经历着前所未有的变革。全过程数字化采购管理成为了企业追求高效、透明和规范的关键。该系统通过SpringCloud、SpringBoot2、Mybatis等先进技术,打造了从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通过待办消息、招标公告、......
  • 电商系统源码搭建,让你轻松拥有属于自己的网店
    在开发与测试过程中,需要注意以下几个关键细节:前端页面设计与用户体验:确保前端页面符合系统架构设计中确定的功能模块,同时注重页面设计和用户体验。后端接口安全性与性能:开发后端接口时,要确保接口的安全性和性能,确保前端页面与后端数据的有效交互。数据库设计与优化:根据系统需......
  • 基于企业客户管理系统(源码+开题)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的迅猛发展,企业客户管理已经成为企业核心竞争力的重要组成部分。在日益激烈的市场竞争中,如何有效管理客户信息、提升客户服务质量、加强......
  • 基于农产品交易系统(源码+开题)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的飞速发展,农产品交易系统已成为现代农业产业化的重要组成部分。传统的农产品交易方式存在信息不对称、交易效率低下等问题,严重制约了农......
  • 基于内容的音乐推荐网站(源码+开题)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的飞速发展和数字音乐产业的蓬勃兴起,音乐已经成为人们日常生活中不可或缺的一部分。然而,面对海量的音乐资源,如何帮助用户高效、准确地找......
  • 基于Java中的SSM框架实现宝康药房销售管理系统项目【项目源码+论文说明】
    基于Java中的SSM框架实现宝康药房销售管理系统演示摘要随着我国市场经济的蓬勃发展和人们对医药产品需求的迅速增加,医药销售行业正处于一个高速发展的时期。行业的快速发展必然导致竞争的加剧,面对药品销售业日益严酷的竟争现实,加强管理、提高工作效率和改善服务质量成了急......
  • Spring AOP之源码分析
    在研究@Enable*注解的应用之:声明式事务@EnableTransactionManagement详解源码时,配置中context装置了一个TransactionInterceptor的bean。对这个类比较好奇因为Interceptor,因为MyBatis功能点之二(1):MyBatis提供的拦截器平台中也分析过interceptor的使用,这个SpringInterceptor是......
  • 基于Java+Vue的人力资源管理系统设计与实现【附源码+文档】
        前言:eHR人力资源管理系统是一个综合性的、用于优化人力资源管理流程的系统。它涵盖了人力资源管理的多个方面,包括招聘、人事、考勤、绩效、社保公积金以及薪酬管理等。以下是关于这些模块的详细解释:一、招聘管理招聘管理是eHR系统的重要组成部分,它可以帮助企业实现......
  • 基于SpringBoot的“会员制医疗预约服务管理信息系统”的设计与实现(源码+数据库+文档+P
    基于SpringBoot的“会员制医疗预约服务管理信息系统”的设计与实现(源码+数据库+文档+PPT)开发语言:Java数据库:MySQL技术:SpringBoot工具:IDEA/Ecilpse、Navicat、Maven系统展示系统首页界面图医院信息界面图坐诊信息界面图会员注册界面图个人中心界面图......