在上一篇中介绍了基于Dapper的NetCore分表,本篇旨在介绍基于Dapper的NetCore分区,废话不多说开搞吧!
模拟业务场景:基于公司所在地区对表建立分区
设计公司表结构,其中TableAttribute标识表名,PartitionAttribute标识当前表是分区结构,Property代表按照某个属性分区
public abstract class Entity<TKey> { [ExplicitKey] public virtual TKey Id { get; set; } } [Table("Company")] [Partition(property:"Region")] public class Company : Entity<Guid> { public string Name { get; set; } public string Region { get; set; } }Company
[AttributeUsage(AttributeTargets.Property)] public class ExplicitKey : Attribute { } public class PartitionAttribute : Attribute { public string Property { get; set; } public PartitionAttribute(string property) { this.Property = property; } }Attribute
有了表结构之后设计Repository,首先是BaseRepository除了基本的CRUD,还定义一个虚方法 protected virtual Func<string, string>? CreateScriptFunc { get; set; }要求子类根据特定的需求创建表及定义一个Schema设计的初衷是做数据隔离或者安全隔离。
using Dapper.Contrib.Extensions; using System.Data; using System.Reflection; namespace API.Dapper.Repository.V2 { public abstract class BaseRepositoryV2<T> where T : class, new() { protected IDbConnectionSchema DbConnectionSchema { get; set; } protected string Schema { get => DbConnectionSchema.Schema; } protected IDbTransaction Transaction { get; private set; } protected IDbConnection DbConnection => DbConnectionSchema.DbConnection; protected Int32 CommandTimeout { get; set; } = 30; protected static string _tableName => typeof(T).GetCustomAttribute<TableAttribute>()?.Name ?? typeof(T).Name; protected static IEnumerable<string> _properties => typeof(T).GetProperties().Select(p => p.Name); protected BaseRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) { DbConnectionSchema = dbConnectionSchema; Transaction = dbTransaction; } protected virtual Func<string, string>? CreateScriptFunc { get; set; } public async Task<Int32> AddAsync(IEnumerable<T> entities, string tableName = default) { var sql = string.Format(Sql.Insert.BuildSchema(Schema), tableName ?? _tableName, string.Join(",", _properties), string.Join(",", _properties.Select(p => $"@{p}"))); return await DbConnection.ExecuteAsync(sql, entities, Transaction, CommandTimeout, CommandType.Text); } public async Task<IEnumerable<T>> GetAllAsync(string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, string.Empty); return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<T> GetAsync(Guid id, string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", _properties), tableName ?? _tableName, $"WHERE Id = @Id"); var param = new DynamicParameters(); param.Add("Id", id); return await DbConnection.QuerySingleOrDefaultAsync<T>(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, IEnumerable<string> whereProperties, string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, string.Join(" AND ", whereProperties.Select(p => $"{p}=@{p}"))); return await DbConnection.QueryAsync<T>(sql, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<IEnumerable<T>> GetAsync(IEnumerable<string> selectedProperties, string whereClause, DynamicParameters sqlParams, string tableName = default) { var sql = string.Format(Sql.Select.BuildSchema(Schema), string.Join(",", selectedProperties), tableName ?? _tableName, whereClause); return await DbConnection.QueryAsync<T>(sql, sqlParams, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<Int32> UpdateAsync(IEnumerable<T> entities, IEnumerable<string> updateProperties, IEnumerable<string> whereProperties, string tableName = default) { var updateClause = string.Join(",", _properties.Intersect(updateProperties).Select(p => $"{p}=@{p}")); var whereClause = $"WHERE {string.Join(" AND ", _properties.Intersect(whereProperties).Select(p => $"{p}=@{p}"))}"; var sql = string.Format(Sql.Update.BuildSchema(Schema), tableName ?? _tableName, updateClause, whereClause); return await DbConnection.ExecuteAsync(sql, entities, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } public async Task<Int32> DeleteAsync(IEnumerable<Guid> ids, string tableName = default) { var sql = string.Format(Sql.Delete.BuildSchema(Schema), tableName ?? _tableName, $"WHERE Id IN @Ids"); var param = new DynamicParameters(); param.Add("Ids", ids); return await DbConnection.ExecuteAsync(sql, param, transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); } } }BaseRepository
BaseRepository中基础的CURD不能满足分区的业务需求,还需要一些特定的逻辑,比如写入数据的时候需要判断是否有分区Function,分区Schema及是否已经创建分区Range values等,所以定义了PartionRepository继承自BaseRepository完成以上操作
using API.Dapper.Attributes; using API.Dapper.Repository.V2; using Dapper; using System.Data; using System.Reflection; namespace API.Dapper.Repository; public abstract class PartitionRepository<T> : BaseRepositoryV2<T> where T : class, new() { public PartitionRepository(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) : base(dbConnectionSchema, dbTransaction) { } public async Task<Int32> AddAsync(IEnumerable<T> entities) { if (typeof(T).GetCustomAttribute<PartitionAttribute>() != null) { if (!await DbConnection.IsTableExist(Schema, _tableName, Transaction, CommandTimeout)) { await DbConnection.ExecuteAsync(this.CreateScriptFunc?.Invoke(_tableName), transaction: Transaction, commandTimeout: CommandTimeout, commandType: CommandType.Text); var partitionValues = GetPartitionValues(entities); var existPartitionValues = await DbConnection.GetPartitionValues(_tableName, Schema, transaction: Transaction, commandTimeout: CommandTimeout); if (partitionValues.Except(existPartitionValues).Any()) await DbConnection.CreatePartitionValues(partitionValues.Except(existPartitionValues), _tableName, Schema, transaction: Transaction, commandTimeout: CommandTimeout); } } return await base.AddAsync(entities); } private IEnumerable<string> GetPartitionValues(IEnumerable<T> entities) { var partitionAttribute = typeof(T).GetCustomAttribute(typeof(PartitionAttribute)) as PartitionAttribute; var propertyInfo = typeof(T).GetProperties().Where(p => p.Name.Equals(partitionAttribute!.Property)).FirstOrDefault(); if (propertyInfo != null) { return entities.Select(e => propertyInfo.GetValue(e, null)).Select(v => $"{v}"); } return default; } }PartitionRepository
接下来定义CompanyRepository继承自PartitionRepository完成业务Repository
using API.Dapper.Entity; using System.Data; namespace API.Dapper.Repository.V2 { public class CompanyRepositoryV2 : PartitionRepository<Company> { private static string _defaultPartitionValue => "CHN"; public CompanyRepositoryV2(IDbConnectionSchema dbConnectionSchema, IDbTransaction dbTransaction) : base(dbConnectionSchema, dbTransaction) { } protected override Func<string, string> CreateScriptFunc => tableName => _tableName switch { APITable.Company => $"CREATE PARTITION FUNCTION [{Schema}_{tableName}PartitionFun](nvarchar(256)) AS RANGE RIGHT FOR VALUES ('{_defaultPartitionValue}');" + $"CREATE PARTITION SCHEME [{Schema}_{tableName}PartitonSchema] AS PARTITION [{Schema}_{tableName}PartitionFun] ALL TO ([PRIMARY]);" + @"CREATE TABLE [{schema}].".BuildSchema(Schema) + $@"[{tableName}]( [Id] [uniqueidentifier] NOT NULL, [Name] [nvarchar](256) NOT NULL, [Region] [nvarchar](256) NOT NULL) ON [{Schema}_{tableName}PartitonSchema]([Region]);"+ "CREATE CLUSTERED INDEX [Clustered_Region] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([Region] ASC);"+ "CREATE NONCLUSTERED INDEX [index_ActivityType] ON [{schema}].".BuildSchema(Schema) + $"[{tableName}]([Id] DESC);", _ => string.Empty }; } }CompanyRepository
其次创建事务管理UnitOfWork
using System.Data; namespace API.Dapper.UnitOfWork.V2 { public class UnitOfWorkV2 : IUnitOfWork { public IDbConnection DbConnection { get; set; } public IDbTransaction DbTransaction { get; set; } public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema) { DbConnection = dbConnectionSchema.DbConnection; DbConnection.Open(); DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction(); } public UnitOfWorkV2(IDbConnectionSchema dbConnectionSchema, IsolationLevel isolationLevel) { DbConnection = dbConnectionSchema.DbConnection; DbConnection.Open(); DbTransaction = dbConnectionSchema.DbConnection.BeginTransaction(isolationLevel); } public void Commit() { DbTransaction.Commit(); } public void Dispose() { if (DbTransaction != null) DbTransaction.Dispose(); if (DbConnection != null) DbConnection.Dispose(); GC.SuppressFinalize(this); } public void Rollback() { DbTransaction.Rollback(); } } }UnitOfWork
using API.Dapper.Repository.V2; namespace API.Dapper.UnitOfWork.V2; public class CompanyUnitOfWorkV2 : UnitOfWorkV2 { public CompanyRepositoryV2 CompanyRepositoryV2 { get; } public CompanyUnitOfWorkV2(IDbConnectionSchema dbConnectionSchema) : base(dbConnectionSchema) { CompanyRepositoryV2 = new CompanyRepositoryV2(dbConnectionSchema, DbTransaction); } }CompanyUnitOfWork
IDbConnection扩展方法
#region Partition public static async Task<bool> IsPartitionTable(this IDbConnection connection, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null) { var sql = $"SELECT Count(t.object_id) FROM sys.tables t INNER JOIN sys.indexes i ON t.object_id = i.object_id AND i.type IN(0,1) INNER JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id WHERE t.name = '{tableName}' AND SCHEMA_NAME(schema_id) = '{schema}';"; return await connection.ExecuteScalarAsync<int>(sql, transaction: transaction, commandTimeout: commandTimeout) > 0; } public static async Task<Int32> CreatePartitionValues(this IDbConnection connection, IEnumerable<string> rangeValues, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null) { var partitionSchemaFunctions = await connection.GetPartitionSchemaFunctions(tableName, schema, transaction, commandTimeout); if (partitionSchemaFunctions?.Count() == 0) throw new Exception($"The partition scheme and function is null or empty, target table: {tableName}, schema: {schema}."); var partition = partitionSchemaFunctions!.First(); var affectedRows = 0; foreach (var rangeValue in rangeValues) { var sql = $"ALTER PARTITION SCHEME [{partition.Scheme}] Next used 'PRIMARY' ALTER PARTITION FUNCTION [{partition.Function}]() SPLIT RANGE('{rangeValue}');"; affectedRows += await connection.ExecuteAsync(sql, transaction: transaction, commandTimeout: commandTimeout); } return affectedRows; } private static async Task<IEnumerable<PartitionEntity>> GetPartitionSchemaFunctions(this IDbConnection connection, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null) { var sql = $"SELECT ps.name [Scheme], pf.name [Function] FROM sys.partition_functions AS pf JOIN sys.partition_schemes AS ps ON pf.function_id = ps.function_id JOIN sys.indexes AS si ON ps.data_space_id = si.data_space_id AND si.type IN(0,1) JOIN sys.tables AS st ON si.object_id = st.object_id WHERE st.name = '{tableName}' AND SCHEMA_NAME(schema_id) = '{schema}'"; return await connection.QueryAsync<PartitionEntity>(sql, transaction: transaction, commandTimeout: commandTimeout); } public static async Task<IEnumerable<string>> GetPartitionValues(this IDbConnection connection, string tableName, string schema, IDbTransaction transaction = null, int? commandTimeout = null) { var sql = $"SELECT srv.value FROM sys.tables AS st JOIN sys.indexes AS si ON st.object_id = si.object_id JOIN sys.partitions AS sp ON si.object_id = sp.object_id AND si.index_id = sp.index_id JOIN sys.partition_schemes AS ps ON si.data_space_id = ps.data_space_id JOIN sys.partition_functions AS sf ON ps.function_id = sf.function_id JOIN sys.partition_range_values AS srv ON sf.function_id = srv.function_id and srv.boundary_id = sp.partition_number WHERE si.type <= 1 AND st.name = '{tableName}' AND SCHEMA_NAME(schema_id) = '{schema}'"; return await connection.QueryAsync<string>(sql, transaction: transaction, commandTimeout: commandTimeout); } #endregionDbConnectionExtension
sql
namespace API.Dapper; public static partial class Sql { public const string Insert = @"INSERT INTO [{schema}].[{0}]({1})VALUES({2})"; public const string Select = @"SELECT {0} FROM [{schema}].{1} {2}"; public const string Update = @"UPDATE [{schema}].[{0}] SET {1} {2}"; public const string Delete = @"DELETE FROM [{schema}].[{0}] {1}"; public static string BuildSchema(this string sql, string schema) => sql.Replace("{schema}", schema); }Sql
创建一个NetCore Console Application,引入上面Library工程,定义ComanyService
using API.Dapper.Entity; using API.Dapper.UnitOfWork.V2; namespace API.Dapper.Test.Services; public class CompanyService { public async Task<IEnumerable<Company>> GetAllAsync() { using var unitOfWork = new CompanyUnitOfWorkV2(new DemoDbConnectionSchema()); return await unitOfWork.CompanyRepositoryV2.GetAllAsync(); } public async Task<Company> GetAsync(Guid id) { using var unitOfWork = new CompanyUnitOfWorkV2(new DemoDbConnectionSchema()); return await unitOfWork.CompanyRepositoryV2.GetAsync(id); } public async Task AddAsync(IEnumerable<Company> companies) { using var unitOfWork = new CompanyUnitOfWorkV2(new DemoDbConnectionSchema()); await unitOfWork.CompanyRepositoryV2.AddAsync(companies); unitOfWork.Commit(); } }CompanyService
在Program中通过DI调用UserService测试AddAsync及GetAsync
//DI var services = new ServiceCollection(); services.AddTransient<CompanyService>(); var serviceProvider = services.BuildServiceProvider(); UserContext.Current.Init(new UserPrincipal { UserId = Guid.Parse("00000000-0000-0000-0000-000000000001"), DisplayName = "Jack", Role = Role.Boss, HireDate = DateTime.UtcNow.Ticks }); var companyService = serviceProvider.GetRequiredService<CompanyService>(); Console.WriteLine("-----------Start to add company to partition table--------------"); await companyService.AddAsync(new List<Company> { new Company { Id = Guid.NewGuid(), Name ="Compnay01", Region ="AU" }, new Company { Id= Guid.NewGuid(), Name ="Company02",Region="US"} }); Console.WriteLine("-----------Add two companies to table----------------------------"); var companies = await companyService.GetAllAsync(); Console.WriteLine($"Get all users from tables \r\n {string.Join("\r\n", companies.Select(c => $"Id: {c.Id}, Name: {c.Name}, Region: {c.Region}"))}"); Console.ReadKey();Program
运行结果
查看DB table及分区函数
分区Scheme及Function
在database storage中查看数据分布情况
OK,简单的分区即完成,但在真正的生产环境通常不会指定同一个data file[Primary]作为分区,可以指定多个data file,也可能会将多个data file放在不同磁盘上,从而提高IO。当前资源和精力有限,之后会有需要会逐渐完善。
标签:string,NetCore,分区,tableName,id,sql,var,Dapper,public From: https://www.cnblogs.com/qindy/p/17838857.html