首页 > 数据库 >linux下试验中间件canal的example示例-binlog日志的实时获取显示以及阿里巴巴中间件canal.kafka将mysql-bin日志直接传入kafka消息队列

linux下试验中间件canal的example示例-binlog日志的实时获取显示以及阿里巴巴中间件canal.kafka将mysql-bin日志直接传入kafka消息队列

时间:2024-08-25 13:25:53浏览次数:14  
标签:canal 10 中间件 kafka 2018 mysql root

一、linux下试验中间件canal的example示例-binlog日志的实时获取显示

    今天重装mysql后,进行了canal的再次试验,原来用的mysql5.7, 今天重装直接换了5.6算了。反正测试服务器的mysql也不常用。canal启动后日志显示examplep repare to find start position just show master status 开始寻找开始点,因为canal的配置中canal.instance.master.position项没有填写,所以这个如果mysql没有数据变化canal会一直停留在这里直到mysql数据库有变化,也即是从启动canal开始认为是数据变化的起始点 publish:October 23, 2018 -Tuesday。

==> canal/canal.log <==
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
2018-10-23 17:05:17.093 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2018-10-23 17:05:17.183 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2018-10-23 17:05:17.186 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-10-23 17:05:17.325 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.90.123:11111]

==> example/example.log <==
2018-10-23 17:05:18.261 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-10-23 17:05:18.268 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2018-10-23 17:05:18.675 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2018-10-23 17:05:18.804 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-10-23 17:05:18.805 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2018-10-23 17:05:19.164 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2018-10-23 17:05:19.741 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2018-10-23 17:05:19.977 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2018-10-23 17:05:19.977 [destination = example , address = /192.168.90.123:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status

==> canal/canal.log <==
2018-10-23 17:05:20.090 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

==> example/example.log <==
2018-10-23 17:06:09.908 [destination = example , address = /192.168.90.123:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - find start position : EntryPosition[included=false,journalName=mysql-bin.000007,position=4961,serverId=1,gtid=<null>,timestamp=1540285200000]

        数据发生变化后,example日志里显示连接到了主库的日志及position等数据,然后这里就不再显示其它东西了,原来我以为这里也会显示mysql的bin日志,怪不得必须要启动canal的client客户端连接canal。如之前文章所述:https://linge.blog.csdn.net/article/details/141367606 最后下载canal.example压缩包:https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.example-1.1.0.tar.gz 

        解压到/opt/modules/canal_example目录,启动后查看日志: 

cd /opt/modules/canal_example
bin/startup.sh
cd logs
tail -f canal/entry.log  example/entry.log 
#在mysql进行数据变更后,这里会显示mysql的bin日志。
****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2018-10-23 17:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2018-10-23 17:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2018-10-23 17:25:35)] 
****************************************************

================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2018-10-23 17:25:35) , gtid : () , delay : 393ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2018-10-23 17:25:35) , gtid : () , delay : 393 ms
id : 8    type=int(10) unsigned
name : 512    type=varchar(255)
----------------
 END ----> transaction id: 249
================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2018-10-23 17:25:35) , gtid : () , delay : 394ms

****************************************************
* Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2018-10-23 17:27:49
* Start : [mysql-bin.000007:6387:1540286869000(2018-10-23 17:27:49)] 
* End : [mysql-bin.000007:6563:1540286869000(2018-10-23 17:27:49)] 
****************************************************

================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2018-10-23 17:27:49) , gtid : () , delay : 976ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2018-10-23 17:27:49) , gtid : () , delay : 976 ms
id : 21    type=int(10) unsigned    update=true
name : aaa    type=varchar(255)    update=true
----------------
 END ----> transaction id: 250
================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2018-10-23 17:27:49) , gtid : () , delay : 977ms

****************************************************
* Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2018-10-23 17:28:22
* Start : [mysql-bin.000007:6594:1540286902000(2018-10-23 17:28:22)] 
* End : [mysql-bin.000007:6782:1540286902000(2018-10-23 17:28:22)] 
****************************************************

================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2018-10-23 17:28:22) , gtid : () , delay : 712ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2018-10-23 17:28:22) , gtid : () , delay : 712 ms
id : 21    type=int(10) unsigned
name : aaac    type=varchar(255)    update=true
----------------
 END ----> transaction id: 252
================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2018-10-23 17:28:22) , gtid : () , delay : 713ms

 

        从日志里可以明显看到几个重要字段:

eventType表示了此次行类类别,如UPDATE,INSERT,以及涉及的ID。
name显示了是操作了哪个数据库的哪张表,如上面即是操作了canal数据库的canal_table表。如此也算是成功走完了canal的功能流程。 

二、阿里巴巴中间件canal.kafka将mysql-bin日志直接传入kafka消息队列

    publish:October 25, 2018 -Thursday 不需要另外再单独安装canal,阿里巴巴有一个包含canal服务端和kafka对接的包:canal.kafka-1.1.0.tar.gz。github主页:

https://github.com/alibaba/canal/releases 

    从v1.0.26 alpha 4开始释放此包。canal.kafka-1.1.0解压后启动更改配置后直接和kafka相结合,将获取的mysql的bin日志传入kafka消息队列供后续各种使用。Canal服务端与消息队列Kafka以及RocketMQ快速开始QuickStart文档见github:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    先说一下实现mysql的bin日志向kafka的消息队列传送有了canal.kafka后不再需要canal独立服务器安装。只需要:canal.kafka + mysql + kafka服务。启动了canal_kafka了,相当于同时启动了canal服务端以及与kafka的传输通道,接下来只要启动kafka和消费端即可。实际canal.kafka就相当于kafka的生产端。

#下载安装canal.kafka-1.1.0
[root@123 download]# mwget https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.kafka-1.1.0.tar.gz
[root@123 download]# mkdir /opt/modules/canal_kafka 
[root@123 canal_kafka]# tar zxvf canal.kafka-1.1.0.tar.gz -C /opt/modules/canal_kafka
[root@123 canal_kafka]# cd /opt/modules/canal_kafka/
[root@123 canal_kafka]# ll
total 16
drwxr-xr-x 2 root root 4096 Oct 24 15:47 bin
drwxr-xr-x 5 root root 4096 Oct 24 15:47 conf
drwxr-xr-x 2 root root 4096 Oct 24 15:47 lib
drwxrwxrwx 2 root root 4096 Aug 20 13:55 logs
[root@123 canal_kafka]# ll conf/
total 24
-rwxrwxrwx 1 root root 3528 Aug 20 13:31 canal.properties
drwxrwxrwx 2 root root 4096 Oct 24 15:47 example
-rwxrwxrwx 1 root root  403 Aug 20 13:31 kafka.yml
-rwxrwxrwx 1 root root 3094 Aug 20 13:31 logback.xml
drwxrwxrwx 2 root root 4096 Oct 24 15:47 metrics
drwxrwxrwx 3 root root 4096 Oct 24 15:47 spring
[root@123 canal_kafka]# vim conf/example/instance.properties
## mysql serverId , v1.0.26+ will autoGen
#编辑一个ID不要的mysql的server-id以及其它的canal的id一样即可
canal.instance.mysql.slaveId=3

# position info:mysql的IP和端口
canal.instance.master.address=192.168.90.123:3306

# username/password:供canal连接mysql的账号
canal.instance.dbUsername=你的username
canal.instance.dbPassword=你的password
canal.instance.connectionCharset=UTF-8
#修改canal_kafka中的kafka配置,主要就是修改servers和topic(和后面启动的kafka消费队列topic一致)
[root@123 canal_kafka]# vim conf/kafka.yml 
servers: 127.0.0.1:9092
canalDestinations:
  - canalDestination: example
    topic: kermitMQ
    partition:
[root@123 canal_kafka]# vim conf/canal.properties
#这个文件不用动,基本使用默认即可。
canal.id= 3
canal.ip=
canal.port=11111
canal.metrics.pull.port=11112
#然后就可以启动canal_kafka了,相当于同时启动了canal服务端以及向kafka传输数据的功能,接下来就是启动kafka了。
[root@123 canal_kafka]# bin/startup.sh 
#在另一个端口进入kafka的目录(我这里之前已经安装好了kafka)
[root@123 canal_kafka]# cd /opt/modules/kafka_2.11-2.0.0/bin
#在终端启动一个消费队列,topic
[root@123 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kermitMQ

        接下来变更mysql数据库里的数据后,在上面的消费队列命令终端就会出现数据,流程上是走通了,只不过显示出来就和直接查看mysql-bin日志一样是乱码,截图如下:

        在实际业务阿里巴巴中有吃饭对应的MQ数据消费的样例工程,包含数据编解码的功能,详细见下方文档链接:

    kafka模式: com.alibaba.otter.canal.client.running.kafka.CanalKafkaClientExample
    rocketMQ模式: com.alibaba.otter.canal.client.running.rocketmq.CanalRocketMQClientExample
https://github.com/alibaba/canal/blob/master/client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

标签:canal,10,中间件,kafka,2018,mysql,root
From: https://blog.csdn.net/weixin_47792780/article/details/141373630

相关文章

  • rocketmq 是参考了 kafka架构, 为什么rocketmq吞吐量是10万/秒, kafka吞吐量是17万/秒
    我们都知道,为了防止消息在服务器丢失,一般都是进行持久化(保存在磁盘),在发送消失时那就涉及到从磁盘拷贝到内核空间,从内核空间到用户态,再从用户态到socket缓存区,从socket缓存区到网卡四次拷贝。kafka使用的是零拷贝-sendfile,把内核态数据发送到网卡,减少两次拷......
  • kafka
    消息队列的流派MQ是什么MessageQueue(MQ)是一种消息队列中间件。MQ的主要作用是通过分离消息的发送和接收来实现应用程序的异步和解耦。然而,MQ的核心目的是通信:它屏蔽了底层复杂的通信协议,并定义了一套更简单的应用层通信协议。在分布式系统中,模块间通信通常使用HTTP......
  • 【NextJS】中间件实战介绍
    原创洞窝技术使用Next.js中间件实现高性能个性化在当今的数字时代,用户期望获得量身定制的在线体验。个性化已经从一个奢侈品变成了必需品,尤其是对于希望在竞争激烈的市场中脱颖而出的企业来说。然而,实现高性能的个性化往往是一个挑战,需要在用户体验和系统性能之间取得......
  • centos7安装Kafka单节点环境部署三-安装Logstash
    1、下载Logstashwgethttps://artifacts.elastic.co/downloads/logstash/logstash-7.17.7-linux-x86_64.tar.gz2、解压到/usr/local/mkdir-p/usr/local/logstash7.17tar-zxflogstash-7.17.7-linux-x86_64.tar.gz-C/usr/local/logstash7.17/--strip-components=1#--......
  • 重生之我要当前端大王--node篇--02express路由,中间件
    重生之我要当前端大王–node篇第一篇章后端服务篇–nodeJS启动!02express路由,中间件前言阅读本章可学习到将接口抽离到独立模块,减少耦合,以及中间件的使用一、路由是什么,有什么用?路由是Express应用中用于处理客户端请求的规则和处理程序。每个路由可以定义一个特定......
  • [消息队列]kafka
    Kafka如何保证消息的消费顺序?我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了2个消息,这2个消息对应的操作分别对应的数据库操作是:更改用户会员等级。根据会员等级计算订单价格。假如这两条消息的消费顺序不一样造成的最终结果就会......
  • 阿里巴巴中间件canal的搭建和使用以及linux命令下使用mail发送html格式的邮件
    一、阿里巴巴中间件canal的搭建和使用    canal可以用来监控数据库数据的变化(binlog日志),从而获得指定数据的变化。canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求时开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅......
  • 浅谈Kafka(一)
    浅谈Kafka(一)文章目录浅谈Kafka(一)Kafa的设计是什么样的数据传输的事务定义消息队列的应用场景Kafka怎么样判断节点是否存活Kafka的消息是采用pull模式还是push模式Kafka在磁盘上的消息格式Kafka高效文件存储设计特点Kafka与传统消息系统之间的区别Kafka的分区数据怎样保......
  • centos7安装Kafka单节点环境部署一-ZooKeeper安装与配置
    由于Kafka运行需要zookeeper配合,zookeeper需要运行在JVM上,所以需要安装JDK,zookeeper。Kafka从2.0.0版本开始就不再支持JDK7及以下版本,就以CentOS764位JDK8为例1、下载ZooKeeperwgethttps://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.ta......
  • canal同步mysql,监听单实例,多实例配置
    1、下载canal安装包canal.adapter-1.1.7.tar.gzcanal.deployer-1.1.7.tar.gz2、修改涉及的文件canal_deployer:/conf/canal.properties/conf/example/instance.propertiescanal_adapter:/conf/application.yml/conf/rdb在rdb中添加对应的数据配置表,即便监听多个mysq......