一、 oracle开启备份日志:
shutdown immediate startup mount alter database archivelog; archive log list; show parameter db_recovery_file_dest_size; alter system set db_recovery_file_dest_size=10G; #日志缓冲区大小,根据业务和磁盘情况定 alter database open; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER TABLE HEALTHEHR.EHR_PIR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; alter tablespace HEALTHEHR begin backup;
备注:归档日志会占用磁盘空间,注意及时情理,linux 系统可以配置归档日志定时情理,
开启归档日志同步后,重启数据库需要先停止日志同步,如:alter tablespace HEALTHEHR end backup;
二、 Flink连接设置
java.util.Properties properties = new Properties(); properties.put("database.tablename.case.insensitive","false");//11g数据库适配
properties.setProperty("database.connection.adapter", "logminer");
// 要同步快,这个配置必须加,不然非常慢
properties.setProperty("log.mining.strategy", "online_catalog");
properties.setProperty("log.mining.continuous.mine", "true");
SourceFunction<String> sourceFunction = OracleSource.<String>builder() .hostname("") .port(1521) .database("ORCL") // monitor XE database .schemaList("HEHR") // monitor inventory schema .tableList("HEAR.ehr_pir") // monitor products table .username("hhr") .password("hear") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .debeziumProperties(properties) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSink<String> stringDataStreamSource = env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering //stringDataStreamSource. env.execute();
引入的pom 如下:
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <!-- the dependency is available only for stable releases. --> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
三、官方建议新创建一个用户专门用于数据同步,如下
CREATE TABLESPACE logminer_tbs DATAFILE '/data/db/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs ; GRANT CREATE SESSION TO dbzuser ; GRANT SET CONTAINER TO dbzuser ; GRANT SELECT ON V_$DATABASE to dbzuser ; GRANT FLASHBACK ANY TABLE TO dbzuser ; GRANT SELECT ANY TABLE TO dbzuser ; GRANT SELECT_CATALOG_ROLE TO dbzuser ; GRANT EXECUTE_CATALOG_ROLE TO dbzuser ; GRANT SELECT ANY TRANSACTION TO dbzuser ; GRANT LOGMINING TO dbzuser ; GRANT CREATE TABLE TO dbzuser ; GRANT LOCK ANY TABLE TO dbzuser ; GRANT CREATE SEQUENCE TO dbzuser ; GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser ; GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser ; GRANT SELECT ON V_$LOG TO dbzuser ; GRANT SELECT ON V_$LOG_HISTORY TO dbzuser ; GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser ; GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser ; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser ; GRANT SELECT ON V_$LOGFILE TO dbzuser ; GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser ; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser ; GRANT SELECT ON V_$TRANSACTION TO dbzuser ; exit;
四、备注官方文档:
https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/oracle-cdc.html#oracle-cdc-connector
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/
https://gitee.com/harveyTuan/flink/
https://blog.csdn.net/z3191595/article/details/123072101
五、错误处理
ORA-19809: 超出了恢复文件数的限制 --> 调整次参数大小: alter system set db_recovery_file_dest_size=10G;
标签:flink,dbzuser,GRANT,Flink,cdc,oracle,alter,SELECT From: https://www.cnblogs.com/leolzi/p/16825422.html