首页 > 编程语言 >【源码阅读】4. Stream Load 导入任务的执行流程

【源码阅读】4. Stream Load 导入任务的执行流程

时间:2023-06-25 18:55:48浏览次数:43  
标签:Load load stream Stream header 源码 StreamLoadAction 执行 PlanFragmentExecutor

 

FE

起手路由

在访问curl --location-trusted -u root: -T test.csv -H "column_separator:," http://127.0.0.1:8030/api/demo/example_tbl/_stream_load时,FE如下操作:

● 检查用户名密码

● 检查权限

● 随机选择一个BE,Redirect到这个BE上

 

BE

_stream_load的handler是StreamLoadAction。处理一个标准http请求过程中的相关的回调方法有以下(Doris实现了具体的回调动作):

● StreamLoadAction.on_header

● StreamLoadAction.on_chunk_data - 接收数据

● StreamLoadAction.handle - 收尾工作

关键类:

StreamLoadAction:处理_stream_load的处理程序

StreamLoadContext:StreamLoad执行的上下文类

StreamLoadExecutor:

● begin_txn(), commit_txn(), rollback_txn()

● excute_plan_fragment()

StreamLoadPipe:是一个数据通路,主要实现了append()和read()两个方法;LoadStreamMgr:管理StreamLoadPipe的类

FragmentMgr:管理所有的fragment执行

PlanFragmentExecutor:真正Fragment的执行器

 

 

on_header

● 创建StreamLoadContext

● 向FE请求事务

● 生成StreamLoadPipe并向LoadStreamMgr注册

● 向FE请求执行计划

● 执行执行计划

○ 构建QueryFragmentsCtx

○ 构建FragmentExecState,并于其中构建PlanFragmentExecutor

○ 执行FragmentExecState.prepare,并执行PlanFragmentExecutor.prepare

○ 执行FragmentExecState.execute,并执行PlanFragmentExecutor.open

 

PlanFragmentExecutor.prepare

 

 

PlanFragmentExecutor.open

 

vectorized::NewFileScanNode

stream_load::VOlapTableSink

 

 

 

on_chunk_data

此步为接收数据,并往StreamLoadPipe中append数据

 

handle

● 首先future.get()阻塞,等待on_header中对future进行解除

● 如果执行成功则调用StreamLoadExecutor.commit_txn进行提交

● 向客户端发送StreamLoad的相关统计信息

● 记录StreamLoad相关信息

 

 

 

● on_header函数会调用到StreamLoadAction,并进行_on_header和_process_put方法的调用,最后调用到execute_plan_fragment函数将任务加入到线程池_thread_pool中;

● on_chunk_data函数进行导入数据的append;

● handle函数中通过ctx->future等待stream load线程结束;

 

标签:Load,load,stream,Stream,header,源码,StreamLoadAction,执行,PlanFragmentExecutor
From: https://www.cnblogs.com/xutaoustc/p/17503709.html

相关文章

  • Bert Pytorch 源码分析:二、注意力层
    #注意力机制的具体模块#兼容单头和多头classAttention(nn.Module):"""Compute'ScaledDotProductAttention""" #QKV尺寸都是BS*ML*ES #(或者多头情况下是BS*HC*ML*HS,最后两维之外的维度不重要) #从输入计算QKV的过程可以统一处理,不必......
  • 【源码阅读】2. Catalog和Database
     Catalog创建|KW_CREATEKW_CATALOGopt_if_not_exists:ifNotExistsident:catalogNameopt_properties:properties{:RESULT=newCreateCatalogStmt(ifNotExists,catalogName,null,properties);:}|KW_CREATEKW_CATALOGopt_if_not_......
  • 【源码阅读】1. 配置、VARIABLE与用户PROPERTY
     配置初始化在FE启动时:● Config类ConfField注解标记的静态属性反射出Field存储到内存confFields,作为一个可读取和修改的属性列表(真正的值存储在Config类的静态属性中,反射出Field并存储到confFields只是一个读取和修改指针而已)● 读取配置文件,根据配置文件内容,设置Confi......
  • Java 反序列化之 XStream 反序列化
    0x01XStream基础XStream简介XStream是一个简单的基于Java库,Java对象序列化到XML,反之亦然(即:可以轻易的将Java对象和XML文档相互转换)。使用XStream实现序列化与反序列化下面看下如何使用XStream进行序列化和反序列化操作的。先定义接口类IPerson.javapublicinterf......
  • Java 反序列化之 XStream 反序列化
    0x01XStream基础XStream简介XStream是一个简单的基于Java库,Java对象序列化到XML,反之亦然(即:可以轻易的将Java对象和XML文档相互转换)。使用XStream实现序列化与反序列化下面看下如何使用XStream进行序列化和反序列化操作的。先定义接口类IPerson.javapubli......
  • Bert PyTorch 源码分析:一、嵌入层
    #标记嵌入就是最普通的嵌入层#接受单词ID输出单词向量#直接转发给了`nn.Embedding`classTokenEmbedding(nn.Embedding):def__init__(self,vocab_size,embed_size=512):super().__init__(vocab_size,embed_size,padding_idx=0) #片段嵌入实际上是......
  • GPT-Gstreamer操作调查
    gstreamer是一个开源的多媒体框架,可以用来实现音视频的编解码、处理、播放和转码等功能。本文将介绍如何用gstreamer完成多码率视频转换与生成、音视频编解码的基本步骤和原理。多码率视频转换与生成多码率视频转换与生成是一种常见的视频处理需求,它可以根据不同的网络环境和设......
  • 谁与争锋!手机直播源码知识分享之主播PK功能
    今天我要分享的知识与PK有关,PK是指某些人分成几方进行对决、对抗,直到分出胜负。PK的方式有很多,在现实生活中,人们可以通过智力、力量等进行PK,方式可以是搏斗、扳手腕、现场智力问答等;而在网络中,人们可以通过游戏、网络智力问答的方式进行PK。我今天要讲的这个功能也是网络中的PK,这个......
  • 2.nacos-client源码及查看
    nacos-client.2.2.1-RC.SDK查看源码官网JAVASDK链接主要内容<dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId><version>${version}</version></dependency>问题:1.获取配置api是获取快照......
  • Invalid character found in the request target [/api/hsFile/download?filePath=E:
    java.lang.IllegalArgumentException:Invalidcharacterfoundintherequesttarget[/api/hsFile/download?filePath=E:\\%E4%B8%B4%E6%97%B6%E6%96%87%E4%BB%B6&fileName=N230508A0002.xlsx].ThevalidcharactersaredefinedinRFC7230andRFC39861、原因:/a......