- org.apache.flink 没有jar包,要换为 com.ververica.cdc
2.com.ververica.cdc 最新的也只有 3.0.1,3.1.1的没有 主要 mvnrepository 仓库没找到
如下是单并行度和多并行度的demo
==============================================================================================
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;
public class SqlserverCDCSource {
public static void main(String[] args) {
singleTest();
//mutiTest();
}
public static void mutiTest(){
SqlServerIncrementalSource<String> sqlServerSource =
new SqlServerSourceBuilder()
.hostname("localhost")
.port(1433)
.databaseList("flinkcdc")
.tableList("dbo.c2")
.username("qsa")
.password("qxx")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
env.fromSource(
sqlServerSource,
WatermarkStrategy.noWatermarks(),
"SqlServerIncrementalSource")
.setParallelism(2)
.print()
.setParallelism(1);
try {
env.execute("Print SqlServer Snapshot + Change Stream");
}
catch (Exception e) {
e.printStackTrace();
}
}
//Single Thread Reading
public static void singleTest(){
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
.hostname("localhost")
.port(1433)
.database("flinkcdc") // monitor sqlserver database
.tableList("dbo.c2") // monitor products table
.username("daa")
.password("1qxxsa")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
try{
env.execute();
}catch (Exception e){
e.printStackTrace();
}
}
}
标签:DataStream,方式,cdc,StreamExecutionEnvironment,SqlserverCDCSource,ververica,env,im From: https://www.cnblogs.com/huft/p/18257813