首页 > 数据库 >SqlserverCDCSource DataStream方式

SqlserverCDCSource DataStream方式

时间:2024-06-19 23:54:37浏览次数:27  
标签:DataStream 方式 cdc StreamExecutionEnvironment SqlserverCDCSource ververica env im

  1. org.apache.flink 没有jar包,要换为 com.ververica.cdc
    2.com.ververica.cdc 最新的也只有 3.0.1,3.1.1的没有 主要 mvnrepository 仓库没找到
    如下是单并行度和多并行度的demo
    ==============================================================================================

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;

public class SqlserverCDCSource {
public static void main(String[] args) {
singleTest();
//mutiTest();
}

public static void mutiTest(){
    SqlServerIncrementalSource<String> sqlServerSource =
            new SqlServerSourceBuilder()
                    .hostname("localhost")
                    .port(1433)
                    .databaseList("flinkcdc")
                    .tableList("dbo.c2")
                    .username("qsa")
                    .password("qxx")
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    .startupOptions(StartupOptions.initial())
                    .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    // set the source parallelism to 2
    env.fromSource(
                    sqlServerSource,
                    WatermarkStrategy.noWatermarks(),
                    "SqlServerIncrementalSource")
            .setParallelism(2)
            .print()
            .setParallelism(1);

try {
env.execute("Print SqlServer Snapshot + Change Stream");
}
catch (Exception e) {
e.printStackTrace();
}
}

//Single Thread Reading
public static void singleTest(){
    SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
            .hostname("localhost")
            .port(1433)
            .database("flinkcdc") // monitor sqlserver database
            .tableList("dbo.c2") // monitor products table
            .username("daa")
            .password("1qxxsa")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(sourceFunction)
            .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    try{
        env.execute();
    }catch (Exception e){
        e.printStackTrace();
    }
}

}

标签:DataStream,方式,cdc,StreamExecutionEnvironment,SqlserverCDCSource,ververica,env,im
From: https://www.cnblogs.com/huft/p/18257813

相关文章

  • 3种方式自动化控制APP
    自动化控制APP不管是在工作还是生活方面,都可以帮助我们高效地完成任务,节省时间和精力。本文主要介绍自动化控制APP的3种常用方式。1、Python+adb这种方式需要对Android有一些基本的了解。adb是一种用于调试Android应用程序的工具。使用Python和adb可以轻松实现自动化控......
  • ChatGPT Plus GPT-4o Claude 3 Opus合租拼车全新方式
    无需自己搭建,登录即可用,国内直连访问,聚合多家最强大模型,随意选择使用。立即体验datapipe.top支持OpenAI最新GPT-4o,获得快速高质量的对话,保证可用配额。支持多种大模型,GPT-4o,Claude-3,Llama3等最强模型。AskInternet联网搜索用于替代传统搜索,更高效。支持GPT-3.5/K......
  • WPF/C#:显示分组数据的两种方式
    前言本文介绍自己在遇到WPF对数据进行分组显示的需求时,可以选择的两种方案。一种方案基于ICollectionView,另一种方案基于IGrouping。基于ICollectionView实现相关cs代码:[ObservableProperty]privateObservableCollection<People>people;publicGroupDemoViewModel(){......
  • 程序猿大战Python——文件操作、异常、模块——常见处理异常方式
    快速入门异常==目标:==掌握异常的快速入门使用。当程序中遇到了异常时,通常程序会出现崩溃情况。为了不让程序崩溃,就可以使用异常来快速处理。异常处理语法:try: 可能发生异常的代码except: 如果出现异常时,执行的代码说明:try、except都是关键字,用于处理异......
  • IPv6转换技术是什么?浅谈IPv6转换的两种技术方式
    与双栈技术和隧道技术相比,IPv6转换技术具备改造周期短、成本低、部署灵活等优势,是目前各大政企网站进行IPv6升级改造的主要方式。采用协议转换实现IPv4到IPv6过渡的优点是不需要进行IPv4、IPv6节点的升级改造,缺点是用来实现IPv4节点和IPv6节点相互访问的方法比较复杂,网络设备......
  • 255页10万字大数据中心架构、存储、基础设施建设和运维方案WORD(文末附123相关资料下载
    原文《大数据中心架构、大数据存储、数据中心基础设施建设和运维方案》更多参考资料及相关文档下载见文末​大数据中心架构是一个集数据存储、处理、分析和管理于一体的综合性平台。其设计旨在实现高效的数据吞吐、稳定的运行性能和灵活的资源扩展。整个架构采用分层设计,......
  • 离散制造业数字化智能工厂及MES一站式生产运营管理平台建设方案(文末附120份相关资料下
    原文《离散制造业数字化智能工厂及MES一站式生产运营管理平台建设方案》PPT格式,主要从生产方式智能化、产品与服务智能化、生产装备智能化、供应链仓储智能化、智能工厂逻辑架构、智能工厂总体架构、智能工厂整体应用方案、智能工厂基础建设、智能工厂生产控制、智能工厂生产......
  • 哈希查找(按个位取余的方式)
    步骤:1.构建哈希表进行分类2.进行哈希查找(算法)处理冲突:按个位取余发现有两个或多个数重复,如:会发现15,25,55,重叠了,对此进行冲突处理,冲突处理有三种:1.地址偏移法以上图为例,将和25按个位取余后重复的数填到25后面的位置,如果相邻的位置满了,在向后填。2.再哈希法 再次使......
  • NetMvc通过亚马逊方式服务器端和客户端上传MinIO顺利解决
    前言:1、由于项目是.NETFramework4.7MVCLayUI,所以需要找一个资源站点存放项目中静态资源文件;2、需要支持服务端和客户端都支持上传文件方式;3、调用简单,涉及库越少越好。结果:调用AWSSDK.S3和AWSSDK.Core实现文件上传到MinIO;调用MimeMapping获取文件ContentType......
  • 现代分布式数据库 数据分布方式 Round-Robin、Range、List 和 Hash
    现代分布式数据库中,常见的数据分布方式有如下几种:Round-Robin、Range、List和Hash。如下图所示: 数据分布|StarRockshttps://docs.starrocks.io/zh/docs/table_design/Data_distribution/StarRocks的数据分布方式​StarRocks支持单独和组合使用数据分布方式。说明除......