首页 > 数据库 >canal+ftp实现mysql数据跨网同步

canal+ftp实现mysql数据跨网同步

时间:2024-09-18 17:52:41浏览次数:3  
标签:canal ftp entry 跨网 sql new import size

canal服务端

编辑my.ini文件,保存后重启mysql,执行show variables like 'log_bin';  显示on代表开启

# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
#binlog文件最大值
max_binlog_size = 1000M

配置一个canal使用的账户,不配置也行,可以使用现有mysql账户。

CREATE USER canal@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

下载 deployer-1.1.7,编辑deployer-1.1.7\conf\example\instance.properties文件

#刷新日志文件
flush logs;
#查看现在使用的binlog名称及偏移量,记录下来配置到canal中
show master status

# position info
canal.instance.master.address=127.0.0.1:3306  #监听的mysql地址

# username/password                           #mysql账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

#canal.instance.filter.regex=.*\\..*          #默认是所有库所有表
canal.instance.filter.regex=mydata\\..*       #监听mydata库下所有表

# 指定binlog文件名和位置
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=1234

deployer-1.1.7\bin\目录下有启动文件,windows双击bat文件即可

不是首次启动的话,如果想修改目前读取的binlog位置,需要将deployer-1.1.7\conf\目录下meta.dat,h2.mv.db文件删掉,里面记录了目前binlog的读取位置。

canal客户端

将canal读取到的数据转成sql写入文件,并上传ftp

package com.canal.conf;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.canal.constants.Constants;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.net.ftp.FTPClient;

import java.io.*;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

@Log4j2
public class CanalClient {
    //每次执行sql队列
    private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    //sql队列最大值
    private static int BACH_SIZE = 1000;

    public static void exportFtp() {
        try {
            int count;
            do{
                //读取数据
                startCanal();
                count = SQL_QUEUE.size();
                //有数据,创建sql文件并上传ftp
                if (count > 0) {
                    buildQueueSqlFile();
                    uploadFileToFtp();
                }
            }while (count == BACH_SIZE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void startCanal() {
        //连接canal
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1"
                , 11111), "example", "", "");
        try {
            //订阅Desctinstion
            connector.connect();
            connector.subscribe();
            connector.rollback();
            try {
                //取数据
                Message message = connector.getWithoutAck(BACH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId != -1 && size != 0) {
                    dataHandle(message.getEntries());
                }
                connector.ack(batchId);
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 队列里面的sql语句
     */
    public static void buildQueueSqlFile() {
        try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
                new FileOutputStream(Constants.FILE_PATH, true),
                StandardCharsets.UTF_8))) {
            //获取队列
            int size = SQL_QUEUE.size();
            for (int i = 0; i < size; i++) {
                String sql = SQL_QUEUE.poll();
                if (sql != null) { 
                    writer.write(sql + ";");
                    writer.newLine();
                }
            }
        } catch (FileNotFoundException e) {
            log.error("File not found: ", e);
        } catch (IOException e) {
            log.error("IO error: ", e);
        }
    }


    //上传文件到ftp
    private static void uploadFileToFtp() throws IOException {

        File file = new File(Constants.PATH);
        if (file.length() == 0) {
            return;
        }
        //上传并删除原文件
        FTPClient ftpClient = new FTPClient();
        try {
            String fileName = System.currentTimeMillis() + Constants.FILE_SUFFIX;
            ftpClient.connect(Constants.HOST, Constants.PORT);
            ftpClient.enterLocalPassiveMode();
            ftpClient.login(Constants.USERNAME, Constants.PASSWORD);

            FileInputStream fileInputStream = new FileInputStream(Constants.PATH);
            ftpClient.changeWorkingDirectory(Constants.FOLDER_PATH);
            ftpClient.storeFile(fileName, fileInputStream);

            fileInputStream.close();
            ftpClient.logout();
            ftpClient.disconnect();
            file.delete();
        } finally {
            if (ftpClient.isConnected()) {
                ftpClient.disconnect();
            }
        }
    }

    /**
     * 处理读取到的数据
     * @param entrys
     */
    private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 更新语句
     * @param entry
     */
    private static void saveUpdateSql(CanalEntry.Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = " + formatValue(newColumnList.get(i)));
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        sql.append(column.getName() + " = " + formatValue(column));
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除语句
     * @param entry
     */
    private static void saveDeleteSql(CanalEntry.Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        sql.append(column.getName() + " = " + formatValue(column));
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 插入语句
     * @param entry
     */
    private static void saveInsertSql(CanalEntry.Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(formatValue(columnList.get(i)));
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 格式化参数值
     */
    private static String formatValue(Column column) {
        //空直接返回
        if(column.getIsNull()){
            return "NULL";
        }
        //字符串处理
        return sqlParam(column.getValue().replace("'", "‘").replace("\"", "“"));
    }

    private static String sqlParam(String value) {
        return "'" + value + "'";
    }
}

另一个网络读取sql文件执行就可以

package com.canal.conf;

import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
import java.util.Arrays;
import java.util.Comparator;

@Component
@Log4j2
public class Import {

    @Value("${filePath}")
    private String filePath;
    @Value("${fileSuffix}")
    private String fileSuffix;
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional(rollbackFor = Exception.class)
    public Boolean ftpDataImport() throws Exception {
        Boolean result = false;
        File directory = new File(filePath);
        File[] files = directory.listFiles((dir, name) -> name.endsWith(fileSuffix));
        if (files == null || files.length == 0) {
            log.info("没有新数据,任务结束");
            return result;
        }
        //如果有多个文件,只处理第一个
        if (files.length > 1) {
            Arrays.sort(files, Comparator.comparing(File::getName));
            result = true;
        }
        File file = files[0];
        log.info("正在读取文件:" + file.getName());
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"))) {                String line;
            StringBuilder sql = new StringBuilder();
            while ((line = reader.readLine()) != null) {
                // 去除行尾空格和可能的分号,然后添加到SQL构建器中
                sql.append(line.trim()).append(" ");
                //当前行以分号结束,则认为SQL语句结束
                if (line.trim().endsWith(";")) {
                    jdbcTemplate.execute(sql.toString().trim().replaceFirst(";\\s*$", ""));
                    sql = new StringBuilder(); // 重置SQL构建器
                }
            }
            // 添加最后一条SQL(如果没有以分号结束)
            if (sql.length() > 0) {
                jdbcTemplate.execute(sql.toString().trim());
            }
        }
        log.info("文件入库完毕,删除中");
        Boolean flag = file.delete();
        if (flag) {
            log.info("删除成功" + file);
        } else {
            log.info("删除失败" + file);
            throw new Exception("删除失败,事务回滚");
        }
        return result;
    }
}

标签:canal,ftp,entry,跨网,sql,new,import,size
From: https://blog.csdn.net/m0_70720259/article/details/142338144

相关文章

  • FTP和TFP
    文件传输协议FTP文件传输协议类型:ASCII码(传输文本类型)、二进制(传输非文本类型)、采用C/S架构,需要设置账号密码进程分类控制进程:建立FTP会话,基于TCP21数据进程:传输数据,基于TCP20思科命令用途ipftpusernamename客户端创建FTP账户ipftppasswordAA123456......
  • 常用的运维工具:文件传输工具详解(SCP, SFTP)
    在信息技术(IT)运维中,文件传输是日常工作中不可或缺的一部分。运维工程师需要高效、安全地在不同服务器之间传输文件,以确保系统的正常运行和数据的完整性。本文将详细介绍两种常用的文件传输工具——SCP(SecureCopyProtocol)和SFTP(SecureFileTransferProtocol),帮助读者更好......
  • sftp 传输文件
    简介SFTP(SSHFileTransferProtocol)是一种通过安全外壳(SSH)传输文件的协议。它提供了一种安全的方式在网络上进行文件传输。命令解释sftp:这是命令的主要部分,表示你想使用SFTP程序进行文件传输。root:这是你想要连接到远程服务器上的用户名。在这个例子中,使用的是 root 用......
  • centos8 搭建NFS、Samba 和 FTP 共享服务
    centos8搭建NFS、Samba和FTP共享服务1.搭建NFS共享服务1:安装NFS服务器2:启动并设置NFS服务3:配置NFS共享4:创建并设置共享目录的权限5:重新导出文件系统6:配置防火墙7:测试NFS共享2.搭建Samba共享服务1:安装Samba2:启动并设置Samba服务3......
  • FTP、HTTP上传
    1.设置FTP上传打开控制面板—>程序—>启动或关闭windows功能,找到互联网信息服务勾选✔其中的ftp服务器,web管理,万维网服务,系统就会安装IIS服务管理器了,安装过程可能需要等待几分钟。回到电脑桌面,右击“计算机”,点击管理,进入计算机管理界面。在这里,我们就可以看到刚刚添加的II......
  • 网络编程基础项目一:TCP实现FTP功能
    目录FTP核心原理项目功能介绍: 大致思路复习stat函数stat获取当前路径下文件的属性代码服务器客户端 总结FTP核心原理客户端连接服务器后,向服务器发送一个文件。文件名可以通过参数指定,服务器端接收客户端传来的文件(文件名随意),如果文件不存在自动创建文件,如果......
  • 威联通NAS指南丨SMB、FTP、WebDAV等协议
    随着时代的发展,手机屏幕越来越大,拍照越来越清晰,影视画质更高清......同时也会遇到一些问题,拍照清晰了,占用内存也变大了;视频画质更好了,网盘容量跟不上了;大家对自己的数据隐私问题也更加敏感了。这时在家配置一台NAS是不错的选择,可将手机中的照片、视频备份到NAS中,告别手机内存......
  • sftp连接失败
    故障现象:正常连接方式:sftp-oPort=22sftp部署之后正常连接没问题,换了地方之后连接失败,其他设备连接正常检查:防火墙清除selinux关闭网络ping可通ssh-vvv发现卡住了 解决:sftp-oCiphers=+aes128-ctr 用户名@域名或IP 解释:这个命令行参数用于指定SFTP(SecureFil......
  • FTP
    FTPFTP的主动模式(ActiveMode)和被动模式(PassiveMode)的主要区别在于数据连接的建立方式,这涉及到客户端和服务器之间如何相互连接以传输数据。以下是两种模式的详细比较:主动模式(ActiveMode)连接建立:客户端首先连接到服务器的21端口建立控制连接。数据端口:客户端在本地选择一个......
  • 怎样的跨网文件安全交换系统,能让企业投入产出比最高?
    网络安全技术越来越成为企业进行数据保护的手段,企业有数据安全的意识,且进行网络安全防护建设,是企业信息化建设和安全意识增强的表现。但对于很多小而精的企业来说,进行网络隔离建设成本可控,但随之也会带来一系列其他的成本。如网络隔离后,需要考虑跨网文件安全交换系统的建设。......