回顾一次容器环境的MySQL、canal、Elasticsearch数据同步
MySQL和Elasticsearch安装初始化就不展示了,版本如下:
sql表关键字段如下:
CREATE TABLE `fault_code` (
`title` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,
`description` varchar(512) CHARACTER SET utf8mb4 DEFAULT NULL,
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
主要是同步title和description
Elasticsearch的映射:
PUT /fault_code
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_smart"
},
"description": {
"type": "text",
"analyzer": "ik_smart"
}
}
}
}
为canal专门创建一个db 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'testcanal';
GRANT SELECT, REPLICATION SLAVE, SHOW DATABASES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
拉取canal镜像,由于上面MySQL、Elasticsearch版本已经确定,根据官方文档canal版本选择1.1.5
docker pull canal/canal-server:v1.1.5
提前创建 /mydata/canal/conf目录,接着来执行拷贝配置文件命令
启动canaltest,拷贝配置文件出来进行配置
docker run --name canaltest \
-p 11111:11111 \
-id canal/canal-server:v1.1.5
把配置文件拷贝出来
docker cp canaltest:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/
接着对instance.properties文件进行修改:
# 这个配置不能和MySQL主库的id相同
canal.instance.mysql.slaveId=22
# 主库MySQL地址,根据需求去配,也可配置k8s暴露的端口
canal.instance.master.address=192.168.56.10:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=testcanal
# 想要同步的数据表,可以使用正则
canal.instance.filter.regex=test.fault_code
主要是上面几个属性
删除原来的canal服务:
docker rm -f canaltest
启动canal服务:
docker run --name canal115 \
-p 11111:11111 \
-v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-id canal/canal-server:v1.1.5
进入canal容器:
docker exec -it canal115 /bin/bash
到指定目录查看日志情况
cd canal-server/logs/example/
tail -50 example.log
第一次查看日志发现问题:
CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
with command: show master status
重新赋予权限,并重启MySQL之后,canal就可以正常使用
拉取docker pull slpcat/canal-adapter:v1.1.5-jdk8镜像
docker pull slpcat/canal-adapter:v1.1.5-jdk8
启动docker canal adapter
docker run --name canaladapter
-p 8081:8081
-id slpcat/canal-adapter:v1.1.5-jdk8
把两个配置文件拷贝出来:
canaladapter的配置文件拷贝出来
docker cp canaladapter:/opt/canal-adapter/conf/application.yml /mydata/canaladapter/conf/
es的映射文件拷贝出来
docker cp canaladapter:/opt/canal-adapter/conf/es7/faultcode.yml /mydata/canaladapter/conf/
修改两个配置文件
application.yml:
需要改的有下面几行:
canal.tcp.server.host: 127.0.0.1:11111
源数据:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.56.10:3306/test?useUnicode=true
username: canal
password: testcanal
- name: es
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
faultcode.yml:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: fault_code
_id: id
fieldMapping:
title: title # 将 MySQL 的 title 字段映射到 Elasticsearch 的 title 字段
description: description # 将 MySQL 的 description 字段映射到 Elasticsearch 的 description 字段
sql: "select fc.id, fc.title, fc.description from fault_code fc"
commitBatch: 3000
删除旧的canaladapter
启动新的canaladapter
docker run --name canaladapter \
-p 8081:8081 \
-v /mydata/canaladapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
-v /mydata/canaladapter/conf/faultcode.yml:/opt/canal-adapter/conf/es7/faultcode.yml \
-id slpcat/canal-adapter:v1.1.5-jdk8
发现启动不了adapter
这个问题应该是格式不对,修改了一下格式,再度重启canal adapter
docker restart 029aa4c345af
重启之后,canal adapter可以正常启动,但是进入容器查看canal adapter日志
docker exec -it cd0b5d5 /bin/bash
cd logs/adapter/
tail -100 adapter.log
2024-12-26 00:28:00.194 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2024-12-26 00:28:00.194 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2024-12-26 00:28:00.206 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## something goes wrong when starting up the canal client adapters:
java.lang.NullPointerException: null
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:51) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_282]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_282]
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]
这个经过搜索确定是格式上的错误,把canal adapter的格式又修改了一下重启:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.56.10:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.56.10:3306/test?useUnicode=true
username: canal
password: testcanal
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es
hosts: 192.168.56.10:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
启动之后发现还是有问题
2024-12-26 00:50:14.161 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - process error!
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198) ~[na:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115) ~[na:na]
at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:59) ~[na:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:184) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_282]
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_282]
at sun.nio.ch.Net.connect(Net.java:482) ~[na:1.8.0_282]
at sun.nio.ch.Net.connect(Net.java:474) ~[na:1.8.0_282]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) ~[na:1.8.0_282]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:150) ~[na:na]
... 4 common frames omitted
看到错误猜测是某个连接没配对,再去看看配置,应该是canal.tcp.server.host: 192.168.56.10:11111没有配置成真实地址
改完再试一下
这一次又有新问题:
Extension instance(name: es, class: interface com.alibaba.otter.canal.client.adapter.OuterAdapter) could not be instantiated: class could not be found
这个是name需要修改一下:
- name: es7
hosts: 192.168.56.10:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
修改完之后application.yml文件如下:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.56.10:11111 #canal server暴露的端口
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.56.10:3306/test?useUnicode=true
username: canal
password: testcanal
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7 # 要选择自己的es版本
hosts: 192.168.56.10:9200 # 127.0.0.1:9200 for rest mode Elasticsearch服务暴露的端口
properties:
mode: rest # or rest restful
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
docker restart cd0b5d5b98b7
进入容器查看日志,发现已经启动成功
测试新增同步:
测试修改同步:
测试删除同步:
标签:canal,5.0,jdbc,java,jar,Elasticsearch,MySQL,RELEASE From: https://www.cnblogs.com/wbstudy/p/18632202