首页 > 其他分享 >Canal 数据同步 到 Kafka Canal 配置2

Canal 数据同步 到 Kafka Canal 配置2

时间:2022-08-17 19:48:17浏览次数:47  
标签:Canal canal 同步 java protobuf Kafka google return com

配置canal

vim /opt/module/canal/conf/canal.properties
#################################################
#########       common argument     #############
#################################################
# tcp bind ip
canal.id= 10
canal.ip = 192.168.200.106
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
# canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# 配置 模式
# canal.serverMode = kafka

# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
# canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
# canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########       destinations        #############
#################################################
# 配置同步主题
canal.destinations = example

# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########           MQ               #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
#################################################
#########      common argument      #############
#################################################
# tcp bind ip
canal.id= 10
canal.ip = 192.168.200.106
# register ip to zookeeper
canal.register.ip  =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# 配置 模式
# canal.serverMode = kafka

# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########       destinations        #############
#################################################
# 配置同步主题
canal.destinations = example

# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########             MQ             #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########            Kafka           #############
##################################################
# 配置 Kafka 
# kafka.bootstrap.servers = hadoop106:8020,hadoop107:8020,hadoop108:8020
# kafka.acks = -1
# kafka.compression.type = none
# kafka.batch.size = 16384
# kafka.linger.ms = 1
# kafka.max.request.size = 1048576
# kafka.buffer.memory = 33554432
# kafka.max.in.flight.requests.per.connection = 1
# kafka.retries = 0
# kafka.kerberos.enable = false
# kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
# kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
#########     Kafka Kerberos Info    #############
##################################################

canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

配置实例

vim /opt/module/canal/conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234


# position info
canal.instance.master.address=192.168.200.106:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 新增 默认连接库
canal.instance.defaultDatabaseName=FlinkEtl
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

 Java canal_Client

java.canal.bean.RowData 
package canal.bean;

import canal.protobuf.ProtoBufable;
import com.alibaba.fastjson.JSON;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.NoArgsConstructor;
import proto.CanalModel;
import java.util.HashMap;
import java.util.Map;
import static proto.CanalModel.RowData.parseFrom;

@Data
@NoArgsConstructor
public class RowData implements ProtoBufable {
    private String logfilename;//binlog 数据库的操作日志
    private Long logfileoffset;//binlog 偏移量
    private Long executeTime;//操作时间
    private String schemaName;//数据库名
    private String tableName;//表名
    private String eventType;//操作类型
    private Map<String, String> columns;//列信息
    public RowData(Map map) {
        this.logfilename = map.get("logfileName").toString();
        this.logfileoffset = Long.parseLong(map.get("logfileOffset").toString());
        this.executeTime = Long.parseLong(map.get("executeTime").toString());
        this.schemaName = map.get("schemaName").toString();
        this.tableName = map.get("tableName").toString();
        this.eventType = map.get("eventType").toString();
        this.columns = (Map<String, String>)map.get("columns");
    }
    public RowData(String logfilename,
                   Long logfileoffset,
                   Long executeTime,
                   String schemaName,
                   String tableName,
                   String eventType,
                   Map<String, String> columns) {
        this.logfilename = logfilename;
        this.logfileoffset = logfileoffset;
        this.executeTime = executeTime;
        this.schemaName = schemaName;
        this.tableName = tableName;
        this.eventType = eventType;
        this.columns = columns;
    }
    //反序列化构造
    public RowData(byte[] bytes) {
        try {
            CanalModel.RowData rowData = parseFrom(bytes); //parseFrom是canalModle中的静态方法
            this.logfilename = rowData.getLogfileName();
            this.logfileoffset = rowData.getLogfileOffset();
            this.executeTime = rowData.getExecuteTime();
            this.tableName = rowData.getTableName();
            this.eventType = rowData.getEventType();
            // 将所有map列值添加到可变HashMap中
            this.columns = new HashMap<String, String>();
            columns.putAll(rowData.getColumnsMap());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    }
    //序列化方法
    @Override
    public byte[] toByte() {
        CanalModel.RowData.Builder builder = CanalModel.RowData.newBuilder();
        builder.setLogfileName(this.logfilename);
        builder.setLogfileOffset(this.logfileoffset);
        builder.setExecuteTime(this.executeTime);
        builder.setTableName(this.tableName);
        builder.setEventType(this.eventType);
        for (String key : this.columns.keySet()) {
            builder.putColumns(key, this.columns.get(key));
        }
        return builder.build().toByteArray();
    }
    @Override
    public String toString() {
        return JSON.toJSONString(this);
//        return this.logfilename;
    }
}
java.canal.protobuf.ProtoBufable 
package canal.protobuf;

/**
 * ProtoBuf序列化接口
 * 所有能够使用ProtoBuf序列化的bean都应该实现该接口
 */
public interface ProtoBufable {
     /**
     * 将对象转换为字节数组
     * @return 字节数组
     */
    byte[] toByte();
}
java.canal.protobuf.ProtoBufSerializer 
package canal.protobuf;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;

//protobuf的序列化类
public class ProtoBufSerializer implements Serializer<ProtoBufable> {

    @Override
    public void configure(Map<String, ?> map, boolean b) { }

    @Override
    public byte[] serialize(String s, ProtoBufable protoBufable) {
        return protoBufable.toByte();
    }

    @Override
    public void close() { }

}
java.canal_client.kafka.KafkaSender 
package canal_client.kafka;

import canal.bean.RowData;
import canal_client.util.ConfigUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaSender {

    private Properties kafkaProps = new Properties();
    private KafkaProducer<String, RowData> kafkaProducer;
    //配置kafka参数
    public KafkaSender() {
        kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
        kafkaProps.put("acks", ConfigUtil.kafkaAcks());
        kafkaProps.put("retries", ConfigUtil.kafkaRetries());
        kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
        kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
        kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());
        kafkaProducer = new KafkaProducer<String, RowData>(kafkaProps);
    }

    //发送信息
    public void send(RowData rowData) {
        String string = rowData.toString();
        System.out.println("have sent:"+string);
        kafkaProducer.send(new ProducerRecord<String, RowData>(ConfigUtil.kafkaTopic(), rowData));
}

}
java.canal_client.util.ConfigUtil 
package canal_client.util;

import java.io.IOException;
import java.util.Properties;

//静态 获取程序运行前的一些参数
//获取配置文件信息
public class ConfigUtil {
    private static Properties properties;
    //通过类加载器加载配置文件
    //静态块:启动了当前类,主动加载静态块的代码,并且只加载一次
    static {
        try {
            properties = new Properties();
            //类加载器以流的形式获取到配置文件
            properties.load(ConfigUtil.class.getClassLoader().getResourceAsStream("config.properties"));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    //一些配置文件的属性
    public static String canalServerIp() {
        return properties.getProperty("canal.server.ip");
    }
    public static int canalServerPort() {
        return Integer.parseInt(properties.getProperty("canal.server.port"));
    }

    public static String canalServerDestination() {
        return properties.getProperty("canal.server.destination");
    }

    public static String canalSubscribeFilter() {
        return properties.getProperty("canal.subscribe.filter");
    }

    public static String zookeeperServerIp() {
        return properties.getProperty("zookeeper.server.ip");
    }

    public static String kafkaBootstrap_servers_config() {
        return properties.getProperty("kafka.bootstrap_servers_config");
    }

    public static String kafkaBatch_size_config() {
        return properties.getProperty("kafka.batch_size_config");
    }

    public static String kafkaAcks() {
        return properties.getProperty("kafka.acks");
    }

    public static String kafkaRetries() {
        return properties.getProperty("kafka.retries");
    }

    public static String kafkaBatch() {
        return properties.getProperty("kafka.batch");
    }

    public static String kafkaClient_id_config() {
        return properties.getProperty("kafka.client_id_config");
    }

    public static String kafkaKey_serializer_class_config() {
        return properties.getProperty("kafka.key_serializer_class_config");
    }

    public static String kafkaValue_serializer_class_config() {
        return properties.getProperty("kafka.value_serializer_class_config");
    }

    public static String kafkaTopic() {
        return properties.getProperty("kafka.topic");
    }

    //测试
    public static void main(String[] args) {
        System.out.println(canalServerIp());
        System.out.println(canalServerPort());
        System.out.println(canalServerDestination());

    }
}
java.canal_client.CanalClient 
package canal_client;

import canal.bean.RowData;
import canal_client.kafka.KafkaSender;
import canal_client.util.ConfigUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/*
*   定义连接Canal的客户端类。
*
* */

public class CanalClient {

    // 一次性读取BINLOG数据条数
    private static final int BATCH_SIZE = 50;
    // Canal客户端连接器
    private CanalConnector canalConnector;
    // Canal配置项
    private KafkaSender kafkaSender;

    public CanalClient() {
        // 初始化连接 (单机) TCP(IP + PORT+EXAMPLE) ===连接上Canal的服务
        canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(ConfigUtil.canalServerIp(),ConfigUtil.canalServerPort()),
                ConfigUtil.canalServerDestination(),"","");
        kafkaSender = new KafkaSender();
    }

    // 开始监听
    public void start() {
        try {
            while(true) {
                //建立连接
                canalConnector.connect();
                //回滚上次的get请求,重新获取数据
                canalConnector.rollback();
                //订阅匹配日志
                canalConnector.subscribe(ConfigUtil.canalSubscribeFilter());
                while(true) {
                    // 批量拉取binlog日志,一次性获取多条数据
                    Message message = canalConnector.getWithoutAck(BATCH_SIZE);
                    // 获取batchId
                    long batchId = message.getId();
                    // 获取binlog数据的条数
                    int size = message.getEntries().size();
                    if(batchId == -1 || size == 0) {
                    }
                    else {
                        //将binlog日志解析成map
                        Map binlogMsgMap = binlogMessageToMap(message);
                        RowData rowData = new RowData(binlogMsgMap);
                        System.out.println("RowData测试===>"+binlogMsgMap);
                        if (binlogMsgMap.size() > 0) {
                            kafkaSender.send(rowData);
                        }
                    }
                    // 确认指定的batchId已经消费成功
                    canalConnector.ack(batchId);
                }
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            canalConnector.disconnect();
        }
    }

    /**
     * 将binlog日志转换为Map结构
     * @param message
     * @return
     */
    private Map binlogMessageToMap(Message message) throws InvalidProtocolBufferException {

        Map rowDataMap = new HashMap();
        // 1. 遍历message中的所有binlog实体
        for (CanalEntry.Entry entry : message.getEntries()) {
            // 只处理事务型binlog
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // 获取binlog文件名
            String logfileName = entry.getHeader().getLogfileName();
            // 获取logfile的偏移量
            long logfileOffset = entry.getHeader().getLogfileOffset();
            // 获取sql语句执行时间戳
            long executeTime = entry.getHeader().getExecuteTime();
            // 获取数据库名
            String schemaName = entry.getHeader().getSchemaName();
            // 获取表名
            String tableName = entry.getHeader().getTableName();
            // 获取事件类型 insert/update/delete
            String eventType = entry.getHeader().getEventType().toString().toLowerCase();

            rowDataMap.put("logfileName", logfileName);
            rowDataMap.put("logfileOffset", logfileOffset);
            rowDataMap.put("executeTime", executeTime);
            rowDataMap.put("schemaName", schemaName);
            rowDataMap.put("tableName", tableName);
            rowDataMap.put("eventType", eventType);

            // 获取所有行上的变更
            Map<String, String> columnDataMap = new HashMap<String, String>();
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> columnDataList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : columnDataList) {
                if(eventType.equals("insert") || eventType.equals("update")) {
                    for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                        columnDataMap.put(column.getName(), column.getValue().toString());
                    }
                }
                else if(eventType.equals("delete")) {
                    for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                        columnDataMap.put(column.getName(), column.getValue().toString());
                    }
                }
            }

            rowDataMap.put("columns", columnDataMap);
        }

        return rowDataMap;
    }
}
java.canal_client.Entrance 
package canal_client;
public class Entrance {
    public static void main(String[] args) {
        CanalClient canalClient=new CanalClient();
        canalClient.start();
    }
}

pom

<dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.19.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
</dependencies>

<build>
        <plugins>

          <!-- Protobuf插件 -->
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.5.0</version>
                    <configuration>
              <protoSourceRoot>
                ${project.basedir}/src/main/proto               </protoSourceRoot> <protocArtifact>                 com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier} </protocArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
target\generated-sources\protobuf\java\proto\CanalModel.java
// Generated by the protocol buffer compiler.  DO NOT EDIT!
// source: CanalModel.proto

package proto;

public final class CanalModel {
  private CanalModel() {}
  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistryLite registry) {
  }

  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistry registry) {
    registerAllExtensions(
        (com.google.protobuf.ExtensionRegistryLite) registry);
  }
  public interface RowDataOrBuilder extends
      // @@protoc_insertion_point(interface_extends:RowData)
      com.google.protobuf.MessageOrBuilder {

    /**
     * <code>optional string logfileName = 15;</code>
     */
    java.lang.String getLogfileName();
    /**
     * <code>optional string logfileName = 15;</code>
     */
    com.google.protobuf.ByteString
        getLogfileNameBytes();

    /**
     * <code>optional uint64 logfileOffset = 14;</code>
     */
    long getLogfileOffset();

    /**
     * <code>optional uint64 executeTime = 1;</code>
     */
    long getExecuteTime();

    /**
     * <code>optional string schemaName = 2;</code>
     */
    java.lang.String getSchemaName();
    /**
     * <code>optional string schemaName = 2;</code>
     */
    com.google.protobuf.ByteString
        getSchemaNameBytes();

    /**
     * <code>optional string tableName = 3;</code>
     */
    java.lang.String getTableName();
    /**
     * <code>optional string tableName = 3;</code>
     */
    com.google.protobuf.ByteString
        getTableNameBytes();

    /**
     * <code>optional string eventType = 4;</code>
     */
    java.lang.String getEventType();
    /**
     * <code>optional string eventType = 4;</code>
     */
    com.google.protobuf.ByteString
        getEventTypeBytes();

    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */
    int getColumnsCount();
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */
    boolean containsColumns(
        java.lang.String key);
    /**
     * Use {@link #getColumnsMap()} instead.
     */
    @java.lang.Deprecated
    java.util.Map<java.lang.String, java.lang.String>
    getColumns();
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */
    java.util.Map<java.lang.String, java.lang.String>
    getColumnsMap();
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */

    java.lang.String getColumnsOrDefault(
        java.lang.String key,
        java.lang.String defaultValue);
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */

    java.lang.String getColumnsOrThrow(
        java.lang.String key);
  }
  /**
   * <pre>
   * 行数据 
   * </pre>
   *
   * Protobuf type {@code RowData}
   */
  public  static final class RowData extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:RowData)
      RowDataOrBuilder {
    // Use RowData.newBuilder() to construct.
    private RowData(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
      super(builder);
    }
    private RowData() {
      logfileName_ = "";
      logfileOffset_ = 0L;
      executeTime_ = 0L;
      schemaName_ = "";
      tableName_ = "";
      eventType_ = "";
    }

    @java.lang.Override
    public final com.google.protobuf.UnknownFieldSet
    getUnknownFields() {
      return com.google.protobuf.UnknownFieldSet.getDefaultInstance();
    }
    private RowData(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      this();
      int mutable_bitField0_ = 0;
      try {
        boolean done = false;
        while (!done) {
          int tag = input.readTag();
          switch (tag) {
            case 0:
              done = true;
              break;
            default: {
              if (!input.skipField(tag)) {
                done = true;
              }
              break;
            }
            case 8: {

              executeTime_ = input.readUInt64();
              break;
            }
            case 18: {
              java.lang.String s = input.readStringRequireUtf8();

              schemaName_ = s;
              break;
            }
            case 26: {
              java.lang.String s = input.readStringRequireUtf8();

              tableName_ = s;
              break;
            }
            case 34: {
              java.lang.String s = input.readStringRequireUtf8();

              eventType_ = s;
              break;
            }
            case 42: {
              if (!((mutable_bitField0_ & 0x00000040) == 0x00000040)) {
                columns_ = com.google.protobuf.MapField.newMapField(
                    ColumnsDefaultEntryHolder.defaultEntry);
                mutable_bitField0_ |= 0x00000040;
              }
              com.google.protobuf.MapEntry<java.lang.String, java.lang.String>
              columns__ = input.readMessage(
                  ColumnsDefaultEntryHolder.defaultEntry.getParserForType(), extensionRegistry);
              columns_.getMutableMap().put(
                  columns__.getKey(), columns__.getValue());
              break;
            }
            case 112: {

              logfileOffset_ = input.readUInt64();
              break;
            }
            case 122: {
              java.lang.String s = input.readStringRequireUtf8();

              logfileName_ = s;
              break;
            }
          }
        }
      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
        throw e.setUnfinishedMessage(this);
      } catch (java.io.IOException e) {
        throw new com.google.protobuf.InvalidProtocolBufferException(
            e).setUnfinishedMessage(this);
      } finally {
        makeExtensionsImmutable();
      }
    }
    public static final com.google.protobuf.Descriptors.Descriptor
        getDescriptor() {
      return proto.CanalModel.internal_static_RowData_descriptor;
    }

    @SuppressWarnings({"rawtypes"})
    protected com.google.protobuf.MapField internalGetMapField(
        int number) {
      switch (number) {
        case 5:
          return internalGetColumns();
        default:
          throw new RuntimeException(
              "Invalid map field number: " + number);
      }
    }
    protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
        internalGetFieldAccessorTable() {
      return proto.CanalModel.internal_static_RowData_fieldAccessorTable
          .ensureFieldAccessorsInitialized(
              proto.CanalModel.RowData.class, proto.CanalModel.RowData.Builder.class);
    }

    private int bitField0_;
    public static final int LOGFILENAME_FIELD_NUMBER = 15;
    private volatile java.lang.Object logfileName_;
    /**
     * <code>optional string logfileName = 15;</code>
     */
    public java.lang.String getLogfileName() {
      java.lang.Object ref = logfileName_;
      if (ref instanceof java.lang.String) {
        return (java.lang.String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        java.lang.String s = bs.toStringUtf8();
        logfileName_ = s;
        return s;
      }
    }
    /**
     * <code>optional string logfileName = 15;</code>
     */
    public com.google.protobuf.ByteString
        getLogfileNameBytes() {
      java.lang.Object ref = logfileName_;
      if (ref instanceof java.lang.String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (java.lang.String) ref);
        logfileName_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    public static final int LOGFILEOFFSET_FIELD_NUMBER = 14;
    private long logfileOffset_;
    /**
     * <code>optional uint64 logfileOffset = 14;</code>
     */
    public long getLogfileOffset() {
      return logfileOffset_;
    }

    public static final int EXECUTETIME_FIELD_NUMBER = 1;
    private long executeTime_;
    /**
     * <code>optional uint64 executeTime = 1;</code>
     */
    public long getExecuteTime() {
      return executeTime_;
    }

    public static final int SCHEMANAME_FIELD_NUMBER = 2;
    private volatile java.lang.Object schemaName_;
    /**
     * <code>optional string schemaName = 2;</code>
     */
    public java.lang.String getSchemaName() {
      java.lang.Object ref = schemaName_;
      if (ref instanceof java.lang.String) {
        return (java.lang.String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        java.lang.String s = bs.toStringUtf8();
        schemaName_ = s;
        return s;
      }
    }
    /**
     * <code>optional string schemaName = 2;</code>
     */
    public com.google.protobuf.ByteString
        getSchemaNameBytes() {
      java.lang.Object ref = schemaName_;
      if (ref instanceof java.lang.String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (java.lang.String) ref);
        schemaName_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    public static final int TABLENAME_FIELD_NUMBER = 3;
    private volatile java.lang.Object tableName_;
    /**
     * <code>optional string tableName = 3;</code>
     */
    public java.lang.String getTableName() {
      java.lang.Object ref = tableName_;
      if (ref instanceof java.lang.String) {
        return (java.lang.String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        java.lang.String s = bs.toStringUtf8();
        tableName_ = s;
        return s;
      }
    }
    /**
     * <code>optional string tableName = 3;</code>
     */
    public com.google.protobuf.ByteString
        getTableNameBytes() {
      java.lang.Object ref = tableName_;
      if (ref instanceof java.lang.String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (java.lang.String) ref);
        tableName_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    public static final int EVENTTYPE_FIELD_NUMBER = 4;
    private volatile java.lang.Object eventType_;
    /**
     * <code>optional string eventType = 4;</code>
     */
    public java.lang.String getEventType() {
      java.lang.Object ref = eventType_;
      if (ref instanceof java.lang.String) {
        return (java.lang.String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        java.lang.String s = bs.toStringUtf8();
        eventType_ = s;
        return s;
      }
    }
    /**
     * <code>optional string eventType = 4;</code>
     */
    public com.google.protobuf.ByteString
        getEventTypeBytes() {
      java.lang.Object ref = eventType_;
      if (ref instanceof java.lang.String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (java.lang.String) ref);
        eventType_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    public static final int COLUMNS_FIELD_NUMBER = 5;
    private static final class ColumnsDefaultEntryHolder {
      static final com.google.protobuf.MapEntry<
          java.lang.String, java.lang.String> defaultEntry =
              com.google.protobuf.MapEntry
              .<java.lang.String, java.lang.String>newDefaultInstance(
                  proto.CanalModel.internal_static_RowData_ColumnsEntry_descriptor, 
                  com.google.protobuf.WireFormat.FieldType.STRING,
                  "",
                  com.google.protobuf.WireFormat.FieldType.STRING,
                  "");
    }
    private com.google.protobuf.MapField<
        java.lang.String, java.lang.String> columns_;
    private com.google.protobuf.MapField<java.lang.String, java.lang.String>
    internalGetColumns() {
      if (columns_ == null) {
        return com.google.protobuf.MapField.emptyMapField(
            ColumnsDefaultEntryHolder.defaultEntry);
      }
      return columns_;
    }

    public int getColumnsCount() {
      return internalGetColumns().getMap().size();
    }
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */

    public boolean containsColumns(
        java.lang.String key) {
      if (key == null) { throw new java.lang.NullPointerException(); }
      return internalGetColumns().getMap().containsKey(key);
    }
    /**
     * Use {@link #getColumnsMap()} instead.
     */
    @java.lang.Deprecated
    public java.util.Map<java.lang.String, java.lang.String> getColumns() {
      return getColumnsMap();
    }
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */

    public java.util.Map<java.lang.String, java.lang.String> getColumnsMap() {
      return internalGetColumns().getMap();
    }
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */

    public java.lang.String getColumnsOrDefault(
        java.lang.String key,
        java.lang.String defaultValue) {
      if (key == null) { throw new java.lang.NullPointerException(); }
      java.util.Map<java.lang.String, java.lang.String> map =
          internalGetColumns().getMap();
      return map.containsKey(key) ? map.get(key) : defaultValue;
    }
    /**
     * <pre>
     * 列数据 
     * </pre>
     *
     * <code>map&lt;string, string&gt; columns = 5;</code>
     */

    public java.lang.String getColumnsOrThrow(
        java.lang.String key) {
      if (key == null) { throw new java.lang.NullPointerException(); }
      java.util.Map<java.lang.String, java.lang.String> map =
          internalGetColumns().getMap();
      if (!map.containsKey(key)) {
        throw new java.lang.IllegalArgumentException();
      }
      return map.get(key);
    }

    private byte memoizedIsInitialized = -1;
    public final boolean isInitialized() {
      byte isInitialized = memoizedIsInitialized;
      if (isInitialized == 1) return true;
      if (isInitialized == 0) return false;

      memoizedIsInitialized = 1;
      return true;
    }

    public void writeTo(com.google.protobuf.CodedOutputStream output)
                        throws java.io.IOException {
      if (executeTime_ != 0L) {
        output.writeUInt64(1, executeTime_);
      }
      if (!getSchemaNameBytes().isEmpty()) {
        com.google.protobuf.GeneratedMessageV3.writeString(output, 2, schemaName_);
      }
      if (!getTableNameBytes().isEmpty()) {
        com.google.protobuf.GeneratedMessageV3.writeString(output, 3, tableName_);
      }
      if (!getEventTypeBytes().isEmpty()) {
        com.google.protobuf.GeneratedMessageV3.writeString(output, 4, eventType_);
      }
      com.google.protobuf.GeneratedMessageV3
        .serializeStringMapTo(
          output,
          internalGetColumns(),
          ColumnsDefaultEntryHolder.defaultEntry,
          5);
      if (logfileOffset_ != 0L) {
        output.writeUInt64(14, logfileOffset_);
      }
      if (!getLogfileNameBytes().isEmpty()) {
        com.google.protobuf.GeneratedMessageV3.writeString(output, 15, logfileName_);
      }
    }

    public int getSerializedSize() {
      int size = memoizedSize;
      if (size != -1) return size;

      size = 0;
      if (executeTime_ != 0L) {
        size += com.google.protobuf.CodedOutputStream
          .computeUInt64Size(1, executeTime_);
      }
      if (!getSchemaNameBytes().isEmpty()) {
        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, schemaName_);
      }
      if (!getTableNameBytes().isEmpty()) {
        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, tableName_);
      }
      if (!getEventTypeBytes().isEmpty()) {
        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(4, eventType_);
      }
      for (java.util.Map.Entry<java.lang.String, java.lang.String> entry
           : internalGetColumns().getMap().entrySet()) {
        com.google.protobuf.MapEntry<java.lang.String, java.lang.String>
        columns__ = ColumnsDefaultEntryHolder.defaultEntry.newBuilderForType()
            .setKey(entry.getKey())
            .setValue(entry.getValue())
            .build();
        size += com.google.protobuf.CodedOutputStream
            .computeMessageSize(5, columns__);
      }
      if (logfileOffset_ != 0L) {
        size += com.google.protobuf.CodedOutputStream
          .computeUInt64Size(14, logfileOffset_);
      }
      if (!getLogfileNameBytes().isEmpty()) {
        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(15, logfileName_);
      }
      memoizedSize = size;
      return size;
    }

    private static final long serialVersionUID = 0L;
    @java.lang.Override
    public boolean equals(final java.lang.Object obj) {
      if (obj == this) {
       return true;
      }
      if (!(obj instanceof proto.CanalModel.RowData)) {
        return super.equals(obj);
      }
      proto.CanalModel.RowData other = (proto.CanalModel.RowData) obj;

      boolean result = true;
      result = result && getLogfileName()
          .equals(other.getLogfileName());
      result = result && (getLogfileOffset()
          == other.getLogfileOffset());
      result = result && (getExecuteTime()
          == other.getExecuteTime());
      result = result && getSchemaName()
          .equals(other.getSchemaName());
      result = result && getTableName()
          .equals(other.getTableName());
      result = result && getEventType()
          .equals(other.getEventType());
      result = result && internalGetColumns().equals(
          other.internalGetColumns());
      return result;
    }

    @java.lang.Override
    public int hashCode() {
      if (memoizedHashCode != 0) {
        return memoizedHashCode;
      }
      int hash = 41;
      hash = (19 * hash) + getDescriptorForType().hashCode();
      hash = (37 * hash) + LOGFILENAME_FIELD_NUMBER;
      hash = (53 * hash) + getLogfileName().hashCode();
      hash = (37 * hash) + LOGFILEOFFSET_FIELD_NUMBER;
      hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
          getLogfileOffset());
      hash = (37 * hash) + EXECUTETIME_FIELD_NUMBER;
      hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
          getExecuteTime());
      hash = (37 * hash) + SCHEMANAME_FIELD_NUMBER;
      hash = (53 * hash) + getSchemaName().hashCode();
      hash = (37 * hash) + TABLENAME_FIELD_NUMBER;
      hash = (53 * hash) + getTableName().hashCode();
      hash = (37 * hash) + EVENTTYPE_FIELD_NUMBER;
      hash = (53 * hash) + getEventType().hashCode();
      if (!internalGetColumns().getMap().isEmpty()) {
        hash = (37 * hash) + COLUMNS_FIELD_NUMBER;
        hash = (53 * hash) + internalGetColumns().hashCode();
      }
      hash = (29 * hash) + unknownFields.hashCode();
      memoizedHashCode = hash;
      return hash;
    }

    public static proto.CanalModel.RowData parseFrom(
        com.google.protobuf.ByteString data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static proto.CanalModel.RowData parseFrom(
        com.google.protobuf.ByteString data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static proto.CanalModel.RowData parseFrom(byte[] data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static proto.CanalModel.RowData parseFrom(
        byte[] data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static proto.CanalModel.RowData parseFrom(java.io.InputStream input)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input);
    }
    public static proto.CanalModel.RowData parseFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input, extensionRegistry);
    }
    public static proto.CanalModel.RowData parseDelimitedFrom(java.io.InputStream input)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseDelimitedWithIOException(PARSER, input);
    }
    public static proto.CanalModel.RowData parseDelimitedFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
    }
    public static proto.CanalModel.RowData parseFrom(
        com.google.protobuf.CodedInputStream input)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input);
    }
    public static proto.CanalModel.RowData parseFrom(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input, extensionRegistry);
    }

    public Builder newBuilderForType() { return newBuilder(); }
    public static Builder newBuilder() {
      return DEFAULT_INSTANCE.toBuilder();
    }
    public static Builder newBuilder(proto.CanalModel.RowData prototype) {
      return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
    }
    public Builder toBuilder() {
      return this == DEFAULT_INSTANCE
          ? new Builder() : new Builder().mergeFrom(this);
    }

    @java.lang.Override
    protected Builder newBuilderForType(
        com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
      Builder builder = new Builder(parent);
      return builder;
    }
    /**
     * <pre>
     * 行数据 
     * </pre>
     *
     * Protobuf type {@code RowData}
     */
    public static final class Builder extends
        com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
        // @@protoc_insertion_point(builder_implements:RowData)
        proto.CanalModel.RowDataOrBuilder {
      public static final com.google.protobuf.Descriptors.Descriptor
          getDescriptor() {
        return proto.CanalModel.internal_static_RowData_descriptor;
      }

      @SuppressWarnings({"rawtypes"})
      protected com.google.protobuf.MapField internalGetMapField(
          int number) {
        switch (number) {
          case 5:
            return internalGetColumns();
          default:
            throw new RuntimeException(
                "Invalid map field number: " + number);
        }
      }
      @SuppressWarnings({"rawtypes"})
      protected com.google.protobuf.MapField internalGetMutableMapField(
          int number) {
        switch (number) {
          case 5:
            return internalGetMutableColumns();
          default:
            throw new RuntimeException(
                "Invalid map field number: " + number);
        }
      }
      protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
          internalGetFieldAccessorTable() {
        return proto.CanalModel.internal_static_RowData_fieldAccessorTable
            .ensureFieldAccessorsInitialized(
                proto.CanalModel.RowData.class, proto.CanalModel.RowData.Builder.class);
      }

      // Construct using proto.CanalModel.RowData.newBuilder()
      private Builder() {
        maybeForceBuilderInitialization();
      }

      private Builder(
          com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
        super(parent);
        maybeForceBuilderInitialization();
      }
      private void maybeForceBuilderInitialization() {
        if (com.google.protobuf.GeneratedMessageV3
                .alwaysUseFieldBuilders) {
        }
      }
      public Builder clear() {
        super.clear();
        logfileName_ = "";

        logfileOffset_ = 0L;

        executeTime_ = 0L;

        schemaName_ = "";

        tableName_ = "";

        eventType_ = "";

        internalGetMutableColumns().clear();
        return this;
      }

      public com.google.protobuf.Descriptors.Descriptor
          getDescriptorForType() {
        return proto.CanalModel.internal_static_RowData_descriptor;
      }

      public proto.CanalModel.RowData getDefaultInstanceForType() {
        return proto.CanalModel.RowData.getDefaultInstance();
      }

      public proto.CanalModel.RowData build() {
        proto.CanalModel.RowData result = buildPartial();
        if (!result.isInitialized()) {
          throw newUninitializedMessageException(result);
        }
        return result;
      }

      public proto.CanalModel.RowData buildPartial() {
        proto.CanalModel.RowData result = new proto.CanalModel.RowData(this);
        int from_bitField0_ = bitField0_;
        int to_bitField0_ = 0;
        result.logfileName_ = logfileName_;
        result.logfileOffset_ = logfileOffset_;
        result.executeTime_ = executeTime_;
        result.schemaName_ = schemaName_;
        result.tableName_ = tableName_;
        result.eventType_ = eventType_;
        result.columns_ = internalGetColumns();
        result.columns_.makeImmutable();
        result.bitField0_ = to_bitField0_;
        onBuilt();
        return result;
      }

      public Builder clone() {
        return (Builder) super.clone();
      }
      public Builder setField(
          com.google.protobuf.Descriptors.FieldDescriptor field,
          Object value) {
        return (Builder) super.setField(field, value);
      }
      public Builder clearField(
          com.google.protobuf.Descriptors.FieldDescriptor field) {
        return (Builder) super.clearField(field);
      }
      public Builder clearOneof(
          com.google.protobuf.Descriptors.OneofDescriptor oneof) {
        return (Builder) super.clearOneof(oneof);
      }
      public Builder setRepeatedField(
          com.google.protobuf.Descriptors.FieldDescriptor field,
          int index, Object value) {
        return (Builder) super.setRepeatedField(field, index, value);
      }
      public Builder addRepeatedField(
          com.google.protobuf.Descriptors.FieldDescriptor field,
          Object value) {
        return (Builder) super.addRepeatedField(field, value);
      }
      public Builder mergeFrom(com.google.protobuf.Message other) {
        if (other instanceof proto.CanalModel.RowData) {
          return mergeFrom((proto.CanalModel.RowData)other);
        } else {
          super.mergeFrom(other);
          return this;
        }
      }

      public Builder mergeFrom(proto.CanalModel.RowData other) {
        if (other == proto.CanalModel.RowData.getDefaultInstance()) return this;
        if (!other.getLogfileName().isEmpty()) {
          logfileName_ = other.logfileName_;
          onChanged();
        }
        if (other.getLogfileOffset() != 0L) {
          setLogfileOffset(other.getLogfileOffset());
        }
        if (other.getExecuteTime() != 0L) {
          setExecuteTime(other.getExecuteTime());
        }
        if (!other.getSchemaName().isEmpty()) {
          schemaName_ = other.schemaName_;
          onChanged();
        }
        if (!other.getTableName().isEmpty()) {
          tableName_ = other.tableName_;
          onChanged();
        }
        if (!other.getEventType().isEmpty()) {
          eventType_ = other.eventType_;
          onChanged();
        }
        internalGetMutableColumns().mergeFrom(
            other.internalGetColumns());
        onChanged();
        return this;
      }

      public final boolean isInitialized() {
        return true;
      }

      public Builder mergeFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws java.io.IOException {
        proto.CanalModel.RowData parsedMessage = null;
        try {
          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
          parsedMessage = (proto.CanalModel.RowData) e.getUnfinishedMessage();
          throw e.unwrapIOException();
        } finally {
          if (parsedMessage != null) {
            mergeFrom(parsedMessage);
          }
        }
        return this;
      }
      private int bitField0_;

      private java.lang.Object logfileName_ = "";
      /**
       * <code>optional string logfileName = 15;</code>
       */
      public java.lang.String getLogfileName() {
        java.lang.Object ref = logfileName_;
        if (!(ref instanceof java.lang.String)) {
          com.google.protobuf.ByteString bs =
              (com.google.protobuf.ByteString) ref;
          java.lang.String s = bs.toStringUtf8();
          logfileName_ = s;
          return s;
        } else {
          return (java.lang.String) ref;
        }
      }
      /**
       * <code>optional string logfileName = 15;</code>
       */
      public com.google.protobuf.ByteString
          getLogfileNameBytes() {
        java.lang.Object ref = logfileName_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (java.lang.String) ref);
          logfileName_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>optional string logfileName = 15;</code>
       */
      public Builder setLogfileName(
          java.lang.String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  
        logfileName_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>optional string logfileName = 15;</code>
       */
      public Builder clearLogfileName() {
        
        logfileName_ = getDefaultInstance().getLogfileName();
        onChanged();
        return this;
      }
      /**
       * <code>optional string logfileName = 15;</code>
       */
      public Builder setLogfileNameBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  checkByteStringIsUtf8(value);
        
        logfileName_ = value;
        onChanged();
        return this;
      }

      private long logfileOffset_ ;
      /**
       * <code>optional uint64 logfileOffset = 14;</code>
       */
      public long getLogfileOffset() {
        return logfileOffset_;
      }
      /**
       * <code>optional uint64 logfileOffset = 14;</code>
       */
      public Builder setLogfileOffset(long value) {
        
        logfileOffset_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>optional uint64 logfileOffset = 14;</code>
       */
      public Builder clearLogfileOffset() {
        
        logfileOffset_ = 0L;
        onChanged();
        return this;
      }

      private long executeTime_ ;
      /**
       * <code>optional uint64 executeTime = 1;</code>
       */
      public long getExecuteTime() {
        return executeTime_;
      }
      /**
       * <code>optional uint64 executeTime = 1;</code>
       */
      public Builder setExecuteTime(long value) {
        
        executeTime_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>optional uint64 executeTime = 1;</code>
       */
      public Builder clearExecuteTime() {
        
        executeTime_ = 0L;
        onChanged();
        return this;
      }

      private java.lang.Object schemaName_ = "";
      /**
       * <code>optional string schemaName = 2;</code>
       */
      public java.lang.String getSchemaName() {
        java.lang.Object ref = schemaName_;
        if (!(ref instanceof java.lang.String)) {
          com.google.protobuf.ByteString bs =
              (com.google.protobuf.ByteString) ref;
          java.lang.String s = bs.toStringUtf8();
          schemaName_ = s;
          return s;
        } else {
          return (java.lang.String) ref;
        }
      }
      /**
       * <code>optional string schemaName = 2;</code>
       */
      public com.google.protobuf.ByteString
          getSchemaNameBytes() {
        java.lang.Object ref = schemaName_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (java.lang.String) ref);
          schemaName_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>optional string schemaName = 2;</code>
       */
      public Builder setSchemaName(
          java.lang.String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  
        schemaName_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>optional string schemaName = 2;</code>
       */
      public Builder clearSchemaName() {
        
        schemaName_ = getDefaultInstance().getSchemaName();
        onChanged();
        return this;
      }
      /**
       * <code>optional string schemaName = 2;</code>
       */
      public Builder setSchemaNameBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  checkByteStringIsUtf8(value);
        
        schemaName_ = value;
        onChanged();
        return this;
      }

      private java.lang.Object tableName_ = "";
      /**
       * <code>optional string tableName = 3;</code>
       */
      public java.lang.String getTableName() {
        java.lang.Object ref = tableName_;
        if (!(ref instanceof java.lang.String)) {
          com.google.protobuf.ByteString bs =
              (com.google.protobuf.ByteString) ref;
          java.lang.String s = bs.toStringUtf8();
          tableName_ = s;
          return s;
        } else {
          return (java.lang.String) ref;
        }
      }
      /**
       * <code>optional string tableName = 3;</code>
       */
      public com.google.protobuf.ByteString
          getTableNameBytes() {
        java.lang.Object ref = tableName_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (java.lang.String) ref);
          tableName_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>optional string tableName = 3;</code>
       */
      public Builder setTableName(
          java.lang.String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  
        tableName_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>optional string tableName = 3;</code>
       */
      public Builder clearTableName() {
        
        tableName_ = getDefaultInstance().getTableName();
        onChanged();
        return this;
      }
      /**
       * <code>optional string tableName = 3;</code>
       */
      public Builder setTableNameBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  checkByteStringIsUtf8(value);
        
        tableName_ = value;
        onChanged();
        return this;
      }

      private java.lang.Object eventType_ = "";
      /**
       * <code>optional string eventType = 4;</code>
       */
      public java.lang.String getEventType() {
        java.lang.Object ref = eventType_;
        if (!(ref instanceof java.lang.String)) {
          com.google.protobuf.ByteString bs =
              (com.google.protobuf.ByteString) ref;
          java.lang.String s = bs.toStringUtf8();
          eventType_ = s;
          return s;
        } else {
          return (java.lang.String) ref;
        }
      }
      /**
       * <code>optional string eventType = 4;</code>
       */
      public com.google.protobuf.ByteString
          getEventTypeBytes() {
        java.lang.Object ref = eventType_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (java.lang.String) ref);
          eventType_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>optional string eventType = 4;</code>
       */
      public Builder setEventType(
          java.lang.String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  
        eventType_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>optional string eventType = 4;</code>
       */
      public Builder clearEventType() {
        
        eventType_ = getDefaultInstance().getEventType();
        onChanged();
        return this;
      }
      /**
       * <code>optional string eventType = 4;</code>
       */
      public Builder setEventTypeBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  checkByteStringIsUtf8(value);
        
        eventType_ = value;
        onChanged();
        return this;
      }

      private com.google.protobuf.MapField<
          java.lang.String, java.lang.String> columns_;
      private com.google.protobuf.MapField<java.lang.String, java.lang.String>
      internalGetColumns() {
        if (columns_ == null) {
          return com.google.protobuf.MapField.emptyMapField(
              ColumnsDefaultEntryHolder.defaultEntry);
        }
        return columns_;
      }
      private com.google.protobuf.MapField<java.lang.String, java.lang.String>
      internalGetMutableColumns() {
        onChanged();;
        if (columns_ == null) {
          columns_ = com.google.protobuf.MapField.newMapField(
              ColumnsDefaultEntryHolder.defaultEntry);
        }
        if (!columns_.isMutable()) {
          columns_ = columns_.copy();
        }
        return columns_;
      }

      public int getColumnsCount() {
        return internalGetColumns().getMap().size();
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */

      public boolean containsColumns(
          java.lang.String key) {
        if (key == null) { throw new java.lang.NullPointerException(); }
        return internalGetColumns().getMap().containsKey(key);
      }
      /**
       * Use {@link #getColumnsMap()} instead.
       */
      @java.lang.Deprecated
      public java.util.Map<java.lang.String, java.lang.String> getColumns() {
        return getColumnsMap();
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */

      public java.util.Map<java.lang.String, java.lang.String> getColumnsMap() {
        return internalGetColumns().getMap();
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */

      public java.lang.String getColumnsOrDefault(
          java.lang.String key,
          java.lang.String defaultValue) {
        if (key == null) { throw new java.lang.NullPointerException(); }
        java.util.Map<java.lang.String, java.lang.String> map =
            internalGetColumns().getMap();
        return map.containsKey(key) ? map.get(key) : defaultValue;
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */

      public java.lang.String getColumnsOrThrow(
          java.lang.String key) {
        if (key == null) { throw new java.lang.NullPointerException(); }
        java.util.Map<java.lang.String, java.lang.String> map =
            internalGetColumns().getMap();
        if (!map.containsKey(key)) {
          throw new java.lang.IllegalArgumentException();
        }
        return map.get(key);
      }

      public Builder clearColumns() {
        getMutableColumns().clear();
        return this;
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */

      public Builder removeColumns(
          java.lang.String key) {
        if (key == null) { throw new java.lang.NullPointerException(); }
        getMutableColumns().remove(key);
        return this;
      }
      /**
       * Use alternate mutation accessors instead.
       */
      @java.lang.Deprecated
      public java.util.Map<java.lang.String, java.lang.String>
      getMutableColumns() {
        return internalGetMutableColumns().getMutableMap();
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */
      public Builder putColumns(
          java.lang.String key,
          java.lang.String value) {
        if (key == null) { throw new java.lang.NullPointerException(); }
        if (value == null) { throw new java.lang.NullPointerException(); }
        getMutableColumns().put(key, value);
        return this;
      }
      /**
       * <pre>
       * 列数据 
       * </pre>
       *
       * <code>map&lt;string, string&gt; columns = 5;</code>
       */

      public Builder putAllColumns(
          java.util.Map<java.lang.String, java.lang.String> values) {
        getMutableColumns().putAll(values);
        return this;
      }
      public final Builder setUnknownFields(
          final com.google.protobuf.UnknownFieldSet unknownFields) {
        return this;
      }

      public final Builder mergeUnknownFields(
          final com.google.protobuf.UnknownFieldSet unknownFields) {
        return this;
      }


      // @@protoc_insertion_point(builder_scope:RowData)
    }

    // @@protoc_insertion_point(class_scope:RowData)
    private static final proto.CanalModel.RowData DEFAULT_INSTANCE;
    static {
      DEFAULT_INSTANCE = new proto.CanalModel.RowData();
    }

    public static proto.CanalModel.RowData getDefaultInstance() {
      return DEFAULT_INSTANCE;
    }

    private static final com.google.protobuf.Parser<RowData>
        PARSER = new com.google.protobuf.AbstractParser<RowData>() {
      public RowData parsePartialFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws com.google.protobuf.InvalidProtocolBufferException {
          return new RowData(input, extensionRegistry);
      }
    };

    public static com.google.protobuf.Parser<RowData> parser() {
      return PARSER;
    }

    @java.lang.Override
    public com.google.protobuf.Parser<RowData> getParserForType() {
      return PARSER;
    }

    public proto.CanalModel.RowData getDefaultInstanceForType() {
      return DEFAULT_INSTANCE;
    }

  }

  private static final com.google.protobuf.Descriptors.Descriptor
    internal_static_RowData_descriptor;
  private static final 
    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
      internal_static_RowData_fieldAccessorTable;
  private static final com.google.protobuf.Descriptors.Descriptor
    internal_static_RowData_ColumnsEntry_descriptor;
  private static final 
    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
      internal_static_RowData_ColumnsEntry_fieldAccessorTable;

  public static com.google.protobuf.Descriptors.FileDescriptor
      getDescriptor() {
    return descriptor;
  }
  private static  com.google.protobuf.Descriptors.FileDescriptor
      descriptor;
  static {
    java.lang.String[] descriptorData = {
      "\n\020CanalModel.proto\"\334\001\n\007RowData\022\023\n\013logfil" +
      "eName\030\017 \001(\t\022\025\n\rlogfileOffset\030\016 \001(\004\022\023\n\013ex" +
      "ecuteTime\030\001 \001(\004\022\022\n\nschemaName\030\002 \001(\t\022\021\n\tt" +
      "ableName\030\003 \001(\t\022\021\n\teventType\030\004 \001(\t\022&\n\007col" +
      "umns\030\005 \003(\0132\025.RowData.ColumnsEntry\032.\n\014Col" +
      "umnsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028" +
      "\001B\023\n\005protoB\nCanalModelb\006proto3"
    };
    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
        new com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
          public com.google.protobuf.ExtensionRegistry assignDescriptors(
              com.google.protobuf.Descriptors.FileDescriptor root) {
            descriptor = root;
            return null;
          }
        };
    com.google.protobuf.Descriptors.FileDescriptor
      .internalBuildGeneratedFileFrom(descriptorData,
        new com.google.protobuf.Descriptors.FileDescriptor[] {
        }, assigner);
    internal_static_RowData_descriptor =
      getDescriptor().getMessageTypes().get(0);
    internal_static_RowData_fieldAccessorTable = new
      com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
        internal_static_RowData_descriptor,
        new java.lang.String[] { "LogfileName", "LogfileOffset", "ExecuteTime", "SchemaName", "TableName", "EventType", "Columns", });
    internal_static_RowData_ColumnsEntry_descriptor =
      internal_static_RowData_descriptor.getNestedTypes().get(0);
    internal_static_RowData_ColumnsEntry_fieldAccessorTable = new
      com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
        internal_static_RowData_ColumnsEntry_descriptor,
        new java.lang.String[] { "Key", "Value", });
  }

  // @@protoc_insertion_point(outer_class_scope)
}

 

标签:Canal,canal,同步,java,protobuf,Kafka,google,return,com
From: https://www.cnblogs.com/chang09/p/16596516.html

相关文章

  • infowear下拉同步
    importosimportsubprocessimportsysimportuiautomator2asu2importtimeclassLogger(object):def__init__(self,fileN="Default.log"):self.terminal......
  • Linux同步网络时间
    CentOS服务器1、获取当前系统时间#date2、安装ntp#yum-yinstallntp3、修改时区#ln-sf/usr/share/zoneinfo/Asia/Shanghai/etc/localtime4、同步网络时间#ntpd......
  • 使用 Canal 和 Kafka 与 RDS MySQL 进行 Redis 数据同步
    教程:https://www.alibabacloud.com/blog/redis-data-synchronization-with-rds-mysql-using-canal-%26-kafka_598072源码:https://github.com/alibabacloud-howto/soluti......
  • kafka 如何保证消息不丢失
    今天我们来分析一下这个问题。先来回忆一下kafka 中消息传输的整个过程 1、kafka在producer端产生消息,调用kafkaproducerclientsend方法发送消息2、kafkaprod......
  • mongo数据同步的三种方案
    (一)直接复制data目录(需要停止源和目标的mongo服务)1.针对目标mongo服务已经存在,并正在运行的(mongo2-->mongo)。执行步骤:(1).停止源/目标服务器的mongo服务。mongod--db......
  • mysql容器数据同步的一种方式
    同步操作流程1.找到MySQL的容器sudodockerps|grepmysql2.进入docker容器sudodockerexec-it容器idbash3.1.dump源端数据库(单个)mysqldump-hhost-Pport-utes......
  • C# 线程同步方法
    1、lock锁定的是一个引用类型,值类型不能被锁定,但应该避免锁定一个string,因为string的存储方式是不样的。主要注意的就是被锁的这个对象。classProgram{p......
  • /opt/module/canal/conf/canal.properties
    ##########################################################commonargument###############################################################tcpbindipcanal.id......
  • /opt/module/canal/conf/example/instance.properties
    ###################################################mysqlserverId,v1.0.26+willautoGencanal.instance.mysql.slaveId=1234canal.instance.gtidon=false#positi......
  • 删除 canal 实例
    如果canal启动时候从日志看到报这个错误:can'tfindstartpositionforexample。有如下解决方法:●单机删除meta.dat文件,重启canal,问题解决。●集群进入canal对应的zookeep......