首页 > 数据库 >(转载)【RocketMQ 课程笔记】18.消费者基于自定义属性实现SQL过滤

(转载)【RocketMQ 课程笔记】18.消费者基于自定义属性实现SQL过滤

时间:2022-09-30 14:01:08浏览次数:51  
标签:自定义 18 itlaoqi source com SQL data id rocketmq

消费者基于自定义属性实现SQL过滤

在发送消息时,发送方可以自定义消息的用户属性,消费者可以利用SQL92的WHERE子句语法实现消息过滤。
相比Tag过滤,消息过滤使用更加灵活,也更容易被程序猿接受,但相较Tag过滤执行效率较低。
下面咱们来看案例:

消息生产者

com.itlaoqi.rocketmq.sqlfilter.SfProducer
消息发送方和标准发送有两点变化:

  • 可以不设置消息的Tag与Key,转而使用用户自定义属性,这里实现了source与id两个自定义属性的赋值
  • 利用message.putUserProperty为用户赋予自定义属性
@Slf4j
public class SfProducer {
    public static void main(String[] args) {
        //DefaultMQProducer用于发送非事务消息
        DefaultMQProducer producer = new DefaultMQProducer("sf-producergroup");
        //注册NameServer地址
        producer.setNamesrvAddr("192.168.31.103:9876");
        /*//异步发送失败后Producer自动重试2次
        producer.setRetryTimesWhenSendAsyncFailed(2);*/
        try {
            //启动生产者实例
            producer.start();
            for(Integer i = 0 ; i < 10 ; i++) {
                Thread.sleep(1000);
                Integer rnd = new Random().nextInt(10);
                //用户自定义属性
                String source = "";
                switch (rnd % 3){
                    case 0:
                    source = "jd";
                    break;
                case 1:
                    source = "tmall";
                    break;
                case 2:
                    source = "taobao";
                    break;
        	}
            //消息数据
            String data = "第" + i + "条消息数据";
            //消息主题,使用用户自定义属性时可以不设置tag与key
            Message message = new Message("sf-sample-data", data.getBytes());
            message.putUserProperty("id" , i.toString());
            message.putUserProperty("source", source);
            //发送结果
            SendResult result = producer.send(message);
            log.info("id:{},source:{},data:{}" ,i.toString(), source,data);
        }
        }catch (Exception e){
        	e.printStackTrace();
        }finally {
            try {
                //关闭连接
                producer.shutdown();
                log.info("连接已关闭");
            }catch (Exception e){
            	e.printStackTrace();
            }
        }
    }
}

运行结果

09:34:52.771 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:0,source:jd,data:第0条消息数据
09:34:53.788 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:1,source:jd,data:第1条消息数据
09:34:54.801 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:2,source:tmall,data:第2条消息数据
09:34:55.818 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:3,source:tmall,data:第3条消息数据
09:34:56.836 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:4,source:jd,data:第4条消息数据
09:34:57.850 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:5,source:taobao,data:第5条消息数据
09:34:58.865 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:6,source:taobao,data:第6条消息数据
09:34:59.880 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:7,source:jd,data:第7条消息数据
09:35:00.896 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:8,source:tmall,data:第8条消息数据
09:35:01.911 [main] INFO com.itlaoqi.rocketmq.sqlfilter.SfProducer - id:9,source:taobao,data:第9条消息数据

消息消费者

默认RocketMQ并未开启自定义属性SQL过滤的选项,需要在配置文件中额外开启,如下所示:
master.conf:Master节点配置文件追加下面选型

#开启自定义属性SQL过滤
enablePropertyFilter=true

完整如下:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
namesrvAddr=192.168.31.103:9876
autoCreateTopicEnable=true
#开启自定义属性SQL过滤
enablePropertyFilter=true

slave.conf:Slave节点也要追加该配置项,别忘记

京东消费者

com.itlaoqi.rocketmq.sqlfilter.SfJDConsumer
京东消费者负责消费source='jd'的数据,和标准消费者最大的不同便是在subscribe方法第二个参数不再是Tag,而改为MessageSelector.bySql方法,利用WHERE子句写法对自定义属性实现过滤,源码如下

@Slf4j
public class SfJDConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sf-jdconsumer-group");
        consumer.setNamesrvAddr("192.168.31.103:9876");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //利用SQL WHERE子句写法对自定义属性进行过滤
        consumer.subscribe("sf-sample-data", MessageSelector.bySql("source='jd'"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(msg->{
                    log.info("id:{},source:{},data:{}", msg.getUserProperty("id"), msg.getUserProperty("source"), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        log.info("集群消费者启动成功,正在监听新消息");
    }
}

运行结果

09:34:52.770 [ConsumeMessageThread_2] INFO com.itlaoqi.rocketmq.sqlfilter.SfJDConsumer - id:0,source:jd,data:第0条消息数据
09:34:53.788 [ConsumeMessageThread_3] INFO com.itlaoqi.rocketmq.sqlfilter.SfJDConsumer - id:1,source:jd,data:第1条消息数据
09:34:56.836 [ConsumeMessageThread_4] INFO com.itlaoqi.rocketmq.sqlfilter.SfJDConsumer - id:4,source:jd,data:第4条消息数据
09:34:59.880 [ConsumeMessageThread_5] INFO com.itlaoqi.rocketmq.sqlfilter.SfJDConsumer - id:7,source:jd,data:第7条消息数据

阿里消费者

com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer
京东消费者负责消费天与猫淘宝的数据,与京东消费者最明显的区别是:

  • 因为业务范围不同,消费者组不一样;
  • bySQL的要获取多个数值,可用下面语法
    • source in ('tmall','taobao')
    • source = 'tmall' or source = 'taobao'
...
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sf-aliconsumer-group");
...
consumer.subscribe("sf-sample-data", MessageSelector.bySql("source in ('tmall','taobao')"));

运行结果

09:34:54.803 [ConsumeMessageThread_14] INFO com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer - id:2,source:tmall,data:第2条消息数据
09:34:55.819 [ConsumeMessageThread_15] INFO com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer - id:3,source:tmall,data:第3条消息数据
09:34:57.850 [ConsumeMessageThread_16] INFO com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer - id:5,source:taobao,data:第5条消息数据
09:34:58.865 [ConsumeMessageThread_17] INFO com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer - id:6,source:taobao,data:第6条消息数据
09:35:00.896 [ConsumeMessageThread_18] INFO com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer - id:8,source:tmall,data:第8条消息数据
09:35:01.911 [ConsumeMessageThread_19] INFO com.itlaoqi.rocketmq.sqlfilter.SfAliConsumer - id:9,source:taobao,data:第9条消息数据

标签:自定义,18,itlaoqi,source,com,SQL,data,id,rocketmq
From: https://www.cnblogs.com/JamKing/p/16744684.html

相关文章

  • sql查询,报错,布尔盲注
    一、联合查询注入:当有.php?id=1,参数值为id(1)看有无报错?'(2)判断数字型还是字符型注入?1数字型:两次返回的页面不同1and1=11and1=2字符型:两次返回的页面不同1'......
  • 关系数据库标准语言SQL(2)数据定义
    数据定义分为四个定义:模式(SCHEMA)定义、表(TABLE)定义、视图定义、索引定义操作的方式:创建(CREATE)、删除(DROP)、修改(ALTER),在每种定义前前加上相应操作的关键字,而后加上定义关......
  • SQL之检索数据
    1、检索单个列1select2col_name3from4table_name;2、检索多个列1select2col_name,3col_name4from5table_name;3、检索所有列使用·*......
  • mysql主从复制常见问题(useing version:8)
    Fatalerror:TheslaveI/OthreadstopsbecausemasterandslavehaveequalMySQLserverids;theseidsmustbedifferentforreplicationtowork(orthe--rep......
  • 浅谈 MySQL 连表查询
    浅谈MySQL连表查询连表查询是一把双刃剑,优点是适应范式,减少数据冗余;缺点是连表查询特别是多张表的连表会增加数据库的负担,降低查询效率.简介连表查询就是2......
  • mysql常用命令
    CREATEUSER'用户名'@'可连接ip,%表示任意ip'IDENTIFIEDBY'密码';DROPUSER[IFEXISTS]存在才删除 user[,user]...用户列表DROP USER IF EXISTS 用户名GRA......
  • MYBatis-动态SQL
    MyBatis动态SQL什么是动态SQL?官方给出动态SQL的解释是一个基于OGNL的表达式,MyBatis3替换了之前的大部分元素,大大精简了元素种类,现在要学习的元素种类比原来的一半还要少......
  • 关于Mysql [ERR] 1118 - Row size too large (> 8126)解决方法
    Mysql版本:8.0系统:win10错误描述:[ERR]1118-Rowsizetoolarge(>8126).ChangingsomecolumnstoTEXTorBLOBorusingROW_FORMAT=DYNAMICorROW_FORMAT=COMP......
  • SQL之前述
    1、背景前不久,买了本SQL必知必会,看了下,然后就开始在某客网刷题,从此篇起,1、准备结合<<SQL必知必会>>和某客网的题目开始记录 2、后面还会去搜......
  • sql排序函数 rank() / dense_rank()
    Rank排名函数1、rank()按照某字段的排序结果添加排名,但是他是跳跃的、间断的排名partitionby子句按照对应字段将结果集分为多个分区,然后orderby子句按分数对结果集进......