首页 > 数据库 >Oracle-OracleConnectorTask

Oracle-OracleConnectorTask

时间:2024-08-11 09:22:56浏览次数:15  
标签:归档 数据库 配置 事件 OracleConnectorTask Oracle 日志 变更

提示:OracleConnectorTask 是一个 Debezium 连接器的具体实现,用于捕获 Oracle 数据库中的数据变化,并将这些变化以 Kafka 消息的形式发布出去。

文章目录


前言

提示:OracleConnectorTask 是一个重要的组件,它负责从 Oracle 数据库中捕获数据变更事件,并将这些事件转换为 Kafka 消息,从而使得下游应用能够实时地消费这些变更事件。


提示:以下是本篇文章正文内容

一、核心功能

核心功能详细说明

1. 初始化和配置组件

  • 功能: 初始化和配置与 Oracle 数据库交互所需的组件。
  • 作用: 确保所有必需的服务和配置都准备好,以便能够开始捕获数据变更事件。

2. 创建协调器 (ChangeEventSourceCoordinator)

  • 功能: 创建一个协调器实例,用于管理数据变更事件的源。
  • 作用: 协调器负责管理数据变更事件的捕获过程,包括启动、停止以及处理事件流。

3. 检查归档日志目的地

  • 功能: 验证数据库归档日志目的地的有效性。
  • 作用: 确保归档日志配置正确,避免因配置问题导致无法捕获数据变更事件。

4. 获取心跳连接

  • 功能: 创建一个新的 OracleConnection 实例用于心跳检测。
  • 作用: 心跳连接用于定期向数据库发送心跳信号,确保连接的有效性和可用性。

5. 轮询数据变更事件

  • 功能: 从队列中获取数据变更事件。
  • 作用: 定期从事件队列中获取新的数据变更事件,并将其转换为 SourceRecord 对象。

6. 停止操作

  • 功能: 关闭 JDBC 连接及相关的资源。
  • 作用: 当任务不再需要运行时,释放占用的资源,避免资源泄露。

7. 获取所有配置字段

  • 功能: 返回 OracleConnectorConfig 中的所有字段。
  • 作用: 提供所有配置项的列表,这有助于调试和配置验证。

8. 验证重做日志配置

  • 功能: 检查数据库是否处于归档模式。
  • 作用: 确认数据库配置支持 Debezium 所需的重做日志功能,以保证能够正确捕获数据变更事件。

二、代码分析

// 创建并配置一个协调器实例,用于管理数据变更事件的源
ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
        previousOffsets, // 已有的偏移量信息
        errorHandler, // 错误处理策略
        OracleConnector.class, // 当前连接器的类类型
        connectorConfig, // 连接器配置
        new OracleChangeEventSourceFactory( // 创建 Oracle 数据变更事件源工厂
                connectorConfig, // 连接器配置
                connectionFactory, // 数据库连接工厂
                errorHandler, // 错误处理策略
                dispatcher, // 事件分发器
                clock, // 时钟服务
                schema, // 架构管理器
                jdbcConfig, // JDBC 配置
                taskContext, // 任务上下文
                streamingMetrics, // 流式度量指标
                snapshotterService), // 快照服务
        new OracleChangeEventSourceMetricsFactory(streamingMetrics), // 创建 Oracle 数据变更事件源度量指标工厂
        dispatcher, // 事件分发器
        schema, // 架构管理器
        signalProcessor, // 信号处理器
        notificationService, // 通知服务
        snapshotterService); // 快照服务

// 启动协调器
coordinator.start(taskContext, this.queue, metadataProvider);

// 返回协调器实例
return coordinator;
}

// 检查归档日志目的地的有效性
private void checkArchiveLogDestination(OracleConnection connection, String destinationName) {
    try {
        // 如果指定了归档目的地名称
        if (!Strings.isNullOrBlank(destinationName)) {
            // 验证指定的归档目的地是否有效
            if (!connection.isArchiveLogDestinationValid(destinationName)) {
                LOGGER.warn("Archive log destination '{}' may not be valid, please check the database.", destinationName);
            }
        } else {
            // 如果没有指定归档目的地名称
            // 检查是否存在多个有效的归档目的地
            if (!connection.isOnlyOneArchiveLogDestinationValid()) {
                LOGGER.warn("There are multiple valid archive log destinations. " +
                        "Please add '{}' to the connector configuration to avoid log availability problems.",
                        OracleConnectorConfig.ARCHIVE_DESTINATION_NAME.name());
            }
        }
    } catch (SQLException e) {
        // 如果检查过程中发生 SQL 异常
        throw new DebeziumException("Error while checking validity of archive log configuration", e);
    }
}

// 创建一个新的 OracleConnection 实例用于心跳检测
private OracleConnection getHeartbeatConnection(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig) {
    // 创建 OracleConnection 实例
    final OracleConnection connection = new OracleConnection(jdbcConfig);
    // 如果指定了 PDB 名称,则设置会话到该 PDB
    if (!Strings.isNullOrBlank(connectorConfig.getPdbName())) {
        connection.setSessionToPdb(connectorConfig.getPdbName());
    }
    // 返回连接实例
    return connection;
}

// 轮询数据变更事件
@Override
public List<SourceRecord> doPoll() throws InterruptedException {
    // 从队列中获取数据变更事件
    List<DataChangeEvent> records = queue.poll();

    // 将数据变更事件转换为 SourceRecord 列表
    List<SourceRecord> sourceRecords = records.stream()
            .map(DataChangeEvent::getRecord)
            .collect(Collectors.toList());

    // 返回 SourceRecord 列表
    return sourceRecords;
}

// 停止操作
@Override
public void doStop() {
    // 尝试关闭 JDBC 连接
    try {
        if (jdbcConnection != null) {
            jdbcConnection.close();
        }
    } catch (SQLException e) {
        // 如果关闭 JDBC 连接时发生异常
        LOGGER.error("Exception while closing JDBC connection", e);
    }

    // 尝试关闭 JDBC bean registry 连接
    try {
        if (beanRegistryJdbcConnection != null) {
            beanRegistryJdbcConnection.close();
        }
    } catch (SQLException e) {
        // 如果关闭 JDBC bean registry 连接时发生异常
        LOGGER.error("Exception while closing JDBC bean registry connection", e);
    }

    // 关闭架构管理器
    if (schema != null) {
        schema.close();
    }
}

// 获取所有配置字段
@Override
protected Iterable<Field> getAllConfigurationFields() {
    // 返回 OracleConnectorConfig 中的所有字段
    return OracleConnectorConfig.ALL_FIELDS;
}

// 验证重做日志配置
private void validateRedoLogConfiguration(OracleConnectorConfig config, SnapshotterService snapshotterService) {
    // 检查数据库是否处于归档模式
    final boolean archivelogMode = jdbcConnection.isArchiveLogMode();
    // 如果数据库不是归档模式
    if (!archivelogMode) {
        // 如果重做日志对于当前配置是必需的
        if (redoLogRequired(config, snapshotterService)) {
            // 抛出异常,因为缺少归档模式会导致无法正常工作
            throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is "
                    + "required for this connector to work properly. Change the Oracle configuration to use a "
                    + "LOG_MODE=ARCHIVELOG and restart the connector.");
        } else {
            // 发出警告,但继续执行(如果重做日志不是严格必需的)
            LOGGER.warn("Failed the archive log check but continuing as redo log isn't strictly required");
        }
    }
}

// 检查重做日志是否对于当前配置是必需的
private static boolean redoLogRequired(OracleConnectorConfig config, SnapshotterService snapshotterService) {
    // 如果配置要求流式传输或者事务快照边界模式为 ALL,则重做日志是必需的
    return snapshotterService.getSnapshotter().shouldStream() ||
            config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL;
}

OracleConnectorTask 类展示了良好的面向对象设计原则,特别是在封装方面。它将与 Oracle 数据库交互的逻辑封装在一个类中,提供了对外部友好的接口,如 doPoll()doStop() 方法,同时隐藏了内部实现细节。

组件化与依赖注入
  • 组件化: 该类通过组合其他组件来完成复杂的任务,例如使用 ChangeEventSourceCoordinator 来管理数据变更事件的捕获过程。这种组件化的思想有助于提高代码的可读性和可维护性。

  • 依赖注入: 通过构造函数和方法参数传递依赖,而不是在类内部创建这些依赖。例如,在创建 ChangeEventSourceCoordinator 时,它接收了许多外部组件作为参数。这种方式遵循了依赖倒置原则,提高了代码的灵活性和可测试性。

职责单一原则
  • 职责单一: OracleConnectorTask 类的主要职责是管理与 Oracle 数据库的交互,包括捕获数据变更事件、处理这些事件并将它们转换为 Kafka 消息。它不承担与这些职责无关的其他功能,这体现了单一职责原则。
代码质量
  • 清晰的命名: 类名、方法名和变量名都非常直观,易于理解。例如,checkArchiveLogDestinationvalidateRedoLogConfiguration 方法名清楚地表达了它们的功能。

  • 异常处理: 该类在处理可能发生的异常时采用了明确的错误处理策略,例如在 checkArchiveLogDestination 方法中抛出 DebeziumException。这有助于提高系统的健壮性。

  • 模块化: 通过将不同的功能分解成独立的方法,提高了代码的可读性和可维护性。例如,checkArchiveLogDestinationvalidateRedoLogConfiguration 方法都是独立的功能单元,易于理解和测试。


总结

  1. 配置对象 (connectorConfig): 存储连接器配置信息的对象。
  2. 事件队列 (queue): 用于存储待处理的数据变更事件的队列。
  3. 错误处理器 (errorHandler): 处理捕获过程中出现的错误。
  4. 架构管理器 (schema): 用于管理数据库架构的对象。
  5. 通知服务 (notificationService): 用于发送通知的服务。
  6. 信号处理器 (signalProcessor): 用于处理信号的处理器。
  7. 快照服务 (snapshotterService): 用于执行快照操作的服务。
  8. JDBC 连接 (jdbcConnection): 与 Oracle 数据库通信的 JDBC 连接。
  9. JDBC 配置 (jdbcConfig): 存储 JDBC 连接配置信息的对象。
  10. 任务上下文 (taskContext): 包含任务相关信息的上下文对象。
  11. 时钟服务 (clock): 提供时间服务的对象。
  12. 分发器 (dispatcher): 用于分发数据变更事件的对象。
  13. 元数据提供者 (metadataProvider): 提供元数据信息的对象。
  14. 主题命名策略 (topicNamingStrategy): 用于确定 Kafka 主题名称的策略。
  15. 元数据提供者 (metadataProvider): 提供元数据信息的对象。
  16. 度量指标 (streamingMetrics): 用于收集度量指标的对象。

方法

ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator
  • 功能: 创建一个 ChangeEventSourceCoordinator 实例,用于管理数据变更事件的源。
  • 作用: 协调器负责管理数据变更事件的捕获过程,包括启动、停止以及处理事件流。
private void checkArchiveLogDestination(OracleConnection connection, String destinationName)
  • 功能: 验证数据库归档日志目的地的有效性。
  • 作用: 确保归档日志配置正确,避免因配置问题导致无法捕获数据变更事件。
private OracleConnection getHeartbeatConnection(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig)
  • 功能: 创建一个新的 OracleConnection 实例用于心跳检测。
  • 作用: 心跳连接用于定期向数据库发送心跳信号,确保连接的有效性和可用性。
@Override public List<SourceRecord> doPoll() throws InterruptedException
  • 功能: 从队列中获取数据变更事件。
  • 作用: 定期从事件队列中获取新的数据变更事件,并将其转换为 SourceRecord 对象。
@Override public void doStop()
  • 功能: 关闭 JDBC 连接及相关的资源。
  • 作用: 当任务不再需要运行时,释放占用的资源,避免资源泄露。
@Override protected Iterable<Field> getAllConfigurationFields()
  • 功能: 返回 OracleConnectorConfig 中的所有字段。
  • 作用: 提供所有配置项的列表,这有助于调试和配置验证。
private void validateRedoLogConfiguration(OracleConnectorConfig config, SnapshotterService snapshotterService)
  • 功能: 检查数据库是否处于归档模式。
  • 作用: 确认数据库配置支持 Debezium 所需的重做日志功能,以保证能够正确捕获数据变更事件。
private static boolean redoLogRequired(OracleConnectorConfig config, SnapshotterService snapshotterService)
  • 功能: 检查重做日志是否对于当前配置是必需的。
  • 作用: 如果配置要求流式传输或者事务快照边界模式为 ALL,则重做日志是必需的。

标签:归档,数据库,配置,事件,OracleConnectorTask,Oracle,日志,变更
From: https://blog.csdn.net/sinat_33727881/article/details/140967608

相关文章

  • 14、Oracle中的Set运算符
    最近项目要用到Oracle,奈何之前没有使用过,所以在B站上面找了一个学习视频,用于记录学习过程以及自己的思考。视频链接:【尚硅谷】Oracle数据库全套教程,oracle从安装到实战应用如果有侵权,请联系删除,谢谢。学习目标:描述SET操作符将多个查询用SET操作符连接组成一个新的查......
  • 15、Oracle中的高级子查询
    最近项目要用到Oracle,奈何之前没有使用过,所以在B站上面找了一个学习视频,用于记录学习过程以及自己的思考。视频链接:【尚硅谷】Oracle数据库全套教程,oracle从安装到实战应用如果有侵权,请联系删除,谢谢。学习目标:书写多列子查询在FROM子句中使用子查询在SQL中使用单列子......
  • Oracle事务是怎么练成的
    什么是事务事务是数据库管理系统执行过程的一个逻辑单位,由一系列有限的数据库操作序列构成,事务必须满足‌ACID属性。ACID理论是数据库中最重要的概念之一,分别代表原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。原子性是指事务是一个不可分割的......
  • Oracle数据库巡检
    数据库巡检列表序号业务系统1主机名2操作系统4单机/RAC4IP地址5地址类型6数据类型7数据库版本8实例名巡检方案检查方面具体检查内容检查标准集群配置集群软件版本集群软件版本要等于或高于DB软件版本集群服务状态各种服务状态(除GSD外)需是ONLINE注:使用asfforrac的环境下......
  • 读取EXCEL数据导入到oracle
    importpandasaspdimportcx_Oraclefromdatetimeimportdatetime#数据库连接信息username='****'#替换为您的用户名password='****'#替换为您的密码dsn='192.168.10.216:1521/ORCL'#替换为您的数据源名称#读取Excel文件excel_file=......
  • Oracle系列---【磁盘有空间,但是报unable to extend index ... by 128 in tablespace C
    一、Oracle表空间满了的问题可能出现在以下几个方面1.数据文件达到最大大小限制:即使启用了自动扩展,数据文件可能已经达到了其最大大小设置。2.缺乏可用磁盘空间:尽管您提到数据目录有空间,但仍需要确认相关磁盘卷是否有足够的可用空间。3.自动扩展配置问题:检查自动扩展是否配置......
  • ORACLE 查询条件出现关键字:&
    SQLselect1fromdualWHERExxxIN('AAA&SSS')编译器提示原因和解决方法在OracleSQL查询中,‌如果查询条件包含特殊字符如&,‌通常需要进行转义处理,‌以确保查询语句能被正确解析&在Oracle中可能被视作替换变量的一部分,‌因此直接使用时可能导致查询出错为了正常查询......
  • 【Oracle EBS R12】第三章 Primary Ledger Overview(英文版)
    PrimaryLedgerOverview1.TransactionComponentsTransactiondateTransactionDetailsTransactionAmount2.TransactiondateYearType:PeriodType:3.TransactionDetails4.TransactionAmount5.Summry3Cs4Cs6.PrimaryLedger(PL)1.TransactionComp......
  • 在oracle中将一行字符串拆分成多行
    例如,有如下一张表,表名为bk_test。插入了以下数据:CREATETABLEBK_TESK(idvarchar2(10),svarchar2(20));insertintoBK_TESKvalues('A','1,2,3');insertintoBK_TESKvalues('B','4,5,6');insertintoBK_TESKvalues('C','......
  • SqlDbx客户端连接服务器Oracle数据库
    查了很多文章,介绍的不对,走了好多弯路,最后整理一下,供参考一、下载Oracle客户端1、SqlDbx如果是32位的,客户端也要下载32位的2、Oracle客户端版本要和服务端版本一致(本例用的是12.1.0.2.0)3、32位客户端下载地址:https://www.oracle.com/database/technologies/instant-client/mic......