首页 > 其他分享 >如何实现一个简单易用的 RocketMQ SDK

如何实现一个简单易用的 RocketMQ SDK

时间:2024-05-06 23:14:38浏览次数:12  
标签:订阅 消费 易用 开发者 msg RocketMQ SDK

2018 年,做为架构负责人,接到一个架构需求:实现一个简单易用的 RocketMQ SDK 。

因为各个团队 RocketMQ 原生客户端配置起来千奇百怪,有的配置存在风险,各团队负责人都需要一个简洁易用的 RocketMQ SDK 。

我立马调研相关开源的方案,当时 RocketMQ-Spring 项目并没有开源,而阿里云的 ONS SDK 是开源的,我只能讲目标转向 阿里云 ONS 。

通过学习 ONS 的设计方式,我对于 RocketMQ 的客户端原理有了进一步了解,同时参考 ONS 的设计,也实现了公司内部使用的 RocketMQ SDK 。

项目地址:https://github.com/makemyownlife/platform-rocketmq

之所以说简单,就是让用户(开发者)使用 SDK 时,减少心智负担

举三个例子:

1 发送顺序消息

使用原生代码发送消息时,会使用如下的代码:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

我们可以将 SDK API 简化为:

SendResult send(final ProducerMessage message, final String shardingKey);

开发者不需要定义队列选择器,只需要传递分片键 orderId 即可。

2 单条消息消费

使用原来代码定义消费监听器时,使用如下的代码:

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

监听器内部,对于开发者操作的对象是消息列表 msgs ,很多开发同学想只操作一条消息。

于是,我们可以将 SDK API 简化为:

consumer.subscribe("mytest", new ConsumerListener() {
    @Override
    public ConsumerAction consumer(ConsumerMessage msg) {
        byte[] body = msg.getBody();
        System.out.println("msg:" + new String(body));
        return ConsumerAction.CommitMessage;
    }
});

开发者在消费时,可以一条一条操作,代码简洁了不少。

同时,很多开发者在使用普通消费、顺序消费时,需要返回延时消费的状态码时,两种消费模式定义的枚举也不相同。我们将枚举做了统一:

/**
 * 消费消息的返回结果
 */
public enum ConsumerAction {

    /**
     * 消费成功,继续消费下一条消息
     */
    CommitMessage,
    
    /**
     * 消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
     */
    ReconsumeLater;
}

3 订阅关系一致

实际场景里,订阅关系不一致是极容易发生的事情,就算是高级别的架构师也会翻车,每次翻车现场都是惨不忍睹。

正确的订阅关系见下图:

正确的订阅关系

代码逻辑角度来看,每个消费者实例内订阅方法的主题、 TAG、监听逻辑都需要保持一致

当订阅关系不一致时,在 Broker 端同一个消费组内的各个消费者客户端的订阅信息相互被覆盖,从而导致某个消费者客户端无法拉取到新的消息。

怎么解决呢 ?

我当时想起了阿里技术专家沈询的一句话:

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它 !

公司内部出现订阅关系一致99%的问题是:消费者组一致的前提下,主题相同,但 TAG 不相同。

基于此,我的设计思路就明确了:不开放订阅 TAG 的权限!

没想到吧,我就是这么粗暴。

按照这种设计思路,虽然开始有的程序员会有质疑,但你和他梳理好消费者组的定义,以及做好领域划分,对业务来讲,反而清晰了。

4 写到最后

我并不认为我们写得多么的好,只是想让同学们理解:

1、成长的第一步就是模仿

2、把开发者当用户,以用户的体验为先

我经常去阅读阿里云产品的 SDK , 去思考他们为什么这么设计, 为什么和开源的有所不同,有的时候开始没想明白,但模仿得多了,好像也慢慢懂了。

要紧的是果敢地迈出第一步,对与错先都不管,自古就没有把一切都设计好再开步的事。

别想把一切都弄清楚,再去走路。鲁莽者要学会思考,善思者要克服的是犹豫。

——史铁生


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

标签:订阅,消费,易用,开发者,msg,RocketMQ,SDK
From: https://www.cnblogs.com/makemylife/p/18176157

相关文章

  • ENVI57扩展工具:FLAASH Easy-to-Use 大气校正易用版 [新]
    本扩展工具要求ENVI5.7及以上版本。低版本ENVI可以使用如下扩展工具:https://www.cnblogs.com/enviidl/p/16393415.html 自ENVI5.7版本开始,FLAASH大气校正功能提供了官方Task接口,详细信息可查看ENVI帮助内ENVI>Programming>ENVITasks>ListofTasks>FLAASH章节......
  • 快速入门一篇搞定RocketMq-实现微服务实战落地
    1、RocketMq介绍RocketMQ起源于阿里巴巴,最初是为了解决邮件系统的高可靠性和高性能而设计的。在2016年开源分布式消息中间件,并逐渐成为Apache顶级项目。现在是Apache的一个顶级项目,在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。官网地址:https://......
  • @aws-sdk 支持的库
    abort-controllerbody-checksum-nodechunked-stream-reader-nodeclient-accessanalyzerclient-accountclient-acmclient-acm-pcaclient-alexa-for-businessclient-ampclient-amplifyclient-amplifybackendclient-amplifyuibuilderclient-api-gatewayclient-apiga......
  • 国密SDK编译
    一、GMSSL-2.x国密SDK源码下载,对GMSSL库进行编译生成对应的静态库。执行如下命令:cd到SDK源码目录cd/Users/xxxx/Downloads/GMSSLV2-master查看SDK适用环境./config上图中错误解决方法使用文本编辑器打开SDK目录下Configure、test/build.info、test/run_tests.pl。......
  • RocketMQ生产者启动源码
    核心代码初始化Default生产者DefaultMQProducerproducer=newDefaultMQProducer(PRODUCER_GROUP);设置NameAddr地址producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();分析newDefaultMQProducer(PRODUCER_GROUP)publicDefaultMQProducer(finalStringp......
  • iOS 隐私清单和SDK签名
    隐私清单:<?xmlversion="1.0"encoding="UTF-8"?><!DOCTYPEplistPUBLIC"-//Apple//DTDPLIST1.0//EN""http://www.apple.com/DTDs/PropertyList-1.0.dtd"><plistversion="1.0"><dict><!--......
  • Docker安装RocketMQ
    https://blog.csdn.net/qq_43600166/article/details/136187969 前提条件需要安装dockerhttps://yeasy.gitbook.io/docker_practice/install/centos NameServer1.拉取容器dockerpullrocketmqinc/rocketmq2.创建NameServer容器创建一个新的容器并指定RocketMQ的镜像......
  • Apache RocketMQ ACL 2.0 全新升级
    作者:徒钟引言RocketMQ作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ现有的AC......
  • RocketMQLog:WARN No appenders could be found for logger (io.netty.channel.nio.Ni
    springBoot集成rocketMq启动的时候报RocketMQLog:WARNNoappenderscouldbefoundforlogger(io.netty.channel.nio.NioEventLoop). RocketMQLog:WARNPleaseinitializetheloggersystemproperly. 原因是pom中的rocket的依赖版本太高了。<dependency><groupI......
  • .net开源智能家居之小米米家的c#原生sdk【MiHome.Net】1.0.0发布
    背景介绍hi大家好,我是三合,作为一个非著名懒人,智能家居简直刚需,在上一篇文章他来了他来了,.net开源智能家居之苹果HomeKit的c#原生sdk【Homekit.Net】1.0.0发布,快来打造你的私人智能家居吧中有靓仔提到,没有苹果设备,有一说一,苹果手机很贵,并且原生支持苹果HomeKit的智能家居设备......