1、引入pom文件
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
2、自定义注解【BinLogs】
import java.lang.annotation.*;
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BinLogs
{
}
3、注解实现类【BinLogsValidator】
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.lang3.ObjectUtils;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 注解实现
*/
@Component
@Aspect
public class BinLogsValidator {
private static final int BATCH_SIZE = 10;
// 可使用常量,也可用yml配置
// @Value("${common.canalurl}")
private String canalUrl = "10.0.0.11";
// @Value("${common.canalport}")
private Integer canalport = 111111;
@Resource
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor poolTaskExecutor;
/**
* 定义切入点
*/
@Pointcut("@annotation(BinLogs)")
public void noRepeat() {
}
/**
* 后置通知:在连接点之后执行通知
*/
@After("noRepeat()")
public void after() {
//可携带入参
String url = canalUrl;
Integer port = canalport;
poolTaskExecutor.execute(() -> dealAop(url, port));
}
private static void dealAop(String canalUrl, Integer canalport) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalUrl, canalport),
"example", "", "");
try {
// 打开连接
connector.connect();
// 订阅数据库表,全部表
connector.subscribe(".*\\..*");
// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
// 获取批量ID
long batchId = message.getId();
// 获取批量的数量
int size = message.getEntries().size();
// 如果没有数据
if (batchId == -1 || size == 0) {
try {
// 线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 如果有数据,处理数据
printEntry(message.getEntries());
}
// 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
// 开启/关闭事务的实体类型,跳过
continue;
}
// RowChange对象,包含了一行数据变化的所有特征
// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
}
// 获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChage.getEventType();
// 表名称
String tableName = entry.getHeader().getTableName();
// binlog文件名称
String binlog = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset();
// 打印Header信息
System.out.println("操作的binlog:" + binlog);
System.out.println("操作的表:" + entry.getHeader().getSchemaName() + "." + tableName);
System.out.println("操作类型:" + eventType);
if ("sys_binlogs".equals(tableName)) {
return;
}
// 判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
// 获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
// 如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
JSONObject jsonObject = printColumn(rowData.getBeforeColumnsList());
System.out.println("删除语句:" + jsonObject);
// 获取删除的sql
StringBuffer stringBuffer = saveDeleteSql(rowData, tableName);
System.out.println("删除Sql:" + stringBuffer);
//todo 可处理业务逻辑
} // 如果是新增语句
else if (eventType == CanalEntry.EventType.INSERT) {
JSONObject jsonObject = printColumn(rowData.getAfterColumnsList());
System.out.println("新增语句:" + jsonObject);
// 获取插入的sql
StringBuffer stringBuffer = saveInsertSql(rowData, tableName);
System.out.println("新增Sql:" + stringBuffer);
//todo 可处理业务逻辑
} // 如果是更新的语句
else {
// 变更后的数据
System.out.println("------->; after");
JSONObject after = printColumn(rowData.getAfterColumnsList());
System.out.println("变更后的数据:" + after);
// 变更前的数据
System.out.println("------->; before");
JSONObject before = printBeforeColumn(rowData.getBeforeColumnsList(), after);
System.out.println("变更前的数据:" + before);
JSONObject jsonObject = new JSONObject();
jsonObject.put("before", before);
jsonObject.put("after", after);
// 获取修改的sql
StringBuffer stringBuffer = saveUpdateSql(rowData, tableName);
System.out.println("更新Sql:" + stringBuffer);
}
}
}
}
/**
* 变更前的数据内容(根据变更后的字段进行取值操作)
*
* @param columns 变更前的数据每一行
* @param after 变更后的字段key:value
*/
private static JSONObject printBeforeColumn(List<CanalEntry.Column> columns, JSONObject after) {
if (ObjectUtils.isEmpty(after)) {
return new JSONObject();
}
JSONObject jsonObject = new JSONObject();
for (CanalEntry.Column column : columns) {
if (org.apache.commons.lang3.ObjectUtils.isNotEmpty(after.get(column.getName()))) {
jsonObject.put(column.getName(), column.getValue());
}
}
return jsonObject;
}
/**
* 处理数据库表数据变更
*
* @param columns 数据库表数据变更的每一行
*/
private static JSONObject printColumn(List<CanalEntry.Column> columns) {
JSONObject jsonObject = new JSONObject();
for (CanalEntry.Column column : columns) {
if (column.getUpdated()) {
jsonObject.put(column.getName(), column.getValue());
}
}
return jsonObject;
}
/**
* 保存更新语句
*/
private static StringBuffer saveUpdateSql(CanalEntry.RowData rowData, String tableName) {
List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + tableName + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
if (newColumnList.get(i).getUpdated()) {
sql.append(" ").append(newColumnList.get(i).getName()).append(" = '")
.append(newColumnList.get(i).getValue()).append("'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
}
sql.append(" where ");
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : oldColumnList) {
if (column.getIsKey()) {
// 暂时只支持单一主键
sql.append(column.getName()).append("=").append(column.getValue());
break;
}
}
return sql;
}
/**
* 保存删除语句
*/
private static StringBuffer saveDeleteSql(CanalEntry.RowData rowData, String tableName) {
List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + tableName + " where ");
for (CanalEntry.Column column : columnList) {
if (column.getIsKey()) {
// 暂时只支持单一主键
sql.append(column.getName()).append("=").append(column.getValue());
break;
}
}
return sql;
}
/**
* 保存插入语句
*/
private static StringBuffer saveInsertSql(CanalEntry.RowData rowData, String tableName) {
List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + tableName + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'").append(columnList.get(i).getValue()).append("'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
return sql;
}
4、接口使用注解
将此注解加到对应的接口上就行 @BinLogs
标签:canal,java,自定义,column,CanalEntry,rowData,sql,import,append
From: https://blog.csdn.net/qq_38189441/article/details/144344941