首页 > 编程语言 >Java消息队列详解

Java消息队列详解

时间:2024-10-21 16:19:45浏览次数:3  
标签:Java 队列 详解 RocketMQ apache org consumer rocketmq 消息

消息队列的作用及原理

消息队列产生主要是为了解决系统间的异步解耦与确保数据最终一致性问题。通过将主流程与辅助流程分离,使得辅助任务可以并行处理,不仅提高了系统的响应速度,还增强了其可扩展性和稳定性。此外,消息队列机制保证了每条消息至少被消费一次,从而确保了业务逻辑的可靠执行。

何时应用消息队列提升系统性能

在电商网站中,当用户下单后,系统需要完成一系列操作,如库存扣减、订单创建、支付处理等。如果这些操作都采用同步调用的方式进行,那么整个流程的响应时间会变得很长,并且一旦某个环节出现问题,会导致整个流程失败。通过引入RocketMQ消息队列,可以将这些操作解耦为独立的服务,每个服务负责处理特定的任务。例如,订单服务负责生成订单并发送订单创建消息,库存服务订阅该消息并处理库存扣减,支付服务则处理支付请求。这样不仅提高了系统的响应速度(异步处理),还增强了各部分的灵活性和扩展性。

在面对促销活动或秒杀场景时,短时间内会有大量请求涌入系统,这很容易导致服务器过载甚至崩溃。利用RocketMQ的消息队列机制,可以先将这些请求暂存起来,然后以一个稳定的速率逐步处理,从而实现削峰填谷的效果。同时,由于RocketMQ提供了高可靠性的消息传递保证,即使在极端情况下出现网络中断或服务宕机,也能确保没有数据丢失,保证了业务连续性和用户体验的一致性。此外,随着业务的发展,可以通过增加更多的消费者实例来提高系统的吞吐量,进一步体现了其良好的可扩展性。

在线业务场景推荐:RocketMQ,确保数据一致性与高可用性,适用于电商、支付等实时需求高的场景。

在线业务场景适合:
RocketMQ 提供了事务消息功能,确保分布式环境下的数据一致性。它具有低延迟和高可用性,适用于对实时性和可靠性要求较高的在线业务场景,如电商交易、支付处理等。详情可以从 Apache RocketMQ 官方中文社区|快速使用|架构原理|官方答疑 了解。

大数据传输适合:
Kafka 通过单文件队列的方式存储和读取数据,顺序写入和读取使得其在处理大规模数据时效率极高。这使其非常适合用于日志收集、事件流处理等需要高效处理大量数据的场景。详情可以从 Apache Kafka 了解。

需要JMS标准实现适合:
ActiveMQ 完全支持JMS(Java Message Service)规范,提供了丰富的消息传递功能,包括持久化、多种传输协议的支持以及分布式的部署形式。对于依赖于JMS标准的企业级应用来说,ActiveMQ是一个很好的选择。详情可以从 ActiveMQ 了解。

AMQP等多协议小场景适合:
RabbitMQ 支持多种消息协议,如AMQP, STOMP, MQTT等,特别适合需要灵活配置消息路由规则的小规模应用场景或微服务架构中。虽然RocketMQ也在增强这些协议的支持,但在当前阶段RabbitMQ在这方面表现更为成熟。详情可以从 https://www.rabbitmq.com/ 了解。

使用RocketMQ进行消息收发的详细示例

详细的使用MQ收发消息的例子(以RocketMQ为例)

为了提供一个完整的示例,包括配置、依赖引入以及Java代码实现,我们将通过构建一个简单的生产者-消费者模型来演示如何使用RocketMQ进行消息的发送和接收。本示例将涵盖同步消息发送、异步消息接收等基本功能,并解释为何这种模式能够促进系统的异步解耦。

1. 加入依赖

首先,在您的pom.xml中添加对RocketMQ客户端库的依赖。这里假设您正在使用Maven作为项目构建工具。

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>

        <artifactId>rocketmq-client</artifactId>

        <version>4.9.1</version>

    </dependency>

</dependencies>

如果您更偏好使用Gradle,则相应的配置如下:

dependencies {
    compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
}

确保使用的版本与您的RocketMQ服务器兼容。

2. 生产者代码

创建一个Java类用于发送消息到指定的主题。这里展示的是同步消息发送方式。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例并设置组名
        DefaultMQProducer producer = new DefaultMQProducer("my-group");
        
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");

        // 启动生产者
        producer.start();

        for (int i = 0; i < 100; i++) {
            // 构建消息体
            Message msg = new Message("TestTopic", "TagA",
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            
            // 发送消息
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}
3. 消费者代码

接下来是消费者的实现,它会订阅特定主题的消息并在收到时处理它们。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class SimpleConsumer {

    public static void main(String[] args) throws Exception {
        // 创建消费者实例并设置组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");

        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个或多个主题
        consumer.subscribe("TestTopic", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

为什么这样做可以实现异步解耦?

上述例子中的生产者和消费者模式展示了如何利用RocketMQ来实现应用程序间的异步通信。当生产者生成数据时,它并不直接调用消费该数据的服务,而是将其发布到RocketMQ集群中的某个主题上。这使得服务之间不再需要知道彼此的存在,只需关注于自己负责的数据处理部分。与此同时,消费者可以根据自身情况灵活地决定何时从队列中拉取数据进行处理,从而达到了解耦的目的。此外,RocketMQ还提供了诸如消息重试机制等功能来保证消息传递的可靠性,进一步增强了系统的鲁棒性。

标签:Java,队列,详解,RocketMQ,apache,org,consumer,rocketmq,消息
From: https://blog.csdn.net/ApacheRocketMQ/article/details/143114789

相关文章

  • Java基础---异常
    1.概述Java异常处理是Java语言的一个重要特性,它可以帮助我们更好地管理程序中的错误和异常情况。本文档将详细介绍Java中的异常处理机制,包括异常的概念、分类、捕获和处理方法。2.异常概念异常(Exception)是在程序执行过程中发生的不正常情况,它会打断程序的正常流程。Java语......
  • Java中的基础知识点---Object
    Object类的常见方法有哪些?Object类是一个特殊的类,是所有类的父类,主要提供了以下11个方法:/** *native方法,用于返回当前运行时对象的Class对象,使用了final关键字修饰,故不允许子类重写。 */publicfinalnativeClass<?>getClass()/** *native方法,用于返回......
  • 网络安全学习路线图(2024版详解)
       近期,大家在网上对于网络安全讨论比较多,想要学习的人也不少,但是需要学习哪些内容,按照什么顺序去学习呢?其实我们已经出国多版本的网络安全学习路线图,一直以来效果也比较不错,本次我们针对市场需求,整理了一套系统的网络安全学习路线图,供大家学习参考。希望大家按照路线图进......
  • Java列表list
    List列表创建列表//List的ArrayList实现List<String>list1=newArrayList<>();//List的LinkedList实现List<String>list2=newLinkedList<>();常用方法importjava.util.List;importjava.util.LinkedList;classMain{publicstatic......
  • 【Javaee】网络编程-UDP基础
     前言UDP是一个高效、快速、简单的传输协议,适合于需要低延迟和实时性的应用本篇将介绍UDP相关的api,并使用UDP构建回显服务器程序。一.UDP与TCP特点UDP:无连接,不可靠,面向数据报,全双工。TCP:有连接,可靠,面向字节流,全双工。何为连接?此处所说的连接是抽象的连接,并不是实际......
  • 【java】实现字节数组转int(采用IEEE 754标准)
    /***字节数组转int*采用IEEE754标准**@parambytes*@returnfloat*/publicintbytesToInt(byte[]bytes){//获取字节数组转化成的2进制字符串StringbinaryStr=bytesToBinaryStr(bytes);//......
  • 【最新Java必过毕设选题】基于微信小程序自助购药小程序全套(程序+万字(源码+万字LW+答
    作品简介 Hi,各位同学好呀!今天向大家分享一个最新完成的高质量毕业设计项目作品基于ssm+uniapp的XXX微信小程序项目评分(最低0分,满分5分)难度系数:3分工作量:5分创新点:3分界面美化:5分使用技术小程序框架:uniapp小程序开发软件:HBuilderX小程序运行软件:微信开发者......
  • Java中集合知识上
    集合知识集合的体系结构单列集合(Collection)Collection的方法各方法细节:Collection系列集合的三种通用遍历1.迭代器遍历2.增强for遍历3.lambda表达式遍历(含lambda介绍)List系列集合:List集合的特有方法:List集合的特有方法细节:注意点(重构方法调用优先级):List系......
  • java+vue计算机毕设二手交易平台【开题+程序+论文+源码】
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着经济的快速发展和人们生活水平的提高,商品更新换代的速度日益加快,大量闲置物品应运而生。这些物品对于持有者而言可能已失去使用价值,但对于其他人......
  • 常用负载均衡详解
    1、介绍在互联网场景下,负载均衡(LoadBalance)是分布式系统架构设计中必须考虑的一个环节,它通常是指将负载流量(工作任务、访问请求)平衡、分摊到多个操作单元(服务器、组件)上去执行的过程。目的在于提供负载配比,解决性能、单点故障(高可用)和扩展性(水平伸缩)等问题。  以上图为......