首页 > 其他分享 >RabbitMQ实战入门

RabbitMQ实战入门

时间:2023-07-30 12:32:49浏览次数:29  
标签:实战 入门 2023 springframework RabbitMQ org import message 消息

一、概念

RabbitMQ架构模型分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。两者通过连接和信道进行通信。

整体的流程是生产者将消息发送到服务器,消费者从服务器中获取对应的消息。具体流程是生产者在发送消息前小确定发送给哪个虚拟机的哪个交换器,再由交换器通过路由键将消息发送给与之绑定的队列。最后消费者到指定的队列中获取自己的消息进行消费。

二、客户端和服务端

生产者和消费者都属于客户端,生产者是消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消费者是消息的接收方,负责消费消息体。

服务端包含虚拟主机、交换器和队列。虚拟主机用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。交换器负责接收生产者的消息,并根据规则分配给对应的队列。队列负责存储消息,生产者发送的消息会被存储到这里,而消费者从这里获取消息。

三、工作模式

模式

介绍

简单模式

简单模式就是生产者将消息发送给队列,消费者从队列中获取消息即可

工作队列模式

存在多个消费者,生产者将消息放到队列,多个消费者会依次进行消费。只有在消息条数是消费者数量的整数倍才能做到公平分配

广播模式

广播模式下,即使只发送一条消息,它对应的所有消费者都可以全部收到

路由模式

路由模式下,交换器会根据不同的路由键将消息发送给指定的队列,从而被特定的消费者消费。

动态路由模式

路由模式需要指定明确的路由键,动态路由模式可以支持带通配符的路由键

四、管理平台

启动RabbitMQ服务,访问http://localhost:15672/

RabbitMQ实战入门_中间件

默认的账户和密码都是guest,我们用这个账户登录后,会进入管理页面。

RabbitMQ实战入门_生产者_02

我们在Admin的标签页面里面添加一个虚拟主机。

RabbitMQ实战入门_服务器_03

五、项目实战

1.引入依赖

<!--rabbitmq-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>2.6.11</version>
</dependency>
<!--lomobk-->
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<version>1.18.28</version>
</dependency>

2.添加相关配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 虚拟主机名称
spring.rabbitmq.virtual-host=my-virtual

3.简单模式测试

生产者发送消息示例代码如下:

package com.example.rabbitmqtest.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 生产者
 */
@RestController
@RequestMapping("/rabbit")
@Slf4j
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 简单模式-生产者发送消息
     *
     * @param routingKey 路由键
     * @param message    消息
     */
    @GetMapping("/producer")
    public String send(String routingKey, String message) {
        rabbitTemplate.convertAndSend(routingKey, message);
        log.info("生产者发送消息:{}", message);
        return "生产者发送消息成功";
    }
}

消费者接收消息示例代码如下:

package com.example.rabbitmqtest.components;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 消费者
 */
@Slf4j
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Consumer {

    @RabbitHandler
    public void receive(String message) {
        log.info("接收到的消息:{}", message);
    }
}

我们启动项目,访问生产者发送消息的请求。

RabbitMQ实战入门_服务器_04

我们在控制台可以看到消费者接收到了消息。

2023-07-30 10:34:54.151  INFO 6716 --- [nio-8080-exec-1] c.e.r.controller.ProducerController      : 生产者发送消息:hello
2023-07-30 10:34:54.244  INFO 6716 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer     : 接收到的消息:hello

4.工作队列模式测试

生产者示例代码如下:

package com.example.rabbitmqtest.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 生产者
 */
@RestController
@RequestMapping("/rabbit")
@Slf4j
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 工作队列模式-生产者发送消息
     *
     * @param routingKey 路由键
     * @param message    消息
     */
    @GetMapping("/work")
    public String send(String routingKey, String message) {
        for (int i = 1; i <= 10; i++) {
            rabbitTemplate.convertAndSend(routingKey, "第" + i + "条消息:" + message);
        }
        return "生产者发送消息成功";
    }
}

消费者示例代码如下:

package com.example.rabbitmqtest.components;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 消费者
 */
@Slf4j
@Component
public class Consumer {

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveOne(String message) {
        log.info("receiveOne接收到的消息:{}", message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveTwo(String message) {
        log.info("receiveTwo接收到的消息:{}", message);
    }

}

启动项目,调用工作队列模式的测试请求

RabbitMQ实战入门_消费者_05

我们在控制台上看到消费者接收到的消息是均分获取的。

2023-07-30 10:49:44.805  INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer     : receiveOne接收到的消息:第1条消息:hello
2023-07-30 10:49:44.805  INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer     : receiveTwo接收到的消息:第2条消息:hello
2023-07-30 10:49:44.807  INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer     : receiveOne接收到的消息:第3条消息:hello
2023-07-30 10:49:44.808  INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer     : receiveTwo接收到的消息:第4条消息:hello
2023-07-30 10:49:44.808  INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer     : receiveOne接收到的消息:第5条消息:hello
2023-07-30 10:49:44.808  INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer     : receiveTwo接收到的消息:第6条消息:hello
2023-07-30 10:49:44.808  INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer     : receiveOne接收到的消息:第7条消息:hello
2023-07-30 10:49:44.808  INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer     : receiveTwo接收到的消息:第8条消息:hello
2023-07-30 10:49:44.808  INFO 8224 --- [ntContainer#0-1] c.e.rabbitmqtest.components.Consumer     : receiveOne接收到的消息:第9条消息:hello
2023-07-30 10:49:44.809  INFO 8224 --- [ntContainer#1-1] c.e.rabbitmqtest.components.Consumer     : receiveTwo接收到的消息:第10条消息:hello

5.广播模式测试

生产者示例代码如下:

    /**
     * 广播模式-生产者发送消息
     */
    @GetMapping("/fanout")
    public void sendFanout(String exchange, String message) {
        rabbitTemplate.convertAndSend(exchange, "", message);
    }

消费者示例代码如下:

package com.example.rabbitmqtest.components;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 消费者
 */
@Component
@Slf4j
public class FanoutConsumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "fanout", type = "fanout")))
    public void reciveMessage(String message) {
        log.info("receiveMessage:{}", message);
    }
}

启动项目,进行广播模式的测试

RabbitMQ实战入门_消息队列_06

控制台显示接收到的消息情况

2023-07-30 11:24:56.608  INFO 9036 --- [ntContainer#2-1] c.e.r.components.FanoutConsumer          : reciveOne:hello
2023-07-30 11:24:56.608  INFO 9036 --- [ntContainer#3-1] c.e.r.components.FanoutConsumer          : reciveTwo:hello

6.路由模式测试

    /**
     * 路由模式
     */
    @GetMapping("/direct")
    public String sendDirect(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return "路由模式发送消息成功";
    }
package com.example.rabbitmqtest.components;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 路由模式消费者
 */
@Slf4j
@Component
public class DirectConsumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa", "bb"}, exchange = @Exchange(name = "direct", type = "direct")))
    public void reciveOne(String message) {
        log.info("reciveOne:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa"}, exchange = @Exchange(name = "direct", type = "direct")))
    public void reciveTwo(String message) {
        log.info("reciveTwo:{}", message);
    }
}

启动项目根据路由键来发送消息

RabbitMQ实战入门_消费者_07

路由键为aa,所以两个消费者都接收到了消息

2023-07-30 11:51:20.254  INFO 3776 --- [ntContainer#2-1] c.e.r.components.DirectConsumer          : reciveOne:hello
2023-07-30 11:51:20.254  INFO 3776 --- [ntContainer#3-1] c.e.r.components.DirectConsumer          : reciveTwo:hello

路由键为bb,那么只有第一个消费者接收到了消息

RabbitMQ实战入门_生产者_08

2023-07-30 11:52:14.484  INFO 3776 --- [ntContainer#2-1] c.e.r.components.DirectConsumer          : reciveOne:hello

7.动态路由模式测试

修改消费者代码

package com.example.rabbitmqtest.components;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/07/30
 * @desc 动态路由模式消费者
 */
@Slf4j
@Component
public class TopicConsumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.bb.cc"}, exchange = @Exchange(name = "topic", type = "topic")))
    public void reciveOne(String message) {
        log.info("reciveOne:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.bb.*"}, exchange = @Exchange(name = "topic", type = "topic")))
    public void reciveTwo(String message) {
        log.info("reciveTwo:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.bb.#"}, exchange = @Exchange(name = "topic", type = "topic")))
    public void reciveThree(String message) {
        log.info("reciveThree:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"aa.#"}, exchange = @Exchange(name = "topic", type = "topic")))
    public void reciveFour(String message) {
        log.info("reciveFour:{}", message);
    }
    
}

启动项目,传入动态的键值数据

RabbitMQ实战入门_生产者_09

我们在控制台上显示的数据知道只有3和4符合我们的动态路由规则,所以只有3和4接收到了服务器的消息。

2023-07-30 11:57:54.554  INFO 1524 --- [ntContainer#9-1] c.e.r.components.TopicConsumer           : reciveFour:hello
2023-07-30 11:57:54.556  INFO 1524 --- [ntContainer#8-1] c.e.r.components.TopicConsumer           : reciveThree:hello


标签:实战,入门,2023,springframework,RabbitMQ,org,import,message,消息
From: https://blog.51cto.com/u_13312531/6899721

相关文章

  • python数据分析师入门-学习笔记(第三节)
    学习链接:python数据分析师入门爬虫到底是什么概括爬虫是批量化自动获取既有数据 批量化 自动 既有数据通常 获取既有数据特殊 批量注册一批账号 批量去领取优惠券 批量自动下单购物 自动做任务(签到)......
  • python数据分析师入门-学习笔记(第二节)
    爬虫(数据采集)序言1.爬虫到底是什么2.爬虫的应用场景3.爬虫的分类4.爬虫合法吗5.爬虫如何搞钱初级1.开始爬虫的准备工作2.爬虫的核心流程3.数据获取4.数据提取5.数据存储6.应对反爬虫中级1.提升性能2.令牌池(cookie......
  • 【SpringBoot】快速入门
    (知识目录)一、SpringBoot快速入门1.1idea创建(1)新建模块,选择SpringInitializer,一定要记得选择SpringBoot的版本为2.7.7注意上面的两个目录一定要是空目录,然后点击apply,OK即可,之后进入到主界面。(2)编写UserController类packagecom.itxiaoguo.controller;importor......
  • 微信小程序入门
    全局配置:配置所有页面路径:app.json,{pages:[配置所有页面]},将首页放置在第一个位置上设置tabbar导航:"tabBar":{"color":"#7A7E83",//字体颜色"selectedColor":"#3cc51f",//选中时候字体的颜色"borderStyle":"black",//tarbar边框......
  • python数据分析师入门-学习笔记
    第一节数据分析整体介绍应用领域数据分析爬虫开发数据存储数据可视化数据分析内容1.语言基础python基础2.数据获取爬虫课程3.数据存储MySQL数据库4.数据处理NumpyPandas5.数据可视化Matplot......
  • Quartz实战:基于Quartz实现定时任务的动态调度,实现定时任务的增删改查
    文章目录一、Quartz基础二、使用Quartz实现定时任务的动态调度1、使用Quartz-jobStore持久化2、前端页面实现效果图3、自定义job表4、增删改查Controller5、Quartz工具类6、测试任务类7、springboot启动初始化定时任务8、自定义JobFactory,使Task注册为Bean9、省略的内容10、总结......
  • C语言从入门到绝望
    Aclockinoneline:intmain(int_,char**__){_^448&&main(-~_,__);__builtin_putchar(--_%64?32|-~7[__TIME__-_/8%8][">'txiZ^(~z?"-48]>>";;;====~$::199"[_*2&8|_/64]/(_&2?1:8)%8&1:10);}Hello,World!int......
  • Dubbo(二)_入门
    什么是dubbodubbo最新版本为3.x,ApacheDubbo是一款易用、高性能的web和rpc框架,同时为构建企业级微服务提供服务发现、流量治理、可观测、认证鉴权等能力。Dubbo3替代了阿里运行多年的HSF框架,依托于Dubbo实现自己的微服务解决方案(DNS),即Dubbo、Nacos(服务注册与管理)......
  • MongoDB从入门到精通深入学习路线图?
    MongoDB从入门到精通深入学习路线图?学习MongoDB从入门到精通需要掌握以下内容,以下是一个深入学习路线图:阶段1:入门1.1学习数据库基础知识-数据库的概念和作用-关系数据库和非关系数据库的区别-NoSQL数据库的特点和优势1.2安装和配置MongoDB-下载并安装MongoDB-配置Mongo......
  • 18-Hive入门&安装
    1.Hive概述1.1什么是Hive?ApacheHive是一款建立在Hadoop之上的开源数据仓库工具,可以将存储在Hadoop文件中的结构化、半结构化数据文件映射为一张数据库表,基于表提供了一种类似SQL的查询模型,称为Hive查询语言(HQL),用于访问和分析存储在Hadoop文件中的大型数据集。H......