首页 > 数据库 >Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

时间:2023-11-19 13:33:05浏览次数:41  
标签:Canal String commodity Redis private Kafka tbCommodityInfo consumer id

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:
  kafka:
      # Kafka服务地址
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: consumer-group1
      #序列化反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {
    //数据
    private List<TbCommodityInfo> data;
    //数据库名称
    private String database;
    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}
public class MysqlType {
    private String id;
    private String commodity_name;
    private String commodity_price;
    private String number;
    private String description;
    //getter、setter方法
}
public class SqlType {
    private int id;
    private int commodity_name;
    private int commodity_price;
    private int number;
    private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {
    //日志记录
    private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
    //redis操作工具类
    @Resource
    private RedisClient redisClient;
    //监听的队列名称为:canaltopic
    @KafkaListener(topics = "canaltopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
        //转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
        //获取是否是DDL语句
        boolean isDdl = canalBean.getIsDdl();
        //获取类型
        String type = canalBean.getType();
        //不是DDL语句
        if (!isDdl) {
            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
            //过期时间
            long TIME_OUT = 600L;
            if ("INSERT".equals(type)) {
                //新增语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //新增到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else if ("UPDATE".equals(type)) {
                //更新语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //更新到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else {
                //删除语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //从redis中删除
                    redisClient.deleteKey(id);
                }
            }
        }
    }
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。 网的回答,大家参考一下 img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。 本文由博客群发一文多发等运营工具平台 OpenWrite 发布

标签:Canal,String,commodity,Redis,private,Kafka,tbCommodityInfo,consumer,id
From: https://blog.51cto.com/u_14725510/8472336

相关文章

  • Redis缓存雪崩、击穿、穿透解释及解决方法,缓存预热,布隆过滤器 ,互斥锁
    Redis缓存雪崩、击穿、穿透解释及解决方法,缓存预热,布隆过滤器,互斥锁......
  • 在Linux环境安装redis步骤,且设置开机自动启动redis
    原创/朱季谦最近一直在学习redis相关知识,看了很多理论知识后,觉得还是要多动手操作,就如王阳明说的知行合一那样,因此,便决定在linux环境安装了redis,过程捣鼓了一番,也遇到了一些波折,但最后还是成功安装完成,顺便把步骤流程记录了下来,分享给有需要的小伙伴。1.首先,我在linux的/usr/local/......
  • kafka入门(一):kafka消息消费
    安装kafka,创建topic:Windows安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851Linux安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353添加依赖包:<dependency><groupId>org.springfr......
  • 8、Redis发布订阅(了解即可)
    一、是什么二、能干嘛三、常用命令(1)(2)(3)(4)(5)(6)四、案例演示1、当堂演示2、小总结pub/sub缺点......
  • 7、Redis管道
    问题由来一、是什么二、案例演示三、小总结1、pipeline与原生批量命令对比2、pipeline与事务对比3、使用pipeline注意事项......
  • Redis7 RDB-AOF混合持久化
    1、官方建议2、rdbvsaof3、怎么选RDb持久化方式能够在指定的时间间隔对你的数据进行快照存储AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据AOF命令以redis协议追加保存每次写的操作到文件末尾4、同时开启两种持久化方式在这种情......
  • redis lua循环
    Redis的Lua脚本支持循环,可以使用for循环和while循环等方式实现。下面是一个使用for循环的示例:--获取列表中所有元素的值并相加localsum=0locallist=redis.call('LRANGE','mylist',0,-1)fori=1,#listdosum=sum+tonumber(list[i])endreturnsum在这个示例......
  • 6、Redis事务
    一、是什么二、能干嘛三、Redis事务VS数据库事务四、怎么玩1、常用命令2、正常执行3、放弃事务4、全体连坐5、冤头债主6、watch监控五、小总结......
  • 5、Redis持久化
    一、持久化:RDB+AOF1、RDB(RedisDataBase)1.1官网介绍1.2是什么1.3能干嘛1.4案例演示1.4.1需求说明1.4.2配置文件(redis6vsredis7)(1)redis6.0.16版本以下配置文件(2)redis6.2~redis7.0.0版本配置文件1.4.3操作步骤1.4.3.1自动触发(1)5秒2次修改(2)修......
  • kafka安装教程
    检查java8没有就安装java-version安装jdk1.8yum-yinstalljava-1.8.0-openjdk下载kafka(网速很慢)wgethttps://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz解压缩tar-xzfkafka_2.13-3.5.0.tgzcdkafka_2.13-3.5.0后台启动ZooKeeper服务(这里使用kafka里......