首页 > 数据库 >SqlserverCDCcrudSourceSink mssql数据实时同步demo

SqlserverCDCcrudSourceSink mssql数据实时同步demo

时间:2024-06-22 16:55:17浏览次数:9  
标签:c2r demo id2 SqlserverCDCcrudSourceSink sink import apache org mssql

package org.hu.fk.datastream_connector;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

// 捕获变更数据,同步目标库
//根据crud类型,分类同步
public class SqlserverCDCcrudSourceSink {
public static void main(String[] args) {
SingleTest();
}

//Single Thread Reading
public static void SingleTest(){
    SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
            .hostname("localhost")
            .port(1433)
            .database("flinkcdc") // monitor sqlserver database
            .tableList("dbo.c2") // monitor products table
            .username("sa")
            .password("123456")
            .startupOptions(StartupOptions.latest())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);
    DataStreamSource<String> ds = env.addSource(sourceFunction);// use parallelism 1 for sink to keep message ordering
    SingleOutputStreamOperator<C2r> map = ds.map(new MyMap());

    OutputTag<C2r> opc=new OutputTag<>("opc", Types.POJO(C2r.class));
    OutputTag<C2r> opu=new OutputTag<>("opu", Types.POJO(C2r.class));
    OutputTag<C2r> opd=new OutputTag<>("opd", Types.POJO(C2r.class));

    SingleOutputStreamOperator<C2r> process = map.process(new ProcessFunction<C2r, C2r>() {

        @Override
        public void processElement(C2r c2r
                , ProcessFunction<C2r, C2r>.Context context
                , Collector<C2r> collector) throws Exception {

            String op = c2r.getOp();

            System.out.println("processElement "+c2r);
            if(c2r.getId2()>=1) {
                if ("c".equals(op)) {
                    context.output(opc, c2r);
                } else if ("u".equals(op)) {
                    context.output(opu, c2r);
                } else if ("d".equals(op)) {
                    context.output(opd, c2r);
                }
            } else {
                collector.collect(c2r);
            }
        }
    });

    SideOutputDataStream<C2r> opcc = process.getSideOutput(opc);
    SideOutputDataStream<C2r> opcu = process.getSideOutput(opu);
    SideOutputDataStream<C2r> opcd = process.getSideOutput(opd);

    opcc.addSink(JdbcSink.sink(
            "insert into c2_sink(id2,name2) values (?,?)",
            //"update c2_sink set name2=? where id2=?",
            // "delete from c2_sink where id2=?",
            (ps, t) -> {
                ps.setInt(1, t.id2);
                ps.setString(2, t.name2);
                //ps.setInt(1, t.id2);
            },
            JdbcExecutionOptions.builder()
                    .withBatchSize(1) // 批次大小,条数
                    .withBatchIntervalMs(1000) // 批次最大等待时间
                    .withMaxRetries(1) // 重复次数
                    .build()
            ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
                    .withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
                    .withUsername("sa")
                    .withPassword("123456")
                    .build()
                   ));

    opcu.addSink(JdbcSink.sink(
            //"insert into c2_sink(id2,name2) values (?,?)",
            "update c2_sink set name2=? where id2=?",
            // "delete from c2_sink where id2=?",
            (ps, t) -> {
                ps.setInt(2, t.id2);
                ps.setString(1, t.name2);
               // ps.setInt(1, t.id2);
            },
            JdbcExecutionOptions.builder()
                    .withBatchSize(1) // 批次大小,条数
                    .withBatchIntervalMs(1000) // 批次最大等待时间
                    .withMaxRetries(1) // 重复次数
                    .build()
            ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
                    .withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
                    .withUsername("sa")
                    .withPassword("123456")
                    .build()
    ));

    opcd.addSink(JdbcSink.sink(
            //"insert into c2_sink(id2,name2) values (?,?)",
           // "update c2_sink set name2=? where id2=?",
             "delete from c2_sink where id2=?",
            (ps, t) -> {
                //ps.setInt(2, t.id2);
                //ps.setString(1, t.name2);
                 ps.setInt(1, t.id2);
            },
            JdbcExecutionOptions.builder()
                    .withBatchSize(1) // 批次大小,条数
                    .withBatchIntervalMs(1000) // 批次最大等待时间
                    .withMaxRetries(1) // 重复次数
                    .build()
            ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
                    .withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
                    .withUsername("sa")
                    .withPassword("123456")
                    .build()
    ));


    try{
        env.execute();
    }catch (Exception e){
        e.printStackTrace();
    }
}

static class MyMap implements MapFunction<String, C2r> {
    @Override
    public C2r map(String s) throws Exception {
        JSONObject source = JSONObject.parseObject(s);
        String op = source.getString("op");
        C2r c2r = new C2r(0, "", op);

        if ("d".equals(op)) {
            parseJsonField(source, "before", c2r);
        }

        if ("c".equals(op) || "u".equals(op)) {
            parseJsonField(source, "after", c2r);
        }

        c2r.setOp(op);
        return c2r;
    }

    private void parseJsonField(JSONObject source, String field, C2r c2r) {
        JSONObject jsonField = JSON.parseObject(source.getString(field));
        c2r.setId2(jsonField.getInteger("id2")==null?0:jsonField.getInteger("id2"));
        c2r.setName2(jsonField.getString("name2")==null?"":jsonField.getString("name2"));
    }
}

}

标签:c2r,demo,id2,SqlserverCDCcrudSourceSink,sink,import,apache,org,mssql
From: https://www.cnblogs.com/huft/p/18262486

相关文章

  • 米尔全志T527系列加推工控板和工控机,更多工业场景DEMO
    自米尔首发基于全志T527系列核心板以来,这款基于八核CPU的高性能国产核心板得到广大客户的好评。这款产品支持Android13、Linux5.15操作系统,还将适配Ubuntu系统,满足开发者们更灵活地开发各种创新应用。米尔为满足不同的客户需求,推出基于全志T527的全系列的产品:米粉派T527、MYD-LT5......
  • WPF绘制3D小demo
    试过了WPF原生3D和HelixToolkit.Wpf,感觉还是SharpDX的效果比较好,所以使用了HelixToolkit.Wpf.SharpDX初学,仅供参考,没搞懂怎么双面渲染,所以每个面用了2个三角形分别显示正面和反面 <Grid><hx:Viewport3DXEnableSwapChainRendering="True"FXAALevel="Low"Backg......
  • DEMO_02:随机数获取;数组集合遍历;整型与字符串转换;字符串字符遍历;数组/集合排序
    /***考核点:随机数获取;数组集合遍历;整型与字符串转换;字符串字符遍历;数组/集合排序*<p>*题目:*1.使用while循环获取20个五位数随机数并打印;*2.遍历20个数,筛选出随机数中3的倍数,并统计个数;*3.符合2的数中,找出五位数中3的倍数和位置*4.符合2的数中,把这五位数......
  • H5移动端加载预览pdf文件——demo
    前言:正常情况下需要在HTML中嵌入本地docx或pdf文件时,我们会有以下解决办法:1.使用<iframe>标签2.使用<embed>标签3.使用<object>标签4.使用第三方库(如PDF.js)当实际操作时,会发现前三种方式在pc端支持,但在移动端不支持,因为这些标签在移动端浏览器中的支持并不统一。为了兼容移......
  • DEMO_01:List数据存储,回调函数,集合转字符串,元素去重
    *题目:*1.构建属性结构List<DemoNode>data,根据本包的data.png中数据结构图将数据存入data中(字就是nodeName)*2.将树形结构List<DemoNode>里面的元素全部遍历出来存放到List<String>list中,输出结果转换成字符串:粉粉碎机被粉碎机粉碎了怎么办*3.将list里元素去重后......
  • 实现一个实时数据平台的小型demo
    近期自己梳理了一下自己所属业务线上的数据中台技术栈,以常见的实时链路为例,从最初的埋点到数据服务层查询到结果,依次经过:1、埋点上报2、写入消息队列3、flink读取队列4、flink写入clickhouse或hbase5、spring项目提供查询和接口返回搭建个简易版的实时数据平台流程跑......
  • Net6 EFCore 基于MSSQL & T4 自动生成字段注释
    文件模板代码<#@templatelanguage="C#"#><#@outputextension=".cs"#><#@assemblyname="System.Core"#><#@importnamespace="System.IO"#><#@importnamespace="System.Linq"#>......
  • Demo | 利用机器学习构建作物模型的Python源码
    作物模型提出很早,但应用有限。看起来复杂,其实解决的是环境与表型间的关联,可参考前期推文:作物生长模型CropGrow。环境组的复杂,关键在于数据的准确性获取。对于数据分析人员来说,如果不care数据准确性,分析其实很简单的,就是经典的机器学习流程。这里提供一段伪代码仅供参考。1.导库......
  • clion+msvc+qml demo
    CMake设置-DCMAKE_PREFIX_PATH=C:\Qt\6.6.2\msvc2019_64demo工程结构:├───CMakeLists.txt└───main.cpp└───Main.qml└───MyObject.cpp└───MyObject.h└───MyRectangle.qmlCMakeLists.txtcmake_minimum_required(VERSION3.16)project(qmltest02......
  • youcomplete的vimrc配置文件demo
    离开youcompleteme,vim的使用体验差很多。下面是vimrc文件中ycm相关配置demo。对于C/C++语言的语法补齐需要安装clangd.setnusetexpandtabsettabstop=4setshiftwidth=4setmouse=asetshell=bashsethlssetencoding=utf-8"YoucomPleteMe:语句补全插件",reftoh......