dremio提供了方便的jobprofile 能力,可以进行共享以及分析dremio 查询的性能问题,以下是关于jobprofile
下载功能的简单说明
下载处理
- SupportResource.java
@POST
@Path("download")
@Consumes(MediaType.APPLICATION_JSON)
public Response downloadData(@PathParam("jobId") JobId jobId)
throws IOException, UserNotFoundException, JobResourceNotFoundException {
final DownloadDataResponse response;
try {
final ImmutableSupportRequest request = new ImmutableSupportRequest.Builder()
.setUserId(context.getUserPrincipal().getName())
.setJobId(jobId)
.build();
// 通过BasicSupportService 进行下载处理
response = supportService.downloadSupportRequest(request);
} catch (JobNotFoundException e) {
throw JobResourceNotFoundException.fromJobNotFoundException(e);
}
final StreamingOutput streamingOutput = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
IOUtils.copyBytes(response.getInput(), output, 4096, true);
}
};
return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM)
.header("Content-Disposition", "attachment; filename=\"" + response.getFileName() + "\"").build();
}
- BasicSupportService 处理
通过generateSupportRequest 生成一个zip 文件,zip 文件包含了header 以及Attempts 数量的profile 文件
private Path generateSupportRequest(SupportRequest request, Pointer<Boolean> outIncludesLogs,
Pointer<User> outUserConfig, Pointer<String> outSubmissionId)
throws IOException, UserNotFoundException, JobNotFoundException {
final String submissionId = UUID.randomUUID().toString();
outSubmissionId.value = submissionId;
Files.createDirectories(supportPath);
Path path = supportPath.resolve(submissionId + ".zip");
outUserConfig.value = userService.get().getUser(request.getUserId());
User config = outUserConfig.value;
// inner try to close file once written.
try(
FileOutputStream fos = new FileOutputStream(path.toFile());
BufferedOutputStream dest = new BufferedOutputStream(fos);
ZipOutputStream zip = new ZipOutputStream(dest);
) {
zip.putNextEntry(new ZipEntry("header.json"));
// 处理header json
recordHeader(zip, request, config, submissionId);
final JobSummary jobSummary = obfuscate(getJobSummary(request, config));
// 会对于不同状态的profile 进行处理
for(int attemptIndex = 0; attemptIndex < jobSummary.getNumAttempts() ; attemptIndex++) {
zip.putNextEntry(new ZipEntry(String.format("profile_attempt_%d.json", attemptIndex)));
QueryProfile profile = recordProfile(zip, request, attemptIndex);
if (profile.hasPrepareId()) {
final QueryId id = profile.getPrepareId();
zip.putNextEntry(new ZipEntry(String.format("prepare_profile_attempt_%d.json", attemptIndex)));
JobId prepareId = new JobId(new UUID(id.getPart1(), id.getPart2()).toString());
recordProfile(zip, prepareId, request, 0);
}
// If the query failed, collect log file information.
// 如果异常了,会包含log 信息
if (profile.getState() == UserBitShared.QueryResult.QueryState.FAILED) {
zip.putNextEntry(new ZipEntry(String.format("log_attempt_%d.json", attemptIndex)));
outIncludesLogs.value = recordLog(zip, request.getUserId(), profile.getStart(), profile.getEnd(), request.getJobId(), submissionId);
}
}
}
return path;
}
- header json 处理
private boolean recordHeader(OutputStream output, SupportRequest supportRequest, User user, String submissionId)
throws UserNotFoundException, IOException, JobNotFoundException {
SupportHeader header = new SupportHeader();
header.setClusterInfo(getClusterInfo());
header.setJob(obfuscate(JobsProtoUtil.getLastAttempt(getJobDetails(supportRequest, user))));
Submission submission = new Submission()
.setSubmissionId(submissionId)
.setDate(System.currentTimeMillis())
.setEmail(user.getEmail())
.setFirst(user.getFirstName())
.setLast(user.getLastName());
header.setSubmission(submission);
// record the dremio version that was used to run the query in the header
// 此处会发起一个grpc 请求,实际使用的是LocalJobsService 中的getProfile 方法与Attempts profile处理类似,只是attempt为0
header.setDremioVersion(getProfile(supportRequest, 0).getDremioVersion());
ProtostuffUtil.toJSON(output, header, SupportHeader.getSchema(), false);
return true;
}
参考格式与上边代码一致
- Attempts profile 处理
也会调用getProfile
private QueryProfile recordProfile(OutputStream out, SupportRequest supportRequest, int attempt) throws IOException, JobNotFoundException {
QueryProfile profile = obfuscate(getProfile(supportRequest, attempt));
ProtobufUtils.writeAsJSONTo(out, profile);
return profile;
}
rpc 服务处理
rpc 服务包含了client 以及server,在DACDaemonModule 中启动服务
- DACDaemonModule job服务注册处理
private void registerJobsServices(final ConduitServiceRegistry conduitServiceRegistry,
final SingletonRegistry registry,
final BootStrapContext bootstrap,
final BufferAllocator jobResultsAllocator,
final Provider<OptionManager> optionManagerProvider) {
// 1. job adapter
conduitServiceRegistry.registerService(new JobsServiceAdapter(registry.provider(LocalJobsService.class)));
// 2. chronicle
conduitServiceRegistry.registerService(new Chronicle(registry.provider(LocalJobsService.class), bootstrap::getExecutor));
// 3. jobs, sys flight producers registered together as CoordinatorFlightProducer, as individual binding is masking one of them
final BufferAllocator coordFlightAllocator = bootstrap.getAllocator().newChildAllocator(CoordinatorFlightProducer.class.getName(), 0, Long.MAX_VALUE);
final JobsFlightProducer jobsFlightProducer = new JobsFlightProducer(registry.provider(LocalJobsService.class), coordFlightAllocator);
final SysFlightProducer sysFlightProducer = new SysFlightProducer(registry.provider(SystemTableManager.class));
final CoordinatorFlightProducer coordFlightProducer = new CoordinatorFlightProducer(jobsFlightProducer, sysFlightProducer);
conduitServiceRegistry.registerService(new FlightCloseableBindableService(coordFlightAllocator, coordFlightProducer, null, null));
//4. MaestroGrpcServerFacade
conduitServiceRegistry.registerService(new MaestroGrpcServerFacade(registry.provider(ExecToCoordStatusHandler.class)));
//5. jobresults
JobResultsGrpcServerFacade jobResultsGrpcServerFacade = new JobResultsGrpcServerFacade(
registry.provider(ExecToCoordResultsHandler.class),
registry.provider(MaestroForwarder.class),
registry.lookup(Tracer.class));
// Note: To take effect of the change in this option on coordinator side, coordinator needs to be restarted.
// But the change of option is propagated to executor without restarting executor. Its dynamic change on executor side.
if (avoidHeapCopy()) {
logger.info("Using JobResultsBindableService");
conduitServiceRegistry.registerService(new JobResultsBindableService(jobResultsAllocator, jobResultsGrpcServerFacade));
} else {
logger.info("Using JobResultsGrpcServerFacade");
conduitServiceRegistry.registerService(jobResultsGrpcServerFacade);
}
}
- client 处理
HybridJobsService client job包装进行的rpc 调用
@Override
public QueryProfile getProfile(QueryProfileRequest request) throws JobNotFoundException {
try {
return getChronicleBlockingStub().getProfile(request);
} catch (StatusRuntimeException e) {
// TODO (DX-17909): Use request username
throwSuitableException(e, JobsProtoUtil.toStuff(request.getJobId()), SYSTEM_USERNAME);
throw new AssertionError(e); // should be unreachable
}
}
- Chronicle server getProfile 处理
public void getProfile(QueryProfileRequest request, StreamObserver<UserBitShared.QueryProfile> responseObserver) {
// getJobsService 实际上就是LocalJobsService
handleUnaryCall(getJobsService()::getProfile, request, responseObserver);
}
- LocalJobsService 服务 getProfile 简单处理
参考代码
public Optional<QueryProfile> getProfile(String jobId, String username) {
JobId jobIdObj = new JobId(jobId);
GetJobRequest request = GetJobRequest.newBuilder()
.setJobId(jobIdObj)
.setUserName(username)
.build();
try {
Job job = getJob(request);
UserBitShared.QueryProfile profile =
getProfileFromJobId(job.getJobId(), job.getAttempts().size() - 1, job.getJobAttempt().getEndpoint());
return java.util.Optional.of(profile);
} catch (JobNotFoundException ex) {
return java.util.Optional.empty();
}
}
- LocalJobsService getProfileFromJobId
尽管LocalJobsService 会提供rpc server 的服务,但是内部还是在发起一个远程请求(RemoteJobServiceForwarder 处理的)
通过jobid 到job server 获取需要的信息,可以指定NodeEndpoint,实际profile 服务是JobTelemetryServiceImpl 处理的
QueryProfile getProfileFromJobId(JobId jobId, int attempt, NodeEndpoint endpoint) {
final AttemptId attemptId = new AttemptId(JobsServiceUtil.getJobIdAsExternalId(jobId), attempt);
// Check if the profile for given attempt already exists. Even if the job is not done, it is possible that
// profile exists for previous attempts
StatusRuntimeException caughtException;
try {
return jobTelemetryServiceStub
.getQueryProfile(
GetQueryProfileRequest.newBuilder().setQueryId(attemptId.toQueryId()).build())
.getProfile();
} catch (StatusRuntimeException sre) {
logger.debug("Unable to fetch query profile for {} on Node {}", jobId, identity, sre);
caughtException = sre;
}
QueryProfile qp = null;
try {
if (!(endpoint.getAddress().equals(identity.getAddress()) &&
(endpoint.getConduitPort() == identity.getConduitPort()))) {
logger.debug("Fetching query profile for {} on Node {}", jobId, endpoint);
final QueryProfileRequest request = QueryProfileRequest.newBuilder()
.setJobId(JobsProtoUtil.toBuf(jobId))
.setAttempt(attempt)
.setUserName(SYSTEM_USERNAME)
.build();
qp = forwarder.getProfile(JobsServiceUtil.toPB(endpoint), request);
}
} catch (StatusRuntimeException sre) {
logger.debug("Unable to fetch profile for {} on Node {}", jobId, endpoint, sre);
if (!sre.getStatus().getCode().equals(Status.UNAVAILABLE.getCode())) {
caughtException = sre;
}
}
if (qp == null) {
if (caughtException.getStatus().getCode().equals(Status.INVALID_ARGUMENT.getCode())) {
logger.error("Unable to get profile for {}", jobId, caughtException);
throw caughtException;
} else {
logger.error("Unable to fetch profile for {} at the moment, try after sometime.", jobId, caughtException);
throw new StatusRuntimeException(Status.INTERNAL
.withDescription("Unable to fetch profile at the moment, try after sometime.")
.withCause(caughtException));
}
} else {
return qp;
}
}
- JobTelemetryServiceImpl 服务getQueryProfile
public void getQueryProfile(
GetQueryProfileRequest request, StreamObserver<GetQueryProfileResponse> responseObserver) {
try {
QueryId queryId = request.getQueryId();
Preconditions.checkNotNull(queryId);
// 内部会使用ProfileStore 获取
QueryProfile mergedProfile = fetchOrBuildMergedProfile(queryId);
responseObserver.onNext(
GetQueryProfileResponse.newBuilder().setProfile(mergedProfile).build());
responseObserver.onCompleted();
} catch (IllegalArgumentException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("Unable to get query profile. " + e.getMessage())
.asRuntimeException());
} catch (Exception ex) {
logger.error("Unable to get query profile.", ex);
responseObserver.onError(
Status.INTERNAL.withDescription(Throwables.getRootCause(ex).getMessage()).asRuntimeException());
}
}
- JobTelemetryServiceImpl fetchOrBuildMergedProfile 处理
private QueryProfile fetchOrBuildMergedProfile(QueryId queryId) throws ExecutionException, InterruptedException {
// 通过ProfileStore获取profile 信息
Optional<QueryProfile> fullProfile = profileStore.getFullProfile(queryId);
if (fullProfile.isPresent()) {
return fullProfile.get();
}
QueryProfile mergedProfile = buildFullProfile(queryId);
// persist the merged profile, if in a terminal state
if (isTerminal(mergedProfile.getState())) {
bgProfileWriter.tryWriteAsync(queryId, mergedProfile);
}
return mergedProfile;
}
说明
以上支持一个简单的job profile query 说明,实际上对于一个job 的处理,肯定还会有如何存储的,当然同时知道如何查看profile 也是比较重要的
关于阅读profile dremio官方提供了一些文档值得看看
参考资料
dac/backend/src/main/java/com/dremio/dac/resource/JobProfileResource.java
dac/backend/src/main/java/com/dremio/dac/support/SupportResource.java
dac/backend/src/main/java/com/dremio/dac/support/BasicSupportService.java
dac/backend/src/main/java/com/dremio/dac/obfuscate/ObfuscationUtils.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
services/jobs/target/generated-sources/protobuf/grpc-java/com/dremio/service/job/ChronicleGrpc.java
services/jobs/src/main/java/com/dremio/service/jobs/RemoteJobServiceForwarder.java
services/jobs/src/main/java/com/dremio/service/jobs/LocalJobsService.java
dac/backend/src/main/java/com/dremio/dac/admin/ProfileResource.java
services/jobs/src/main/java/com/dremio/service/jobs/Chronicle.java
services/jobs/src/main/java/com/dremio/service/jobs/HybridJobsService.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsServiceAdapter.java
services/jobtelemetry/common/src/main/java/com/dremio/service/jobtelemetry/JobTelemetryClient.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsStoreCreator.java
services/jobtelemetry/server/src/main/java/com/dremio/service/jobtelemetry/server/JobTelemetryServiceImpl.java
https://www.dremio.com/wp-content/uploads/2024/01/Query-Performance-Analysis-and-Improvement.pdf
https://docs.dremio.com/current/sonar/monitoring/viewing-query-profiles
https://docs.dremio.com/current/sonar/monitoring/overview
https://docs.dremio.com/current/sonar/monitoring/raw-profile/