首页 > 数据库 >Flink oracle-cdc 配置

Flink oracle-cdc 配置

时间:2022-10-25 16:57:21浏览次数:52  
标签:flink dbzuser GRANT Flink cdc oracle alter SELECT

一、 oracle开启备份日志:

shutdown immediate 

startup mount

alter database archivelog;

archive log list;

show parameter db_recovery_file_dest_size;
alter system set db_recovery_file_dest_size=10G;  #日志缓冲区大小,根据业务和磁盘情况定

alter database open;

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; 
ALTER TABLE HEALTHEHR.EHR_PIR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

alter tablespace HEALTHEHR begin backup;

备注:归档日志会占用磁盘空间,注意及时情理,linux 系统可以配置归档日志定时情理,

开启归档日志同步后,重启数据库需要先停止日志同步,如:alter tablespace HEALTHEHR end backup;

二、 Flink连接设置

java.util.Properties properties = new Properties();
        properties.put("database.tablename.case.insensitive","false");//11g数据库适配
     properties.setProperty("database.connection.adapter", "logminer");
     // 要同步快,这个配置必须加,不然非常慢
     properties.setProperty("log.mining.strategy", "online_catalog");
properties.setProperty("log.mining.continuous.mine", "true");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname("")
                .port(1521)
                .database("ORCL") // monitor XE database
                .schemaList("HEHR") // monitor inventory schema
                .tableList("HEAR.ehr_pir") // monitor products table
                .username("hhr")
                .password("hear")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .debeziumProperties(properties)
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink<String> stringDataStreamSource =
        env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
        //stringDataStreamSource.
        env.execute();

 

引入的pom 如下:

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <!-- the dependency is available only for stable releases. -->
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

 

 

三、官方建议新创建一个用户专门用于数据同步,如下

CREATE TABLESPACE logminer_tbs DATAFILE '/data/db/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    
  CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs  ;

  GRANT CREATE SESSION TO dbzuser  ;
  GRANT SET CONTAINER TO dbzuser  ;
  GRANT SELECT ON V_$DATABASE to dbzuser  ;
  GRANT FLASHBACK ANY TABLE TO dbzuser  ;
  GRANT SELECT ANY TABLE TO dbzuser  ;
  GRANT SELECT_CATALOG_ROLE TO dbzuser  ;
  GRANT EXECUTE_CATALOG_ROLE TO dbzuser  ;
  GRANT SELECT ANY TRANSACTION TO dbzuser  ;
  GRANT LOGMINING TO dbzuser  ;

  GRANT CREATE TABLE TO dbzuser  ;
  GRANT LOCK ANY TABLE TO dbzuser  ;
  GRANT CREATE SEQUENCE TO dbzuser  ;

  GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser  ;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser  ;

  GRANT SELECT ON V_$LOG TO dbzuser  ;
  GRANT SELECT ON V_$LOG_HISTORY TO dbzuser  ;
  GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser  ;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser  ;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser  ;
  GRANT SELECT ON V_$LOGFILE TO dbzuser  ;
  GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser  ;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser  ;
  GRANT SELECT ON V_$TRANSACTION TO dbzuser  ;

  exit;

四、备注官方文档:

https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/oracle-cdc.html#oracle-cdc-connector

https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/

https://gitee.com/harveyTuan/flink/

https://blog.csdn.net/z3191595/article/details/123072101

五、错误处理

ORA-19809: 超出了恢复文件数的限制      -->  调整次参数大小: alter system set db_recovery_file_dest_size=10G;

 

标签:flink,dbzuser,GRANT,Flink,cdc,oracle,alter,SELECT
From: https://www.cnblogs.com/leolzi/p/16825422.html

相关文章

  • RocketMQ Flink Catalog 设计与实践
    摘要:本文为RocketMQFlinkCatalog使用指南。主要内容包括:Flink和FlinkCatalogRocketMQFlinkConnectorRocketMQFlinkCatalog作者:李晓双,ApacheRocketMQContribut......
  • DBeaver连接Oracle数据库报错
    DBeaver,免费的多平台数据库工具,适用于开发人员,数据库管理员,分析师和所有需要使用数据库的人员。支持所有流行的数据库:MySQL,PostgreSQL,SQLite,Oracle,DB2,SQLServer,Sybase,MSA......
  • FlinkSql的窗口使用以及运用案例
    1flinkSQL窗口概述1.1窗口定义:可理解为时间轴,可将无界流切分成有界流1.2窗口分类:TimeWindow:通过时间切割窗口,但是不知道窗口有多少数据滑动窗口滚动窗口......
  • Oracle动态监听及静态监听区别
    作者:IT邦德中国DBA联盟(ACDU)成员,目前从事DBA及程序编程(Web\java\Python)工作,主要服务于生产制造现拥有Oracle11gOCP/OCM、Mysql、Oceanbase(OBCA)认证分布式TBase\TDSQL数......
  • Oracle的服务器端和客户端同时安装Sqlplus无法登陆的处理
    现象:1.在Server2012安装完数据库,可正常登陆,服务器认证如下正常2.可是安装完客户端后,Sqlplus无法登陆,如下报错2、问题解决自己分析原因:应该是环境变量中自动调用的oracle......
  • Oracle数据库的两种授权收费方式介绍!
    首发微信公众号:SQL数据库运维原文链接:https://mp.weixin.qq.com/s?__biz=MzI1NTQyNzg3MQ==&mid=2247485212&idx=1&sn=450e9e94fa709b5eeff0de371c62072b&chksm=ea37536cdd......
  • Oracle-11g静默安装-netca.rsp
    响应模板文件netca.rsp,可以静默模式运行netca命令,配置并启动Oracle网络监听listener.ora和网络服务tnsnames.ora。##复制文件以备不时之需cp/home/oracle/database/respons......
  • 第3章 flink安装与部署
    2.1flink集群部署stanalone模式是指在裸机上运行flink,通过自身的资源调度器来运行,一般这种方式不推荐,flink集群一般与其它集群,像spark,Hadoop等共存,所以在底层需要有一套资......
  • 第2章 Flink快速上手
    Wordcount在大数据中有点像HelloWorld,当我们输出HelloWorld的时候,就说明程序执行成功了,同样在大数据项目中如果成功的统计出了文本或者socket流中的单词数量,也相当于成功......
  • Oracle JDK 和 OpenJDK 有什么区别?
    OpenJDK是Sun在2006年末把Java开源而形成的项目,这里的“开源”是通常意义上的源码开放形式,即源码是可被复用的,例如IcedTea、UltraViolet都是从OpenJDK源码衍生出的发行版。......