首页 > 编程语言 >canal基于自定义注解使用【java】

canal基于自定义注解使用【java】

时间:2024-12-09 12:56:37浏览次数:7  
标签:canal java 自定义 column CanalEntry rowData sql import append

1、引入pom文件

      <dependency>
          <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.1.4</version>
      </dependency>

2、自定义注解【BinLogs】

import java.lang.annotation.*;

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BinLogs
{
}

3、注解实现类【BinLogsValidator】


import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.lang3.ObjectUtils;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * 注解实现
 */
@Component
@Aspect
public class BinLogsValidator {
    private static final int BATCH_SIZE = 10;

    // 可使用常量,也可用yml配置
    // @Value("${common.canalurl}")
    private String canalUrl = "10.0.0.11";
    // @Value("${common.canalport}")
    private Integer canalport = 111111;

    @Resource
    @Qualifier("taskExecutor")
    private ThreadPoolTaskExecutor poolTaskExecutor;

    /**
     * 定义切入点
     */
    @Pointcut("@annotation(BinLogs)")
    public void noRepeat() {
    }

    /**
     * 后置通知:在连接点之后执行通知
     */
    @After("noRepeat()")
    public void after() {
        //可携带入参
        String url = canalUrl;
        Integer port = canalport;
        poolTaskExecutor.execute(() -> dealAop(url, port));
    }

    private static void dealAop(String canalUrl, Integer canalport) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalUrl, canalport),
                "example", "", "");
        try {
            // 打开连接
            connector.connect();
            // 订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            // 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                // 获取批量ID
                long batchId = message.getId();
                // 获取批量的数量
                int size = message.getEntries().size();
                // 如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        // 线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    // 如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                // 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                // 开启/关闭事务的实体类型,跳过
                continue;
            }
            // RowChange对象,包含了一行数据变化的所有特征
            // 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
            }
            // 获取操作类型:insert/update/delete类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            // 表名称
            String tableName = entry.getHeader().getTableName();
            // binlog文件名称
            String binlog = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset();
            // 打印Header信息
            System.out.println("操作的binlog:" + binlog);
            System.out.println("操作的表:" + entry.getHeader().getSchemaName() + "." + tableName);
            System.out.println("操作类型:" + eventType);
            if ("sys_binlogs".equals(tableName)) {
                return;
            }
            // 判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            // 获取RowChange对象里的每一行数据,打印出来
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                // 如果是删除语句
                if (eventType == CanalEntry.EventType.DELETE) {
                    JSONObject jsonObject = printColumn(rowData.getBeforeColumnsList());
                    System.out.println("删除语句:" + jsonObject);
                    // 获取删除的sql
                    StringBuffer stringBuffer = saveDeleteSql(rowData, tableName);
                    System.out.println("删除Sql:" + stringBuffer);
                    //todo 可处理业务逻辑
                } // 如果是新增语句
                else if (eventType == CanalEntry.EventType.INSERT) {
                    JSONObject jsonObject = printColumn(rowData.getAfterColumnsList());
                    System.out.println("新增语句:" + jsonObject);

                    // 获取插入的sql
                    StringBuffer stringBuffer = saveInsertSql(rowData, tableName);
                    System.out.println("新增Sql:" + stringBuffer);
                    //todo 可处理业务逻辑
                } // 如果是更新的语句
                else {
                    // 变更后的数据
                    System.out.println("------->; after");
                    JSONObject after = printColumn(rowData.getAfterColumnsList());
                    System.out.println("变更后的数据:" + after);
                    // 变更前的数据
                    System.out.println("------->; before");
                    JSONObject before = printBeforeColumn(rowData.getBeforeColumnsList(), after);
                    System.out.println("变更前的数据:" + before);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("before", before);
                    jsonObject.put("after", after);

                    // 获取修改的sql
                    StringBuffer stringBuffer = saveUpdateSql(rowData, tableName);
                    System.out.println("更新Sql:" + stringBuffer);
                }
            }
        }
    }

    /**
     * 变更前的数据内容(根据变更后的字段进行取值操作)
     *
     * @param columns 变更前的数据每一行
     * @param after   变更后的字段key:value
     */
    private static JSONObject printBeforeColumn(List<CanalEntry.Column> columns, JSONObject after) {
        if (ObjectUtils.isEmpty(after)) {
            return new JSONObject();
        }
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns) {
            if (org.apache.commons.lang3.ObjectUtils.isNotEmpty(after.get(column.getName()))) {
                jsonObject.put(column.getName(), column.getValue());
            }
        }
        return jsonObject;
    }

    /**
     * 处理数据库表数据变更
     *
     * @param columns 数据库表数据变更的每一行
     */
    private static JSONObject printColumn(List<CanalEntry.Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns) {
            if (column.getUpdated()) {
                jsonObject.put(column.getName(), column.getValue());
            }
        }
        return jsonObject;
    }

    /**
     * 保存更新语句
     */
    private static StringBuffer saveUpdateSql(CanalEntry.RowData rowData, String tableName) {
        List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
        StringBuffer sql = new StringBuffer("update " + tableName + " set ");
        for (int i = 0; i < newColumnList.size(); i++) {
            if (newColumnList.get(i).getUpdated()) {
                sql.append(" ").append(newColumnList.get(i).getName()).append(" = '")
                        .append(newColumnList.get(i).getValue()).append("'");
                if (i != newColumnList.size() - 1) {
                    sql.append(",");
                }
            }
        }
        sql.append(" where ");
        List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
        for (CanalEntry.Column column : oldColumnList) {
            if (column.getIsKey()) {
                // 暂时只支持单一主键
                sql.append(column.getName()).append("=").append(column.getValue());
                break;
            }
        }
        return sql;
    }

    /**
     * 保存删除语句
     */
    private static StringBuffer saveDeleteSql(CanalEntry.RowData rowData, String tableName) {
        List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
        StringBuffer sql = new StringBuffer("delete from " + tableName + " where ");
        for (CanalEntry.Column column : columnList) {
            if (column.getIsKey()) {
                // 暂时只支持单一主键
                sql.append(column.getName()).append("=").append(column.getValue());
                break;
            }
        }
        return sql;
    }

    /**
     * 保存插入语句
     */
    private static StringBuffer saveInsertSql(CanalEntry.RowData rowData, String tableName) {
        List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
        StringBuffer sql = new StringBuffer("insert into " + tableName + " (");
        for (int i = 0; i < columnList.size(); i++) {
            sql.append(columnList.get(i).getName());
            if (i != columnList.size() - 1) {
                sql.append(",");
            }
        }
        sql.append(") VALUES (");
        for (int i = 0; i < columnList.size(); i++) {
            sql.append("'").append(columnList.get(i).getValue()).append("'");
            if (i != columnList.size() - 1) {
                sql.append(",");
            }
        }
        sql.append(")");
        return sql;
    }

4、接口使用注解

将此注解加到对应的接口上就行 @BinLogs

标签:canal,java,自定义,column,CanalEntry,rowData,sql,import,append
From: https://blog.csdn.net/qq_38189441/article/details/144344941

相关文章

  • java中的注解使用
    说明java中的注解(Annotation)是用于为代码添加元数据的信息,编译器可以通过注解进行不同的处理。注解本身并不直接影响程序的运行。常见内置注解@Override标记重写父类方法@Deprecated标记类、方法、字段等不推荐使用,可能会在未来的版本中删除。@SuppressWarnings抑制编译器......
  • Java Playwright 浏览器最大化
    Playwright是一个用于自动化Web应用测试的现代工具,支持多种语言(包括Java)及多个浏览器(如Chromium、Firefox和WebKit)。它提供了一致的API来控制浏览器行为,其中包括窗口操作,如最大化。本文将详细介绍如何在JavaPlaywright中实现浏览器窗口的最大化,并提供详细的代码示例。......
  • java review
    一、多态向上转型FUfu=newZi();可以调用子类方法,不能调用子类特有方法(成员方法)成员变量,看等号左边的是谁,调用谁里面的成员变量二、内部类1.什么时候使用内部类:​ 当一个事务的内部,还有一个部分需要定义完整的结构去描述,而这个内部的完整结构又只为外部事物提供......
  • java基础--多线程
    进程与线程进程(Process):每个独立运行的程序都对应一个进程。进程是资源分配的最小单位,占用独立的内存空间和系统资源线程(Thread):CPU调度和分派的基本单位,程序执行过程中的最小单元例如:迅雷是个进程,里面的多个下载任务属于线程二者区别进程是资源分配的基本单位,线程......
  • 如何处理 JavaScript 中的事件委托?
    目录事件委托简介为什么要使用事件委托事件委托的原理事件委托的实际应用4.1示例1:动态生成的列表项点击事件4.2示例2:表单验证事件委托的优缺点常见问题及优化1.事件委托简介事件委托是指将一个事件处理程序绑定到父元素上,而不是直接绑定到每个子元素上。通过事件......
  • 基于java ssm篮球网上商城系统(源码+文档+运行视频+讲解视频)
     文章目录系列文章目录目的前言一、详细视频演示二、项目部分实现截图三、技术栈后端框架SSM前端框架vueSSM框架详细介绍系统测试四、代码参考源码获取目的摘要: 本文论述基于JavaSSM框架构建的篮球网上商城系统。该系统在满足篮球爱好者购物需求和推动篮球运动发......
  • 基于java ssm家用电器上门回收系统回收分配订单(源码+文档+运行视频+讲解视频)
     文章目录系列文章目录目的前言一、详细视频演示二、项目部分实现截图三、技术栈后端框架SSM前端框架vueSSM框架详细介绍系统测试四、代码参考源码获取目的摘要: 本文论述基于JavaSSM框架构建的家用电器上门回收系统。该系统在推动资源循环利用和环保事业中发挥着......
  • 基于java ssm教师教学数据统计分析系统论文课题科研成果工作量考核工作日志(源码+文档+
     文章目录系列文章目录目的前言一、详细视频演示二、项目部分实现截图三、技术栈后端框架SSM前端框架vueSSM框架详细介绍系统测试四、代码参考源码获取目的摘要: 本文论述基于JavaSSM框架构建的教师教学数据统计分析系统。该系统对提升教学管理水平和教师教学质量......
  • 基于java ssm考研租房网站系统房屋租赁租房合同(源码+文档+运行视频+讲解视频)
     文章目录系列文章目录目的前言一、详细视频演示二、项目部分实现截图三、技术栈后端框架SSM前端框架vueSSM框架详细介绍系统测试四、代码参考源码获取目的摘要: 本文介绍基于JavaSSM框架构建的考研租房网站系统。该系统在满足考研学生租房需求方面具有重要意义。......
  • java 打印整数的二进制数
    任何类型的数在计算机底层存储都是以二进制的形式,那么如何知道一个数的二进制数是多少呢?lpublicclassPrintBinary{publicstaticvoidprintBinary(Objectnum){if(num==null){return;}if(numinstanceofInteger){......