首页 > 数据库 >SqlserverCDCSourceSink

SqlserverCDCSourceSink

时间:2024-06-20 23:53:37浏览次数:31  
标签:flink sqlserver SqlserverCDCSourceSink org apache import com

此处调用官方sink demo,更新和删除逻辑还需要再判断实现。

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class SqlserverCDCSourceSink {
public static void main(String[] args) {
SingleTest();

}

//Single Thread Reading
public static void SingleTest(){
    SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
            .hostname("localhost")
            .port(1433)
            .database("flinkcdc") // monitor sqlserver database
            .tableList("dbo.c2") // monitor products table
            .username("sa")
            .password("123456")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);
    DataStreamSource<String> ds = env.addSource(sourceFunction);// use parallelism 1 for sink to keep message ordering
    SingleOutputStreamOperator<C2> map = ds.map(new StrToC2());

    map.addSink(JdbcSink.sink(
            "insert into c2_sink(id2,name2) values (?,?)",
            (ps, t) -> {
                ps.setInt(1, t.id2);
                ps.setString(2, t.name2);
            },
            JdbcExecutionOptions.builder()
                    .withBatchSize(1) // 批次大小,条数
                    .withBatchIntervalMs(1000) // 批次最大等待时间
                    .withMaxRetries(1) // 重复次数
                    .build()
            ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
                    .withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
                    .withUsername("sa")
                    .withPassword("123456")
                    .build()
                   ));

    try{
        env.execute();
    }catch (Exception e){
        e.printStackTrace();
    }
}

}

标签:flink,sqlserver,SqlserverCDCSourceSink,org,apache,import,com
From: https://www.cnblogs.com/huft/p/18259704

相关文章