首页 > 其他分享 >dremio jobprofile查询简单说明

dremio jobprofile查询简单说明

时间:2024-03-02 09:02:01浏览次数:61  
标签:profile dremio java request 查询 new com jobprofile

dremio提供了方便的jobprofile 能力,可以进行共享以及分析dremio 查询的性能问题,以下是关于jobprofile
下载功能的简单说明

下载处理

  • SupportResource.java
  @POST
  @Path("download")
  @Consumes(MediaType.APPLICATION_JSON)
  public Response downloadData(@PathParam("jobId") JobId jobId)
      throws IOException, UserNotFoundException, JobResourceNotFoundException {
    final DownloadDataResponse response;
    try {
      final ImmutableSupportRequest request = new ImmutableSupportRequest.Builder()
        .setUserId(context.getUserPrincipal().getName())
        .setJobId(jobId)
        .build();
      // 通过BasicSupportService 进行下载处理
      response = supportService.downloadSupportRequest(request);
    } catch (JobNotFoundException e) {
      throw JobResourceNotFoundException.fromJobNotFoundException(e);
    }
    final StreamingOutput streamingOutput = new StreamingOutput() {
      @Override
      public void write(OutputStream output) throws IOException, WebApplicationException {
        IOUtils.copyBytes(response.getInput(), output, 4096, true);
      }
    };
    return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM)
      .header("Content-Disposition", "attachment; filename=\"" + response.getFileName() + "\"").build();
  }
  • BasicSupportService 处理

通过generateSupportRequest 生成一个zip 文件,zip 文件包含了header 以及Attempts 数量的profile 文件

  private Path generateSupportRequest(SupportRequest request, Pointer<Boolean> outIncludesLogs,
                                      Pointer<User> outUserConfig, Pointer<String> outSubmissionId)
      throws IOException, UserNotFoundException, JobNotFoundException {
    final String submissionId = UUID.randomUUID().toString();
    outSubmissionId.value = submissionId;
 
    Files.createDirectories(supportPath);
    Path path = supportPath.resolve(submissionId + ".zip");
    outUserConfig.value = userService.get().getUser(request.getUserId());
    User config = outUserConfig.value;
 
    // inner try to close file once written.
    try(
        FileOutputStream fos = new FileOutputStream(path.toFile());
        BufferedOutputStream dest = new BufferedOutputStream(fos);
        ZipOutputStream zip = new ZipOutputStream(dest);
        ) {
 
      zip.putNextEntry(new ZipEntry("header.json"));
     // 处理header json
      recordHeader(zip, request, config, submissionId);
 
      final JobSummary jobSummary = obfuscate(getJobSummary(request, config));
      // 会对于不同状态的profile 进行处理
      for(int attemptIndex = 0; attemptIndex < jobSummary.getNumAttempts() ; attemptIndex++) {
        zip.putNextEntry(new ZipEntry(String.format("profile_attempt_%d.json", attemptIndex)));
        QueryProfile profile = recordProfile(zip, request, attemptIndex);
 
        if (profile.hasPrepareId()) {
          final QueryId id = profile.getPrepareId();
          zip.putNextEntry(new ZipEntry(String.format("prepare_profile_attempt_%d.json", attemptIndex)));
          JobId prepareId = new JobId(new UUID(id.getPart1(), id.getPart2()).toString());
          recordProfile(zip, prepareId, request, 0);
        }
 
        // If the query failed, collect log file information.
       // 如果异常了,会包含log 信息
        if (profile.getState() == UserBitShared.QueryResult.QueryState.FAILED) {
          zip.putNextEntry(new ZipEntry(String.format("log_attempt_%d.json", attemptIndex)));
          outIncludesLogs.value = recordLog(zip, request.getUserId(), profile.getStart(), profile.getEnd(), request.getJobId(), submissionId);
        }
      }
    }
    return path;
  }
  • header json 处理
  private boolean recordHeader(OutputStream output, SupportRequest supportRequest, User user, String submissionId)
      throws UserNotFoundException, IOException, JobNotFoundException {
 
    SupportHeader header = new SupportHeader();
 
    header.setClusterInfo(getClusterInfo());
 
    header.setJob(obfuscate(JobsProtoUtil.getLastAttempt(getJobDetails(supportRequest, user))));
 
    Submission submission = new Submission()
        .setSubmissionId(submissionId)
        .setDate(System.currentTimeMillis())
        .setEmail(user.getEmail())
        .setFirst(user.getFirstName())
        .setLast(user.getLastName());
 
    header.setSubmission(submission);
 
    // record the dremio version that was used to run the query in the header
    // 此处会发起一个grpc 请求,实际使用的是LocalJobsService 中的getProfile 方法与Attempts profile处理类似,只是attempt为0
    header.setDremioVersion(getProfile(supportRequest, 0).getDremioVersion());
 
    ProtostuffUtil.toJSON(output, header, SupportHeader.getSchema(), false);
    return true;
  }

参考格式与上边代码一致

  • Attempts profile 处理
    也会调用getProfile
  private QueryProfile recordProfile(OutputStream out, SupportRequest supportRequest, int attempt) throws IOException, JobNotFoundException {
    QueryProfile profile = obfuscate(getProfile(supportRequest, attempt));
    ProtobufUtils.writeAsJSONTo(out, profile);
    return profile;
  }

rpc 服务处理

rpc 服务包含了client 以及server,在DACDaemonModule 中启动服务

  • DACDaemonModule job服务注册处理
private void registerJobsServices(final ConduitServiceRegistry conduitServiceRegistry,
                                    final SingletonRegistry registry,
                                    final BootStrapContext bootstrap,
                                    final BufferAllocator jobResultsAllocator,
                                    final Provider<OptionManager> optionManagerProvider) {
    // 1. job adapter
    conduitServiceRegistry.registerService(new JobsServiceAdapter(registry.provider(LocalJobsService.class)));
 
    // 2. chronicle
    conduitServiceRegistry.registerService(new Chronicle(registry.provider(LocalJobsService.class), bootstrap::getExecutor));
 
    // 3. jobs, sys flight producers registered together as CoordinatorFlightProducer, as individual binding is masking one of them
    final BufferAllocator coordFlightAllocator = bootstrap.getAllocator().newChildAllocator(CoordinatorFlightProducer.class.getName(), 0, Long.MAX_VALUE);
    final JobsFlightProducer jobsFlightProducer = new JobsFlightProducer(registry.provider(LocalJobsService.class), coordFlightAllocator);
    final SysFlightProducer sysFlightProducer = new SysFlightProducer(registry.provider(SystemTableManager.class));
    final CoordinatorFlightProducer coordFlightProducer = new CoordinatorFlightProducer(jobsFlightProducer, sysFlightProducer);
    conduitServiceRegistry.registerService(new FlightCloseableBindableService(coordFlightAllocator, coordFlightProducer, null, null));
 
    //4. MaestroGrpcServerFacade
    conduitServiceRegistry.registerService(new MaestroGrpcServerFacade(registry.provider(ExecToCoordStatusHandler.class)));
 
    //5. jobresults
    JobResultsGrpcServerFacade jobResultsGrpcServerFacade = new JobResultsGrpcServerFacade(
      registry.provider(ExecToCoordResultsHandler.class),
      registry.provider(MaestroForwarder.class),
      registry.lookup(Tracer.class));
 
    // Note: To take effect of the change in this option on coordinator side, coordinator needs to be restarted.
    // But the change of option is propagated to executor without restarting executor. Its dynamic change on executor side.
    if (avoidHeapCopy()) {
      logger.info("Using JobResultsBindableService");
      conduitServiceRegistry.registerService(new JobResultsBindableService(jobResultsAllocator, jobResultsGrpcServerFacade));
    } else {
      logger.info("Using JobResultsGrpcServerFacade");
      conduitServiceRegistry.registerService(jobResultsGrpcServerFacade);
    }
  }
  • client 处理
    HybridJobsService client job包装进行的rpc 调用
  @Override
  public QueryProfile getProfile(QueryProfileRequest request) throws JobNotFoundException {
    try {
      return getChronicleBlockingStub().getProfile(request);
    } catch (StatusRuntimeException e) {
      // TODO (DX-17909): Use request username
      throwSuitableException(e, JobsProtoUtil.toStuff(request.getJobId()), SYSTEM_USERNAME);
      throw new AssertionError(e); // should be unreachable
    }
  }
  • Chronicle server getProfile 处理
  public void getProfile(QueryProfileRequest request, StreamObserver<UserBitShared.QueryProfile> responseObserver) {
   //  getJobsService 实际上就是LocalJobsService
    handleUnaryCall(getJobsService()::getProfile, request, responseObserver);
  }
  • LocalJobsService 服务 getProfile 简单处理
    参考代码
  public Optional<QueryProfile> getProfile(String jobId, String username) {
    JobId jobIdObj = new JobId(jobId);
 
    GetJobRequest request = GetJobRequest.newBuilder()
                                         .setJobId(jobIdObj)
                                         .setUserName(username)
                                         .build();
    try {
      Job job = getJob(request);
      UserBitShared.QueryProfile profile =
        getProfileFromJobId(job.getJobId(), job.getAttempts().size() - 1, job.getJobAttempt().getEndpoint());
      return java.util.Optional.of(profile);
    } catch (JobNotFoundException ex) {
      return java.util.Optional.empty();
    }
  }
  • LocalJobsService getProfileFromJobId
    尽管LocalJobsService 会提供rpc server 的服务,但是内部还是在发起一个远程请求(RemoteJobServiceForwarder 处理的)
    通过jobid 到job server 获取需要的信息,可以指定NodeEndpoint,实际profile 服务是JobTelemetryServiceImpl 处理的
QueryProfile getProfileFromJobId(JobId jobId, int attempt, NodeEndpoint endpoint) {
    final AttemptId attemptId = new AttemptId(JobsServiceUtil.getJobIdAsExternalId(jobId), attempt);
    // Check if the profile for given attempt already exists. Even if the job is not done, it is possible that
    // profile exists for previous attempts
    StatusRuntimeException caughtException;
    try {
      return jobTelemetryServiceStub
        .getQueryProfile(
          GetQueryProfileRequest.newBuilder().setQueryId(attemptId.toQueryId()).build())
        .getProfile();
    } catch (StatusRuntimeException sre) {
      logger.debug("Unable to fetch query profile for {} on Node {}", jobId, identity, sre);
      caughtException = sre;
    }
 
    QueryProfile qp = null;
    try {
      if (!(endpoint.getAddress().equals(identity.getAddress()) &&
        (endpoint.getConduitPort() == identity.getConduitPort()))) {
        logger.debug("Fetching query profile for {} on Node {}", jobId, endpoint);
        final QueryProfileRequest request = QueryProfileRequest.newBuilder()
          .setJobId(JobsProtoUtil.toBuf(jobId))
          .setAttempt(attempt)
          .setUserName(SYSTEM_USERNAME)
          .build();
        qp = forwarder.getProfile(JobsServiceUtil.toPB(endpoint), request);
      }
    } catch (StatusRuntimeException sre) {
      logger.debug("Unable to fetch profile for {} on Node {}", jobId, endpoint, sre);
      if (!sre.getStatus().getCode().equals(Status.UNAVAILABLE.getCode())) {
        caughtException = sre;
      }
    }
    if (qp == null) {
      if (caughtException.getStatus().getCode().equals(Status.INVALID_ARGUMENT.getCode())) {
        logger.error("Unable to get profile for {}", jobId, caughtException);
        throw caughtException;
      } else {
        logger.error("Unable to fetch profile for {} at the moment, try after sometime.", jobId, caughtException);
        throw new StatusRuntimeException(Status.INTERNAL
          .withDescription("Unable to fetch profile at the moment, try after sometime.")
          .withCause(caughtException));
      }
    } else {
      return qp;
    }
  }
  • JobTelemetryServiceImpl 服务getQueryProfile
  public void getQueryProfile(
    GetQueryProfileRequest request, StreamObserver<GetQueryProfileResponse> responseObserver) {
    try {
      QueryId queryId = request.getQueryId();
      Preconditions.checkNotNull(queryId);
      //  内部会使用ProfileStore 获取
      QueryProfile mergedProfile = fetchOrBuildMergedProfile(queryId);
      responseObserver.onNext(
          GetQueryProfileResponse.newBuilder().setProfile(mergedProfile).build());
      responseObserver.onCompleted();
    } catch (IllegalArgumentException e) {
      responseObserver.onError(
          Status.INVALID_ARGUMENT
              .withDescription("Unable to get query profile. " + e.getMessage())
              .asRuntimeException());
    } catch (Exception ex) {
      logger.error("Unable to get query profile.", ex);
      responseObserver.onError(
          Status.INTERNAL.withDescription(Throwables.getRootCause(ex).getMessage()).asRuntimeException());
    }
  }
  • JobTelemetryServiceImpl fetchOrBuildMergedProfile 处理
  private QueryProfile fetchOrBuildMergedProfile(QueryId queryId) throws ExecutionException, InterruptedException {
   // 通过ProfileStore获取profile 信息
    Optional<QueryProfile> fullProfile = profileStore.getFullProfile(queryId);
    if (fullProfile.isPresent()) {
      return fullProfile.get();
    }
 
    QueryProfile mergedProfile = buildFullProfile(queryId);
    // persist the merged profile, if in a terminal state
    if (isTerminal(mergedProfile.getState())) {
      bgProfileWriter.tryWriteAsync(queryId, mergedProfile);
    }
    return mergedProfile;
  }

说明

以上支持一个简单的job profile query 说明,实际上对于一个job 的处理,肯定还会有如何存储的,当然同时知道如何查看profile 也是比较重要的
关于阅读profile dremio官方提供了一些文档值得看看

参考资料

dac/backend/src/main/java/com/dremio/dac/resource/JobProfileResource.java
dac/backend/src/main/java/com/dremio/dac/support/SupportResource.java
dac/backend/src/main/java/com/dremio/dac/support/BasicSupportService.java
dac/backend/src/main/java/com/dremio/dac/obfuscate/ObfuscationUtils.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
services/jobs/target/generated-sources/protobuf/grpc-java/com/dremio/service/job/ChronicleGrpc.java
services/jobs/src/main/java/com/dremio/service/jobs/RemoteJobServiceForwarder.java
services/jobs/src/main/java/com/dremio/service/jobs/LocalJobsService.java
dac/backend/src/main/java/com/dremio/dac/admin/ProfileResource.java
services/jobs/src/main/java/com/dremio/service/jobs/Chronicle.java
services/jobs/src/main/java/com/dremio/service/jobs/HybridJobsService.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsServiceAdapter.java
services/jobtelemetry/common/src/main/java/com/dremio/service/jobtelemetry/JobTelemetryClient.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsStoreCreator.java
services/jobtelemetry/server/src/main/java/com/dremio/service/jobtelemetry/server/JobTelemetryServiceImpl.java
https://www.dremio.com/wp-content/uploads/2024/01/Query-Performance-Analysis-and-Improvement.pdf
https://docs.dremio.com/current/sonar/monitoring/viewing-query-profiles
https://docs.dremio.com/current/sonar/monitoring/overview
https://docs.dremio.com/current/sonar/monitoring/raw-profile/

标签:profile,dremio,java,request,查询,new,com,jobprofile
From: https://www.cnblogs.com/rongfengliang/p/18017726

相关文章

  • mybatisPlus分页查询
    配置类:packagecom.oep.backend.config;importcom.baomidou.mybatisplus.annotation.DbType;importcom.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;importcom.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;importo......
  • delphi 新版内存表 FDMemTable使用SQL查询(02)
    fdLocalSql可以对fdMemTable内存表进行SQL查询(可以对多个fdMemTable内存表进行联表查询哦),fdLocalSql使用SQLITE引擎,而FIREDAC驱动SQLITE,连SQLITE驱动DLL都不需要附带的。1)设置fdConnection为SQLITE,LoginPrompt设为False; 2)设置TfdLocalSQL 的Connection 3)拖一个FDMemTa......
  • B-tree是怎么让查询变快的?
    B-tree是一种用来搜索大量数据的结构。它是40多年前发明的,但它仍然被大多数现代数据库所使用。尽管有较新的索引结构,如LSM树,但B树在处理大多数数据库查询时仍然是无与伦比的。下面我们来了解B-tree是如何组织数据的,以及它是如何执行搜索查询的。一、起源为了理解B-tree,让我们首......
  • SQL Server查询设计器
    您知道如何使用查询设计器编写SQL脚本吗?一起来看看吧。关于查询设计器查询分析器是一个图形化的数据库编程接口,是SQLserver客户端的重要组成部分。在构建复杂的查询,涉及到许多表,视图等的时候,查询分析器特别有用。查询设计器还可以有利于学习如何编写SQL。通过查询设计器生......
  • 数据库查询语句
    一.基本查询1.查询所有数据select*fromtable;2.查询部分字段selectfield1,field2fromtable;二.条件查询`1.单个条件查询select*fromtablewherefield=x;2.多个条件查询select*fromtablewherefield1=xandfield2=y;三.模糊查询select*fromt......
  • linux下准确查询正在tomcat下运行的java进程。准确获取正在运行的java进程的PID
    查看当前运行的所有的java进程,命令:【一定要注意,取那个你配置的JAVA_HOME全局变量的那个java进程的PID】ps-ef|grepjava     准确获取定位到tomcat下正在运行的java进程的PID命令:ps-ef|grepjava|grepcatalina|awk'{print$2}' 准确定位到tomcat下......
  • mysql 查询语句区分大小写
    一、查询语句上加binarySELECTa.DOCU_CODE'单一窗口编号',b.DOCU_CODE'本地编号',a.DOCU_NAME'单一窗口名称',b.DOCU_NAME'本地名称'fromlicensedocua,licensedocu_copybwherebinarya.DOCU_CODE=b.DOCU_CODEanda.DOCU_NAME!=b.DOCU_NA......
  • SQL关联子查询
    上节课我们讲的子查询,都是先一次性得出子查询的结果,再返回给主查询使用。这种子查询与主查询之间是没有关联,互不影响的。 但在相关子查询中,子查询是在主查询每一条记录层面上依次进行的,子查询依赖主查询。 相关子查询比非关联查询执行起来慢一些。但是有很多实际的应用。 ......
  • FastAPI系列:查询字符串参数
    单个查询字符串@app.get('/index/{username}')defindex(username:str,id:int):#id为查询字符串?id=5return{"message":"success","username":username,"id":id}可选的查询字符串参数@app.get('/items/{item_id}......
  • Oracle-JPA扩展工具-原生SQL查询并返回自定义DTO
    Java-JPA原生SQL查询返回自定义DTO:importcom.tjgeo.njsosms.framework.repository.IBaseRepository;importcom.tjgeo.njsosms.risk.support.entity.Source;importorg.springframework.data.jpa.repository.Query;importjava.util.List;/***${Description}**@......