首页 > 其他分享 >基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】

基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】

时间:2023-04-12 14:44:43浏览次数:51  
标签:COMMENT STRING vars 纯钧 博客园 Metrics chunjun customReporter

基于chunjun纯钧的增量数据同步

目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步

chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方
根据文档我编写了一个SQL脚本

create table `source` (
        `sfzh` STRING COMMENT '公民身份号码',
        `xm` STRING COMMENT '姓名',
        `xb` STRING COMMENT '性别',
        `xbdm` STRING COMMENT '性别代码',
        `jzdz` STRING COMMENT '居住地址',
        `fzrq` DATE COMMENT '发证日期',
        `dsc_biz_record_id` STRING COMMENT '唯一自增序列号'
) with (
        'connector' = 'mysql-x',
        'url' = 'jdbc:mysql://192.168.14.236:3306/zxk?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
        'table-name' = 'ods_gsq_yjrcjzzxx',
        'username' = 'root',
        'password' = 'taotao0226.?',
        'scan.fetch-size' = '1024',
        'scan.increment.column' = 'fzrq',
        --'scan.increment.column-type' = 'date',
        'scan.start-location' = '1659974400000'
);

create table `sink` (
        `sfzh` STRING COMMENT '公民身份号码',
        `xm` STRING COMMENT '姓名',
        `xb` STRING COMMENT '性别',
        `xbdm` STRING COMMENT '性别代码',
        `jzdz` STRING COMMENT '居住地址',
        `fzrq` DATE COMMENT '发证日期',
        `dsc_biz_record_id` STRING COMMENT '唯一自增序列号',
        PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED
) with (
        'connector' = 'stream-x'
);

然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!

在本地调试源码解决问题的大致过程

在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量

/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
protected transient CustomReporter customReporter;

该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

    @Override
    public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true

 /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
    @Override
    protected boolean useCustomReporter() {
        return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
    }

    /** 增量同步或者间隔轮询时,是否初始化外部存储 */
    protected Boolean initReporter = true;

经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式

尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了

那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

后续问题

打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader

public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

    public static CustomReporter discoverMetric(
            ChunJunCommonConf commonConf,
            RuntimeContext context,
            boolean makeTaskFailedWhenReportFailed) {
        try {
            String pluginName = commonConf.getMetricPluginName();
            // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
            String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);
            MetricParam metricParam =
                    new MetricParam(
                            context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());

            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            Class<?> clazz = classLoader.loadClass(pluginClassName);
            Constructor<?> constructor = clazz.getConstructor(MetricParam.class);

            return (CustomReporter) constructor.newInstance(metricParam);
        } catch (Exception e) {
            throw new ChunJunRuntimeException(e);
        }
    }

在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!

标签:COMMENT,STRING,vars,纯钧,博客园,Metrics,chunjun,customReporter
From: https://www.cnblogs.com/sxxs/p/17309745.html

相关文章

  • 基于博客园搭建Typora免费图床
    title:基于博客园搭建Typora免费图床date:2023-04-1016:10:59categories:小技能tags:-免费图床-博客园-Typora本地需要有python3环境,复制以下代码到一个.py的文件中,如我本地命名为upd_pic_to_cnblog.pyimportosimportsysimportxmlrpc.clientasclientimp......
  • 博客园皮肤以及看板娘
    记录一下,从下午四点到晚上九点,配置好了博客园的界面,博客园里申请博客和申请JS非常迅速。主要时间用于熟悉博客园后台以及github上查看各皮肤。首先非常感谢各位大佬的无私付出,分享自己的技术与知识,供社区的大家交流学习。我也会努力分享自己微不足道的经验,为开源社区尽一份力。......
  • 博客园-王垠主题
    主题创建动机因为从一开始看到王垠大佬的博客,就觉得不是很花哨,能让人很安心的静下心来细细的去品每一个文字,所以想跟着大佬一样使用这个主题,在网上找了一圈也没有找到有博客园的主题,之前有使用过由smallyunet提供的hexo-theme-yinwang主题,所以想使用hexo主题来改一改,因为博客园的......
  • 我搬来博客园啦
    一直很喜欢CSDN的,让我学习了很多,也很感谢CSDN这样的平台,后续也将继续使用,但不会再在该平台更新博文。 感觉用的多了发现了以下问题:1、资源积分下载,再不就得开会员,恰饭的话接广告卖周边这都可以,但是在技术分享这里卡一手真的太让人难受了。2、审核,过去特别多的老文章已经不能......
  • 博客园中TinyMCE编辑器的快捷键
    ctrl+z 撤消ctrl+y 重做ctrl+b 粗体ctrl+i 斜体ctrl+u 下划线ctrl + 1-6 h1-h6ctrl+7 pctrl+8 divctrl+9 地址......
  • 关于博客园绝境求商的一点点感想!
      作为一个非常古老(80后)的面向百度开发的程序员,我用百度非常多,大概在前几年的时候,搜技术关键字的时候,博客园上面的问题在百度首页出现的机会非常多,另外还有iteye这样的网站,但是在近几年发现越来越少了,首页基本上都是csdn的帖子,很多帖子都是无意义的复制,重复。虽然csdn的界面......
  • 详解 Flink Catalog 在 ChunJun 中的实践之路
    我们知道Flink有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink里还有一个Catalog(目录)的概念。本文将为大家带来FlinkCatalog的介绍以及FlinkCatalog在ChunJun中的实践之路。FlinkCatalog简介Catalog提供元数据,如数据......
  • 详解 Flink Catalog 在 ChunJun 中的实践之路
    我们知道Flink有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink里还有一个Catalog(目录)的概念。本文将为大家带来FlinkCatalog的介绍以及FlinkCatalog在ChunJun中的实践之路。FlinkCatalog简介Catalog提供元数据,如......
  • 为博客园加上PWA支持
    在这篇博客文章中,我将分享如何将您的博客首页作为PWAApp固定到iOS设备的主屏幕上。这样,博客将以全屏模式打开,提供更好的用户体验。在查阅了iOS如何添加网页到主屏幕可以全屏打开后,我了解到只需在网页的head内添加以下标签即可实现该功能:<metaname="apple-mobile-web-app-capa......
  • 博客园主题美化配置(Awescnb)
    平时比较喜欢捣鼓主题,但是没办法见一个爱一个,或者有时候就一个主题用腻了,想尝试另一个主题,此处记录下是为了切换是更快的还原当初的配置,同时可以给广大网友提供参考~当前主题为awescnb系列主题中的geek主题 Awescnb文档Awescnb作者博客配置步骤一、准备工作申请js......