canal服务端
编辑my.ini文件,保存后重启mysql,执行show variables like 'log_bin'; 显示on代表开启
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
#binlog文件最大值
max_binlog_size = 1000M
配置一个canal使用的账户,不配置也行,可以使用现有mysql账户。
CREATE USER canal@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
下载 deployer-1.1.7,编辑deployer-1.1.7\conf\example\instance.properties文件
#刷新日志文件
flush logs;
#查看现在使用的binlog名称及偏移量,记录下来配置到canal中
show master status
# position info
canal.instance.master.address=127.0.0.1:3306 #监听的mysql地址
# username/password #mysql账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
#canal.instance.filter.regex=.*\\..* #默认是所有库所有表
canal.instance.filter.regex=mydata\\..* #监听mydata库下所有表
# 指定binlog文件名和位置
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=1234
deployer-1.1.7\bin\目录下有启动文件,windows双击bat文件即可
不是首次启动的话,如果想修改目前读取的binlog位置,需要将deployer-1.1.7\conf\目录下meta.dat,h2.mv.db文件删掉,里面记录了目前binlog的读取位置。
canal客户端
将canal读取到的数据转成sql写入文件,并上传ftp
package com.canal.conf;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.canal.constants.Constants;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.net.ftp.FTPClient;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Log4j2
public class CanalClient {
//每次执行sql队列
private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
//sql队列最大值
private static int BACH_SIZE = 1000;
public static void exportFtp() {
try {
int count;
do{
//读取数据
startCanal();
count = SQL_QUEUE.size();
//有数据,创建sql文件并上传ftp
if (count > 0) {
buildQueueSqlFile();
uploadFileToFtp();
}
}while (count == BACH_SIZE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void startCanal() {
//连接canal
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1"
, 11111), "example", "", "");
try {
//订阅Desctinstion
connector.connect();
connector.subscribe();
connector.rollback();
try {
//取数据
Message message = connector.getWithoutAck(BACH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size != 0) {
dataHandle(message.getEntries());
}
connector.ack(batchId);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 队列里面的sql语句
*/
public static void buildQueueSqlFile() {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(Constants.FILE_PATH, true),
StandardCharsets.UTF_8))) {
//获取队列
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
if (sql != null) {
writer.write(sql + ";");
writer.newLine();
}
}
} catch (FileNotFoundException e) {
log.error("File not found: ", e);
} catch (IOException e) {
log.error("IO error: ", e);
}
}
//上传文件到ftp
private static void uploadFileToFtp() throws IOException {
File file = new File(Constants.PATH);
if (file.length() == 0) {
return;
}
//上传并删除原文件
FTPClient ftpClient = new FTPClient();
try {
String fileName = System.currentTimeMillis() + Constants.FILE_SUFFIX;
ftpClient.connect(Constants.HOST, Constants.PORT);
ftpClient.enterLocalPassiveMode();
ftpClient.login(Constants.USERNAME, Constants.PASSWORD);
FileInputStream fileInputStream = new FileInputStream(Constants.PATH);
ftpClient.changeWorkingDirectory(Constants.FOLDER_PATH);
ftpClient.storeFile(fileName, fileInputStream);
fileInputStream.close();
ftpClient.logout();
ftpClient.disconnect();
file.delete();
} finally {
if (ftpClient.isConnected()) {
ftpClient.disconnect();
}
}
}
/**
* 处理读取到的数据
* @param entrys
*/
private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 更新语句
* @param entry
*/
private static void saveUpdateSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = " + formatValue(newColumnList.get(i)));
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
sql.append(column.getName() + " = " + formatValue(column));
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 删除语句
* @param entry
*/
private static void saveDeleteSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
sql.append(column.getName() + " = " + formatValue(column));
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 插入语句
* @param entry
*/
private static void saveInsertSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(formatValue(columnList.get(i)));
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 格式化参数值
*/
private static String formatValue(Column column) {
//空直接返回
if(column.getIsNull()){
return "NULL";
}
//字符串处理
return sqlParam(column.getValue().replace("'", "‘").replace("\"", "“"));
}
private static String sqlParam(String value) {
return "'" + value + "'";
}
}
另一个网络读取sql文件执行就可以
package com.canal.conf;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.*;
import java.util.Arrays;
import java.util.Comparator;
@Component
@Log4j2
public class Import {
@Value("${filePath}")
private String filePath;
@Value("${fileSuffix}")
private String fileSuffix;
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional(rollbackFor = Exception.class)
public Boolean ftpDataImport() throws Exception {
Boolean result = false;
File directory = new File(filePath);
File[] files = directory.listFiles((dir, name) -> name.endsWith(fileSuffix));
if (files == null || files.length == 0) {
log.info("没有新数据,任务结束");
return result;
}
//如果有多个文件,只处理第一个
if (files.length > 1) {
Arrays.sort(files, Comparator.comparing(File::getName));
result = true;
}
File file = files[0];
log.info("正在读取文件:" + file.getName());
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"))) { String line;
StringBuilder sql = new StringBuilder();
while ((line = reader.readLine()) != null) {
// 去除行尾空格和可能的分号,然后添加到SQL构建器中
sql.append(line.trim()).append(" ");
//当前行以分号结束,则认为SQL语句结束
if (line.trim().endsWith(";")) {
jdbcTemplate.execute(sql.toString().trim().replaceFirst(";\\s*$", ""));
sql = new StringBuilder(); // 重置SQL构建器
}
}
// 添加最后一条SQL(如果没有以分号结束)
if (sql.length() > 0) {
jdbcTemplate.execute(sql.toString().trim());
}
}
log.info("文件入库完毕,删除中");
Boolean flag = file.delete();
if (flag) {
log.info("删除成功" + file);
} else {
log.info("删除失败" + file);
throw new Exception("删除失败,事务回滚");
}
return result;
}
}
标签:canal,ftp,entry,跨网,sql,new,import,size
From: https://blog.csdn.net/m0_70720259/article/details/142338144