此处调用官方sink demo,更新和删除逻辑还需要再判断实现。
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class SqlserverCDCSourceSink {
public static void main(String[] args) {
SingleTest();
}
//Single Thread Reading
public static void SingleTest(){
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
.hostname("localhost")
.port(1433)
.database("flinkcdc") // monitor sqlserver database
.tableList("dbo.c2") // monitor products table
.username("sa")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(sourceFunction);// use parallelism 1 for sink to keep message ordering
SingleOutputStreamOperator<C2> map = ds.map(new StrToC2());
map.addSink(JdbcSink.sink(
"insert into c2_sink(id2,name2) values (?,?)",
(ps, t) -> {
ps.setInt(1, t.id2);
ps.setString(2, t.name2);
},
JdbcExecutionOptions.builder()
.withBatchSize(1) // 批次大小,条数
.withBatchIntervalMs(1000) // 批次最大等待时间
.withMaxRetries(1) // 重复次数
.build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
.withUsername("sa")
.withPassword("123456")
.build()
));
try{
env.execute();
}catch (Exception e){
e.printStackTrace();
}
}
}
标签:flink,sqlserver,SqlserverCDCSourceSink,org,apache,import,com From: https://www.cnblogs.com/huft/p/18259704