首页 > 数据库 >Flink系列-SQL connector扩展以及DataGenTableSourceFactory源码走读

Flink系列-SQL connector扩展以及DataGenTableSourceFactory源码走读

时间:2024-08-26 13:55:19浏览次数:11  
标签:走读 Flink public add 源码 rowDataType DataGenConnectorOptions numberOfRows options

一 、说明

        通常我们直接使用Flink的sql进行实时任务开发,经常会遇到扩展新的数据源端或者目标端的场景,或者需要了解connector的一些源码机制,方便开发和定位问题。 

      如何扩展新增Sql connector呢?

扩展Apache Flink的新SQL Connector主要涉及以下几个步骤:

  1. 定义动态工厂类:自定义Factory需要继承DynamicTableSinkFactoryDynamicTableSourceFactory接口,实现相关的方法,如createDynamicTableSinkfactoryIdentifierrequiredOptionsoptionalOptions等。

  2. 实现具体的Source和Sink:根据需要连接的数据存储或服务,实现具体的DynamicTableSinkDynamicTableSource。这可能涉及到编写自定义的SinkFunctionSourceFunction

  3. 使用Java SPI机制:通过在项目的resources目录下创建META-INF.services文件,并添加org.apache.flink.table.factories.Factory的实现类全路径,使得Flink SQL客户端能够识别并加载自定义的Connector。

二、Flink原生自带的DataGenTableSourceFactory如何实现的呢?

    DataGen作为Flink提供的source数据生成器,实现的为DynamicTableSourceFactory,即构建数据源端,核心实现如下四个部分:

1)factoryIdentifier 方法,返回唯一标识,用于区分与其他connector的配置;

2)requiredOptions 必选配置项,无

3)optionalOptions  可选配置项,用于配置产数据数据的条数,字段类型、边界限制等内容;

4)构建DynamicTableSource  此为最核心实现,此过程包括create source datagen 的sql语句中schema读取,配置读取,参数校验与转换等内容

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

    public static final String IDENTIFIER = "datagen";

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet<>();
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(DataGenConnectorOptions.ROWS_PER_SECOND);
        options.add(DataGenConnectorOptions.NUMBER_OF_ROWS);
        options.add(DataGenConnectorOptions.SOURCE_PARALLELISM);

        // Placeholder options
        options.add(DataGenConnectorOptions.FIELD_KIND);
        options.add(DataGenConnectorOptions.FIELD_MIN);
        options.add(DataGenConnectorOptions.FIELD_MAX);
        options.add(DataGenConnectorOptions.FIELD_MAX_PAST);
        options.add(DataGenConnectorOptions.FIELD_LENGTH);
        options.add(DataGenConnectorOptions.FIELD_START);
        options.add(DataGenConnectorOptions.FIELD_END);
        options.add(DataGenConnectorOptions.FIELD_NULL_RATE);
        options.add(DataGenConnectorOptions.FIELD_VAR_LEN);

        return options;
    }

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        Configuration options = new Configuration();
        context.getCatalogTable().getOptions().forEach(options::setString);

        DataType rowDataType = context.getPhysicalRowDataType();
        DataGenerator<?>[] fieldGenerators = new DataGenerator[DataType.getFieldCount(rowDataType)];
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();

        List<String> fieldNames = DataType.getFieldNames(rowDataType);
        List<DataType> fieldDataTypes = DataType.getFieldDataTypes(rowDataType);
        for (int i = 0; i < fieldGenerators.length; i++) {
            String name = fieldNames.get(i);
            DataType type = fieldDataTypes.get(i);

            ConfigOption<String> kind =
                    key(DataGenConnectorOptionsUtil.FIELDS
                                    + "."
                                    + name
                                    + "."
                                    + DataGenConnectorOptionsUtil.KIND)
                            .stringType()
                            .defaultValue(DataGenConnectorOptionsUtil.RANDOM);
            DataGeneratorContainer container =
                    createContainer(name, type, options.get(kind), options);
            fieldGenerators[i] = container.getGenerator();

            optionalOptions.add(kind);
            optionalOptions.addAll(container.getOptions());
        }

        FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, options);

        Set<String> consumedOptionKeys = new HashSet<>();
        consumedOptionKeys.add(CONNECTOR.key());
        consumedOptionKeys.add(DataGenConnectorOptions.ROWS_PER_SECOND.key());
        consumedOptionKeys.add(DataGenConnectorOptions.NUMBER_OF_ROWS.key());
        consumedOptionKeys.add(DataGenConnectorOptions.SOURCE_PARALLELISM.key());
        optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
        FactoryUtil.validateUnconsumedKeys(
                factoryIdentifier(), options.keySet(), consumedOptionKeys);

        String name = context.getObjectIdentifier().toString();
        return new DataGenTableSource(
                fieldGenerators,
                name,
                rowDataType,
                options.get(DataGenConnectorOptions.ROWS_PER_SECOND),
                options.get(DataGenConnectorOptions.NUMBER_OF_ROWS),
                options.getOptional(DataGenConnectorOptions.SOURCE_PARALLELISM).orElse(null));
    }

2.1 构建DynamicTableSource 

对于DataGen实现scanTableSource接口即可,不存在lookup查询的场景;

其中getScanRuntimeProvider方法为数据生成器SourceFunction的实现逻辑提供者

@Internal
public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {

    private final DataGenerator<?>[] fieldGenerators;
    private final String tableName;
    private final DataType rowDataType;
    private final long rowsPerSecond;
    private Long numberOfRows;
    private final @Nullable Integer parallelism;

    public DataGenTableSource(
            DataGenerator<?>[] fieldGenerators,
            String tableName,
            DataType rowDataType,
            long rowsPerSecond,
            Long numberOfRows,
            Integer parallelism) {
        this.fieldGenerators = fieldGenerators;
        this.tableName = tableName;
        this.rowDataType = rowDataType;
        this.rowsPerSecond = rowsPerSecond;
        this.numberOfRows = numberOfRows;
        this.parallelism = parallelism;
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        boolean isBounded = numberOfRows != null;
        return SourceFunctionProvider.of(createSource(), isBounded, parallelism);
    }

    @VisibleForTesting
    public DataGeneratorSource<RowData> createSource() {
        return new DataGeneratorSource<>(
                new RowDataGenerator(fieldGenerators, DataType.getFieldNames(rowDataType), 0),
                rowsPerSecond,
                numberOfRows);
    }

    @Override
    public DynamicTableSource copy() {
        return new DataGenTableSource(
                fieldGenerators, tableName, rowDataType, rowsPerSecond, numberOfRows, parallelism);
    }

    @Override
    public String asSummaryString() {
        return "DataGenTableSource";
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    @Override
    public void applyLimit(long limit) {
        this.numberOfRows = limit;
    }
}

2.2 DataGeneratorSource

其中open方法为初次构建对象实例的初始化操作;

其中run方法为生成数据的实现逻辑,由实现逻辑可以看出,由DataGenerator否则生成数据,run中逻辑控制每秒生成的数据量和如果有总量配置的情况的数据生成截至配置等逻辑

public class DataGeneratorSource<T> extends RichParallelSourceFunction<T>
        implements CheckpointedFunction {

    @Override
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);

        if (numberOfRows != null) {
            final int stepSize = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
            final int taskIdx = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();

            final int baseSize = (int) (numberOfRows / stepSize);
            toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
        }
    }


    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        double taskRowsPerSecond =
                (double) rowsPerSecond
                        / getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
        long nextReadTime = System.currentTimeMillis();

        while (isRunning) {
            for (int i = 0; i < taskRowsPerSecond; i++) {
                if (isRunning
                        && generator.hasNext()
                        && (numberOfRows == null || outputSoFar < toOutput)) {
                    synchronized (ctx.getCheckpointLock()) {
                        outputSoFar++;
                        ctx.collect(this.generator.next());
                    }
                } else {
                    return;
                }
            }

            nextReadTime += 1000;
            long toWaitMs = nextReadTime - System.currentTimeMillis();
            while (toWaitMs > 0) {
                Thread.sleep(toWaitMs);
                toWaitMs = nextReadTime - System.currentTimeMillis();
            }
        }
    }

标签:走读,Flink,public,add,源码,rowDataType,DataGenConnectorOptions,numberOfRows,options
From: https://blog.csdn.net/qq_43462685/article/details/141460021

相关文章

  • Java计算机毕业设计研究生-导师任务管理系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在高等教育领域,研究生教育与导师之间的有效沟通与合作是确保研究质量、促进学生成长的关键环节。然而,随着研究生招生规模的扩大和科研项目的复杂化,传......
  • Java计算机毕业设计线上养老院管理系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着社会的老龄化进程加速,养老问题日益成为社会关注的焦点。传统养老模式面临资源分配不均、服务效率低下等挑战,难以满足老年人日益增长的多样化需求......
  • Java计算机毕业设计医院固定资产系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景:随着医疗技术的不断进步和医院规模的持续扩大,医院固定资产的数量与种类日益增多,管理难度也随之加大。传统的固定资产管理模式往往依赖于人工记录与核......
  • Java计算机毕业设计阳光幼儿园信息管理系统(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着教育信息化的快速发展,幼儿园作为儿童启蒙教育的重要阶段,其管理模式的智能化与信息化已成为提升教育质量、加强家园共育的关键。当前,许多幼儿园仍......
  • Java计算机毕业设计框架的白果园网上水果超市的设计与实现(开题+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和人们生活节奏的加快,电子商务已成为现代消费不可或缺的一部分。在食品零售领域,尤其是水果市场,传统的线下购买方式逐渐难以......
  • 免费分享一套Java协同过滤推荐算法的SpringBoot+Vue(图书)商城系统【论文+源码+SQL脚
    大家好,我是java1234_小锋老师,看到一个不错的Java协同过滤推荐算法的SpringBoot+Vue(图书)商城系统,分享下哈。项目视频演示【免费】Java协同过滤推荐算法的SpringBoot+Vue(图书)商城系统Java毕业设计_哔哩哔哩_bilibili项目介绍伴随着Internet的蓬勃发展,电子商务也取得了......
  • 短剧分销系统搭建教程,源码分享+部署上线指南
    一、短剧分销系统是什么?简单来说就是用来分销推广短剧的系统,系统对接他人短剧小程序片源,仅推广分销用户看剧充值在第三方小程序,佣金为第三方打款。短剧分销系统变现方式简述:付费观看:单剧付费或会员订阅,直接获取用户收入。分销佣金:推广者通过销售获得分成。打赏机制:用户打赏......
  • 源码搭建说明
    云分发系统安装说明【服务器以及环境】系统CentOS7以上然后安装宝塔JAVAPHP7.2Mysql5.6SSL证书JAVA安装说明打开宝塔终端,输入yuminstalljava-1.8.0-openjdk-devel.x86_64,中间如果有yn选项的话,选择y,意思就是继续,最后等待安装完成即可。本产品禁止用于含木马、病毒......
  • 【java计算机毕设】车联网位置信息管理系统MySQL springcloud vue maven项目设计源码
    目录1项目功能2项目介绍3项目地址 1项目功能【java计算机毕设】车联网位置信息管理系统MySQLspringcloudvuemaven项目设计源码前后端可分离也可不分离 2项目介绍系统功能:车联网位置信息管理系统包括管理员、用户两种角色。管理员功能包括个人中心模块用......
  • 【精品】超市售货管理平台小程序(源码+辅导+设计)
    博主介绍:  ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W+粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台的优质作者。通过长期分享和实战指导,我致力于帮助更多学生......