SQL Server 数据库同步方案(.NET程序实现)
代码片段:
1 using DataSync.Core; 2 using Furion.Logging.Extensions; 3 using Microsoft.Data.SqlClient; 4 using Microsoft.Extensions.Logging; 5 using System.Data; 6 namespace DataSync.Application.DataSync.Services 7 { 8 public class DataSyncServices : IDataSyncData, ITransient 9 { 10 private readonly object lockObj = new object(); 11 /// <summary> 12 /// 客户端向服务端同步 13 /// </summary> 14 /// <param name="clientConn"></param> 15 /// <param name="serviceConn">目标数据库</param> 16 /// <returns></returns> 17 public string SyncDataForClient(string clientConn, string serviceConn) 18 { 19 return SyncData(clientConn, serviceConn); 20 } 21 /// <summary> 22 /// 服务端向客户端同步 23 /// </summary> 24 /// <param name="serviceConn"></param> 25 /// <param name="clientConn"></param> 26 /// <returns></returns> 27 public string SyncDataForServer(string serviceConn, string clientConn) 28 { 29 return SyncData(serviceConn, clientConn); 30 } 31 /// <summary> 32 /// 数据同步 33 /// </summary> 34 private string SyncData(string sourceConn, string targetConn) 35 { 36 try 37 { 38 39 //源数据库 数据源链接 40 SqlSugarScope sourceDb = new SqlSugarScope(new ConnectionConfig() 41 { 42 DbType = SqlSugar.DbType.SqlServer, 43 ConnectionString = sourceConn, 44 IsAutoCloseConnection = true, 45 AopEvents = new AopEvents 46 { 47 OnLogExecuting = (sql, ps) => 48 { 49 #if DEBUG 50 Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}"); 51 #endif 52 } 53 } 54 }); 55 //目标数据库 数据源链接 56 SqlSugarScope targetDb = new SqlSugarScope(new ConnectionConfig() 57 { 58 DbType = SqlSugar.DbType.SqlServer, 59 ConnectionString = targetConn, 60 IsAutoCloseConnection = true, 61 AopEvents = new AopEvents 62 { 63 OnLogExecuting = (sql, ps) => 64 { 65 #if DEBUG 66 Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}"); 67 #endif 68 } 69 } 70 }); 71 //使用sqlsugar 初始化目标数据库 72 targetDb.DbMaintenance.CreateDatabase(); 73 var tableNames = sourceDb.DbMaintenance.GetTableInfoList(false).Select(t => t.Name).ToList(); // 调用函数获取所有表名 74 var syncBlackTable = App.GetConfig<List<string>>("SyncBlackTable"); 75 tableNames = tableNames.Except(syncBlackTable).ToList(); 76 //多线程 77 Parallel.ForEach(tableNames, tableName => 78 { 79 lock (lockObj) 80 { 81 // 根据表名从源数据库中获取数据并存入 DataTable 82 var targetdataTable = DataTableHelper.FetchDataFromTable(tableName, sourceDb); 83 //判断数据表在目标库是否存在 84 var flagTargetTab = targetDb.DbMaintenance.IsAnyTable(tableName); 85 if (!flagTargetTab) 86 { 87 // 创建表的SQL语句 88 var createTableSql = $"CREATE TABLE {tableName} ("; 89 if (targetdataTable != null && targetdataTable.Rows.Count > 0) 90 { 91 //目标数据库写入-先移除数据同步标识 92 DataBaseInfoService.DatatableRemoveCloumns(targetdataTable); 93 // 遍历DataTable的列 94 foreach (DataColumn column in targetdataTable.Columns) 95 { 96 string columnName = column.ColumnName; 97 string dataType = DataBaseInfoService.GetSqlDataType(column.DataType); 98 99 createTableSql += $"{columnName} {dataType}, "; 100 } 101 createTableSql = createTableSql.TrimEnd(',', ' ') + ")"; 102 // 创建表 103 targetDb.Ado.ExecuteCommand(createTableSql); 104 105 ("TargetTable : " + tableName + ",创建成功").LogInformation(); 106 // } 107 //} 108 } 109 } 110 //AppSys 111 if (tableName.ToUpper().Equals("APPSYS")) 112 { 113 AppSysDataSync.SyncData(tableName, sourceDb, targetDb); 114 } 115 var selectCountSql = $"SELECT COUNT(*) FROM {tableName} "; 116 117 var sourceCount = sourceDb.Ado.GetInt(selectCountSql); 118 119 var middleCount = targetDb.Ado.GetInt(selectCountSql); 120 //增量 121 if (sourceCount > middleCount) 122 { 123 // commandTarget.Connection = connTarget; 124 // commandTarget.CommandType = CommandType.Text; 125 //查询数据 126 var selectTableSql = $"SELECT * FROM {tableName}"; 127 //创建datatable(源数据) 128 var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); 129 130 if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) 131 { 132 //新增列 MD5 133 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); 134 } 135 //创建datatable(目标表数据) 136 var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql); 137 138 if (targetDataTable != null && targetDataTable.Rows.Count > 0) 139 { 140 //新增列 MD5 141 DataBaseInfoService.DataTableAddColumsMd5(targetDataTable); 142 } 143 // 计算差集 144 var tempTable = new DataTable(); 145 var tempExceptTable = (from source in sourceDataTable.AsEnumerable() 146 where 147 !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains( 148 source.Field<string>("MD5")) 149 select source); 150 if (tempExceptTable != null && tempExceptTable.Count() > 0) 151 { 152 tempTable = tempExceptTable.CopyToDataTable(); 153 } 154 //批量插入数据 155 if (tempTable != null && tempTable.Rows.Count > 0) 156 { 157 //目标数据库写入-先移除数据同步标识,MD5标识 158 DataBaseInfoService.DatatableRemoveCloumns(tempTable); 159 var connTarget = new SqlConnection(targetConn); 160 DataBaseInfoService.DataBulkCopy(connTarget, tableName, tempTable); 161 162 // TargetDataScope.Db.Fastest<DataTable>().AS(tableName).BulkCopy(tempTable); 163 } 164 } 165 //删除 166 else if (sourceCount < middleCount) 167 { 168 //查询数据 169 var selectTableSql = $"SELECT * FROM {tableName}"; 170 //创建datatable(源数据) 171 var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); 172 173 if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) 174 { 175 //新增列 MD5 176 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); 177 } 178 //创建datatable 179 var taergetTable = targetDb.Ado.GetDataTable(selectTableSql); 180 if (taergetTable != null && taergetTable.Rows.Count > 0) 181 { 182 //新增列 MD5 183 DataBaseInfoService.DataTableAddColumsMd5(taergetTable); 184 } 185 // 计算差集 186 var tempTable = new DataTable(); 187 var tempExceptTable = (from target in taergetTable.AsEnumerable() 188 where 189 !(from source in sourceDataTable.AsEnumerable() select source.Field<string>("MD5")).Contains( 190 target.Field<string>("MD5")) 191 select target); 192 if (tempExceptTable != null && tempExceptTable.Count() > 0) 193 { 194 tempTable = tempExceptTable.CopyToDataTable(); 195 } 196 if (tempTable != null && tempTable.Rows.Count > 0) 197 { 198 //获取主键字段 199 var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName); 200 //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget); 201 //获取自增列 202 var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName); 203 if (PrimaryKeyName != null && PrimaryKeyName.Count > 0) 204 { 205 foreach (DataRow row in tempTable.Rows) 206 { 207 var deleteDataSql = DataTableHelper.ConstructDeleteSql(tableName, PrimaryKeyName, Identities, row); 208 //$"DELETE FROM {tableName} WHERE {PrimaryKeyName} ='{row[PrimaryKeyName[0]]}'"; 209 //目标数据数据操作对象 210 targetDb.Ado.ExecuteCommand(deleteDataSql); 211 212 } 213 } 214 } 215 } 216 //更新 217 else 218 { 219 //判断是否存在需要更新的记录 220 //和目标表比较取差集 221 //查询数据 222 var selectTableSql = $"SELECT * FROM {tableName}"; 223 //创建datatable(源数据) 224 var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); 225 if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) 226 { 227 //新增列 MD5 228 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); 229 } 230 //创建datatable(目标表数据) 231 var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql); 232 if (targetDataTable != null && targetDataTable.Rows.Count > 0) 233 { 234 //新增列 MD5 235 DataBaseInfoService.DataTableAddColumsMd5(targetDataTable); 236 } 237 // 计算差集 238 var tempTable = new DataTable(); 239 var tempExceptTable = (from source in sourceDataTable.AsEnumerable() 240 where 241 !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains( 242 source.Field<string>("MD5")) 243 select source); 244 if (tempExceptTable != null && tempExceptTable.Count() > 0) 245 { 246 tempTable = tempExceptTable.CopyToDataTable(); 247 } 248 if (tempTable != null && tempTable.Rows.Count > 0) 249 { 250 //删除标识列和MD5列 251 DataBaseInfoService.DatatableRemoveCloumns(tempTable); 252 //获取目标表主键字段 253 var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName); 254 //获取自增列 255 var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName); 256 //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget); 257 foreach (DataRow dataRow in tempTable.Rows) 258 { 259 var updateDataSql = DataTableHelper.ConstructUpdateSql(tableName, PrimaryKeyName, Identities, dataRow); 260 targetDb.Ado.ExecuteCommand(updateDataSql); 261 262 } 263 } 264 } 265 } 266 }); 267 } 268 catch (Exception ex) 269 { 270 ("Error occurred while connecting to database or fetching data from tables.\n" + ex.Message).LogError(); 271 return "同步失败。详见错误日志!"; 272 } 273 return "同步完成!"; 274 } 275 276 } 277 }
//--DataTableHelper
using Furion; using Furion.Logging.Extensions; using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.Data.SqlClient; using SqlSugar; using System; using System.Collections.Generic; using System.Data; using System.Linq; namespace DataSync.Core { public class DataTableHelper { /// /// 获取所有的数据表 /// /// /// public static List< string > GetAllTables(SqlConnection connection) { List< string > tableNames = new List< string >(); using (SqlCommand command = new SqlCommand()) { command.Connection = connection; command.CommandText = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE' AND TABLE_CATALOG=DB_NAME() ORDER BY TABLE_NAME ASC" ; using (SqlDataReader reader = command.ExecuteReader()) { while (reader.Read()) { tableNames.Add(( string )reader[ "TABLE_NAME" ]); } } } return tableNames; } /// /// 获取datatable /// /// /// /// public static DataTable FetchDataFromTable( string tableName, SqlSugarScope scope) { var sql = $ "SELECT * FROM {tableName}" ; DataTable dataTable = scope.Ado.GetDataTable(sql); return dataTable; } /// /// 获取表主键字段 /// /// /// /// public static List< string > GetPrimaryKeyFieldName( string tableName, SqlConnection connectionstring) { var req = new List< string >(); using (SqlCommand command = new SqlCommand()) { command.Connection = connectionstring; command.CommandText = $ "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME='{tableName}' AND CONSTRAINT_NAME LIKE 'PK%'" ; using (SqlDataReader reader = command.ExecuteReader()) { while (reader.Read()) { req.Add(( string )reader[ "COLUMN_NAME" ]); } } } return req; } /// /// 复制主键到目标表 /// /// /// /// /// public static void CopyPrimaryKeyToTargetTable( string sourceTable, string targetTable, SqlConnection sourceConn, SqlConnection targetConn) { var copyQuery = $ @"EXEC sp_helpconstraint '{sourceTable}'" ; using ( var cmd = new SqlCommand(copyQuery, sourceConn)) { using ( var reader = cmd.ExecuteReader()) { while (reader.Read()) { if (( int )reader[ "ConstraintType" ] == 3 && ( bool )reader[ "IsPrimaryKey" ]) { var constraintName = ( string )reader[ "name" ]; var alterQuery = $ "ALTER TABLE {targetTable}" + $ " ADD CONSTRAINT [{constraintName}] FOREIGN KEY ([Id]) REFERENCES {sourceTable}([Id]);" ; using ( var alterCmd = new SqlCommand(alterQuery, targetConn)) { alterCmd.ExecuteNonQuery(); } } } } } } /// /// 根据传来的数据行,自动生成UPDATA的SQL语句,更新条件由程序自动产生 /// ///表名 ///主键 /// ///数据行 /// public static string ConstructUpdateSql( string tableName, List< string > primaryKeyName, List< string > Identities, DataRow dataRow) { string sql = "UPDATE " + tableName + " SET " ; string whereStr = "WHERE " ; if (primaryKeyName != null && primaryKeyName.Count > 0) { //存在多个主键。取自增列做筛选条件 if (primaryKeyName.Count > 1 && Identities != null && Identities.Count > 0) { foreach ( var item in Identities) { whereStr = whereStr + item + @" = '" + dataRow[item].ToString() + @"' and " ; } } else { foreach ( var item in primaryKeyName) { whereStr = whereStr + item + @" = '" + dataRow[item].ToString() + @"' and " ; } } } foreach (DataColumn cloumn in dataRow.Table.Columns) { if (primaryKeyName.Count > 1 && Identities != null && Identities.Count > 0) { if (Identities.Contains(cloumn.ColumnName)) { continue ; //跳过自增列 } } else { if (primaryKeyName.Contains(cloumn.ColumnName)) { continue ; //跳过自增列 } } switch (cloumn.DataType.Name.ToUpper()) { case "STRING" : sql = sql + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' ," ; break ; case "INT" : case "INT16" : case "INT32" : case "INT64" : case "DECIMAL" : sql = sql + cloumn + @" = " + dataRow[cloumn].ToString() + @" ," ; break ; case "DATE" : case "TIME" : case "DATETIME" : sql = sql + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' ," ; break ; case "BOOLEAN" : sql = sql + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' ," ; break ; default : ( "未处理类型:" + cloumn.DataType.Name.ToUpper()).LogError(); break ; } } sql = sql.Substring(0, sql.Length - 1); whereStr = whereStr.Substring(0, whereStr.Length - 4); sql = sql + whereStr; return sql; } /// /// 生成Update语句 /// /// /// /// public static string ConstructUpdateSql( string tableName, DataRow dataRow) { string sql = "UPDATE " + tableName + " SET " ; string whereStr = "WHERE " ; //if (!string.IsNullOrEmpty(primaryKeyName)) //{ // whereStr = whereStr + primaryKeyName + @" = '" + dataRow[primaryKeyName].ToString() + @"' and "; //} //foreach (DataColumn cloumn in dataRow.Table.Columns) //{ // if (cloumn.ColumnName == primaryKeyName) // { // continue;//跳过主键 // } // switch (cloumn.DataType.Name.ToUpper()) // { // case "STRING": // sql = sql + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' ,"; // break; // case "INT": // case "INT16": // case "INT32": // case "INT64": // case "DECIMAL": // sql = sql + cloumn + @" = " + dataRow[cloumn].ToString() + @" ,"; // break; // case "DATE": // case "TIME": // case "DATETIME": // sql = sql + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' ,"; // break; // case "BOOLEAN": // sql = sql + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' ,"; // break; // default: // ("未处理类型:" + cloumn.DataType.Name.ToUpper()).LogError(); // break; // } //} sql = sql.Substring(0, sql.Length - 1); whereStr = whereStr.Substring(0, whereStr.Length - 4); sql = sql + whereStr; return sql; } /// /// 生成Delete语句 /// /// /// /// public static string ConstructDeleteSql( string tableName, DataRow dataRow) { // "DELETE FROM table_name WHERE condition;"; string sql = "DELETE FROM" + tableName; string whereStr = " WHERE " ; ; foreach (DataColumn cloumn in dataRow.Table.Columns) { switch (cloumn.DataType.Name.ToUpper()) { case "STRING" : whereStr = whereStr + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' and " ; break ; case "INT" : case "INT16" : case "INT32" : case "INT64" : case "DECIMAL" : whereStr = whereStr + cloumn + @" = " + dataRow[cloumn].ToString() + @" and " ; break ; case "DATE" : case "TIME" : case "DATETIME" : whereStr = whereStr + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' and " ; break ; case "BOOLEAN" : switch (dataRow[cloumn].ToString()) { case "True" : whereStr = whereStr + @"(" + cloumn + @" = '1' OR " + cloumn + @" = 'true' OR " + cloumn + @" = 'True') and " ; break ; case "False" : whereStr = whereStr + @"(" + cloumn + @" = '0' OR " + cloumn + @" = 'false' OR " + cloumn + @" = 'False') and " ; break ; default : break ; } break ; default : ( "未处理类型:" + cloumn.DataType.Name.ToUpper()).LogError(); break ; } } whereStr = whereStr.Substring(0, whereStr.Length - 4); sql = sql + whereStr; return sql; } public static string ConstructDeleteSql( string tableName, List< string > primaryKeyName, List< string > Identities, DataRow dataRow) { // "DELETE FROM table_name WHERE condition;"; string sql = "DELETE FROM " + tableName; string whereStr = " WHERE " ; foreach (DataColumn cloumn in dataRow.Table.Columns) { if (primaryKeyName.Count > 1 && Identities != null && Identities.Count > 0) { if (!Identities.Contains(cloumn.ColumnName)) { continue ; //跳过主键 } } else { if (!primaryKeyName.Contains(cloumn.ColumnName)) { continue ; //跳过主键 } } switch (cloumn.DataType.Name.ToUpper()) { case "STRING" : whereStr = whereStr + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' and " ; break ; case "INT" : case "INT16" : case "INT32" : case "INT64" : case "DECIMAL" : whereStr = whereStr + cloumn + @" = " + dataRow[cloumn].ToString() + @" and " ; break ; case "DATE" : case "TIME" : case "DATETIME" : whereStr = whereStr + cloumn + @" = '" + dataRow[cloumn].ToString() + @"' and " ; break ; case "BOOLEAN" : switch (dataRow[cloumn].ToString()) { case "True" : whereStr = whereStr + @"(" + cloumn + @" = '1' OR " + cloumn + @" = 'true' OR " + cloumn + @" = 'True') and " ; break ; case "False" : whereStr = whereStr + @"(" + cloumn + @" = '0' OR " + cloumn + @" = 'false' OR " + cloumn + @" = 'False') and " ; break ; default : break ; } break ; default : ( "未处理类型:" + cloumn.DataType.Name.ToUpper()).LogError(); break ; } } whereStr = whereStr.Substring(0, whereStr.Length - 4); sql = sql + whereStr; return sql; } /// /// 生成insert语句 /// /// /// /// public static string ConstructInsertSql( string tableName, DataRow dataRow) { // "INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);"; string sql = "INSERT INTO " + tableName + " ( AppName,Version,AppType,CreateDate) VALUES( " ; string endSql = ") " ; foreach (DataColumn cloumn in dataRow.Table.Columns) { switch (cloumn.DataType.Name.ToUpper()) { case "STRING" : sql = sql + "'" + dataRow[cloumn].ToString() + "' ," ; break ; case "INT" : case "INT16" : case "INT32" : case "INT64" : case "DECIMAL" : sql = sql + dataRow[cloumn].ToString() + " ," ; break ; case "DATE" : case "TIME" : case "DATETIME" : sql = sql + "'" + dataRow[cloumn].ToString() + "' ," ; break ; case "BOOLEAN" : sql = sql + "'" + dataRow[cloumn].ToString() + "' ," ; break ; default : ( "未处理类型:" + cloumn.DataType.Name.ToUpper()).LogError(); break ; } } sql = sql.Substring(0, sql.Length - 1); sql = sql + endSql; return sql; } } }
Gitee下载地址:https://gitee.com/ltf_free/sync-data.git
热门相关:玫瑰旅馆3:援交告白 玫瑰旅馆3:援交告白 黑骏马国语 那个外星人 替补