首页 > 其他分享 >Debezium系列之:Debezium 通知

Debezium系列之:Debezium 通知

时间:2023-09-11 10:00:38浏览次数:37  
标签:系列 通知 connector 连接器 快照 type Debezium



Debezium系列之:Debezium 通知

  • 一、概述
  • 二、Debezium 通知格式
  • 三、可用的通知
  • 四、启用 Debezium 通知
  • 五、访问 Debezium JMX 通知
  • 六、自定义通知渠道
  • 七、配置自定义通知渠道
  • 八、Debezium 核心模块依赖项
  • 九、部署自定义通知渠道
  • 十、配置连接器以使用自定义通知通道


一、概述

Debezium 通知提供了一种获取有关连接器状态信息的机制。通知可以发送到以下渠道:

  • 接收器通知通道:通过 Connect API 将通知发送到配置的主题。
  • 日志通知通道:通知会附加到日志中。
  • JmxNotificationChannel:通知作为 JMX bean 中的属性公开。
  • Custom:通知将发送到您实施的自定义渠道。

二、Debezium 通知格式

通知消息包含以下信息:

属性

描述

id

分配给通知的唯一标识符。对于增量快照通知,该 ID 与使用执行快照信号发送的 ID 相同。

aggregate_type

与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用聚合。

type

提供有关在aggregate_type 字段中指定的事件的状态信息。

additional_data

包含有关通知的详细信息的 Map<String,String>。有关示例,请参阅有关增量快照进度的 Debezium 通知。

三、可用的通知

Debezium 通知提供有关初始快照或增量快照进度的信息。

初始快照的状态
以下示例显示了提供初始快照状态的典型通知:

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED" 
}

类型字段可以包含以下值之一:

  • COMPLETED
  • ABORTED
  • SKIPPED

Debezium 有关增量快照进度的通知
下表显示了报告增量快照状态的通知中可能存在的不同负载的示例:

Start:

{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      }
}

Paused:

{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      }
}

Resumed:

{
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   }
}

Stopped:

{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   }
}

Processing chunk:

{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   }
}

Snapshot completed for a table:

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED" 
   }
}

可能的值为:

  • EMPTY - 表为空
  • NO_PRIMARY_KEY - 表没有快照所需的主键
  • SKIPPED - 不支持此类表的快照,请检查日志以了解详细信息
  • SQL_EXCEPTION - 处理快照时捕获 SQL 异常
  • SUCCEEDED - 快照成功完成
  • UNKNOWN_SCHEMA - 找不到表的架构,请检查日志以获取已知表的列表

Completed:

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   }
}

四、启用 Debezium 通知

要使 Debezium 能够发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知渠道可用:

  • sink
  • log
  • jmx

要使用接收器通知通道,您还必须将 notification.sink.topic.name 配置属性设置为您希望 Debezium 发送通知的主题的名称。

五、访问 Debezium JMX 通知

要使 Debezium 能够报告通过 JMX beans 公开的事件,请完成以下配置步骤:

  • 启用 JMX MBean 服务器以公开通知 bean。
  • 将 jmx 添加到连接器配置中的 notification.enabled.channels 属性中。
  • 将您首选的 JMX 客户端连接到 MBean 服务器。

通知通过名称为 debezium.<connector-type>.management.notifications.<server> 的 bean 的“Notifications”属性公开。

下图显示了报告增量快照开始的通知:

Debezium系列之:Debezium 通知_自定义


要放弃通知,请对 bean 调用重置操作。

通知还公开为 debezium.notification 类型的 JMX 通知。要使应用程序能够侦听 MBean 发出的 JMX 通知,请为应用程序订阅通知。

六、自定义通知渠道

通知机制被设计为可扩展的。您可以根据需要实施渠道,以最适合您的环境的方式传递通知。添加通知通道涉及几个步骤:

  • 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
  • 部署通知通道。
  • 通过修改连接器配置,使连接器能够使用自定义通知通道。

七、配置自定义通知渠道

自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供者接口 (SPI) 的 Java 类。例如:

public interface NotificationChannel {

    String name(); 

    void init(CommonConnectorConfig config); 

    void send(Notification notification); 

    void close(); 
}
  1. 频道的名称。要使 Debezium 能够使用该通道,请在连接器的 notification.enabled.channels 属性中指定此名称。
  2. 初始化通道所需的特定配置、变量或连接。
  3. 在频道上发送通知。 Debezium 调用此方法来报告其状态。
  4. 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。

八、Debezium 核心模块依赖项

自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖项。您必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 
</dependency>

${version.debezium} 表示 Debezium 连接器的版本。

在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。

九、部署自定义通知渠道

先决条件:

  • 您有一个自定义通知通道 Java 程序。

程序

  • 要将通知通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
  • 例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。

十、配置连接器以使用自定义通知通道

将自定义通知通道的名称添加到 notification.enabled.channels 配置属性中。


标签:系列,通知,connector,连接器,快照,type,Debezium
From: https://blog.51cto.com/u_12080573/7432229

相关文章

  • SonarQube系列-架构与外部集成
    介绍Sonar是一个代码质量管理的开源平台,基于Java开发的,用于管理源代码的质量,通过插件形式,可以支持包括java、C#、JavaScript等二十余种编程语言的代码质量管理与检测。它具有免费的社区版本和其他付费版本。SonarQube之采购选型参考利用SonarQube的主要好处是:它集成了数千种......
  • MySQL入门系列7-多表查询
    在我们日常开发中,表与表之间的关系通常分为以下几种:一对多(多对一)、’多对多、一对一。一、一对多(多对一)我们之前的部门和员工表就是一个一对多的关系,一个部门有多个员工,部门就是一方,员工就是多方。员工表的外键关联了部门表二、多对多学生和课程的关系可以理解为多对多的关系,......
  • 【愚公系列】2023年09月 WPF控件专题 DockPanel控件详解
    (文章目录)前言WPF控件是WindowsPresentationFoundation(WPF)中的基本用户界面元素。它们是可视化对象,可以用来创建各种用户界面。WPF控件可以分为两类:原生控件和自定义控件。原生控件是由Microsoft提供的内置控件,如Button、TextBox、Label、ComboBox等。这些控件都是WPF中常见......
  • Java实现关系型数据库工具类JdbcUtils系列九:通用DAO
    Java实现关系型数据库工具类JdbcUtils系列九:通用DAO一、创建对应数据库表的实体类二、数据库连接池Druid工具类三、DAO类四、BaseDAO五、DatabaseInfoDao六、通用DAO测试类一、创建对应数据库表的实体类数据库表结构CREATETABLE`databaseInfo`(`id`bigint(11)NOTNULLAU......
  • 深入浅出理解数据分析系列之:Python安装Excel文档库openpyxl和Pycharm为项目安装Excel
    深入浅出理解数据分析系列之:Python安装Excel文档库openpyxl和Pycharm为项目安装Excel文档库openpyxl一、Python安装openpyxl二、Pycharm为项目安装openpyxl一、Python安装openpyxlpip3installopenpyxlCollectingopenpyxlDownloadingopenpyxl-3.0.9-py2.py3-none-any.whl......
  • Python系列之:argparse和vars
    Python系列之:argparse和vars一、argparse用法示例二、add_argument()方法常用参数详解三、vars用法示例一、argparse用法示例argparse是Python模块,主要用于命令行选项、参数和子命令解析器。vars是Python模块,主要用于返回对象object的属性和属性值的字典对象第一步:创建解析器Arg......
  • debezium报错:no longer available on the server. Reconfigure the connector to use
    debezium报错:nolongeravailableontheserver.Reconfiguretheconnectortouseasnapshotwhenneede完整报错如下:-“trace”:"io.debezium.DebeziumException:TheconnectoristryingtoreadbinlogstartingatSourceInfo[currentGtid=null,currentBinlogFilename......
  • debezium报错:Caused by: io.debezium.DebeziumException:whose schema isn‘t known t
    debezium报错:Causedby:io.debezium.DebeziumException:whoseschemaisn’tknowntothisconnector“trace”:"org.apache.kafka.connect.errors.ConnectException:Anexceptionoccurredinthechangeeventproducer.Thisconnectorwillbestopped.Causedby:io.......
  • Java基础知识面试题系列五:41~50题
    Java基础知识面试题系列三:41~50题41.值传递与引用传递有哪些区别42.不同数据类型的转换有哪些规则43.强制类型转换的注意事项有哪些44.Math类中round、ceil和floor方法的功能是什么45.++i与i++有什么区别46."<<"运算符与">>"运算符有什么异同47.char型变量中是否可以存储一个中文汉......
  • Java基础知识面试题系列三:21~30题
    Java基础知识面试题系列三:21~30题21.抽象类(abstractclass)与接口(interface)有什么异同22.内部类有哪些23.如何获取父类的类名24.this与super有什么区别25.break、continue以及return有什么区别26.final、finally和finalize有什么区别27.JDK中哪些类是不能继承的28.assert有什么......