package org.hu.fk.datastream_connector;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
// 捕获变更数据,同步目标库
//根据crud类型,分类同步
public class SqlserverCDCcrudSourceSink {
public static void main(String[] args) {
SingleTest();
}
//Single Thread Reading
public static void SingleTest(){
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
.hostname("localhost")
.port(1433)
.database("flinkcdc") // monitor sqlserver database
.tableList("dbo.c2") // monitor products table
.username("sa")
.password("123456")
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> ds = env.addSource(sourceFunction);// use parallelism 1 for sink to keep message ordering
SingleOutputStreamOperator<C2r> map = ds.map(new MyMap());
OutputTag<C2r> opc=new OutputTag<>("opc", Types.POJO(C2r.class));
OutputTag<C2r> opu=new OutputTag<>("opu", Types.POJO(C2r.class));
OutputTag<C2r> opd=new OutputTag<>("opd", Types.POJO(C2r.class));
SingleOutputStreamOperator<C2r> process = map.process(new ProcessFunction<C2r, C2r>() {
@Override
public void processElement(C2r c2r
, ProcessFunction<C2r, C2r>.Context context
, Collector<C2r> collector) throws Exception {
String op = c2r.getOp();
System.out.println("processElement "+c2r);
if(c2r.getId2()>=1) {
if ("c".equals(op)) {
context.output(opc, c2r);
} else if ("u".equals(op)) {
context.output(opu, c2r);
} else if ("d".equals(op)) {
context.output(opd, c2r);
}
} else {
collector.collect(c2r);
}
}
});
SideOutputDataStream<C2r> opcc = process.getSideOutput(opc);
SideOutputDataStream<C2r> opcu = process.getSideOutput(opu);
SideOutputDataStream<C2r> opcd = process.getSideOutput(opd);
opcc.addSink(JdbcSink.sink(
"insert into c2_sink(id2,name2) values (?,?)",
//"update c2_sink set name2=? where id2=?",
// "delete from c2_sink where id2=?",
(ps, t) -> {
ps.setInt(1, t.id2);
ps.setString(2, t.name2);
//ps.setInt(1, t.id2);
},
JdbcExecutionOptions.builder()
.withBatchSize(1) // 批次大小,条数
.withBatchIntervalMs(1000) // 批次最大等待时间
.withMaxRetries(1) // 重复次数
.build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
.withUsername("sa")
.withPassword("123456")
.build()
));
opcu.addSink(JdbcSink.sink(
//"insert into c2_sink(id2,name2) values (?,?)",
"update c2_sink set name2=? where id2=?",
// "delete from c2_sink where id2=?",
(ps, t) -> {
ps.setInt(2, t.id2);
ps.setString(1, t.name2);
// ps.setInt(1, t.id2);
},
JdbcExecutionOptions.builder()
.withBatchSize(1) // 批次大小,条数
.withBatchIntervalMs(1000) // 批次最大等待时间
.withMaxRetries(1) // 重复次数
.build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
.withUsername("sa")
.withPassword("123456")
.build()
));
opcd.addSink(JdbcSink.sink(
//"insert into c2_sink(id2,name2) values (?,?)",
// "update c2_sink set name2=? where id2=?",
"delete from c2_sink where id2=?",
(ps, t) -> {
//ps.setInt(2, t.id2);
//ps.setString(1, t.name2);
ps.setInt(1, t.id2);
},
JdbcExecutionOptions.builder()
.withBatchSize(1) // 批次大小,条数
.withBatchIntervalMs(1000) // 批次最大等待时间
.withMaxRetries(1) // 重复次数
.build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.withUrl("jdbc:sqlserver://localhost:1433;databaseName=sink_dw")
.withUsername("sa")
.withPassword("123456")
.build()
));
try{
env.execute();
}catch (Exception e){
e.printStackTrace();
}
}
static class MyMap implements MapFunction<String, C2r> {
@Override
public C2r map(String s) throws Exception {
JSONObject source = JSONObject.parseObject(s);
String op = source.getString("op");
C2r c2r = new C2r(0, "", op);
if ("d".equals(op)) {
parseJsonField(source, "before", c2r);
}
if ("c".equals(op) || "u".equals(op)) {
parseJsonField(source, "after", c2r);
}
c2r.setOp(op);
return c2r;
}
private void parseJsonField(JSONObject source, String field, C2r c2r) {
JSONObject jsonField = JSON.parseObject(source.getString(field));
c2r.setId2(jsonField.getInteger("id2")==null?0:jsonField.getInteger("id2"));
c2r.setName2(jsonField.getString("name2")==null?"":jsonField.getString("name2"));
}
}
}
标签:c2r,demo,id2,SqlserverCDCcrudSourceSink,sink,import,apache,org,mssql From: https://www.cnblogs.com/huft/p/18262486