首页 > 其他分享 >面试官:使用 RocketMQ 怎么进行灰度发布?

面试官:使用 RocketMQ 怎么进行灰度发布?

时间:2023-07-10 23:06:54浏览次数:37  
标签:gray 面试官 private 过滤 灰度 msg Consumer RocketMQ

今天来聊一聊 RocketMQ 的灰度方案。

灰度发布是指在黑与白之间,平滑过渡的一种发布方式。在大流量的系统中,如果一次升级改造范围比较大,或者影响内容不太确定,一般会采用切量的方式进行升级,这样可以减少生产变更带来的影响。

面试官:使用 RocketMQ 怎么进行灰度发布?_Group

如上图,对 ServiceA 这个服务进行升级,采用灰度发布,先升级 Server5,一周后如果没有问题,升级 Server4 和 Server 3,再运行一周没有问题,把剩下两个节点都升级。

上面的案例是一个 RPC 的调用。但如果使用消息队列该怎么做呢?使用消息队列,并不能使用网关来进行流量转发。这里需要分不同场景进行分析。

1 只升级消费者

这是最简单的情况,比如只有消费者修改了消费逻辑,就是 RPC 调用的情况类似,我们只要把消费者进行灰度发布就可以。如下图:

面试官:使用 RocketMQ 怎么进行灰度发布?_灰度_02

2 生产者也升级

下面是一个订单的实体类,我们新加了一个属性,订单生成时间

public class Order {
    private Long id;
    private Long userId;
    private Long productId;
    private Integer count;
    private BigDecimal payAmount;
    /**订单状态:0:创建中;1:已完结*/
    private Integer status;
    /**新加属性,订单生成时间*/
    private String createTime;
}

消费端的改造是需要对 createTime 这个属性进行处理。

2.1 消费端过滤

在生产者的 Order 类中增加 createTime 属性,如果我们直接使用 createTime 属性来过滤,消费者并不能实现灰度,因为所有的消费者都可能会拉取到带有 createTime 属性的消息。

RocketMQ 中 Message 的定义如下:

public class Message implements Serializable {
    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;
}

可以在 properties 属性中增加一个灰度标识,比如生产者发送消息的时候封装如下:

Message msg = buildMessage(topic);
msg.putUserProperty("gray", "true");

注意:也可以在 SendMessageHook 这个钩子函数中定义。通过这种方式可以在消费端新增加一个灰度 Consumer Group,用来对灰度消息则进行消费。如下图:

面试官:使用 RocketMQ 怎么进行灰度发布?_灰度_03

对于灰度 Consumer Group 判断到 gray 属性是 true 时进行消费,而对于普通 Consumer Group,判断到 gray 属性不等于 true 时再进行消费。这里可以借助 RocketMQ 客户端的 FilterMessageHook,代码如下:

defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
 @Override
 public String hookName() {
  return "filterHook";
 }

 @Override
 public void filterMessage(FilterMessageContext context) {
  List<MessageExt> messages = context.getMsgList();
                context.setMsgList(messages.stream().filter(m -> StringUtils.equals(m.getProperty("gray"),"true"))
                        .collect(Collectors.toList()));
 }
});

不过这样会有两个问题,灰度和正常的两个 Consumer Group 相当于是广播组

  1. 两个组都要对所有的消息进行拉取,比如本来使用灰度发布计划切 10% 的流量,但实际上全部流量都切过去了,只是根据属性做了判断。这让消费端整体承担了两倍的压力;
  2. 因为两个消费者组都要去 Broker 拉取消息,Broker 的压力也增加了一倍。

2.2 Broker 过滤

2.2.1 使用 tag 过滤

如果一个 Consumer 不订阅一个 Topic 中的全部消息,可以通过 Tag 来过滤。比如一个 Consumer 订阅了 TopicA 这个 Topic 中的 Tag1 和 Tag2 这两个 tag,那这个 Consumer 的订阅关系如下图:

SubscriptionData 这个对象封装了 Topic、tag 以及所订阅 tag 的 hashcode 集合。

Consumer 发送拉取消息请求时,会把订阅关系传给 Broker(Broker 解析成 SubscriptionData 对象),Broker 使用 consumequeue 获取消息时,首先判断判断最后 8 个字节的 tag hashcode 是否在 SubscriptionData 的 codeSet 中,如果不在就跳过,如果存在把消息返回给 Consumer。如下图:

面试官:使用 RocketMQ 怎么进行灰度发布?_发送消息_04

这样可以在灰度 Producer 发送消息时加上 Tag,如下代码:

Message msg = new Message();
msg.setBody("Test");
msg.setTopic("Topic");
msg.setTags("Gray");

而在灰度消费者订阅 Gray 这个 tag。这样就避免了 2.1 节中消息全量拉取的问题。

2.2.2 使用 SQL92 过滤

使用 SQL92 过滤,可以应对更加复杂的场景,不仅可以过滤 Tag,还可以过滤 UserProperty。

比如下面是一个生产者的代码:

Message msg = new Message();
msg.setTopic("testTopic");
msg.setTags("tag1");
msg.putUserProperty("gray","true");

这样消费者初始化的时候,可以定义使用 SQL92 过滤,代码如下:

consumer.subscribe("testTopic",
            MessageSelector.bySql("(TAGS is not null and TAGS in TAGS='''''tag1''''')" +
                "and (gray is not null gray='true')"));

下面是 bySql 的源代码:

public static MessageSelector bySql(String sql) {
 return new MessageSelector(ExpressionType.SQL92, sql);
}

3 总结

本文介绍了 RocketMQ 灰度消息的使用方法,场景比较简单。对于全链路的复杂灰度场景,可以参考使用阿里的微服引擎 MSE。

标签:gray,面试官,private,过滤,灰度,msg,Consumer,RocketMQ
From: https://blog.51cto.com/u_16173760/6681077

相关文章

  • 基于云原生网关的全链路灰度实践
    作者:倪海峰(海迩)前言随着企业规模的不断扩大,传统单体应用已很难进一步支持业务的发展,业务的迭代速度已经难以满足业务的增长,此时企业会对应用系统做微服务化的改造,降低业务的耦合度,提升开发迭代的效率,让开发更加敏捷。系统架构微服务化的,原本的愿景是希望通过将系统的颗粒度变......
  • 京东面试官问:LEFT JOIN关联表中用ON还是WHERE跟条件有什么区别
    之前有码友去京东面试,被问到LEFTJOIN关联表中用ON还是WHERE跟条件有什么区别,很快就答出来了,可是追问什么原因造成这一情况的,一时没回答上来。下面说说,想通过AleftBjoinonand后面的条件来使查出的两条记录变成一条,奈何发现还是有两条。后来发现joinonand不会过......
  • 三、 安装RocketMQ
    RocketMQ的安装包分为两种,二进制包和源码包。点击这里下载ApacheRocketMQ5.1.3的源码包。你也可以从这里下载到二进制包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。 这里使用Windows系统安装RocketMQ5.1.3。JAVA版本选择jdk1.8。下载页面是:ht......
  • windows配置RocketMQ并测试发送消息
    https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ下载rocketmq-all-4.9.5-bin-release 必须配置一个RocketMQ路径的环境变量(参考博客的第二个) 配置内容如下,目录在bin的上层   解压,进入解压目录conf,修改broker.conf在该文件中加入两行(建议直接复......
  • 一、 RocketMQ介绍
    一、为什么选择RocketMQ在阿里孕育RocketMQ的雏形时期,将其用于异步通信、搜索、社交网络活动流、数据管道,贸易流程中。随着阿里的贸易业务吞吐量的上升,源自阿里的消息传递集群的压力也变得紧迫。根据阿里的研究,随着队列和虚拟主题使用的增加,ActiveMQIO模块达到了一个瓶颈。阿......
  • 阿里面试官:谈谈对Redis哈希表的理解
    不少朋友问我,能不能搞个八股文精讲,把面试问题讲讲透,于是系列就这样诞生了。咱们第一期先聊聊Redis。相信哈希表大家并不陌生,今天顺便聊聊Redis的哈希表。Hash表回顾哈希表是一种存储数据的结构,它有很多名字(键值对、字典、符号表、映射、关联数组)。在哈希表中,键和值是一一对应的......
  • RocketMq5.0 任意延迟时间 TimerMessageStore 源码解析
    TimerMessageStore简略介绍延迟队列rmq_sys_wheel_timer指定时间的延迟消息。会先投递到rmq_sys_wheel_timer队列中然后由TimerMessageStore消费队列数据,将数据消费到timerWheel使用时间轮算法,实现秒级任务TimerMessageStore操作的文件store\consumequeue\rmq_sy......
  • 阿里Java二面:说说Spring MVC执行流程及原理?这样聊能吊打面试官
    面试找虐博主之前每次去面试必问的问题:“讲一下springmvc的执行流程以及常用组件的作用”;记得第一次和面试官说了大概的流程是这样的:“服务器收到一个请求后会先去HandlerMapping中匹配url,找到url之后用HandlerAdapter适配器去执行这个控制器(controller层),执行完之后返回一个mo......
  • 阿里Java二面:说说Spring MVC执行流程及原理?这样聊能吊打面试官
    面试找虐博主之前每次去面试必问的问题:“讲一下springmvc的执行流程以及常用组件的作用”;记得第一次和面试官说了大概的流程是这样的:“服务器收到一个请求后会先去HandlerMapping中匹配url,找到url之后用HandlerAdapter适配器去执行这个控制器(controller层),执行完之后返回一个mode......
  • 使用Nginx Ingress实现灰度发布和蓝绿发布
    应用场景      使用NginxIngress实现灰度发布适用场景主要取决于业务流量切分的策略,目前NginxIngress支持基于Header、Cookie和服务权重三种流量切分的策略,基于这三种策略可实现以下两种发布场景:场景一:切分部分用户流量到新版本     假设线上已运行了一套对外提......