FE
起手路由
在访问curl --location-trusted -u root: -T test.csv -H "column_separator:," http://127.0.0.1:8030/api/demo/example_tbl/_stream_load时,FE如下操作:
● 检查用户名密码
● 检查权限
● 随机选择一个BE,Redirect到这个BE上
BE
_stream_load的handler是StreamLoadAction。处理一个标准http请求过程中的相关的回调方法有以下(Doris实现了具体的回调动作):
● StreamLoadAction.on_header
● StreamLoadAction.on_chunk_data - 接收数据
● StreamLoadAction.handle - 收尾工作
关键类:
StreamLoadAction:处理_stream_load的处理程序
StreamLoadContext:StreamLoad执行的上下文类
StreamLoadExecutor:
● begin_txn(), commit_txn(), rollback_txn()
● excute_plan_fragment()
StreamLoadPipe:是一个数据通路,主要实现了append()和read()两个方法;LoadStreamMgr:管理StreamLoadPipe的类
FragmentMgr:管理所有的fragment执行
PlanFragmentExecutor:真正Fragment的执行器
on_header
● 创建StreamLoadContext
● 向FE请求事务
● 生成StreamLoadPipe并向LoadStreamMgr注册
● 向FE请求执行计划
● 执行执行计划
○ 构建QueryFragmentsCtx
○ 构建FragmentExecState,并于其中构建PlanFragmentExecutor
○ 执行FragmentExecState.prepare,并执行PlanFragmentExecutor.prepare
○ 执行FragmentExecState.execute,并执行PlanFragmentExecutor.open
PlanFragmentExecutor.prepare
PlanFragmentExecutor.open
vectorized::NewFileScanNode
stream_load::VOlapTableSink
on_chunk_data
此步为接收数据,并往StreamLoadPipe中append数据
handle
● 首先future.get()阻塞,等待on_header中对future进行解除
● 如果执行成功则调用StreamLoadExecutor.commit_txn进行提交
● 向客户端发送StreamLoad的相关统计信息
● 记录StreamLoad相关信息
● on_header函数会调用到StreamLoadAction,并进行_on_header和_process_put方法的调用,最后调用到execute_plan_fragment函数将任务加入到线程池_thread_pool中;
● on_chunk_data函数进行导入数据的append;
● handle函数中通过ctx->future等待stream load线程结束;
标签:Load,load,stream,Stream,header,源码,StreamLoadAction,执行,PlanFragmentExecutor From: https://www.cnblogs.com/xutaoustc/p/17503709.html