首页 > 其他分享 >基于jprofiler 的一个简单dremio 查询处理学习

基于jprofiler 的一个简单dremio 查询处理学习

时间:2022-10-07 17:35:32浏览次数:80  
标签:dremio java jprofiler 查询处理 sabot fragment run null

一个dremio 查询简单调用链的说明

参考命令

  • arthas watch
watch com.dremio.sabot.exec.fragment.FragmentExecutor$AsyncTaskImpl run '{params, target, returnObj, throwExp}' -x 2
  • jprofiler
    可以直接附加就行了

参考调用图

基于jprofiler 的一个简单dremio 查询处理学习_任务调度

 

 

基于jprofiler 的一个简单dremio 查询处理学习_任务调度_02

 

 

代码处理

  • run 处理
  private void run(){
assert taskState != State.DONE : "Attempted to run a task with state of DONE.";
assert eventProvider != null : "Attempted to run without an eventProvider";

if (!activateResource.isActivated()) {
// All tasks are expected to begin in a runnable state. So, switch to the BLOCKED state on the
// first call.
taskState = State.BLOCKED_ON_SHARED_RESOURCE;
return;
}
stats.runStarted();

// update thread name.
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
currentThread.setName(originalName + " - " + name);

try {

// if we're already done, we're finishing clean up. No core method
// execution is necessary, simply exit this block so we finishRun() below.
// We do this because a failure state will put us in a situation.
if(state == FragmentState.CANCELLED || state == FragmentState.FAILED || state == FragmentState.FINISHED) {
return;
}

// if there are any deferred exceptions, exit.
if(deferredException.hasException()) {
transitionToFailed(null);
return;
}

// if cancellation is requested, that is always the top priority.
if (cancelled.isDone()) {
Optional<Throwable> failedReason = eventProvider.getFailedReason();
if (failedReason.isPresent() || foremanDead) {
// check if it was failed due to an external reason (eg. by heap monitor).
// foremanDead is true, foremanDeadException must be non null.
assert(!foremanDead || (foremanDeadException != null));
transitionToFailed(failedReason.isPresent() ? failedReason.get() : foremanDeadException);
return;
}

transitionToCancelled();
taskState = State.DONE;
return;
}

// setup the execution if it isn't setup.
if(!isSetup){
stats.setupStarted();
try {
if (memoryArbiter != null) {
memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
}
setupExecution();
} finally {
stats.setupEnded();
}
// exit since we just did setup which could be a non-trivial amount of work. Allow the scheduler to decide whether we should continue.
return;
}

// workQueue might contain OOBMessages, which should be held and processed after the setup.
// This piece should always execute after the setup is done.
final Runnable work = workQueue.poll();
if (work != null) {
// we don't know how long it will take to process one work unit, we rely on the scheduler to execute
// this fragment again if it didn't run long enough
work.run();
return;
}

// handle any previously sent fragment finished messages.
FragmentHandle finishedFragment;
while ((finishedFragment = eventProvider.pollFinishedReceiver()) != null) {
pipeline.getTerminalOperator().receivingFragmentFinished(finishedFragment);
}

if (memoryArbiter != null) {
memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
}
// pump the pipeline
taskState = pumper.run();

// if we've finished all work, let's wrap up.
if(taskState == State.DONE){
transitionToFinished();
}

injector.injectChecked(executionControls, INJECTOR_DO_WORK, OutOfMemoryError.class);

} catch (OutOfMemoryError | OutOfMemoryException e) {
// handle out of memory errors differently from other error types.
if (e instanceof OutOfDirectMemoryError || e instanceof OutOfMemoryException || "Direct buffer memory".equals(e.getMessage()) || INJECTOR_DO_WORK.equals(e.getMessage())) {
transitionToFailed(UserException.memoryError(e)
.addContext(MemoryDebugInfo.getDetailsOnAllocationFailure(new OutOfMemoryException(e), allocator))
.buildSilently());
} else {
// we have a heap out of memory error. The JVM in unstable, exit.
ProcessExit.exitHeap(e);
}
} catch (Throwable e) {
transitionToFailed(e);
} finally {

try {
finishRun(originalName);
} finally {
stats.runEnded();
}
}

}
setupExecution 处理
参考代码

void setupExecution() throws Exception{
// drill 的模式以及官方的proflier 可以学习到FragmentMajor 以及FragmentMinor
final PlanFragmentMajor major = fragment.getMajor();
final PlanFragmentMinor minor = fragment.getMinor();

logger.debug("Starting fragment {}:{} on {}:{}", major.getHandle().getMajorFragmentId(), getHandle().getMinorFragmentId(), minor.getAssignment().getAddress(), minor.getAssignment().getUserPort());
outputAllocator = ticket.newChildAllocator("output-frag:" + QueryIdHelper.getFragmentId(getHandle()),
fragmentOptions.getOption(ExecConstants.OUTPUT_ALLOCATOR_RESERVATION),
Long.MAX_VALUE);
contextCreator.setFragmentOutputAllocator(outputAllocator);

final PhysicalOperator rootOperator = reader.readFragment(fragment);
contextCreator.setMinorFragmentEndpointsFromRootSender(rootOperator);
FunctionLookupContext functionLookupContextToUse = functionLookupContext;
if (fragmentOptions.getOption(PlannerSettings.ENABLE_DECIMAL_V2)) {
functionLookupContextToUse = decimalFunctionLookupContext;
}
pipeline = PipelineCreator.get(
new FragmentExecutionContext(major.getForeman(), sources, cancelled, major.getContext()),
buffers,
opCreator,
contextCreator,
functionLookupContextToUse,
rootOperator,
tunnelProvider,
new SharedResourcesContextImpl(sharedResources)
);

pipeline.setup();

clusterCoordinator.getServiceSet(ClusterCoordinator.Role.COORDINATOR).addNodeStatusListener(crashListener);

transitionToRunning();
isSetup = true;
}

说明

dremio 实际运行的时候包含了一个社区办的任务调度包dremio-ce-sabot-scheduler,实际执行就是依赖了这个,基于了动态类加载以及配置管理
基于此链路大家再学习以及分析dremio就比较方便了,毕竟dremio 依赖的组件还是比较多的,同时内部还是比较复杂的,后续慢慢详细说明

参考资料

sabot/kernel/src/main/java/com/dremio/sabot/task/TaskPools.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/TaskPoolInitializer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/fragment/FragmentExecutor.java
sabot/kernel/src/main/java/com/dremio/sabot/driver/PipelineCreator.java

标签:dremio,java,jprofiler,查询处理,sabot,fragment,run,null
From: https://blog.51cto.com/rongfengliang/5734728

相关文章

  • dremio openjdk 11 docker 镜像
    dremio官方也说明了,已经支持openjdk11了,但是默认官方的还是openjdk8,为了体验jdk11所以基于官方的搞了一个openjdk11的镜像,很简单dockerfileARGJAVA_IMAGE="openjdk......
  • 性能测试中用Jprofiler分析响应时间过长问题【杭州多测师】【杭州多测师_王sir】
    1、采用30个并发、在Linux中用jmeter-n-ttest.jme进行压测2、发现tps只有80/sec、左右但是接口的响应时间上升到了400毫秒左右3、通过dstat-tcmnd--disk-util命令查......
  • graylog jprofiler docker 镜像
    主要是添加jprifler方便学习参考dockerfile很简单,添加文件就行了,具体jprofiler官方下载解压就行了FROMgraylog/graylog:4.3COPYjprofiler13.0.3//op......
  • 查询处理
    查询处理概述关系数据库管理系统查询处理可以分为4个阶段:查询分析、查询检查、查询优化和查询执行。1.查询分析:对用户提交的查询语句进行扫描、词法分析和语法分析,判断......
  • dremio 22.1.1 发布
    这次dremio的发布从功能上更多是bug修复比较多,对于增强上主要是调整了对于嵌入nessie历史数据的处理,允许对于非分片列的运行时过滤优化bug修复还是比较多的,具体可以参......
  • macOS最强Java开发分析软件JProfiler for Mac中文永久版
    mac软件下载:https://mac.macxf.com/mac/808.html?id=NjU2MTE%3DJProfiler11破解版可以快速的帮助用户这进行使用的过程中,分析出您的操作错误这存在的错误,以此让开发者进......
  • kettle pack + jprofiler
    官方最新镜像不是最新版本,这里构建最新版本镜像,并添加jprofiler,便于服务异常监控.下载最新版本kettlepack后获取其部署文件并替换当前镜像中的文件.Dockerfile:FROM......