提示:
OracleConnectorTask
是一个 Debezium 连接器的具体实现,用于捕获 Oracle 数据库中的数据变化,并将这些变化以 Kafka 消息的形式发布出去。
文章目录
前言
提示:OracleConnectorTask
是一个重要的组件,它负责从 Oracle 数据库中捕获数据变更事件,并将这些事件转换为 Kafka 消息,从而使得下游应用能够实时地消费这些变更事件。
提示:以下是本篇文章正文内容
一、核心功能
核心功能详细说明
1. 初始化和配置组件
- 功能: 初始化和配置与 Oracle 数据库交互所需的组件。
- 作用: 确保所有必需的服务和配置都准备好,以便能够开始捕获数据变更事件。
2. 创建协调器 (ChangeEventSourceCoordinator
)
- 功能: 创建一个协调器实例,用于管理数据变更事件的源。
- 作用: 协调器负责管理数据变更事件的捕获过程,包括启动、停止以及处理事件流。
3. 检查归档日志目的地
- 功能: 验证数据库归档日志目的地的有效性。
- 作用: 确保归档日志配置正确,避免因配置问题导致无法捕获数据变更事件。
4. 获取心跳连接
- 功能: 创建一个新的
OracleConnection
实例用于心跳检测。 - 作用: 心跳连接用于定期向数据库发送心跳信号,确保连接的有效性和可用性。
5. 轮询数据变更事件
- 功能: 从队列中获取数据变更事件。
- 作用: 定期从事件队列中获取新的数据变更事件,并将其转换为
SourceRecord
对象。
6. 停止操作
- 功能: 关闭 JDBC 连接及相关的资源。
- 作用: 当任务不再需要运行时,释放占用的资源,避免资源泄露。
7. 获取所有配置字段
- 功能: 返回
OracleConnectorConfig
中的所有字段。 - 作用: 提供所有配置项的列表,这有助于调试和配置验证。
8. 验证重做日志配置
- 功能: 检查数据库是否处于归档模式。
- 作用: 确认数据库配置支持 Debezium 所需的重做日志功能,以保证能够正确捕获数据变更事件。
二、代码分析
// 创建并配置一个协调器实例,用于管理数据变更事件的源
ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets, // 已有的偏移量信息
errorHandler, // 错误处理策略
OracleConnector.class, // 当前连接器的类类型
connectorConfig, // 连接器配置
new OracleChangeEventSourceFactory( // 创建 Oracle 数据变更事件源工厂
connectorConfig, // 连接器配置
connectionFactory, // 数据库连接工厂
errorHandler, // 错误处理策略
dispatcher, // 事件分发器
clock, // 时钟服务
schema, // 架构管理器
jdbcConfig, // JDBC 配置
taskContext, // 任务上下文
streamingMetrics, // 流式度量指标
snapshotterService), // 快照服务
new OracleChangeEventSourceMetricsFactory(streamingMetrics), // 创建 Oracle 数据变更事件源度量指标工厂
dispatcher, // 事件分发器
schema, // 架构管理器
signalProcessor, // 信号处理器
notificationService, // 通知服务
snapshotterService); // 快照服务
// 启动协调器
coordinator.start(taskContext, this.queue, metadataProvider);
// 返回协调器实例
return coordinator;
}
// 检查归档日志目的地的有效性
private void checkArchiveLogDestination(OracleConnection connection, String destinationName) {
try {
// 如果指定了归档目的地名称
if (!Strings.isNullOrBlank(destinationName)) {
// 验证指定的归档目的地是否有效
if (!connection.isArchiveLogDestinationValid(destinationName)) {
LOGGER.warn("Archive log destination '{}' may not be valid, please check the database.", destinationName);
}
} else {
// 如果没有指定归档目的地名称
// 检查是否存在多个有效的归档目的地
if (!connection.isOnlyOneArchiveLogDestinationValid()) {
LOGGER.warn("There are multiple valid archive log destinations. " +
"Please add '{}' to the connector configuration to avoid log availability problems.",
OracleConnectorConfig.ARCHIVE_DESTINATION_NAME.name());
}
}
} catch (SQLException e) {
// 如果检查过程中发生 SQL 异常
throw new DebeziumException("Error while checking validity of archive log configuration", e);
}
}
// 创建一个新的 OracleConnection 实例用于心跳检测
private OracleConnection getHeartbeatConnection(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig) {
// 创建 OracleConnection 实例
final OracleConnection connection = new OracleConnection(jdbcConfig);
// 如果指定了 PDB 名称,则设置会话到该 PDB
if (!Strings.isNullOrBlank(connectorConfig.getPdbName())) {
connection.setSessionToPdb(connectorConfig.getPdbName());
}
// 返回连接实例
return connection;
}
// 轮询数据变更事件
@Override
public List<SourceRecord> doPoll() throws InterruptedException {
// 从队列中获取数据变更事件
List<DataChangeEvent> records = queue.poll();
// 将数据变更事件转换为 SourceRecord 列表
List<SourceRecord> sourceRecords = records.stream()
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
// 返回 SourceRecord 列表
return sourceRecords;
}
// 停止操作
@Override
public void doStop() {
// 尝试关闭 JDBC 连接
try {
if (jdbcConnection != null) {
jdbcConnection.close();
}
} catch (SQLException e) {
// 如果关闭 JDBC 连接时发生异常
LOGGER.error("Exception while closing JDBC connection", e);
}
// 尝试关闭 JDBC bean registry 连接
try {
if (beanRegistryJdbcConnection != null) {
beanRegistryJdbcConnection.close();
}
} catch (SQLException e) {
// 如果关闭 JDBC bean registry 连接时发生异常
LOGGER.error("Exception while closing JDBC bean registry connection", e);
}
// 关闭架构管理器
if (schema != null) {
schema.close();
}
}
// 获取所有配置字段
@Override
protected Iterable<Field> getAllConfigurationFields() {
// 返回 OracleConnectorConfig 中的所有字段
return OracleConnectorConfig.ALL_FIELDS;
}
// 验证重做日志配置
private void validateRedoLogConfiguration(OracleConnectorConfig config, SnapshotterService snapshotterService) {
// 检查数据库是否处于归档模式
final boolean archivelogMode = jdbcConnection.isArchiveLogMode();
// 如果数据库不是归档模式
if (!archivelogMode) {
// 如果重做日志对于当前配置是必需的
if (redoLogRequired(config, snapshotterService)) {
// 抛出异常,因为缺少归档模式会导致无法正常工作
throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is "
+ "required for this connector to work properly. Change the Oracle configuration to use a "
+ "LOG_MODE=ARCHIVELOG and restart the connector.");
} else {
// 发出警告,但继续执行(如果重做日志不是严格必需的)
LOGGER.warn("Failed the archive log check but continuing as redo log isn't strictly required");
}
}
}
// 检查重做日志是否对于当前配置是必需的
private static boolean redoLogRequired(OracleConnectorConfig config, SnapshotterService snapshotterService) {
// 如果配置要求流式传输或者事务快照边界模式为 ALL,则重做日志是必需的
return snapshotterService.getSnapshotter().shouldStream() ||
config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL;
}
OracleConnectorTask
类展示了良好的面向对象设计原则,特别是在封装方面。它将与 Oracle 数据库交互的逻辑封装在一个类中,提供了对外部友好的接口,如 doPoll()
和 doStop()
方法,同时隐藏了内部实现细节。
组件化与依赖注入
-
组件化: 该类通过组合其他组件来完成复杂的任务,例如使用
ChangeEventSourceCoordinator
来管理数据变更事件的捕获过程。这种组件化的思想有助于提高代码的可读性和可维护性。 -
依赖注入: 通过构造函数和方法参数传递依赖,而不是在类内部创建这些依赖。例如,在创建
ChangeEventSourceCoordinator
时,它接收了许多外部组件作为参数。这种方式遵循了依赖倒置原则,提高了代码的灵活性和可测试性。
职责单一原则
- 职责单一:
OracleConnectorTask
类的主要职责是管理与 Oracle 数据库的交互,包括捕获数据变更事件、处理这些事件并将它们转换为 Kafka 消息。它不承担与这些职责无关的其他功能,这体现了单一职责原则。
代码质量
-
清晰的命名: 类名、方法名和变量名都非常直观,易于理解。例如,
checkArchiveLogDestination
和validateRedoLogConfiguration
方法名清楚地表达了它们的功能。 -
异常处理: 该类在处理可能发生的异常时采用了明确的错误处理策略,例如在
checkArchiveLogDestination
方法中抛出DebeziumException
。这有助于提高系统的健壮性。 -
模块化: 通过将不同的功能分解成独立的方法,提高了代码的可读性和可维护性。例如,
checkArchiveLogDestination
和validateRedoLogConfiguration
方法都是独立的功能单元,易于理解和测试。
总结
- 配置对象 (
connectorConfig
): 存储连接器配置信息的对象。 - 事件队列 (
queue
): 用于存储待处理的数据变更事件的队列。 - 错误处理器 (
errorHandler
): 处理捕获过程中出现的错误。 - 架构管理器 (
schema
): 用于管理数据库架构的对象。 - 通知服务 (
notificationService
): 用于发送通知的服务。 - 信号处理器 (
signalProcessor
): 用于处理信号的处理器。 - 快照服务 (
snapshotterService
): 用于执行快照操作的服务。 - JDBC 连接 (
jdbcConnection
): 与 Oracle 数据库通信的 JDBC 连接。 - JDBC 配置 (
jdbcConfig
): 存储 JDBC 连接配置信息的对象。 - 任务上下文 (
taskContext
): 包含任务相关信息的上下文对象。 - 时钟服务 (
clock
): 提供时间服务的对象。 - 分发器 (
dispatcher
): 用于分发数据变更事件的对象。 - 元数据提供者 (
metadataProvider
): 提供元数据信息的对象。 - 主题命名策略 (
topicNamingStrategy
): 用于确定 Kafka 主题名称的策略。 - 元数据提供者 (
metadataProvider
): 提供元数据信息的对象。 - 度量指标 (
streamingMetrics
): 用于收集度量指标的对象。
方法
ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator
- 功能: 创建一个
ChangeEventSourceCoordinator
实例,用于管理数据变更事件的源。 - 作用: 协调器负责管理数据变更事件的捕获过程,包括启动、停止以及处理事件流。
private void checkArchiveLogDestination(OracleConnection connection, String destinationName)
- 功能: 验证数据库归档日志目的地的有效性。
- 作用: 确保归档日志配置正确,避免因配置问题导致无法捕获数据变更事件。
private OracleConnection getHeartbeatConnection(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig)
- 功能: 创建一个新的
OracleConnection
实例用于心跳检测。 - 作用: 心跳连接用于定期向数据库发送心跳信号,确保连接的有效性和可用性。
@Override public List<SourceRecord> doPoll() throws InterruptedException
- 功能: 从队列中获取数据变更事件。
- 作用: 定期从事件队列中获取新的数据变更事件,并将其转换为
SourceRecord
对象。
@Override public void doStop()
- 功能: 关闭 JDBC 连接及相关的资源。
- 作用: 当任务不再需要运行时,释放占用的资源,避免资源泄露。
@Override protected Iterable<Field> getAllConfigurationFields()
- 功能: 返回
OracleConnectorConfig
中的所有字段。 - 作用: 提供所有配置项的列表,这有助于调试和配置验证。
private void validateRedoLogConfiguration(OracleConnectorConfig config, SnapshotterService snapshotterService)
- 功能: 检查数据库是否处于归档模式。
- 作用: 确认数据库配置支持 Debezium 所需的重做日志功能,以保证能够正确捕获数据变更事件。
private static boolean redoLogRequired(OracleConnectorConfig config, SnapshotterService snapshotterService)
- 功能: 检查重做日志是否对于当前配置是必需的。
- 作用: 如果配置要求流式传输或者事务快照边界模式为 ALL,则重做日志是必需的。