MongodbConf
package com.citi.conf;
import java.io.Serializable;
public class MongodbConf implements Serializable {
private static final long serialVersionUID = 1L;
private String database;
private String collection;
private String uri;
private int maxConnectionIdleTime;
public MongodbConf(String database, String collection, String uri, int maxConnectionIdleTime) {
this.database = database;
this.collection = collection;
this.uri = uri;
this.maxConnectionIdleTime = maxConnectionIdleTime;
}
public String getDatabase() {
return this.database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getCollection() {
return this.collection;
}
public void setCollection(String collection) {
this.collection = collection;
}
public String getUri() {
return this.uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public int getMaxConnectionIdleTime() {
return this.maxConnectionIdleTime;
}
public void setMaxConnectionIdleTime(int maxConnectionIdleTime) {
this.maxConnectionIdleTime = maxConnectionIdleTime;
}
public String toString() {
return "MongodbConf{database='" + this.database + '\'' + ", collection='" + this.collection + '\'' + ", uri='" + this.uri + '\'' + ", maxConnectionIdleTime=" + this.maxConnectionIdleTime + '}';
}
}
MongodbSinkConf
package com.citi.conf;
import com.citi.conf.MongodbConf;
public class MongodbSinkConf extends MongodbConf {
private final int batchSize;
public MongodbSinkConf(String database, String collection, String uri, int maxConnectionIdleTime, int batchSize) {
super(database, collection, uri, maxConnectionIdleTime);
this.batchSize = batchSize;
}
public int getBatchSize() {
return this.batchSize;
}
public String toString() {
return "MongodbSinkConf{" + super.toString() + "batchSize=" + this.batchSize + '}';
}
}
MongodbBaseSinkFunction
package com.citi;
import com.citi.conf.MongodbSinkConf;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.bson.Document;
import java.util.ArrayList;
import java.util.List;
public abstract class MongodbBaseSinkFunction<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
private final MongodbSinkConf mongodbSinkConf;
private transient MongoClient client;
private transient List<Document> batch;
protected MongodbBaseSinkFunction(MongodbSinkConf mongodbSinkConf) {
this.mongodbSinkConf = mongodbSinkConf;
}
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MongoClientSettings clientSettings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(this.mongodbSinkConf.getUri()))
.build();
// .applyToSslSettings(builder -> {
// builder.enabled(true);
// builder.invalidHostNameAllowed(true);
// }).build();
this.client = MongoClients.create(clientSettings);
this.batch = new ArrayList<>();
}
public void close() throws Exception {
this.flush();
super.close();
this.client.close();
this.client = null;
}
public void invoke(IN value, SinkFunction.Context context) throws Exception {
this.batch.add(this.invokeDocument(value, context));
if (this.batch.size() >= this.mongodbSinkConf.getBatchSize()) {
this.flush();
}
}
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
this.flush();
}
public void initializeState(FunctionInitializationContext functionInitializationContext) {
}
private void flush() {
if (!this.batch.isEmpty()) {
MongoDatabase mongoDatabase = this.client.getDatabase(this.mongodbSinkConf.getDatabase());
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(this.mongodbSinkConf.getCollection());
mongoCollection.insertMany(this.batch);
this.batch.clear();
}
}
abstract Document invokeDocument(IN var1, SinkFunction.Context var2) throws Exception;
}
MongodbDynamicTableSink
package com.citi;
import com.citi.conf.MongodbSinkConf;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
public class MongodbDynamicTableSink implements DynamicTableSink {
private final MongodbSinkConf mongodbSinkConf;
private final TableSchema tableSchema;
public MongodbDynamicTableSink(MongodbSinkConf mongodbSinkConf, TableSchema tableSchema) {
this.mongodbSinkConf = mongodbSinkConf;
this.tableSchema = tableSchema;
}
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return ChangelogMode.insertOnly();
}
public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.tableSchema.toRowDataType());
return SinkFunctionProvider.of(new MongodbUpsertSinkFunction(this.mongodbSinkConf, this.tableSchema.getFieldNames(), converter));
}
public DynamicTableSink copy() {
return new MongodbDynamicTableSink(this.mongodbSinkConf, this.tableSchema);
}
public String asSummaryString() {
return "MongoDB";
}
}
MongodbDynamicTableSinkFactory
package com.citi;
import com.citi.conf.MongodbSinkConf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
public class MongodbDynamicTableSinkFactory implements DynamicTableSinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(MongodbDynamicTableSinkFactory.class);
static final String IDENTIFIER = "mongodb";
static final ConfigOption<String> DATABASE = ConfigOptions.key("database".toLowerCase()).stringType().noDefaultValue().withDescription("The data base to connect.");
static final ConfigOption<String> URI = ConfigOptions.key("uri".toLowerCase()).stringType().noDefaultValue().withDescription("The uri to connect.");
static final ConfigOption<String> COLLECTION_NAME = ConfigOptions.key("collection".toLowerCase()).stringType().noDefaultValue().withDescription("The name of the collection to return.");
static final ConfigOption<Integer> MAX_CONNECTION_IDLE_TIME = ConfigOptions.key("maxConnectionIdleTime".toLowerCase()).intType().defaultValue(60000).withDescription("The maximum idle time for a pooled connection.");
static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions.key("batchSize".toLowerCase()).intType().defaultValue(1024).withDescription("The batch size when sink invoking.");
public MongodbDynamicTableSinkFactory() {}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
DynamicTableFactory.Context normalizedContext = normalizeContext(this, context);
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, normalizedContext);
helper.validate();
MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(helper.getOptions().get(DATABASE), helper.getOptions().get(COLLECTION_NAME), helper.getOptions().get(URI), helper.getOptions().get(MAX_CONNECTION_IDLE_TIME), helper.getOptions().get(BATCH_SIZE));
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(normalizedContext.getCatalogTable().getSchema());
LOG.info("Create dynamic mongoDB table sink: {}.", mongodbSinkConf);
return new MongodbDynamicTableSink(mongodbSinkConf, physicalSchema);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(DATABASE);
requiredOptions.add(COLLECTION_NAME);
requiredOptions.add(URI);
return requiredOptions;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> optionals = new HashSet<>();
optionals.add(MAX_CONNECTION_IDLE_TIME);
optionals.add(BATCH_SIZE);
return optionals;
}
public static DynamicTableFactory.Context normalizeContext(DynamicTableFactory factory, DynamicTableFactory.Context context) {
Map<String, String> catalogOptions = context.getCatalogTable().getOptions();
Map<String, String> convertedOptions = normalizeOptionCaseAsFactory(factory, catalogOptions);
return new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), context.getCatalogTable().copy(convertedOptions), context.getEnrichmentOptions(), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
}
public static Map<String, String> normalizeOptionCaseAsFactory(Factory factory, Map<String, String> options) {
Map<String, String> normalizedOptions = new HashMap<>();
Map<String, String> requiredOptionKeysLowerCaseToOriginal = factory.requiredOptions().stream().collect(Collectors.toMap((option) -> option.key().toLowerCase(), ConfigOption::key));
Map<String, String> optionalOptionKeysLowerCaseToOriginal = factory.optionalOptions().stream().collect(Collectors.toMap((option) -> option.key().toLowerCase(), ConfigOption::key));
for (Map.Entry<String, String> stringStringEntry : options.entrySet()) {
String catalogOptionKey = stringStringEntry.getKey();
String catalogOptionValue = stringStringEntry.getValue();
normalizedOptions.put(requiredOptionKeysLowerCaseToOriginal.containsKey(catalogOptionKey.toLowerCase()) ? (String) requiredOptionKeysLowerCaseToOriginal.get(catalogOptionKey.toLowerCase()) : (String) optionalOptionKeysLowerCaseToOriginal.getOrDefault(catalogOptionKey.toLowerCase(), catalogOptionKey), catalogOptionValue);
}
return normalizedOptions;
}
}
MongodbUpsertSinkFunction
package com.citi;
import com.citi.conf.MongodbSinkConf;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.bson.Document;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class MongodbUpsertSinkFunction extends MongodbBaseSinkFunction<RowData> {
private final DynamicTableSink.DataStructureConverter converter;
private final String[] fieldNames;
public MongodbUpsertSinkFunction(MongodbSinkConf mongodbSinkConf, String[] fieldNames, DynamicTableSink.DataStructureConverter converter) {
super(mongodbSinkConf);
this.fieldNames = fieldNames;
this.converter = converter;
}
Document invokeDocument(RowData value, SinkFunction.Context context) {
Row row = (Row)this.converter.toExternal(value);
Map<String, Object> map = new HashMap<>();
for(int i = 0; i < this.fieldNames.length; ++i) {
Object fieldValue = row.getField(i);
if (fieldValue.getClass().isArray()) {
fieldValue = Arrays.asList((Object[])fieldValue);
}
map.put(this.fieldNames[i], fieldValue);
}
return new Document(map);
}
}
标签:String,flink,org,apache,import,public,Mongo
From: https://www.cnblogs.com/INnoVationv2/p/17004850.html