首页 > 其他分享 >RabbitMQ详解

RabbitMQ详解

时间:2023-10-13 16:25:17浏览次数:30  
标签:String 队列 路由 RabbitMQ 交换机 消息 详解

RabbitMQ是一个开源的消息中间件,采用AMQP协议来实现消息的生产、消费和路由。它主要由以下几个组件构成:

  1. Producer:消息生产者,即发送消息的应用程序。

  2. Exchange:消息交换机,用于接收生产者发送的消息,并将其路由到对应的队列中。

  3. Queue:消息队列,用于存储消息,等待消费者来消费。

  4. Binding:将交换机和队列进行绑定,以便将消息路由到正确的队列中。

  5. Consumer:消息消费者,即接收并处理消息的应用程序。

RabbitMQ的消息流程如下:

  1. 生产者将消息发送到交换机。

  2. 交换机接收到消息后,根据绑定规则将消息路由到对应的队列中。

  3. 消费者从队列中接收消息,并进行处理。

RabbitMQ支持多种不同的交换机类型,包括:

  1. Direct Exchange:直接交换机,将消息路由到指定的队列中。

  2. Fanout Exchange:广播交换机,将消息路由到所有绑定到该交换机的队列中。

  3. Topic Exchange:主题交换机,根据通配符匹配规则将消息路由到对应的队列中。

  4. Headers Exchange:头交换机,根据消息头中的键值对进行匹配,并将消息路由到对应的队列中。

在RabbitMQ中,队列和交换机都有持久化和非持久化之分。持久化的队列和交换机可以在RabbitMQ服务器重启后仍然存在,而非持久化的队列和交换机则会在服务器重启后被删除。

总之,RabbitMQ是一个功能强大的消息中间件,可以为分布式系统提供高效、可靠的消息传递服务。

 

AMQP(Advanced Message Queuing Protocol):

AMQP(Advanced Message Queuing Protocol)是一种网络协议,用于在应用程序之间传递消息。它是一个开放标准,由AMQP工作组开发和维护。AMQP协议支持跨平台、跨语言的消息传递,它可以在不同的操作系统、编程语言和网络环境中使用。

AMQP协议的主要特点包括:

  1. 可靠性:AMQP协议提供了可靠的消息传递机制,确保消息不会丢失或重复传递。

  2. 灵活性:AMQP协议支持多种消息传递模式,包括点对点、发布/订阅和路由模式。

  3. 互操作性:AMQP协议是一个开放标准,支持多种编程语言和操作系统,因此可以在不同的平台之间进行消息传递。

  4. 安全性:AMQP协议支持多种安全机制,包括身份验证、加密和访问控制。

总之,AMQP协议是一种可靠、灵活、互操作和安全的消息传递协议,被广泛应用于分布式系统、云计算和物联网等领域。

 

在 RabbitMQ 中,Exchange、Queue 和 routingKey 是消息传递的三个关键概念,它们之间的关系如下:

  1. Exchange:Exchange 是 RabbitMQ 中的消息交换机,它负责接收生产者发送的消息,并根据消息的 routingKey 进行路由分发。Exchange 有四种类型:direct、topic、headers 和 fanout。

  2. Queue:Queue 是消息队列,它负责存储 Exchange 路由过来的消息。消费者从 Queue 中获取消息进行消费。

  3. routingKey:routingKey 是生产者发送消息时指定的关键字,用于指定消息的路由规则。Exchange 根据 routingKey 进行路由分发,将消息发送到指定的 Queue 中。

Exchange、Queue 和 routingKey 的用法如下:

  1. 生产者发送消息时,需要指定 Exchange 和 routingKey。Exchange 根据 routingKey 进行路由分发,将消息发送到指定的 Queue 中。

  2. 消费者从 Queue 中获取消息进行消费。消费者可以订阅多个 Queue,从而消费多个 Queue 中的消息。

  3. Exchange 和 Queue 可以绑定多个关系。一个 Exchange 可以绑定多个 Queue,一个 Queue 可以绑定多个 Exchange,从而实现不同的消息路由规则。

总之,Exchange、Queue 和 routingKey 是 RabbitMQ 中消息传递的三个关键概念,它们之间的关系和用法需要根据具体的业务场景来设计和实现。

 

在 RabbitMQ 中,有四种类型的交换机:direct、topic、headers 和 fanout。其中,direct 和 topic 类型的交换机都会使用 routing key 进行路由。在 direct 类型的交换机中,routing key 必须与队列的绑定键完全匹配,才能将消息路由到该队列。而在 topic 类型的交换机中,routing key 可以使用通配符进行匹配,从而将消息路由到多个队列。

举个例子,假设有一个名为 "logs" 的交换机,其中有两个队列 "error_logs" 和 "info_logs"。当生产者发送一条日志消息时,可以将消息的 routing key 设置为 "error" 或 "info",交换机会将该消息路由到相应的队列中,从而实现日志的分类和处理。

 

RabbitMQ dmeo示例:

  1. 引入依赖

首先需要在项目中引入RabbitMQ的依赖,可以在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.11.0</version>
</dependency>
  1. 创建连接和通道

在使用RabbitMQ之前,需要先创建一个连接和一个通道。连接是一个TCP连接,通道是在连接内部的一个虚拟连接,可以使用通道进行消息的发送和接收。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
  1. 创建交换机

交换机是消息的路由中心,它接收从生产者发送过来的消息,并根据路由键将消息路由到一个或多个队列中。在RabbitMQ中,有四种类型的交换机:直连交换机(direct)、主题交换机(topic)、扇形交换机(fanout)和头交换机(headers)。

String exchangeName = "myExchange";
String exchangeType = "direct";
boolean durable = true;
boolean autoDelete = false;
boolean internal = false;
Map<String, Object> arguments = null;
channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, internal, arguments);

参数说明:

  • exchangeName:交换机名称

  • exchangeType:交换机类型,可选值有direct、fanout、topic、headers

  • durable:是否持久化

  • autoDelete:是否自动删除

  • arguments:额外的参数,可为null

  1. 创建队列

队列是消息的存储中心,它接收交换机路由过来的消息,并将消息存储在队列中,等待消费者进行消费。在创建队列时,需要指定队列的名称、是否持久化、是否自动删除等参数。

String queueName = "myQueue";
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
Map<String, Object> arguments = null;
channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);

参数说明:

  • queueName:队列名称

  • durable:是否持久化

  • exclusive:是否排他性队列

  • autoDelete:是否自动删除

  • arguments:额外的参数,可为null

  1. 绑定队列和交换机

在将队列绑定到交换机时,需要指定交换机的名称、路由键和队列的名称。路由键是用来指定消息路由到哪个队列的,它可以是一个字符串,也可以是一个正则表达式。

String routingKey = "myRoutingKey";
channel.queueBind(queueName, exchangeName, routingKey);

参数说明:

  • queueName:队列名称

  • exchangeName:交换机名称

  • routingKey:路由键,用于将消息路由到指定的队列

  1. 发送消息

在发送消息时,需要指定交换机的名称、路由键和消息的内容。消息可以是一个字符串,也可以是一个字节数组。

String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  1. 接收消息

在接收消息时,需要先创建一个消费者对象,并将其注册到队列中。然后在回调函数中处理接收到的消息。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);
    }
};
channel.basicConsume(queueName, true, consumer);

以上是RabbitMQ交换机创建、队列创建、队列绑定的详细步骤以及每个参数的用法。下面是Java代码的示例:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class RabbitMQDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建交换机
        String exchangeName = "myExchange";
        String exchangeType = "direct";
        boolean durable = true;
        boolean autoDelete = false;
        boolean internal = false;
        Map<String, Object> arguments = null;
        channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, internal, arguments);

        // 创建队列
        String queueName = "myQueue";
        boolean exclusive = false;
        boolean autoDelete = false;
        channel.queueDeclare(queueName, true, exclusive, autoDelete, null);

        // 绑定队列和交换机
        String routingKey = "myRoutingKey";
        channel.queueBind(queueName, exchangeName, routingKey);

        // 发送消息
        String message = "Hello, RabbitMQ!";
        channel.basicPublish(exchangeName, routingKey, null, message.getBytes());

        // 接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);

        // 关闭连接和通道
        channel.close();
        connection.close();
    }
}

 

RabbitMQ生产者:

在RabbitMQ中,生产者是向消息队列发送消息的应用程序。下面是RabbitMQ中生产者的各个参数的详解以及相应的Java代码示例:

  1. Hostname:RabbitMQ服务器的主机名或IP地址。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
  1. Port:RabbitMQ服务器的端口号。
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
  1. Username和Password:连接RabbitMQ服务器的用户名和密码。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
  1. Virtual Host:RabbitMQ服务器中的虚拟主机。
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
  1. Exchange Name:消息发送到的交换机的名称。
String exchangeName = "myExchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
  1. Routing Key:消息发送到的队列的路由键。
String routingKey = "myRoutingKey";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  1. Message Properties:消息的属性,如消息ID、消息类型、过期时间等。
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .contentType("text/plain")
    .deliveryMode(2)
    .priority(1)
    .build();
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());

以上是RabbitMQ中生产者的各个参数的详解以及相应的Java代码示例。

 

RabbitMQ消费者:

在RabbitMQ中,消费者可以通过设置不同的参数来控制其行为。以下是RabbitMQ中消费者的各个参数的详细解释:

  1. queue:指定要消费的队列名称。

  2. autoAck:是否自动确认消息,默认为true,即自动确认消息。

  3. exclusive:是否为独占模式,默认为false,即非独占模式。

  4. consumerTag:消费者标签,用于在取消消费者时指定消费者。

  5. noLocal:是否禁止消费者使用与生产者相同的连接来接收消息,默认为false,即不禁止。

  6. noAck:是否需要手动确认消息,默认为false,即需要手动确认。

  7. arguments:消费者的其他参数,如优先级等。

以下是一个使用Java编写的消费者示例,其中设置了queue、autoAck、consumerTag和noAck参数:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 创建连接
        Connection connection = factory.newConnection();

        // 创建通道
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
            }
        };

        // 开始消费
        channel.basicConsume(QUEUE_NAME, true, "myConsumerTag", false, false, null, consumer);
    }
}

在上面的示例中,我们创建了一个名为hello的队列,并创建了一个消费者。消费者使用DefaultConsumer类实现,重写了handleDelivery方法来处理接收到的消息。在最后,我们调用了basicConsume方法来开始消费消息。其中,第二个参数autoAck设置为true,表示自动确认消息;第三个参数consumerTag设置为myConsumerTag,用于在取消消费者时指定消费者;第四个参数noAck设置为false,表示需要手动确认消息。

 

标签:String,队列,路由,RabbitMQ,交换机,消息,详解
From: https://www.cnblogs.com/huangdh/p/17762404.html

相关文章

  • IIS应用程序池配置详解及优化
    参数说明1.常规属性名称属性详解NETCLR版本配置应用程序池,以加载特定版本的.NETCLR。选定的CLR版本应与应用程序所使用的相应版本的.NETFramework对应。选择“无托管代码”将导致所有的ASP.NET请求失败。队列长度HTTP.sys将针对应用程序池排队的最大......
  • 为何50Ω占多数?同轴特性阻抗详解
    在选择同轴线缆或组件时,一般我们都会看到或者要求传输线阻抗需要保持在50欧姆,有时这也会成为传输线的特性阻抗,当然75欧姆偶尔也会被用到,为何这两者会如此固定?本期我们将从这个角度出发,探讨同轴中的另一个重要参数,特性阻抗。其实这个问题也很简单,我们快速做一个总结,可以直接看下......
  • 重载运算符详解
    重载运算符详解1.概念  运算符的重载,实际是一种特殊的函数重载,必须定义一个函数,并告诉C++编译器,当遇到该运算符时就调用此函数来行使运算符功能。这个函数叫做运算符重载函数(常为类的成员函数)。  用函数的方式实现了(+-*/[]数组&&||逻辑等)运算符的重载。根据需求决......
  • Node系列 — v8引擎堆内存详解
    参考:https://juejin.cn/post/6963170647207837710v8的堆内存限制Node程序中javascript的使用内存是有限制的,注意这个内存是指堆内存,在v8中,所有的js对象都存在堆中。在实际应用中不小心触碰到这个边界,进程就会退出。64位系统下为1.4GB,32位系统下为0.7GB。Node是基于v8......
  • Oracle数据库导入、导出详解
    Oracle11g数据库导入导出方式传统方式【exp(导出)和(imp)导入】数据泵方式【expdp导出和(impdp)导入】第三方工具【PL/sqlDevelpoer】一、什么是数据库导入导出?Oracle11g数据库的导入/导出,就是我们通常所说的Oracle数据的还原/备份。 数据库导入:把.dmp格式文件从本地导入到......
  • Python 列表详解:从基础到进阶
    Python是一种广泛使用的高级编程语言,它的设计强调代码的可读性和简洁的语法。Python支持多种编程范式,包括过程、面向对象和函数式编程。在Python中,列表是一种非常重要的数据类型,它可以包含各种类型的元素,如数字、字符串和其他列表。本文将详细介绍Python列表的基础和进阶用法。【基......
  • 第四节:Redis数据持久化机制(备份恢复)、缓存淘汰策略、主从同步原理、常见规范与优化
    一.数据持久化 1. 含义Redis提供了RDB和AOF两种持久化方式,默认开启的是RDB,如果需要AOF,需要手动修改配置文件进行开启。RDB:是一种对Redis存在内存中的数据周期性的持久化机制,将内存中的数据以快照的形式硬盘,实质上是fork了一个子进程在执行数据存储,采用的是二进制压......
  • RabbitMQ的名词
         Broker:接收和分发消息的应用,RabbitMQServer就是MessageBrokerVirtualhost:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQserver提供的服务时,可以划分出多个......
  • 软件测试|Linux三剑客之sed命令详解
    简介sed(StreamEditor)是一款流式文本编辑器,在Linux和类Unix系统中广泛使用。它的设计目的是用于对文本进行处理和转换,可以用于替换、删除、插入、打印等操作。sed命令通过逐行处理文本,允许您使用简单的命令来编辑大量文本数据。本文将详细介绍sed命令的基本用法和一些常......
  • 软件测试|Linux三剑客之grep命令详解
    简介grep是一款在Linux和类Unix系统中广泛使用的文本搜索工具。它的名字来源于GlobalRegularExpressionPrint(全局正则表达式打印),它的主要功能是根据指定的模式(正则表达式)在文本文件中搜索并打印匹配的行。grep非常强大且灵活,可以用于日志分析、文件过滤、代码搜索等多......