一、前提条件
已经有mysql实例 自建或者云上都可以
已经在阿里云的kakfa创建topic 默认不自动创建topic
二、配置要监控的instance
先修改example的配置文件 conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address={mysql地址加端口}
......
# username/password
canal.instance.dbUsername={{ 数据库用户名 }}
canal.instance.dbPassword={{ 数据库密码 }}
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex={{ 数据库正则匹配规则 }}
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# mq config
canal.mq.topic={{ topic主题 需要提前到阿里云kafka创建 }}
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic= {{ 主题规则 }}
三、修改canal的配置文件
canal.properties
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.serverMode = kafka {{ 重要 }}
。。。。。。
canal.destinations = {{ 文件夹名称 会去这个文件夹读取instance配置文件 }}
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = {{ kafka公网地址加端口 }}
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.ssl.truststore.location= ../conf/only.4096.client.truststore.jks {{ 重要}}
kafka.ssl.truststore.password= KafkaOnsClient {{ 重要}}
kafka.security.protocol= SASL_SSL {{ 重要}}
kafka.sasl.mechanism = PLAIN {{ 重要}}
kafka.ssl.endpoint.identification.algorithm = {{ 重要}}
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
!!!阿里云的kafka要记得配置白名单
四、在conf文件夹创建kafka_client_producer_jaas.conf文件 用来存储阿里云kafka的用户名密码
内容为
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="用户名"
password="密码";
};
然后修改bin/startup.sh启动脚本 在JAVA_OPTS里面添加-Djava.security.auth.login.config=/root/canal/conf/kafka_client_producer_jaas.conf
五、only.4096.client.truststore.jks文件上传到conf文件夹
六、启动canal 进bin目录
sh startup.sh
看下日志
七、修改规则中的数据库数据 触发canal 推送到阿里云kafka 测试没有问题
阿里云kafka文档: https://help.aliyun.com/document_detail/273086.html#table-r3w-zhj-i8v
标签:canal,xml,spring,对接,kafka,instance,mq From: https://www.cnblogs.com/seablogs/p/17050273.html