首页 > 其他分享 >五、普通消息

五、普通消息

时间:2023-07-09 10:11:05浏览次数:29  
标签:RocketMQ topic 普通 消息 Apache rocketmq proxy

生产者能发送的消息类型有:

  • Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。

  • FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。

  • Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。

  • Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。

先来看下最简单的普通消息,什么是普通消息呢?普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。

应用场景

普通消息一般应用于微服务解耦事件驱动数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求

典型场景一:微服务异步解耦

如上图所示,以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。

典型场景二:数据集成传输

如上图所示,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 Apache RocketMQ 。每条消息都是一段日志数据,Apache RocketMQ 不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成。

功能原理

普通消息是Apache RocketMQ基本消息功能,支持生产者和消费者的异步解耦通信。

普通消息生命周期
  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

发送普通消息

新建名为Mq-Producer的SpringBoot项目,添加依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency> 
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

配置application.properties:

server.port=8001

rocketmq.proxy = 127.0.0.1:8081

rocketmq.proxy是RocketMQ5.0新增的mqproxy地址。RocketMQ还要启动mqproxy。

增加MyController:

@RestController
public class MyController implements InitializingBean, DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(MyController.class);

    @Value("${rocketmq.proxy}")
    private String proxy;


    private Producer producer;

    private ClientServiceProvider provider;

    // 消息发送的目标Topic名称,需要提前创建。
    String topic = "MyTopic";

    private DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;

    @RequestMapping("/send")
    public  List<SendReceipt> send() throws ClientException {


        List<SendReceipt> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            // 普通消息发送。
            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    .setKeys("messageKey")
                    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                    .setTag("messageTag")
                    // 消息体。
                    .setBody(("messageBody" + LocalDate.now().format(formatter)).getBytes())
                    .build();
            try {
                // 发送消息,需要关注发送结果,并捕获失败等异常。
                SendReceipt sendReceipt = producer.send(message);
                list.add(sendReceipt);
                logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
            } catch (ClientException e) {
                logger.error("Failed to send message", e);
            }
        }

        return list;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

         this.provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(proxy);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        this.producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
    }

    @Override
    public void destroy() throws Exception {
        if (producer != null) {
            producer.close();
        }
    }
}

InitializingBean,DisposableBean是Spring的生命周期接口。在RocketMQ-console里面新增名为MyTopic的Topic。启动程序。调用http://localhost:8001/send发送消息到RocketMQ中:
 

 

在RocketMQ-console中查看MyTopic的消息。随便点击右边的一条消息的详情:
 

 

消费消息

新建名为Mq-Consumer的SpringBoot项目,添加依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
     <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.0.4</version>
    </dependency>

配置application.properties:

server.port=8002

rocketmq.proxy = 127.0.0.1:8081

新建消费者:

@Slf4j
@Component
public class RocketMq5Consumer {

    @Value("${rocketmq.proxy}")
    private String proxy;

    String topic = "MyTopic";

    @Bean(name = "mqConsumer")
    public void mqConsumer() {
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(proxy);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        try {
            // 订阅消息的过滤规则,表示订阅所有Tag的消息。
            String tag = "*";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            provider.newPushConsumerBuilder()
                    .setClientConfiguration(configuration)
                    // 设置消费者分组。
                    .setConsumerGroup("my-consumer")
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap("MyTopic", filterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        log.info("消费消息:{}", messageView);
                        log.info("消息内容为:{}",  StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
                        return ConsumeResult.SUCCESS;
                    }).build();
            log.info("构建mq5.0消费者成功:proxy:{}, topic:{}", proxy, topic);
        } catch (ClientException e) {
            log.error("构建mq5.0消费者异常:proxy:{}, topic:{}", proxy, topic, e);
        }
    }
}

@Slf4j是lombok注解。

 

调用http://localhost:8001/send往MyTopic中发送消息,查看Mq-Consumer的控制台的消费记录:
 

标签:RocketMQ,topic,普通,消息,Apache,rocketmq,proxy
From: https://www.cnblogs.com/shigongp/p/17537734.html

相关文章

  • springcloud - 通过消息总线bus进行刷新
    修改3344服务pom文件 <!--添加消息总线RabbitMQ支持--> <dependency>   <groupId>org.springframework.cloud</groupId>   <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>yaml文件 #rabbitmq相关配置 spring: ......
  • 2023-07-08:RabbitMQ如何做到消息不丢失?
    2023-07-08:RabbitMQ如何做到消息不丢失?答案2023-07-08:1.持久化发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。2.确认机制消费者通过basic.ack命令向......
  • windows配置RocketMQ并测试发送消息
    https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ下载rocketmq-all-4.9.5-bin-release 必须配置一个RocketMQ路径的环境变量(参考博客的第二个) 配置内容如下,目录在bin的上层   解压,进入解压目录conf,修改broker.conf在该文件中加入两行(建议直接复......
  • css 文字横向滚动通知消息
    要在CSS中实现横向滚动的通知消息,你可以使用CSS的动画属性、溢出属性和滚动属性相结合。首先,创建一个包含通知消息的容器,并设置其溢出属性为隐藏。这将确保溢出容器的内容不可见。然后,在容器内部创建一个用于显示通知消息的子元素,并设置其显示为内联块元素。然后,为这个子元素设......
  • 消息队列-八股文
    消息队列选型-√kafka:优点:吞吐量高,性能高缺点:功能单一,有丢失消息的风险rocketMQ:优点:功能完善,性能好缺点:客户端仅支持JavaRocketMQ事务消息实现-※RocketMQ底层实现原理-※消息队列如何保证可靠传输可靠传输:不能多不能少1.消费者实现幂等性,哪怕多收消息,......
  • 普通人,如何把握AI时代
    最近AI的热点似乎逐渐消退,很多人会觉得,好像一切都是概念,大家只是跟个风,一切都恢复了原样。 经历了媒体的爆炒,很多人在尝试使用AI工具,但大多数人只是浅尝辄止,当作谈资,或者炫耀自己如何发现了AI不够聪明的问题。 真正用于实战,并且产生实际价值的,依然是少数人,而这恰恰是少数人......
  • 连锁门店监控与普通监控有哪些不同?
    视频监控如今已经成为连锁门店的标配,但是随着门店数量的增多,普通监控已经远远无法满足连锁门店的管理需求。那么,连锁门店在选择监控系统时该重点关注哪些方面?它与普通监控有什么不同呢?安装运维规模更大需要全国同步实施与交付能力连锁门店数量多且分散,因此摄像头的采购数量往往较大......
  • RunnerGo集成三方消息通知,可配置到性能测试定时任务
    测试人员在配置自动化测试、性能测试脚本时需要配置测试报告通知人,近期更新中RunnerGo消息通知已接入第三方,在配置测试报告通知人时可以选择飞书、企业微信、钉钉、邮箱。本篇文章会介绍如何配置RunnerGo的三方消息通知。RunnerGo目前支持创建飞书群机器人、飞书企业应用,企业微......
  • 微信模板消息推送封装方法
    /***@explain*发送消息通知*@returnarray|mixed*@remark*获取到用户的openid之后可以判断用户是否有数据,可以直接跳过获取access_token,也可以继续获取access_token*access_token每日获取次数是有限制的,access_token有时间限制,可以存储到数据库7200s.7200s后access......
  • 如何实现Java mqtt 客户端,收到自己发出消息的具体操作步骤
    实现JavaMQTT客户端收到自己发出的消息介绍在本文中,我将教会你如何使用Java实现一个MQTT(MessageQueuingTelemetryTransport)客户端,以接收自己发出的消息。MQTT是一个轻量级的消息协议,广泛应用于物联网和实时通信领域。我们将使用EclipsePaho库来实现这个功能。整体流程下面......