首页 > 其他分享 >Mongo

Mongo

时间:2022-12-25 23:34:27浏览次数:32  
标签:String flink org apache import public Mongo

MongodbConf

package com.citi.conf;

import java.io.Serializable;

public class MongodbConf implements Serializable {
    private static final long serialVersionUID = 1L;
    private String database;
    private String collection;
    private String uri;
    private int maxConnectionIdleTime;

    public MongodbConf(String database, String collection, String uri, int maxConnectionIdleTime) {
        this.database = database;
        this.collection = collection;
        this.uri = uri;
        this.maxConnectionIdleTime = maxConnectionIdleTime;
    }

    public String getDatabase() {
        return this.database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public String getCollection() {
        return this.collection;
    }

    public void setCollection(String collection) {
        this.collection = collection;
    }

    public String getUri() {
        return this.uri;
    }

    public void setUri(String uri) {
        this.uri = uri;
    }

    public int getMaxConnectionIdleTime() {
        return this.maxConnectionIdleTime;
    }

    public void setMaxConnectionIdleTime(int maxConnectionIdleTime) {
        this.maxConnectionIdleTime = maxConnectionIdleTime;
    }

    public String toString() {
        return "MongodbConf{database='" + this.database + '\'' + ", collection='" + this.collection + '\'' + ", uri='" + this.uri + '\'' + ", maxConnectionIdleTime=" + this.maxConnectionIdleTime + '}';
    }
}

MongodbSinkConf

package com.citi.conf;

import com.citi.conf.MongodbConf;

public class MongodbSinkConf extends MongodbConf {
    private final int batchSize;

    public MongodbSinkConf(String database, String collection, String uri, int maxConnectionIdleTime, int batchSize) {
        super(database, collection, uri, maxConnectionIdleTime);
        this.batchSize = batchSize;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public String toString() {
        return "MongodbSinkConf{" + super.toString() + "batchSize=" + this.batchSize + '}';
    }
}

MongodbBaseSinkFunction

package com.citi;

import com.citi.conf.MongodbSinkConf;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.bson.Document;

import java.util.ArrayList;
import java.util.List;

public abstract class MongodbBaseSinkFunction<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private final MongodbSinkConf mongodbSinkConf;
    private transient MongoClient client;
    private transient List<Document> batch;

    protected MongodbBaseSinkFunction(MongodbSinkConf mongodbSinkConf) {
        this.mongodbSinkConf = mongodbSinkConf;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MongoClientSettings clientSettings = MongoClientSettings.builder()
                .applyConnectionString(new ConnectionString(this.mongodbSinkConf.getUri()))
                .build();
//                .applyToSslSettings(builder -> {
//                    builder.enabled(true);
//                    builder.invalidHostNameAllowed(true);
//                }).build();
        this.client = MongoClients.create(clientSettings);
        this.batch = new ArrayList<>();
    }

    public void close() throws Exception {
        this.flush();
        super.close();
        this.client.close();
        this.client = null;
    }

    public void invoke(IN value, SinkFunction.Context context) throws Exception {
        this.batch.add(this.invokeDocument(value, context));
        if (this.batch.size() >= this.mongodbSinkConf.getBatchSize()) {
            this.flush();
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        this.flush();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
    }

    private void flush() {
        if (!this.batch.isEmpty()) {
            MongoDatabase mongoDatabase = this.client.getDatabase(this.mongodbSinkConf.getDatabase());
            MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(this.mongodbSinkConf.getCollection());
            mongoCollection.insertMany(this.batch);
            this.batch.clear();
        }
    }

    abstract Document invokeDocument(IN var1, SinkFunction.Context var2) throws Exception;
}

MongodbDynamicTableSink

package com.citi;

import com.citi.conf.MongodbSinkConf;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;

public class MongodbDynamicTableSink implements DynamicTableSink {
    private final MongodbSinkConf mongodbSinkConf;
    private final TableSchema tableSchema;

    public MongodbDynamicTableSink(MongodbSinkConf mongodbSinkConf, TableSchema tableSchema) {
        this.mongodbSinkConf = mongodbSinkConf;
        this.tableSchema = tableSchema;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.tableSchema.toRowDataType());
        return SinkFunctionProvider.of(new MongodbUpsertSinkFunction(this.mongodbSinkConf, this.tableSchema.getFieldNames(), converter));
    }

    public DynamicTableSink copy() {
        return new MongodbDynamicTableSink(this.mongodbSinkConf, this.tableSchema);
    }

    public String asSummaryString() {
        return "MongoDB";
    }
}

MongodbDynamicTableSinkFactory

package com.citi;

import com.citi.conf.MongodbSinkConf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;

public class MongodbDynamicTableSinkFactory implements DynamicTableSinkFactory {
    private static final Logger LOG = LoggerFactory.getLogger(MongodbDynamicTableSinkFactory.class);

    static final String IDENTIFIER = "mongodb";
    static final ConfigOption<String> DATABASE = ConfigOptions.key("database".toLowerCase()).stringType().noDefaultValue().withDescription("The data base to connect.");
    static final ConfigOption<String> URI = ConfigOptions.key("uri".toLowerCase()).stringType().noDefaultValue().withDescription("The uri to connect.");
    static final ConfigOption<String> COLLECTION_NAME = ConfigOptions.key("collection".toLowerCase()).stringType().noDefaultValue().withDescription("The name of the collection to return.");
    static final ConfigOption<Integer> MAX_CONNECTION_IDLE_TIME = ConfigOptions.key("maxConnectionIdleTime".toLowerCase()).intType().defaultValue(60000).withDescription("The maximum idle time for a pooled connection.");
    static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions.key("batchSize".toLowerCase()).intType().defaultValue(1024).withDescription("The batch size when sink invoking.");

    public MongodbDynamicTableSinkFactory() {}

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        DynamicTableFactory.Context normalizedContext = normalizeContext(this, context);
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, normalizedContext);
        helper.validate();
        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(helper.getOptions().get(DATABASE), helper.getOptions().get(COLLECTION_NAME), helper.getOptions().get(URI), helper.getOptions().get(MAX_CONNECTION_IDLE_TIME), helper.getOptions().get(BATCH_SIZE));
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(normalizedContext.getCatalogTable().getSchema());
        LOG.info("Create dynamic mongoDB table sink: {}.", mongodbSinkConf);
        return new MongodbDynamicTableSink(mongodbSinkConf, physicalSchema);
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(DATABASE);
        requiredOptions.add(COLLECTION_NAME);
        requiredOptions.add(URI);
        return requiredOptions;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> optionals = new HashSet<>();
        optionals.add(MAX_CONNECTION_IDLE_TIME);
        optionals.add(BATCH_SIZE);
        return optionals;
    }

    public static DynamicTableFactory.Context normalizeContext(DynamicTableFactory factory, DynamicTableFactory.Context context) {
        Map<String, String> catalogOptions = context.getCatalogTable().getOptions();
        Map<String, String> convertedOptions = normalizeOptionCaseAsFactory(factory, catalogOptions);
        return new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), context.getCatalogTable().copy(convertedOptions), context.getEnrichmentOptions(), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
    }

    public static Map<String, String> normalizeOptionCaseAsFactory(Factory factory, Map<String, String> options) {
        Map<String, String> normalizedOptions = new HashMap<>();
        Map<String, String> requiredOptionKeysLowerCaseToOriginal = factory.requiredOptions().stream().collect(Collectors.toMap((option) -> option.key().toLowerCase(), ConfigOption::key));
        Map<String, String> optionalOptionKeysLowerCaseToOriginal = factory.optionalOptions().stream().collect(Collectors.toMap((option) -> option.key().toLowerCase(), ConfigOption::key));

        for (Map.Entry<String, String> stringStringEntry : options.entrySet()) {
            String catalogOptionKey = stringStringEntry.getKey();
            String catalogOptionValue = stringStringEntry.getValue();
            normalizedOptions.put(requiredOptionKeysLowerCaseToOriginal.containsKey(catalogOptionKey.toLowerCase()) ? (String) requiredOptionKeysLowerCaseToOriginal.get(catalogOptionKey.toLowerCase()) : (String) optionalOptionKeysLowerCaseToOriginal.getOrDefault(catalogOptionKey.toLowerCase(), catalogOptionKey), catalogOptionValue);
        }

        return normalizedOptions;
    }
}

MongodbUpsertSinkFunction

package com.citi;

import com.citi.conf.MongodbSinkConf;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.bson.Document;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class MongodbUpsertSinkFunction extends MongodbBaseSinkFunction<RowData> {
    private final DynamicTableSink.DataStructureConverter converter;
    private final String[] fieldNames;

    public MongodbUpsertSinkFunction(MongodbSinkConf mongodbSinkConf, String[] fieldNames, DynamicTableSink.DataStructureConverter converter) {
        super(mongodbSinkConf);
        this.fieldNames = fieldNames;
        this.converter = converter;
    }

    Document invokeDocument(RowData value, SinkFunction.Context context) {
        Row row = (Row)this.converter.toExternal(value);
        Map<String, Object> map = new HashMap<>();

        for(int i = 0; i < this.fieldNames.length; ++i) {
            Object fieldValue = row.getField(i);
            if (fieldValue.getClass().isArray()) {
                fieldValue = Arrays.asList((Object[])fieldValue);
            }

            map.put(this.fieldNames[i], fieldValue);
        }

        return new Document(map);
    }
}

标签:String,flink,org,apache,import,public,Mongo
From: https://www.cnblogs.com/INnoVationv2/p/17004850.html

相关文章

  • MongoDB 索引原理与索引优化
    转载请注明出处:1.MongoDB索引索引通常能够极大的提高查询的效率,如果没有索引,MongoDB在读取数据时必须扫描集合中的每个文件并选取那些符合查询条件的记录。这种......
  • 使用 REST 访问 MongoDB 数据
    本指南将引导您完成创建应用程序的过程,该应用程序通过基于超媒体休息的前端。您将构建什么您将构建一个Spring应用程序,该应用程序允许您创建和检索存储在​​Person​​蒙......
  • mongodb查看数据库和表的信息
    mongodb查看数据库和表的信息mongodb查看数据库和表的方法比较简单,在为这里推荐使用stats的方法,直观并且详细。1、查看数据库db.stats();1输出:{"db":"sirius","colle......
  • SpringBoot2.x系列教程51--NoSQL之SpringBoot整合MongoDB
    SpringBoot2.x系列教程51--NoSQL之SpringBoot整合MongoDB作者:一一哥一.MongoDB1.MongoDB简介MongoDB一词来自于英文单词“Humongous”,中文含义为“庞大”,是由C++语言编写......
  • MongoDB 3.x版本无法找到mongod.cfg文件以及无法开启远程访问的解决方案
    本篇博客主要在于解决mongodb3.6版本开启远程访问,并非mongodb软件安装指南。如果你安装后可以看到bin\mongod.cfg文件,那么无需看此博客这里附一条安装方法的连接https://bl......
  • MongoDB从入门到实战之MongoDB快速入门
    前言 上一章节主要概述了MongoDB的优劣势、应用场景和发展史。这一章节将快速的概述一下MongoDB的基本概念,带领大家快速入门MongoDB这个文档型的NoSQL数据库。Mongo......
  • MongoDB 增删改查 常用sql总结
    本文为博主原创,转载请注明出处:1.切换到指定数据库:如果不存在则创建usedatabase 2.查看所有文档showtablesshowcollections 3.创建表#创建文档db.crea......
  • 使用 MongoDB 访问数据
    本指南将引导您完成使用过程春季数据MongoDB​构建一个存储数据并从中检索数据的应用程序蒙戈数据库,一个基于文档的数据库。您将构建什么您将使用SpringDataMongoDB将......
  • NoSQL 数据库案例实战 --MongoDB 数据库高可用解决方案 -- MongoDB复制集(主从复制)
    MongoDB数据库高可用解决方案--MongoDB复制集(主从复制)​​前言​​​​一、什么是MongoDB复制集?​​​​二、MongoDB复制集架构设计​​​​三、MongoDB复制集实战案......
  • NoSQL 数据库案例实战 --MongoDB 数据库 用户管理
    MongoDB数据库用户管理​​前言​​​​一、创建用户​​​​二、登录数据库​​前言本环境是基于Centos7.8系统构建mongodb-enterprise-4.2.8学习环境具体构建,请参考......