首页 > 其他分享 >Canal

Canal

时间:2022-10-18 23:24:11浏览次数:40  
标签:Canal canal val -- kafka ## mysql

Canal

Canal介绍
    功能:通过读取Mysql的Binlog,实时采集数据库数据的变化写到消息队列。
    原理:将自己伪装成Slave,假装从Master复制数据
    使用场景:1.异地数据库之间的同步  2.更新缓存,读取主库更新,在缓存服务器中更新 3.实时更新
        文档地址:https://github.com/alibaba/canal/wiki
Binlog
    Binlog:记录了DDL和DML的语句,以事件的方式记录,包含了消耗的事件。会有1%性能损耗
    Binlog好处:1.主节点Master开启Binlog,Master把他的二进制日志传递给Slaves来达到主从一致
        2.可以通过Binlog工具恢复数据
    Binlog内容:.index二进制日志索引文件    .00000*记录数据库所有的DDL,DML
    Binlog分类:1.statsment 语句级别:记录每一次写操作的语句 
            节省空间,但是可能会造成数据不一致(当前事件,随机数等)
        2.row 行级别:记录每次操作后每行的变化  数据一致,但是比较占用空间
        3.mixed statsment结合版本  默认使用statsment,特殊情况用row

单机环境安装:https://www.cnblogs.com/wuxiaolong4/p/16548910.html

mysql开启Binlog
    show variables like 'log_%';  log_bin=on ##默认开启
    show variables like 'binlog_format';     ##开启方式
    vim /etc/my.cnf  ##没有验证,mysql是默认开启的
        log_bin=mysql-bin 
        binlog_format=row  
        binlog-do-db=test(也可以不要,监听所有库)
    service mysql stop;service mysql start;
mysql授权canal用户
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
Canal 安装
    mkdir /opt/canal
    cd /opt/canal
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.deployer-1.1.7-SNAPSHOT.tar.gz
    tar -xvf canal.deployer-1.1.7-SNAPSHOT.tar.gz
    mv canal.deployer-1.1.7-SNAPSHOT.tar.gz canal.tar.gz
    mv canal.tar.gz /opt/pkg/

    cd /opt/canal/conf
    example:下面是监控的实例    canal.properties:配置

    vim canal.properties
    canal.serverMode = tcp //数据输出的位置
    canal.destinations = example //监控多个实例
    #canal.admin.user = admin
    #canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

    vim example/instance.properties
    canal.instance.mysql.slaveId=20  //和mysql的ID不一样就行
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
canal 代码依赖、数据类介绍
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.0</version>
    </dependency>

    Message:一次Canal从抓取的信息,可以包含多个SQL的结果
    Entry:对应一个sql结果,一个sql可能会对多行记录造成影响
        TableName EntryType(操作类型DDM,DML) StoreValue(序列化)==>RowChange(反序列化)
    RowChange:EventType(当前数据类型) RowDataList数据集合(RowData:column)
canal 代码
    import java.net.InetSocketAddress

    import scala.collection.JavaConversions._
    import com.alibaba.otter.canal.client.CanalConnectors
    import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, RowChange}

    val canalConnector = CanalConnectors.newSingleConnector(
        new InetSocketAddress("114.116.44.117",11111), "example", "", "");
    canalConnector.connect()

    canalConnector.subscribe("test.*")//订阅数据库

    while(true){
      val message = canalConnector.get(100).getEntries //获取100条数据,非阻塞式
      if(message.size>0){
        for(messageiter <- message){
          val tablename = messageiter.getHeader.getTableName
          val entryType= messageiter.getEntryType//数据类型
          val storeValue = messageiter.getStoreValue//数据二进制
          if(entryType.equals(EntryType.ROWDATA)){//数据类型必须是ROWDATA
            val rowChange = RowChange.parseFrom(storeValue)//反序列化数据
            val eventType= rowChange.getEventType //当前事件的操作类型
            println("eventType:"+eventType)
            val rowData= rowChange.getRowDatasList.iterator() //返回数据集合

            for(data <- rowData){
              val before = data.getBeforeColumnsList
              for(beforeItem <- before){
                println("eventType:"+eventType+"|before:"+beforeItem.getName+":"+beforeItem.getValue)
              }
              val after = data.getAfterColumnsList
              for(afterItem <- after){
                println("eventType:"+eventType+"|before:"+afterItem.getName+":"+afterItem.getValue)
              }
            }
          }
        }
      }else{
        println("nodata")
        Thread.sleep(5000);
      }
    }
mysql kafka环境信息
    cd /opt/mysql/mysql/bin
    ./mysql -u root -p'123456'


    cd /opt/kafka/kafka/bin/
    ##启动ZK
    ./zookeeper-server-start.sh -daemon /opt/kafka/kafka/config/zookeeper.properties  
    ##启动kafka
    ./kafka-server-start.sh -daemon /opt/kafka/kafka/config/server.properties
    ##删除topics
    ./kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic events
    ##创建topics
    ./kafka-topics.sh --create --topic events --bootstrap-server localhost:9092
    ##desc topics
    ./kafka-topics.sh --describe --topic events --bootstrap-server localhost:9092
    ##创建命令行生产者
    ./kafka-console-producer.sh --topic events --bootstrap-server localhost:9092
    ##打印topic
    ./kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server localhost:9092
canal kafka
    canal.properties
        canal.serverMode = kafka
    instance.properties
        canal.mq.topic=events
canal.mq.partitionsNum=3 ##分区数量
canal.mq.partitionHash=test.table:id ##列名作为hash的值

标签:Canal,canal,val,--,kafka,##,mysql
From: https://www.cnblogs.com/wuxiaolong4/p/16804587.html

相关文章

  • DOCKER 部署 CANAL
    DOCKER部署CANAL1、MYSQL开启binlog前提MYSQL已经安装完成,canal采用读取Mysql的binlog日志来实现数据同步,需要修改mysql配置为难my.cnf,并将binlog的格式模式设置为ROW,其......
  • canal全量同步到ES
     参考文档:https://blog.csdn.net/zlt2000/article/details/115291950一、ETL接口adapter 的 ETL 接口为:/etl/{type}/{task}默认web端口为 8081type 为类型(hba......
  • Canal + RabbitMQ 实现监听 MySQL 数据库
    第一步:开启Mysql Biglog日志,Mysql8.0以上默认开启日志(window路径:C:\ProgramData\MySQL\MySQLServer8.0\mysql.ini)1.添加配置[mysqld]log-bin=mysql-bin#开启bi......
  • k8s部署canal-server使用configMap挂载方式报Read Only file System
    k8s部署canal-server使用configMap挂载方式报ReadOnlyfileSystem1.1、问题复现由于部署canal-server时,需要修改主库master的数据库连接信息以及配置zookeeper,所以为了......
  • Canal Install and Usage
    下载并解压到本地wgethttps://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gztarzvfxcanal.deployer-1.1.4.tar.gz配置MySQ......
  • Canal 原理说明和Mysql+Canal+kafaka 按装说明
    Canal原理说明:主要应用场景Mysql与Redis可靠一致性,因为msyql修改变更将数据加到kafka队列可以确保存数据一定会被更新到redis,kafka有重试和可以确保被消费。使用阿里的......
  • CloudCanal x StarRocks 在医疗大健康实时数仓领域的落地与实践
    -#简述本案例为国内某大健康领域头部公司真实案例(因用户保密要求,暂不透露用户相关信息)。希望文章内容对各位读者使用CloudCanal构建实时数仓带来一些帮助。 #业......
  • 5分钟搞定MySQL/PostgreSQL/Oracle到StarRocks数据迁移同步-CloudCanal实战
    ##简述CloudCanal2.1.0.x版本开始支持StarRocks作为对端的数据迁移同步能力本文通过MySQL->StarRocks的数据迁移同步案例简要介绍这个源端的能力。链路特点:-结......
  • canal 同步
    canal从mysql已经能同步到oracle了,现在需要将mysql中的实例换成另外一个实例。改了application.yml和slave中的instance.properties 重启后 在mysql新的实例建......
  • Canal 同步表级DDL 表过滤 正则
    原文:Canal常用配置-EasyCms--博客园(cnblogs.com) 基于日志增量订阅&消费的业务:数据库镜像数据库实时备份多级索引(卖家和买家各自分库索引)searchbuild......