一 、说明
通常我们直接使用Flink的sql进行实时任务开发,经常会遇到扩展新的数据源端或者目标端的场景,或者需要了解connector的一些源码机制,方便开发和定位问题。
如何扩展新增Sql connector呢?
扩展Apache Flink的新SQL Connector主要涉及以下几个步骤:
-
定义动态工厂类:自定义Factory需要继承
DynamicTableSinkFactory
和DynamicTableSourceFactory
接口,实现相关的方法,如createDynamicTableSink
、factoryIdentifier
、requiredOptions
和optionalOptions
等。 -
实现具体的Source和Sink:根据需要连接的数据存储或服务,实现具体的
DynamicTableSink
和DynamicTableSource
。这可能涉及到编写自定义的SinkFunction
或SourceFunction
。 -
使用Java SPI机制:通过在项目的
resources
目录下创建META-INF.services
文件,并添加org.apache.flink.table.factories.Factory
的实现类全路径,使得Flink SQL客户端能够识别并加载自定义的Connector。
二、Flink原生自带的DataGenTableSourceFactory如何实现的呢?
DataGen作为Flink提供的source数据生成器,实现的为DynamicTableSourceFactory,即构建数据源端,核心实现如下四个部分:
1)factoryIdentifier 方法,返回唯一标识,用于区分与其他connector的配置;
2)requiredOptions 必选配置项,无
3)optionalOptions 可选配置项,用于配置产数据数据的条数,字段类型、边界限制等内容;
4)构建DynamicTableSource 此为最核心实现,此过程包括create source datagen 的sql语句中schema读取,配置读取,参数校验与转换等内容
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public static final String IDENTIFIER = "datagen";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(DataGenConnectorOptions.ROWS_PER_SECOND);
options.add(DataGenConnectorOptions.NUMBER_OF_ROWS);
options.add(DataGenConnectorOptions.SOURCE_PARALLELISM);
// Placeholder options
options.add(DataGenConnectorOptions.FIELD_KIND);
options.add(DataGenConnectorOptions.FIELD_MIN);
options.add(DataGenConnectorOptions.FIELD_MAX);
options.add(DataGenConnectorOptions.FIELD_MAX_PAST);
options.add(DataGenConnectorOptions.FIELD_LENGTH);
options.add(DataGenConnectorOptions.FIELD_START);
options.add(DataGenConnectorOptions.FIELD_END);
options.add(DataGenConnectorOptions.FIELD_NULL_RATE);
options.add(DataGenConnectorOptions.FIELD_VAR_LEN);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration options = new Configuration();
context.getCatalogTable().getOptions().forEach(options::setString);
DataType rowDataType = context.getPhysicalRowDataType();
DataGenerator<?>[] fieldGenerators = new DataGenerator[DataType.getFieldCount(rowDataType)];
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
List<String> fieldNames = DataType.getFieldNames(rowDataType);
List<DataType> fieldDataTypes = DataType.getFieldDataTypes(rowDataType);
for (int i = 0; i < fieldGenerators.length; i++) {
String name = fieldNames.get(i);
DataType type = fieldDataTypes.get(i);
ConfigOption<String> kind =
key(DataGenConnectorOptionsUtil.FIELDS
+ "."
+ name
+ "."
+ DataGenConnectorOptionsUtil.KIND)
.stringType()
.defaultValue(DataGenConnectorOptionsUtil.RANDOM);
DataGeneratorContainer container =
createContainer(name, type, options.get(kind), options);
fieldGenerators[i] = container.getGenerator();
optionalOptions.add(kind);
optionalOptions.addAll(container.getOptions());
}
FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, options);
Set<String> consumedOptionKeys = new HashSet<>();
consumedOptionKeys.add(CONNECTOR.key());
consumedOptionKeys.add(DataGenConnectorOptions.ROWS_PER_SECOND.key());
consumedOptionKeys.add(DataGenConnectorOptions.NUMBER_OF_ROWS.key());
consumedOptionKeys.add(DataGenConnectorOptions.SOURCE_PARALLELISM.key());
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
FactoryUtil.validateUnconsumedKeys(
factoryIdentifier(), options.keySet(), consumedOptionKeys);
String name = context.getObjectIdentifier().toString();
return new DataGenTableSource(
fieldGenerators,
name,
rowDataType,
options.get(DataGenConnectorOptions.ROWS_PER_SECOND),
options.get(DataGenConnectorOptions.NUMBER_OF_ROWS),
options.getOptional(DataGenConnectorOptions.SOURCE_PARALLELISM).orElse(null));
}
2.1 构建DynamicTableSource
对于DataGen实现scanTableSource接口即可,不存在lookup查询的场景;
其中getScanRuntimeProvider方法为数据生成器SourceFunction的实现逻辑提供者
@Internal
public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {
private final DataGenerator<?>[] fieldGenerators;
private final String tableName;
private final DataType rowDataType;
private final long rowsPerSecond;
private Long numberOfRows;
private final @Nullable Integer parallelism;
public DataGenTableSource(
DataGenerator<?>[] fieldGenerators,
String tableName,
DataType rowDataType,
long rowsPerSecond,
Long numberOfRows,
Integer parallelism) {
this.fieldGenerators = fieldGenerators;
this.tableName = tableName;
this.rowDataType = rowDataType;
this.rowsPerSecond = rowsPerSecond;
this.numberOfRows = numberOfRows;
this.parallelism = parallelism;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
boolean isBounded = numberOfRows != null;
return SourceFunctionProvider.of(createSource(), isBounded, parallelism);
}
@VisibleForTesting
public DataGeneratorSource<RowData> createSource() {
return new DataGeneratorSource<>(
new RowDataGenerator(fieldGenerators, DataType.getFieldNames(rowDataType), 0),
rowsPerSecond,
numberOfRows);
}
@Override
public DynamicTableSource copy() {
return new DataGenTableSource(
fieldGenerators, tableName, rowDataType, rowsPerSecond, numberOfRows, parallelism);
}
@Override
public String asSummaryString() {
return "DataGenTableSource";
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public void applyLimit(long limit) {
this.numberOfRows = limit;
}
}
2.2 DataGeneratorSource
其中open方法为初次构建对象实例的初始化操作;
其中run方法为生成数据的实现逻辑,由实现逻辑可以看出,由DataGenerator否则生成数据,run中逻辑控制每秒生成的数据量和如果有总量配置的情况的数据生成截至配置等逻辑
public class DataGeneratorSource<T> extends RichParallelSourceFunction<T>
implements CheckpointedFunction {
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
if (numberOfRows != null) {
final int stepSize = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
final int taskIdx = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
final int baseSize = (int) (numberOfRows / stepSize);
toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
}
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
double taskRowsPerSecond =
(double) rowsPerSecond
/ getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
long nextReadTime = System.currentTimeMillis();
while (isRunning) {
for (int i = 0; i < taskRowsPerSecond; i++) {
if (isRunning
&& generator.hasNext()
&& (numberOfRows == null || outputSoFar < toOutput)) {
synchronized (ctx.getCheckpointLock()) {
outputSoFar++;
ctx.collect(this.generator.next());
}
} else {
return;
}
}
nextReadTime += 1000;
long toWaitMs = nextReadTime - System.currentTimeMillis();
while (toWaitMs > 0) {
Thread.sleep(toWaitMs);
toWaitMs = nextReadTime - System.currentTimeMillis();
}
}
}
标签:走读,Flink,public,add,源码,rowDataType,DataGenConnectorOptions,numberOfRows,options
From: https://blog.csdn.net/qq_43462685/article/details/141460021