首页 > 数据库 >[Kogel.Subscribe.Mssql]SQL Server增量订阅,数据库变更监听

[Kogel.Subscribe.Mssql]SQL Server增量订阅,数据库变更监听

时间:2022-11-28 22:22:43浏览次数:72  
标签:builder Nest Server Subscribe SQL new message public

此框架是SQL Server增量订阅,用来监听增删改数据库数据变更

目前仅支持SQL Server,Nuget上可以下载安装

或者使用Nuget命令添加包

dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1

 

(一)定义需要监听表的实体类 

 /// <summary>
    /// 
    /// </summary>
    [Display(Rename = "t_oms_order_detail")]
    [ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")]
    public class OmsOrderDetail : IBaseEntity<OmsOrderDetail, int>
    {
        /// <summary>
        /// 
        /// </summary>
        [Identity]
        [Display(Rename = "id")]
        [Nest.PropertyName("id")]
        public override int Id { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "name")]
        [Nest.PropertyName("name")]
        public string Name { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "trade_id")]
        [Nest.PropertyName("trade_id")]
        public int? TradeId { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "descption")]
        [Nest.PropertyName("descption")]
        public string Descption { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "create_time")]
        [Nest.PropertyName("create_time")]
        public DateTime CreateTime { get; set; }
    }

[Display]和[Identity]属于Kogel.Dapper.Extension的特性如果[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,如果没用到可以忽略

 

(二)定义表订阅

    /// <summary>
    /// 定义表订阅
    /// </summary>
    public class OmsOrderDetailSubscribe : Subscribe<OmsOrderDetail>
    {
        /// <summary>
        /// 设置连接配置
        /// </summary>
        /// <param name="builder"></param>
        public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)
        {
            //此连接字符串账号需要有管理员权限
            builder.BuildConnection("数据库连接字符串");
        }
    }

如果需要此表对应多张分表可以设置

//配置所有表分片
builder.BuildShards(new List<string>
            {
                "t_oms_order_detail_1",
                "t_oms_order_detail_2",
                "t_oms_order_detail_3"
            })

 

(1).如果想推送订阅到RabbitMQ中

builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            })

可以通过BuildTopic设置交换机名称

builder.BuildTopic("kogel_subscribe_order_detail")

 

(2).如果想推送订阅到Kafka中

builder.BuildKafka(new ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                Acks = Acks.None
            })

可以通过BuildTopic设置Topic名称

builder.BuildTopic("kogel_subscribe_order_detail")

 

(3).如果想推送订阅到Elasticsearch中

 builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
            {
                Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")),
            })

如果有设置Basic授权

builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
            {
                Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
                    .BasicAuthentication("账号","密码")
            })

如果想根据自己定义的分片逻辑插入到多个ES索引中可以通过WriteInterceptor

/// <summary>
        /// 设置连接配置
        /// </summary>
        /// <param name="builder"></param>
        public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)
        {
            //此连接字符串账号需要有管理员权限
            builder.BuildConnection("数据库连接字符串");
            //定义推送ES
            builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
            {
                Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
                    .BasicAuthentication("账号", "密码"),
                WriteInterceptor = message => WriteInterceptor(message)
            });
        }

        /// <summary>
        /// 定义自己的索引逻辑
        /// </summary>
        /// <param name="messages"></param>
        /// <returns></returns>
        private EsSubscribeMessage<OmsOrderDetail> WriteInterceptor(SubscribeMessage<OmsOrderDetail> message)
        {
            string esIndexName;
            //这里写自己索引分片的业务逻辑
            if (message.Result.Id % 3 == 0)
            {
                esIndexName = $"kogel_orders_2";
            }
            else
            {
                esIndexName = $"kogel_orders_1";
            }
            return message.ToEsSubscribeMessage(esIndexName);
        }

并且ES索引不存在的时候会动态创建

 

(4).如果想自定义实现订阅逻辑,在可以Subscribe订阅类中重写

/// <summary>
        /// 订阅变更 (每一次sql的执行会触发一次Subscribe)
        /// </summary>
        /// <param name="messageList">消息列表表示所有影响到的数据变更(会受BuildLimit限制,没有查询完成的会在下一次查出)</param>
        public override void Subscribes(List<SubscribeMessage<T>> messageList)
        {
            foreach (var message in messageList)
            {
                Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}");
            }
        }

 

以上订阅的优先级:

 

(三)订阅启动

启动监听所有继承自Subscribe<T>的类,在应用程序启动时执行即可

ApplicationProgram.Run();

如果是基础BaseSubscribe<T>中间基类需要定义成abstract,例如

  /// <summary>
    /// 基础配置类需要定义成abstract
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class BaseSubscribe<T> : Subscribe<T>
        where T : class, IBaseEntity
    {
    }

关闭监听,在应用程序退出时执行即可

ApplicationProgram.Close();

 

(四)其他配置

builder.BuildCdcConfig(new CdcConfig
            {
                //扫描间隔(每次扫描变更表的间隔,单位毫秒) 默认10000毫秒/10秒
                ScanInterval = 10000,

                //变更捕捉文件在DB保存的时间(默认三天)
                Retention = 60 * 24 * 3,

                //是否首次扫描表全部数据再监听变更(默认false)
                IsFirstScanFull = false,

                //每次检索的变更量(默认10条)
                Limit = 10,

                //变更扫描的偏移量位置(默认从最后中止处开始)
                OffsetPosition = OffsetPositionEnum.Abort
            })

 

 

框架开源,完整框架源码可以去Github上下载:

https://github.com/a935368322/Kogel.Subscribe.Mssql

如有问题也可以加QQ群讨论:

技术群 710217654

标签:builder,Nest,Server,Subscribe,SQL,new,message,public
From: https://www.cnblogs.com/kogel/p/16933691.html

相关文章

  • 多表查询两种方法,小知识点补充,可视化软件Navicat,多表查询练习题,python操作MySQL
    目录多表查询两种方法,小知识点补充,可视化软件Navicat,多表查询练习题,python操作MySQL今日内容概要今日内容详细多表查询的两种方法小知识点补充说明可视化软件Navicat多表查......
  • MYSQL中取当前年份的第一天和当前周,月,季度的第一天/最后一天
    mysql获取当年第一天的年月日格式:SELECTDATE_SUB(CURDATE(),INTERVALdayofyear(now())-1DAY);MySQL里获取当前week、month、quarter的第一天/最后一天摘自:http://hideto......
  • Navicat及python第三方模块pymysql
    目录可视化软件Navicatpython操作MySQLpymysql补充说明可视化软件Navicat​ 可视化软件就是第三方开发的用来充当数据库客户端的简单快捷的操作界面,底层的本质还是SQL,第......
  • MySQL补充知识及pymysql基础使用
    昨日内容回顾数据查询关键字where后跟多种条件,支持逻辑运算符,支持模糊查找(使用like关键字)groupby按照指定字段对数据进行分组,可以使用聚合函数筛选数据h......
  • 多表查询的两种方法、Navicat、python操作MySQL
    1.多表查询的两种方法1.连表操作: 1.1:innerjoin:内连接,将两张表共同的部分连接起来生成一张新表。拼接顺序是把后面的表拼在前面的表,如果颠倒位置结果不同。sele......
  • python 之路 37 Navicat 可视化软件使用、 pymysql模块使用
    多表查询的两种方法方式1:连表操作innerjoin内连接select*fromempinnerjoindeponemp.dep_id=dep.id;只连接两张表中公有的数据部分le......
  • MySQL数据库:python操作MySQL
    目录python操作MySQL一、pymysql模块二、pymysql补充说明1.获取数据2.增删改查三、注册登录1.注册2.登录python操作MySQL一、pymysql模块importpymysql#连接mysql......
  • MySQL数据库基础5
    今日内容概要多表查询的两种方法小知识点补充可视化软件Navicat多表查询练习题python操作MySQL今日内容详细多表查询的两种方法方式1:连表操作 innerjoin......
  • MySQL 5
    今日内容详细多表查询的两种方法方式1:连表操作 innerjoin 内连接 select*fromempinnerjoindeponemp.dep_id=dep.id; 只连接两张表中公有的数据部分......
  • MySQl数据库:多表查询方法、MySQL补充方法
    目录一、多表查询的思路二、多表查询的两种方法1.方式一:连表查询(1)innerjoin内连接(2)leftjoin(3)rightjoin(4)union2.方式二:子查询(1)例子(2)in与notinany和in运算符的不同之处:三......