首页 > 其他分享 >kedro package 项目运行内部处理

kedro package 项目运行内部处理

时间:2024-09-26 08:55:08浏览次数:1  
标签:run cli package project kedro str 运行

kedro package 会将开发的data pipeline 项目构建为一个标准的python whl 格式包(build 模块),之后我们就可以直接基于项目模块运行开发的pipeline 了,以下简单说明下内部处理

项目结构

为了将kedro pipeline 项目提供为一个可以通过模块直接运行的,kedro starter 包含了一个__main__.py 的文件,这样就可以直接运行了,当然为了方便
通过cli 模式kedro 也提供了scripts 方便cli 使用

  • 参考代码结构
./spaceflights
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── parameters.yml
│   │   ├── parameters_data_processing.yml
│   │   ├── parameters_data_science.yml
│   │   └── parameters_reporting.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   │   ├── companies.csv
│   │   ├── reviews.csv
│   │   └── shuttles.xlsx
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── notebooks
├── pyproject.toml
├── requirements.txt
└── src
    └── spaceflights
        ├── __init__.py
        ├── __main__.py  # 通过python -m <projectname> 模式可以运行必备的
        ├── pipeline_registry.py
        ├── pipelines
        └── settings.py
  • cli 模式
    pyproject.toml 配置了定义
[project.scripts]
spaceflights = "spaceflights.__main__:main"

内部处理

核心是__main__.py ,调用了内部的配置,以及pipeline 发现以及基于kedro 框架的cli 运行(具体会由不同的runner 执行)

  • main.py
"""spaceflights file for ensuring the package is executable
as `spaceflights` and `python -m spaceflights`
"""
from pathlib import Path
 
from kedro.framework.cli.utils import find_run_command
from kedro.framework.project import configure_project
   
def main(*args, **kwargs):
    package_name = Path(__file__).parent.name
   # 项目配置
    configure_project(package_name)
    # 运行项目
    run = find_run_command(package_name)
    run(*args, **kwargs)
 
if __name__ == "__main__":
    main()
  • configure_project 配置处理
    此出的处理上并不是kedro 使用的omegaconf,而且基于dynaconf 的配置管理,基于omegaconf 是在具体pipeline 运行中使用到
    主要是对于实际运行中conf 以及data 的处理,会结合项目中的settings.py 的一些配置处理
def configure_project(package_name: str) -> None:
    """Configure a Kedro project by populating its settings with values
    defined in user's settings.py and pipeline_registry.py.
    """
    settings_module = f"{package_name}.settings"
    settings.configure(settings_module)
 
    pipelines_module = f"{package_name}.pipeline_registry"
    pipelines.configure(pipelines_module)
 
    # Once the project is successfully configured once, store PACKAGE_NAME as a
    # global variable to make it easily accessible. This is used by validate_settings()
    # below, and also by ParallelRunner on Windows, as package_name is required every
    # time a new subprocess is spawned.
    global PACKAGE_NAME  # noqa: PLW0603
    PACKAGE_NAME = package_name
 
    if PACKAGE_NAME:
        LOGGING.set_project_logging(PACKAGE_NAME)
  • find_run_command 处理
def find_run_command(package_name: str) -> Callable:
    """Find the run command to be executed.
       This is either the default run command defined in the Kedro framework or a run command defined by
       an installed plugin.
 
    Args:
        package_name: The name of the package being run.
 
    Raises:
        KedroCliError: If the run command is not found.
 
    Returns:
        Run command to be executed.
    """
    try:
      # 首先看项目级别的cli 
        project_cli = importlib.import_module(f"{package_name}.cli")
        # fail gracefully if cli.py does not exist
    except ModuleNotFoundError as exc:
        if f"{package_name}.cli" not in str(exc):
            raise
        # 加载entry_points 的几个定义
        plugins = load_entry_points("project")
        run = _find_run_command_in_plugins(plugins) if plugins else None
        if run:
            # use run command from installed plugin if it exists
            return run  # type: ignore[no-any-return]
        # use run command from `kedro.framework.cli.project`
       #  没有找到使用默认project 包中的run 模块
        from kedro.framework.cli.project import run
 
        return run  # type: ignore[no-any-return]
    # fail badly if cli.py exists, but has no `cli` in it
    if not hasattr(project_cli, "cli"):
        raise KedroCliError(f"Cannot load commands from {package_name}.cli")
    return project_cli.run  # type: ignore[no-any-return]

默认entry_point 查找的类型

ENTRY_POINT_GROUPS = {
    "global": "kedro.global_commands",
    "project": "kedro.project_commands",
    "init": "kedro.init",
    "line_magic": "kedro.line_magic",
    "hooks": "kedro.hooks",
    "cli_hooks": "kedro.cli_hooks",
    "starters": "kedro.starters",
}

project run 中的处理

def run(  # noqa: PLR0913
    tags: str,
    env: str,
    runner: str,
    is_async: bool,
    node_names: str,
    to_nodes: str,
    from_nodes: str,
    from_inputs: str,
    to_outputs: str,
    load_versions: dict[str, str] | None,
    pipeline: str,
    config: str,
    conf_source: str,
    params: dict[str, Any],
    namespace: str,
) -> None:
    """Run the pipeline."""
 
    runner_obj = load_obj(runner or "SequentialRunner", "kedro.runner")
    tuple_tags = tuple(tags)
    tuple_node_names = tuple(node_names)
   # 创建session,具体有session 中的run 运行
    with KedroSession.create(
        env=env, conf_source=conf_source, extra_params=params
    ) as session:
        session.run(
            tags=tuple_tags,
            runner=runner_obj(is_async=is_async),
            node_names=tuple_node_names,
            from_nodes=from_nodes,
            to_nodes=to_nodes,
            from_inputs=from_inputs,
            to_outputs=to_outputs,
            load_versions=load_versions,
            pipeline_name=pipeline,
            namespace=namespace,
        )

说明

官方有一个参考架构的图,结合此图以及项目结构代码看运行机制就更加清晰了

参考资料

https://docs.kedro.org/en/0.19.5/tutorial/package_a_project.html
https://docs.kedro.org/en/stable/kedro_project_setup/settings.html
https://docs.kedro.org/en/stable/extend_kedro/architecture_overview.html
https://python-poetry.org/docs/pyproject/#scripts

标签:run,cli,package,project,kedro,str,运行
From: https://www.cnblogs.com/rongfengliang/p/18352103

相关文章

  • 基于微信小程序的社区团购+ssm(lw+演示+源码+运行)
    摘要社会的发展和科学技术的进步,互联网技术越来越受欢迎。手机也逐渐受到广大人民群众的喜爱,也逐渐进入了每个会员的使用。手机具有便利性,速度快,效率高,成本低等优点。因此,构建符合自己要求的操作系统是非常有意义的。本文从管理员、商家、会员的功能要求出发,社区团购系统中......
  • 基于微信小程序的商品展示+ssm(lw+演示+源码+运行)
    商品展示系统摘要随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,微信小程序被用户普遍使用,为方便用户能够可以随时进行小程序的相应信息内容的管理,特开发了基于微信小程序的......
  • 本地部署运行 Google Gemma 开源大模型
    Google开源了Gemma大模型,有7B和2B两个版本,7B模型的能力已经是开源模型中的领先水平。Gemma可以轻松的在本地部署运行,如果你的显存在8G以上,可以体验7B版本,8G以下的话可以试试2B版本。部署过程如下:1、使用ollama运行Gemma模型2、使用Chatbox作为UI客户端......
  • 《守望先锋2》libcef.dll文件丢失无法运行?快速定位并解决《守望先锋2》libcef.dll文件
    《守望先锋2》在运行时提示libcef.dll文件丢失,这确实是一个可能导致游戏无法正常运行的问题。以下是一些快速定位并解决这一问题的步骤:一、了解libcef.dll文件libcef.dll文件通常与ChromiumEmbeddedFramework(CEF)相关,它对于游戏内嵌的网页内容展示与交互至关重要,如登录界面......
  • JMeter的运行
    简介JMeter运行环境要求Java版本:JMeter是基于Java开发的,所以需要安装Java运行环境(JRE)或Java开发工具包(JDK)。推荐使用Java8或更新版本。系统要求:JMeter可以运行在Windows、Mac和Linux等操作系统上。具体的要求可以根据JMeter的官方文档来确定。内存要求:JM......
  • 性能测试 | JMeter的运行
    简介JMeter运行环境要求Java版本:JMeter是基于Java开发的,所以需要安装Java运行环境(JRE)或Java开发工具包(JDK)。推荐使用Java8或更新版本。系统要求:JMeter可以运行在Windows、Mac和Linux等操作系统上。具体的要求可以根据JMeter的官方文档来确定。内存要求:JMeter在......
  • Docker 运行 MongoDB
    Docker运行MongoDB实验环境宿主机:CentOSStreamrelease9Docker:DockerEngine27.3.1MongoDB:7.0.14Mongosh;2.3.1配置文件mongod.conf[karma@localhostkarmamongodb]$catmongod.conf|grep-vE"#|^$"storage:dbPath:/var/lib/mongodbsystemLog:des......
  • 编译安装redis运行注册服务脚本sh install_server.sh时报错。
    在编译安装redis的时候,运行注册服务脚本shinstall_server.sh时,报错。WelcometotheredisserviceinstallerThisscriptwillhelpyoueasilysetuparunningredisserverThissystemsseemstousesystemd.Pleasetakealookattheprovidedexampleserviceunitfi......
  • vscode解决运行程序无法从控制台输入问题
    在vscode中运行一些简单的程序代码,需要从控制台接受输入参数,发现不能通过键盘输入。本章教程,提供该问题的解决方法。解决办法由于我是使用的CodeRunner这个插件,CodeRunner插件支持运行多种编程语言,很方便。打开CodeRunner的插件设置,找到一下选项,并勾选即可解决该问题。如果遇......
  • python调用另一个.py文件中的类和函数或直接运行另一个.py文件
    同一文件夹下的调用1.调用函数A.py文件如下:defadd(x,y):print('和为:%d'%(x+y))在B.py文件中调用A.py的add函数如下:importAA.add(1,2)或fromAimportaddadd(1,2)2.调用类A.py文件如下:classA:def__init__(self,xx,yy):self.x=xxself.y=y......