代码片段:
1 using DataSync.Core; 2 using Furion.Logging.Extensions; 3 using Microsoft.Data.SqlClient; 4 using Microsoft.Extensions.Logging; 5 using System.Data; 6 namespace DataSync.Application.DataSync.Services 7 { 8 public class DataSyncServices : IDataSyncData, ITransient 9 { 10 private readonly object lockObj = new object(); 11 /// <summary> 12 /// 客户端向服务端同步 13 /// </summary> 14 /// <param name="clientConn"></param> 15 /// <param name="serviceConn">目标数据库</param> 16 /// <returns></returns> 17 public string SyncDataForClient(string clientConn, string serviceConn) 18 { 19 return SyncData(clientConn, serviceConn); 20 } 21 /// <summary> 22 /// 服务端向客户端同步 23 /// </summary> 24 /// <param name="serviceConn"></param> 25 /// <param name="clientConn"></param> 26 /// <returns></returns> 27 public string SyncDataForServer(string serviceConn, string clientConn) 28 { 29 return SyncData(serviceConn, clientConn); 30 } 31 /// <summary> 32 /// 数据同步 33 /// </summary> 34 private string SyncData(string sourceConn, string targetConn) 35 { 36 try 37 { 38 39 //源数据库 数据源链接 40 SqlSugarScope sourceDb = new SqlSugarScope(new ConnectionConfig() 41 { 42 DbType = SqlSugar.DbType.SqlServer, 43 ConnectionString = sourceConn, 44 IsAutoCloseConnection = true, 45 AopEvents = new AopEvents 46 { 47 OnLogExecuting = (sql, ps) => 48 { 49 #if DEBUG 50 Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}"); 51 #endif 52 } 53 } 54 }); 55 //目标数据库 数据源链接 56 SqlSugarScope targetDb = new SqlSugarScope(new ConnectionConfig() 57 { 58 DbType = SqlSugar.DbType.SqlServer, 59 ConnectionString = targetConn, 60 IsAutoCloseConnection = true, 61 AopEvents = new AopEvents 62 { 63 OnLogExecuting = (sql, ps) => 64 { 65 #if DEBUG 66 Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}"); 67 #endif 68 } 69 } 70 }); 71 //使用sqlsugar 初始化目标数据库 72 targetDb.DbMaintenance.CreateDatabase(); 73 var tableNames = sourceDb.DbMaintenance.GetTableInfoList(false).Select(t => t.Name).ToList(); // 调用函数获取所有表名 74 var syncBlackTable = App.GetConfig<List<string>>("SyncBlackTable"); 75 tableNames = tableNames.Except(syncBlackTable).ToList(); 76 //多线程 77 Parallel.ForEach(tableNames, tableName => 78 { 79 lock (lockObj) 80 { 81 // 根据表名从源数据库中获取数据并存入 DataTable 82 var targetdataTable = DataTableHelper.FetchDataFromTable(tableName, sourceDb); 83 //判断数据表在目标库是否存在 84 var flagTargetTab = targetDb.DbMaintenance.IsAnyTable(tableName); 85 if (!flagTargetTab) 86 { 87 // 创建表的SQL语句 88 var createTableSql = $"CREATE TABLE {tableName} ("; 89 if (targetdataTable != null && targetdataTable.Rows.Count > 0) 90 { 91 //目标数据库写入-先移除数据同步标识 92 DataBaseInfoService.DatatableRemoveCloumns(targetdataTable); 93 // 遍历DataTable的列 94 foreach (DataColumn column in targetdataTable.Columns) 95 { 96 string columnName = column.ColumnName; 97 string dataType = DataBaseInfoService.GetSqlDataType(column.DataType); 98 99 createTableSql += $"{columnName} {dataType}, "; 100 } 101 createTableSql = createTableSql.TrimEnd(',', ' ') + ")"; 102 // 创建表 103 targetDb.Ado.ExecuteCommand(createTableSql); 104 105 ("TargetTable : " + tableName + ",创建成功").LogInformation(); 106 // } 107 //} 108 } 109 } 110 //AppSys 111 if (tableName.ToUpper().Equals("APPSYS")) 112 { 113 AppSysDataSync.SyncData(tableName, sourceDb, targetDb); 114 } 115 var selectCountSql = $"SELECT COUNT(*) FROM {tableName} "; 116 117 var sourceCount = sourceDb.Ado.GetInt(selectCountSql); 118 119 var middleCount = targetDb.Ado.GetInt(selectCountSql); 120 //增量 121 if (sourceCount > middleCount) 122 { 123 // commandTarget.Connection = connTarget; 124 // commandTarget.CommandType = CommandType.Text; 125 //查询数据 126 var selectTableSql = $"SELECT * FROM {tableName}"; 127 //创建datatable(源数据) 128 var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); 129 130 if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) 131 { 132 //新增列 MD5 133 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); 134 } 135 //创建datatable(目标表数据) 136 var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql); 137 138 if (targetDataTable != null && targetDataTable.Rows.Count > 0) 139 { 140 //新增列 MD5 141 DataBaseInfoService.DataTableAddColumsMd5(targetDataTable); 142 } 143 // 计算差集 144 var tempTable = new DataTable(); 145 var tempExceptTable = (from source in sourceDataTable.AsEnumerable() 146 where 147 !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains( 148 source.Field<string>("MD5")) 149 select source); 150 if (tempExceptTable != null && tempExceptTable.Count() > 0) 151 { 152 tempTable = tempExceptTable.CopyToDataTable(); 153 } 154 //批量插入数据 155 if (tempTable != null && tempTable.Rows.Count > 0) 156 { 157 //目标数据库写入-先移除数据同步标识,MD5标识 158 DataBaseInfoService.DatatableRemoveCloumns(tempTable); 159 var connTarget = new SqlConnection(targetConn); 160 DataBaseInfoService.DataBulkCopy(connTarget, tableName, tempTable); 161 162 // TargetDataScope.Db.Fastest<DataTable>().AS(tableName).BulkCopy(tempTable); 163 } 164 } 165 //删除 166 else if (sourceCount < middleCount) 167 { 168 //查询数据 169 var selectTableSql = $"SELECT * FROM {tableName}"; 170 //创建datatable(源数据) 171 var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); 172 173 if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) 174 { 175 //新增列 MD5 176 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); 177 } 178 //创建datatable 179 var taergetTable = targetDb.Ado.GetDataTable(selectTableSql); 180 if (taergetTable != null && taergetTable.Rows.Count > 0) 181 { 182 //新增列 MD5 183 DataBaseInfoService.DataTableAddColumsMd5(taergetTable); 184 } 185 // 计算差集 186 var tempTable = new DataTable(); 187 var tempExceptTable = (from target in taergetTable.AsEnumerable() 188 where 189 !(from source in sourceDataTable.AsEnumerable() select source.Field<string>("MD5")).Contains( 190 target.Field<string>("MD5")) 191 select target); 192 if (tempExceptTable != null && tempExceptTable.Count() > 0) 193 { 194 tempTable = tempExceptTable.CopyToDataTable(); 195 } 196 if (tempTable != null && tempTable.Rows.Count > 0) 197 { 198 //获取主键字段 199 var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName); 200 //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget); 201 //获取自增列 202 var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName); 203 if (PrimaryKeyName != null && PrimaryKeyName.Count > 0) 204 { 205 foreach (DataRow row in tempTable.Rows) 206 { 207 var deleteDataSql = DataTableHelper.ConstructDeleteSql(tableName, PrimaryKeyName, Identities, row); 208 //$"DELETE FROM {tableName} WHERE {PrimaryKeyName} ='{row[PrimaryKeyName[0]]}'"; 209 //目标数据数据操作对象 210 targetDb.Ado.ExecuteCommand(deleteDataSql); 211 212 } 213 } 214 } 215 } 216 //更新 217 else 218 { 219 //判断是否存在需要更新的记录 220 //和目标表比较取差集 221 //查询数据 222 var selectTableSql = $"SELECT * FROM {tableName}"; 223 //创建datatable(源数据) 224 var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); 225 if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) 226 { 227 //新增列 MD5 228 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); 229 } 230 //创建datatable(目标表数据) 231 var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql); 232 if (targetDataTable != null && targetDataTable.Rows.Count > 0) 233 { 234 //新增列 MD5 235 DataBaseInfoService.DataTableAddColumsMd5(targetDataTable); 236 } 237 // 计算差集 238 var tempTable = new DataTable(); 239 var tempExceptTable = (from source in sourceDataTable.AsEnumerable() 240 where 241 !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains( 242 source.Field<string>("MD5")) 243 select source); 244 if (tempExceptTable != null && tempExceptTable.Count() > 0) 245 { 246 tempTable = tempExceptTable.CopyToDataTable(); 247 } 248 if (tempTable != null && tempTable.Rows.Count > 0) 249 { 250 //删除标识列和MD5列 251 DataBaseInfoService.DatatableRemoveCloumns(tempTable); 252 //获取目标表主键字段 253 var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName); 254 //获取自增列 255 var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName); 256 //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget); 257 foreach (DataRow dataRow in tempTable.Rows) 258 { 259 var updateDataSql = DataTableHelper.ConstructUpdateSql(tableName, PrimaryKeyName, Identities, dataRow); 260 targetDb.Ado.ExecuteCommand(updateDataSql); 261 262 } 263 } 264 } 265 } 266 }); 267 } 268 catch (Exception ex) 269 { 270 ("Error occurred while connecting to database or fetching data from tables.\n" + ex.Message).LogError(); 271 return "同步失败。详见错误日志!"; 272 } 273 return "同步完成!"; 274 } 275 276 } 277 }
Gitee下载地址:https://gitee.com/ltf_free/sync-data.git
标签:string,tempTable,数据库,tableName,targetDb,数据表,var,服务器,sourceDataTable From: https://www.cnblogs.com/100234ltf/p/17997538