首页 > 其他分享 >cloud alibaba 学习 之 rocketmq

cloud alibaba 学习 之 rocketmq

时间:2023-05-23 17:25:14浏览次数:60  
标签:发送 void rocketMQTemplate alibaba topic rocketmq 消息 public cloud

rocketmq有三种消息发送模式:

1.同步发送 需要等待broker回应

    /**
     * 同步消息发送
     */
    @Test
    public void testSyncSend() {
        // param1: topic; 若添加tag: topic:tag
        // param2: 消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic", "这是一条同步消息2");
        System.out.println(sendResult);
    }

 

2.异步发送 添加成功和失败的回调,不需要等待broker

/**
     * 异步消息发送
     *
     * @throws InterruptedException
     */
    @Test
    public void testAsyncSend() throws InterruptedException {
        //param1: topic;  若添加tag: topic:tag
        //param2: 消息内容
        //param3: 回调函数, 处理返回结果
        rocketMQTemplate.asyncSend("test-topic", "这是一条异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });

        Thread.sleep(100000);
    }

3.单向发送 不在乎消息的发送结果

/**
     * 单向发送
     */
    @Test
    public void testOneWay() {
        rocketMQTemplate.sendOneWay("test-topic", "这是一条单向消息");
    }

rocketmq可以严格保证消息的顺序,分为分区有序和全局有序

顺序消息:默认的消息会通过Round Robin轮询方式把消息发送到不同的queue,消费的时候通过多个queue下拉,无法保证消息的顺序。所以控制顺序消息只发送到一个queue,消费只从这个队列拉取,就可以保证消息的顺序。发送和消费只有一个queue就是全局有序;若是多个queue就是分区有序,对于每个queue消息是有序的。

/**
     * 顺序消息
     */
    @Test
    public void testSyncSendOrder() throws InterruptedException {
        // 同步顺序消息
        // param1: topic; 若添加tag: topic:tag
        // param2: 消息内容
        // param3: 用于队列的选择
        SendResult sendResult = rocketMQTemplate.syncSendOrderly("test-topic", "这是一条同步顺序消息", "order");
        System.out.println(sendResult);

        // 异步顺序消息
        // 比同步顺序多个回调
/*         rocketMQTemplate.asyncSendOrderly("test-topic", "这是一条异步顺序消息", "order", new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 System.out.println(sendResult);
             }

             @Override
             public void onException(Throwable throwable) {
                 System.out.println(throwable);
             }
         });
        Thread.sleep(10000);*/

        // 单向顺序消息
        //rocketMQTemplate.sendOneWayOrderly("test-topic","这是一条单向顺序消息","order");
    }

延时消息:rocketmq的延时时间并不能进行自定义而是官方设置好的18个等级,如果有其它需求可以自己组合或者进行修改

这是官方的延时等级图

/**
     * 延时消息
     * param1 主题
     * param2 消息
     * param3 超时时间
     * param4 延时等级
     */
    @Test
    public  void testDelay(){
        // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
        SendResult result=rocketMQTemplate.syncSend("myTopic", MessageBuilder.withPayload("hello delay").build(),1000,3);
        System.out.println(result);
    }

批量发送消息:

/**
     * 批量发送消息
     */
    @Test
    public  void testBatch(){
        List<Message> messages = new ArrayList<>();
        messages.add(MessageBuilder.withPayload("Hello 1").build());
        messages.add(MessageBuilder.withPayload("Hello 2").build());
        messages.add(MessageBuilder.withPayload("Hello 6").build());

        rocketMQTemplate.syncSend("myTopic",messages);
    }

发送过滤消息:主题后添加标签

    /**
     * 发送过滤消息
     */
    @Test
    public  void testTag(){
        rocketMQTemplate.syncSend("myTopic:tag1",MessageBuilder.withPayload("hello tag1").build());
        rocketMQTemplate.syncSend("myTopic:tag2",MessageBuilder.withPayload("hello tag2").build());
        rocketMQTemplate.syncSend("myTopic:tag3",MessageBuilder.withPayload("hello tag3").build());
    }

标签消费者示例: 接收包含tag1或tag2的消息,但是限制是一个消息只能有一个标签,对于复杂的场景可能不起作用

@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "myTopic",selectorExpression = "tag1 || tag2",selectorType = SelectorType.TAG)
public class MyRocketMQListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println(msg);
    }
}

 

标签:发送,void,rocketMQTemplate,alibaba,topic,rocketmq,消息,public,cloud
From: https://www.cnblogs.com/xgphpstudy/p/17412399.html

相关文章

  • DTCloud中台共用模块开发规范
    目前中台仓库代码的运行依赖dtcloud,在贡献中台模块的时候,可能会存在大量重复且能共用的模块。这时候就希望能将这部分代码封装起来,能供其他模块方便使用。直接的方法是提供一个commons包来存放共用代码,包括以后代码的新增、修改都在这里进行。在中台仓库中添加直接from...import会......
  • RocketMQ 在小米的多场景灾备实践案例
    作者:邓志文、王帆01为什么要容灾?在小米内部,我们使用RocketMQ来为各种在线业务提供消息队列服务,比如商城订单、短信通知甚至用来收集IoT设备的上报数据,可以说RocketMQ的可用性就是这些在线服务的生命线。作为软件开发者,我们通常希望服务可以按照理想状态去运行:在没有Bug的......
  • 科技云报道:OneCloud A轮5000万融资,反映出怎样的投资逻辑?
    科技云报道原创。2020年,突如其来的疫情让各行业都面临着空前的经营压力,国内融资环境也步入寒冬。大部分创投机构投资更为谨慎,投资节奏明显放缓。然而,近日融合云服务商OneCloud(北京云联万维技术有限公司)宣布完成5000万元A轮融资,距上一轮Pre-A轮融资时隔仅8个月,成为疫情高压下首个成......
  • 从PaaS平台到技术中台,BoCloud博云直指“数字中国的架构师”
    在IaaS和SaaS如火如荼之后,PaaS也迎来了属于自己的春天。随着头部云服务商和技术创业公司纷纷入场,在业内正在打造“PaaS技术中台”的博云,能否为PaaS企业服务市场带来新的格局?BoCloud博云是一个难以下单一标签的公司,说它是单纯的容器,云管理,PaaS技术企业似乎都不够准确,但又都是博云的......
  • xxxxhttps://blog.csdn.net/holecloud/article/details/80139297
    #include"stdafx.h"#include<opencv2\imgproc\imgproc.hpp>#include<windows.h>#include<opencv2/opencv.hpp>#include<cmath>#include<iostream>//#include"DetectPackage.h"#include<cmath>usin......
  • 微服务与springcloud的介绍
    1.什么是微服务?随着互联网行业的发展,对服务的要求也越来越高,服务架构也从单体架构逐渐演变为现在流行的微服务架构。微服务是一种经过良好架构设计的分布式架构方案。微服务的上述特性其实是给分布式架构制定一个标准,进一步降低服务之间的耦合,提供服务的独立性和灵活性。做到高......
  • RocketMQ事务消息原理
    一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事务消息解决的是生产端的消息发送与本地事务执行......
  • 网站监测软件配置使用 - WGCLOUD
    WGCLOUD是一款优秀的开源运维监控工具,它可以监控网站、网页、服务接口的健康状态,并支持GET和POST接口,以及对返回的内容进行校验,支持告警通知,比如邮件、钉钉、微信等,这样可以有效防止我们的网站被篡改WGCLOUD很轻量,部署很方便,本文中我们不讲述如何部署安装了,可以去网站(www.wgstart.c......
  • 网关服务——Spring Cloud Gateway
    为什么要用网关?1.请求路由和负载均衡:一切请求都必须先经过gateway,但网关不处理业务,而是根据某种规则,把请求转发到某个微服务,这个过程叫做路由。当路由的目标服务有多个时,还需要做负载均衡。2.权限控制:网关作为微服务的入口,需要校验用户是否具有请求资格,如果没有资格就要进行拦截......
  • RocketMQ
    RocketMQ背景是阿里巴巴,经历双11考验,Java语言编写,非常好完整体系1、支持事务消息(实现解决分页式事务的问题)2、支持高并发顺序消息处理(采用内存队列+多线程处理)3、消费者支持tag过滤,减少我们带宽传输RocketMQ关键核心名称:NameServer:存放生产者、消费者、topic信息。去中心......