首页 > 其他分享 >第七章 RabbitMQ之交换机

第七章 RabbitMQ之交换机

时间:2024-10-10 16:22:13浏览次数:8  
标签:队列 boot springframework 交换机 RabbitMQ 第七章 import org 2.1

目录

一、介绍

二、Fanout Exchange

2.1. 案例演示 

2.1.1. 创建SpringBoot工程

2.1.2. 父工程pom依赖

2.1.3. 生产者pom依赖 

 2.1.4. 生产者配置文件

2.1.5. 生产者核心代码

 2.1.6. 消费者RabbitMQConfig

 2.1.7. 消费者pom依赖

2.1.8. 消费者配置文件

2.1.9.  消费者核心代码

2.1.10. 运行效果

三、Direct Exchange

3.1. 案例演示

3.1.1. 生产者核心代码

3.1.2. 消费者RabbitMQConfig

 3.1.3. 消费者核心代码

3.1.4. 运行效果 

四、Topic Exchange

4.1. 案例演示

4.1.1. 生产者核心代码

 4.1.2. 消费者RabbitMQConfig

4.1.3. 消费者核心代码 

4.1.4. 运行结果


一、介绍

在RabbitMQ中,交换机(Exchange)是用来接收生产者发送的消息并将这些消息路由到一个或多个队列的组件。不同类型的交换机决定了它们如何路由这些消息。

以下是RabbitMQ中几种常用交换机类型:

直接交换机(direct):如果路由键完全匹配,消息就被投递到相应的队列。

广播交换机(fanout):消息被广播到所有绑定的队列。

主题交换机(topic):如果路由键匹配模式(通配符#代表匹配多个单词和*代表匹配一个单词),消息被投递到相应的队列。

头交换机(headers):通过匹配消息头进行路由。

二、Fanout Exchange

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式

2.1. 案例演示 

利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定

3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4. 在publisher中编写测试方法,向hmall.fanout发送消息

2.1.1. 创建SpringBoot工程

完整的工程目录结构及代码文件如下:

2.1.2. 父工程pom依赖

引入核心依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>mq-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq-demo</name>
    <description>mq-demo</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <!-- spring amqp依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.1.3. 生产者pom依赖 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>publisher</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>publisher</name>
    <description>publisher</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 2.1.4. 生产者配置文件

spring:
  rabbitmq:
    # 主机
    host: 127.0.0.1
    # 端口
    port: 5672
    # 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容
    username: Wangzhexiao
    password: Wangzhexiao
    # 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容
    virtual-host: /hangzhou



2.1.5. 生产者核心代码

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

package com.example.publisher;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void sendMessage() throws InterruptedException {
        // 交换机名称
        String exchangeName = "wzx.fanout";
        // 消息内容
        String message = "人生苦短,持续不断地努力拼搏,迎难而上!";
        rabbitTemplate.convertAndSend(exchangeName,null, message);
    }
}

 2.1.6. 消费者RabbitMQConfig

package com.example.consumer;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    Queue fanoutQueue1() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("fanout.queue1").build();
    }

    @Bean
    Queue fanoutQueue2() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("fanout.queue2").build();
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("wzx.fanout");
    }

    @Bean
    Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); // 绑定队列和交换机
    }

    @Bean
    Binding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); // 绑定队列和交换机
    }
}

 2.1.7. 消费者pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>consumer</name>
    <description>consumer</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.34</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.1.8. 消费者配置文件

spring:
  rabbitmq:
    # 主机
    host: 127.0.0.1
    # 端口
    port: 5672
    # 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容
    username: Wangzhexiao
    password: Wangzhexiao
    # 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容
    virtual-host: /hangzhou



2.1.9.  消费者核心代码

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listener1(String message) throws InterruptedException {
        System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listener2(String message) throws InterruptedException {
        System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");
    }
}

2.1.10. 运行效果

我们可以看到,生产者的消息,同时发给了绑定fanout类型交换机的两个队列,两个消费者均收到了同一条消息。

三、Direct Exchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

3.1. 案例演示

我们在2.1示例代码的基础上稍做调整:

3.1.1. 生产者核心代码

package com.example.publisher;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void sendMessage() throws InterruptedException {
        String exchangeName = "wzx.direct";
        String message = "人生苦短,持续不断地努力拼搏,迎难而上!";
        rabbitTemplate.convertAndSend(exchangeName,"jianchi", message);
    }
}

3.1.2. 消费者RabbitMQConfig

package com.example.consumer;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    Queue directQueue1() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("direct.queue1").build();
    }

    @Bean
    Queue directQueue2() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("direct.queue2").build();
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("wzx.direct");
    }

    @Bean
    Binding binding1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("nuli"); // 绑定队列和交换机
    }

    @Bean
    Binding binding2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("jianchi"); // 绑定队列和交换机
    }
}

 3.1.3. 消费者核心代码

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "direct.queue1")
    public void listener1(String message) throws InterruptedException {
        System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");
    }

    @RabbitListener(queues = "direct.queue2")
    public void listener2(String message) throws InterruptedException {
        System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");
    }
}

3.1.4. 运行效果 

我们看到消息最终通过路由key,被匹配的消费者2所接收。

四、Topic Exchange

TopicExchange与DirectExchange类似,后者能做的事情,前者也能做。区别在于Topic的routingKey可以是多个单词的列表,并且以 . 分割。 Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

china.news 代表有中国的新闻消息;

china.weather 代表中国的天气消息;

japan.news 则代表日本新闻

japan.weather 代表日本的天气消息

4.1. 案例演示

案例需求

1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

2. 在RabbitMQ控制台中,声明交换机wzx. topic ,将两个队列与其绑定

3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

4. 在publisher中编写测试方法,利用不同的RoutingKey向wzx. topic发送消息

案例代码

我们在上述代码的基础上做调整:

4.1.1. 生产者核心代码

package com.example.publisher;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void sendMessage() throws InterruptedException {
        String exchangeName = "wzx.topic";
        String message = "人生苦短,持续不断地努力拼搏,迎难而上!";
        rabbitTemplate.convertAndSend(exchangeName,"wzx.news", message);
    }
}

 4.1.2. 消费者RabbitMQConfig

package com.example.consumer;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    Queue topicQueue1() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("topic.queue1").build();
    }

    @Bean
    Queue topicQueue2() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("topic.queue2").build();
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("wzx.topic");
    }

    @Bean
    Binding binding1(Queue topicQueue1, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#"); // 绑定队列和交换机
    }

    @Bean
    Binding binding2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news"); // 绑定队列和交换机
    }
}

4.1.3. 消费者核心代码 

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "topic.queue1")
    public void listener1(String message) throws InterruptedException {
        System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");
    }

    @RabbitListener(queues = "topic.queue2")
    public void listener2(String message) throws InterruptedException {
        System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");
    }
}

4.1.4. 运行结果

我们可以看到,消费者2接收到了消息

当我们将发送消息的路由key改为china.news,我们会发现两个消费者都能收到消息,这就是topic的通配符效果:

标签:队列,boot,springframework,交换机,RabbitMQ,第七章,import,org,2.1
From: https://blog.csdn.net/qushaming/article/details/142818566

相关文章

  • Spring Boot 集成 RabbitMQ 消息事务(生产者)
    1.SpringBoot集成RabbitMQ消息事务(生产者)1.1.版本说明1.2.概览1.2.1.最大努力单阶段提交模式1.2.2.成功的业务流程1.2.3.失败的业务流程1.3.新建数据库表1.4.Spring配置1.5.定义常量1.6.配置交换机和队列1.7.定义RabbitMQ消息事务管理器1.8.定......
  • 交换机相关最终_续2
    节后修改版2:importtelnetlibimporttimeimportpandasaspdimportredeftelnet_login(host_ip,username,password):tn=telnetlib.Telnet()try:tn.open(host_ip,port=23,timeout=5)tn.read_until(b'Username:',timeout=5)......
  • RabbitMQ目录发送方确认confirm:确认模式2.重复调用接口时,会提示错误所以改进,自己搞一
    目录发送方确认confirm:确认模式2.重复调用接口时,会提示错误所以改进,自己搞一个template,return退回模式:RabbitMQ的可靠性/保证消息不丢失..消息在交换机中无法路由到制定队列:return模式重试机制发送方确认当消息的生产者发送消息以后,怎么知道是否到达服务器呢?......
  • springboot 整合 rabbitMQ(2)
    springboot整合rabbitMQ(1)-CSDN博客上期说了rabbitMQ的基础用法(普通队列模式)这期学习一下如何防止消息重复消费和进阶用法(订阅者模式)目录重复消费问题导致RabbitMQ重复消费问题的原因:解决思路代码实现(这里在上一期的代码上进行修改)生产者:消费者:订阅者模式:是什么:......
  • 交换机相关最终_续
    1.包含比较检查和修改步骤importparamikoimportosimporttimeimportrandomimportdatetimeimportpandasaspdimportreimportnumpyasnpfromsqlalchemyimporttext,create_engineimportpsycopg2frompsycopg2importsqlfromsqlalchemy.ormimportsessi......
  • 第七章
    第七章:类类的声明类的类名定义了唯一的类名。类可以声明与定义分离,仅声明时称为前向声明,这种声明之后定义之前产生的是不完全类型,这可以用来帮助定义指向这种类型的引用或指针。直到类被定义后数据成员才能被声明为这种类型,在创建类的对象之前必须完成类的定义,否则编译器不知......
  • 使用python对交换机进行排障自动化运维(锐捷)
    importglobimporttelnetlibimportrefromdatetimeimportdatetimefromtimeimportsleepimportpandasaspdimportosimporttimefrommatplotlibimportpyplotasplt#Telnet连接函数defconnect_telnet(hostname,username,password):  try:  ......
  • 交换机相关最终
    用判断语句实现所有可能场景`importtelnetlibimporttimeimportpandasaspdimportredeftelnet_login(host_ip,username,password):tn=telnetlib.Telnet()try:tn.open(host_ip,port=23,timeout=5)tn.read_until(b'Username:',timeout=5)tn.write(username.encode......
  • RabbitMQ
    一、概述1.关键特性高可用性:RabbitMQ支持消息的持久化和高可用性配置,可以在多个节点间复制消息,确保在单点故障时仍能正常工作。灵活的路由:RabbitMQ提供了多种路由方式(如直连、分发、主题、扇出),可以根据不同的需求将消息发送到相应的队列。支持多种协议:虽然RabbitMQ是......
  • RabbitMQ学习心得体会之Exchange
    参考:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet在工作队列中,一个消息只会投递一个消费者,但是发布订阅模式,同一个消息可以发送个多个消费者。(交换)ExchangesrabbitMQ的核心是把生产者把消息发送给exchange,这个x一边是从p接收消息,一边是把这些消息推送给队列。生......