实现效果:
设计思路:
1. 开启数据库及表的cdc,定时查询cdc表数据,封装sql语句(通过执行类型,主键;修改类型的cdc数据只取最后更新的记录),添加到离线数据表;
2. 线程定时查询离线数据表,更新远程库数据;
3. 远程库数据被更改又会产生cdc数据,对此数据进行拦截;
配置文件说明:
{
"AsyncInterval": 30000,
"Drivers": [
{
"RefreshTime": 5000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "dbsync2|student,table1,table2,table3", "dbsync3|*" ],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
}
]
}
或
{
"AsyncInterval": 25000,
"Drivers": [
{
"RefreshTime": 10000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "testsync1|*"],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
},
{
"RefreshTime": 10000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "testsync1|*" ],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
}
]
}
1. 设置同步间隔时间
2. 根据不同的配置文件,加载不同的模式,多驱动(Drivers 1主1备-单向同步,1主1主-双向同步,2主1备-多库汇总),多机同步(DstConnect),多库同多表同步(SrcMap,dbsync2|*表示监听该数据库下的所有表),设置刷新时间(RefreshTime),是否启用(Enable),是否重置cdc数据(SrcUpdateCDC)
数据表说明:
async_data 离线数据表
id 主键自增 INTEGER
connect_str 连接字符串 NVARCHAR(255)
excute_sql 需要同步的sql语句 NVARCHAR(255)
cdc_time cdc时间 DATETIME
event_time event时间 DATETIME
db_name 数据库名 NVARCHAR(255)
table_name 表名 NVARCHAR(255)
table_pk 表主键 NVARCHAR(255)
excute_type 执行类型(I/U/D) NVARCHAR(255)
sqlserver cdc表(日志表)中如果一条id多次更新,取最新一条数据
sqlite asy_data表(离线数据表),入库时,查dbname + table + pk,无记录则添加,有记录比较cdc记录时间,如果时间更新则更新sql语句
特殊数据处理:
uniqueidentifier类型的数据转为NULL,数据中含有'的替换''
核心代码:
using SqlServerAsync.Util.config; using SqlServerAsync.Util.sqlite; using SqlServerAsync.Util.sqlite.model; using System; using System.Collections.Generic; using System.Data; using System.Threading; namespace SqlServerAsync.Util { public class SqlServerCDC { public void Listen(Driver driver) { var update_cdc = driver.SrcUpdateCDC == 1 ? true : false; var enable = driver.Enable == 1 ? true : false; foreach (var map in driver.SrcMap) { StartCDC(driver.SrcConnect, driver.DstConnect, driver.RefreshTime, enable, update_cdc, map); } } void StartCDC(string srcconnect, List<string> dstconnect, int refreshTime, bool enable, bool update_cdc, string map) { try { var freeSql = new FreeSql.FreeSqlBuilder() .UseConnectionString(FreeSql.DataType.SqlServer, srcconnect) .UseNoneCommandParameter(true)// 不使用参数化 .UseAutoSyncStructure(false)// 不同步表结构 .Build(); var arrayMap = map.Split('|'); var db = arrayMap[0]; var tbs = arrayMap[1]; string[] arrayTB = null; var dstStr = string.Join("#", dstconnect); if (!enable) { Program.AddLog($"禁用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}"); return; } string sql = string.Empty; Dictionary<string, Table> dicTable = new Dictionary<string, Table>(); Program.AddLog($"启用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}"); if ("*" == tbs) { // 查询db下所有表名 sql = $"use {db};select TABLE_NAME from {db}.information_schema.tables where TABLE_SCHEMA='dbo' and TABLE_NAME not in('systranschemas','sysdiagrams')"; DataTable dtAll = freeSql.Ado.ExecuteDataTable(sql); var rowCount = dtAll.Rows.Count; if (rowCount > 0) arrayTB = new string[rowCount]; for (int i = 0; i < rowCount; i++) { arrayTB[i] = dtAll.Rows[i]["TABLE_NAME"].ToString(); } } else { arrayTB = tbs.Split(','); } if (null == arrayTB || 0 == arrayTB.Length) { Program.AddLog($"数据库{db},查无数据表 ×"); return; } // 开启SQL Server数据库CDC sql = $"use {db};if exists(select 1 from {db}.sys.databases where name='{db}' and is_cdc_enabled=0)\n" + "begin\n" + $"exec {db}.sys.sp_cdc_enable_db\n" + "end"; freeSql.Ado.ExecuteNonQuery(sql); // 查询库cdc是否开启成功 sql = $"use {db};select is_cdc_enabled from {db}.sys.databases where name='{db}'"; DataTable dtCDC_DB = freeSql.Ado.ExecuteDataTable(sql); if (dtCDC_DB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_DB.Rows[0]["is_cdc_enabled"])) { Program.AddLog($"数据库CDC开启失败({db}) ×"); return; } Program.AddLog($"数据库CDC开启成功({db}) √"); foreach (var table in arrayTB) { if (string.IsNullOrEmpty(table)) continue; if (update_cdc) { // 关闭单张表的CDC功能 sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=1)\n" + "begin\n" + $"exec {db}.sys.sp_cdc_disable_table @source_schema='dbo',@source_name='{table}',@capture_instance='dbo_{table}'" + "end"; freeSql.Ado.ExecuteNonQuery(sql); } // 开启单张表的CDC功能 sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=0)\n" + "begin\n" + $"exec {db}.sys.sp_cdc_enable_table\n" + "@source_schema='dbo',\n" + $"@source_name='{table}',\n" + "@capture_instance=NULL,\n" + "@supports_net_changes=1,\n" + "@role_name=NULL\n" + "end"; freeSql.Ado.ExecuteNonQuery(sql); // 查询表cdc是否开启成功 sql = $"use {db};select is_tracked_by_cdc from {db}.sys.tables WHERE name='{table}'"; DataTable dtCDC_TB = freeSql.Ado.ExecuteDataTable(sql); if (dtCDC_TB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_TB.Rows[0]["is_tracked_by_cdc"])) { Program.AddLog($"数据表CDC开启失败({table}) ×"); continue; } Program.AddLog($"数据表CDC开启成功({table}) √"); Table tb = new Table() { Name = table }; // 获取字段名,是否主键,字段类型 sql = $"use {db};SELECT distinct col.name AS 'Name', idx.is_primary_key as 'IsPK',TYPE_NAME(system_type_id) as 'Type'\n" + $"FROM sys.columns col\n" + $"LEFT JOIN sys.index_columns idxcol ON col.object_id=idxcol.object_id AND col.column_id=idxcol.column_id\n" + $"LEFT JOIN sys.indexes idx ON idxcol.object_id=idx.object_id AND idxcol.index_id=idx.index_id\n" + $"WHERE col.object_id=OBJECT_ID('{table}')"; List<Field> lstField = freeSql.Ado.Query<Field>(sql); foreach (var field in lstField) { var ispk = Convert.ToBoolean(field.IsPK); if (ispk) { tb.LstPKField.Add(field);// 主键,用于更新删除 } else { tb.LstDataField.Add(field); } } dicTable.Add(table, tb); } Program.AddLog($"监听成功,{db}"); // 定时轮询 ThreadPool.QueueUserWorkItem(delegate { Dictionary<string, string> dicTBUpdatePK = new Dictionary<string, string>(); while (true) { try { foreach (var item in dicTable) { dicTBUpdatePK.Clear(); var table_name = item.Key; var tableEntity = item.Value; // cdc表查询 //__$start_lsn :与相应更改的提交事务关联的日志序列号(LSN) //__$end_lsn : (在 SQL Server 2008中,此列始终为 NULL) //__$seqval :对事务内的行更改顺序 //__$operation :源表DML操作 var cdc_table_name = $"{db}.cdc.dbo_{table_name}_CT"; sql = $"use {db};select sys.fn_cdc_map_lsn_to_time(__$start_lsn) as cdctime,* from {cdc_table_name}";// 查询cdc时间 var dt = freeSql.Ado.ExecuteDataTable(sql); table_name = $"{db}.dbo." + table_name; for (int i = 0; i < dt.Rows.Count; i++) { var row = dt.Rows[i]; var lstPKField = tableEntity.LstPKField; var lstDataField = tableEntity.LstDataField; var cdctime = Convert.ToDateTime(row["cdctime"]); var operation = Convert.ToInt32(row["__$operation"]); var seqval = (byte[])(row["__$seqval"]);// __$start_lsn代表事件时间,并发时,会有相同的情况,改用__$seqval var str_seqval = BitConverter.ToString(seqval, 0).Replace("-", string.Empty); if (3 == operation) { continue; } var sql_cdc_execute = string.Empty; string table_pk = string.Empty; foreach (var field1 in lstPKField) { table_pk += field1.Name + "='" + row[field1.Name] + "' and "; } table_pk = table_pk.Substring(0, table_pk.Length - 5); string cdc_dic_pk = table_name + ";" + table_pk; // cdc表中过滤多条表中一条记录多次更新,取最新一条数据(查询过的数据利用字典存储) string str_seqval1 = string.Empty; if (4 == operation) { if (dicTBUpdatePK.ContainsKey(cdc_dic_pk)) { str_seqval1 = dicTBUpdatePK[cdc_dic_pk]; } else { // 查询多次更新后的最新值 sql = $"use {db};select top 1 __$seqval from {cdc_table_name} where {table_pk} and __$operation=4 order by __$seqval desc"; var dtlsn = freeSql.Ado.ExecuteDataTable(sql); var seqval1 = (byte[])(dtlsn.Rows[0]["__$seqval"]); str_seqval1 = BitConverter.ToString(seqval1, 0).Replace("-", string.Empty); dicTBUpdatePK.Add(cdc_dic_pk, str_seqval1); } } // 删除cdc表数据 sql = $"use {db};delete from {cdc_table_name} where __$seqval=CONVERT(BINARY(10), '{str_seqval}', 2)"; freeSql.Ado.ExecuteNonQuery(sql); string excute_type = string.Empty; switch (operation) { case 1: // 删除 excute_type = BaseEnum.Delete; sql_cdc_execute = $"delete from {table_name} where {table_pk}"; break; case 2: // 插入 excute_type = BaseEnum.Insert; string insertField = string.Empty; string insertValue = string.Empty; foreach (var field1 in lstPKField) { insertField += field1.Name + ","; insertValue += HandleSpecialData(field1.Type, row[field1.Name]) + ","; } foreach (var field2 in lstDataField) { insertField += field2.Name + ","; insertValue += HandleSpecialData(field2.Type, row[field2.Name]) + ","; } insertField = insertField.Substring(0, insertField.Length - 1); insertValue = insertValue.Substring(0, insertValue.Length - 1); sql_cdc_execute = $"insert into {table_name} ({insertField}) values({insertValue})"; break; case 3: break; case 4: // 修改 if (str_seqval == str_seqval1)// 最新的数据 { excute_type = BaseEnum.Update; string updateData = string.Empty; foreach (var field2 in lstDataField) { updateData += field2.Name + "=" + HandleSpecialData(field2.Type, row[field2.Name]) + ","; } updateData = updateData.Substring(0, updateData.Length - 1); sql_cdc_execute = $"update {table_name} set {updateData} where {table_pk}"; } break; } if (!string.IsNullOrEmpty(sql_cdc_execute)) { foreach (var dst in dstconnect) { bool add = true; string key1 = srcconnect + "_" + table_name + "_" + table_pk; // A同步B,B更新后,CDC日志返回A,这边做截取 if (Program.DicExecuted.ContainsKey(key1)) { add = false; string removedValue; Program.DicExecuted.TryRemove(key1, out removedValue); } else { // 修改以最后时间的数据为准 var entity = SqliteHelper.GetUpdateAsyncData(db, table_name, table_pk); if (null == entity) { var asyncdata = new AsyncData() { ConnectStr = dst, ExcuteSQL = sql_cdc_execute, CDCTime = cdctime, EventTime = DateTime.Now, DBName = db, TableName = table_name, TablePK = table_pk, ExcuteType = excute_type }; SqliteHelper.InsertAsyncData(asyncdata); } else { // 比较时间 if (DateTime.Compare(entity.CDCTime, cdctime) < 0) { SqliteHelper.UpdateAsyncData(dst, sql_cdc_execute, entity.Id); } else { add = false; } } if (add) { if (dst.Contains("192.168.8.81")) { Console.WriteLine("111"); } Program.AddLog($"添加,dst:{dst},sql:{sql_cdc_execute}"); } } } } } } } catch (Exception ex) { Program.AddLog($"Listen Error,ex:{ex.Message}"); } Thread.Sleep(refreshTime); } }); } catch (Exception ex) { Program.AddLog($"[Error] 初始化CDC异常,errmsg:{ex.Message}"); } } /// <summary> /// 特殊数据类型处理 1. uniqueidentifier为空时,设置为NULL;2. 单引号,转成双号 /// </summary> /// <param name="val"></param> /// <returns></returns> public string HandleSpecialData(string type, object val) { if (null == val) return string.Empty; string ret = val.ToString(); bool special = false; if ("uniqueidentifier" == type.ToLower())// 特殊数据类型处理 { if (string.IsNullOrEmpty(ret)) { special = true; ret = "NULL"; } } if (!special) { if (ret.Contains("'")) { ret = ret.Replace("'", "''");// 把单引号转成双引号 } ret = $"'{ret}'"; } return ret; } } public class Table { public string Name { get; set; } public List<Field> LstPKField { get; set; } = new List<Field>(); public List<Field> LstDataField { get; set; } = new List<Field>(); } public class Field { public string Name { get; set; } public string IsPK { get; set; } public string Type { get; set; }// GUID,uniqueidentifier为空时,改为NULL } }
标签:string,cdc,C#,db,数据库,SqlServer,sql,var,table From: https://www.cnblogs.com/chen1880/p/17610733.html