首页 > 其他分享 >RabbitMQ笔记

RabbitMQ笔记

时间:2024-06-09 16:02:18浏览次数:25  
标签:队列 springframework 笔记 org import RabbitMQ public

 端午无聊,就来学学MQ吧


消息队列

消息指的是两个应用之间传递的数据

消息队列是在消息传递中保存消息的容器

生成者:只负责发送数据

消费者:只负责读取数据

(数据就是消息)

为什么要用消息队列

解耦

如果一个系统(系统a)的需求是共享自己系统的数据,而其他系统(系统BCD)是需求者。而这个时候系统a为了共享自己系统的数据,就需要分别写相关的代码(a->b)(a->c)(a->d)。假如某一时刻系统d的需求突然变了,他不在需要a系统的数据。这个时候,系统a的相关代码就要删除(a->d)。假如过了不久又有了一个新系统e需要系统a的数据,则系统a又需要增加新的代码。这样子系统的耦合性太高了,所以可以用MQ来降低耦合。系统a只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

异步

假如我们的操作为请求进入后又系统a响应调度系统bdc进行操作。这样子就是同步请求,响应的时间就是系统ABCD的总和。如果使用了MQ,系统a发送数据到MQ,然后就可以返回响应给客户端,不需要再等待其他调度系统的响应,可以大大地提高性能

削峰

对于高并发的场景下,如果关于sql的请求直接进入mysql进行执行,那么mysql很有可能崩溃,而导致系统的崩溃。而如果使用MQ,那么请求不再是直接发送sql到数据库,而是把数据发送到MQ,MQ短时间内积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,以防止在请求峰值时期大量的请求直接发送到MySQl导致系统崩溃。

(上面了解了这么多终于到正片了)


RabbitMQ的特点

使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。

可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。灵活的分发消息策略、支持集群、多种协议、支持多种语言、客户端可视化管理界面插件机制

初体验

永远的hello world

pom配置:
<!--消息队列相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
生产者

上来啥也不会先照着做

上代码

​
​

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
​
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
​
import javax.annotation.Resource;
​
@Configuration
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    /*
    * 这是创建交换机和队列用的rabbitAdmin对象
    * */
    @Resource
    private RabbitAdmin rabbitAdmin;
    //初始化rabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
        //只有设置为true,spring 才会加载RabbitAdmin这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    //实例化bean后,也就是Bean的后置处理器
    @Override
    public Object postProcessAfterInitialization(Object bean,String beanName)throws BeansException{
        //创建交换机
        rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
        //创建队列
        rabbitAdmin.declareQueue(rabbitmqDemoDirectQuece());
        return null;
    }
    @Bean
    public Queue rabbitmqDemoDirectQuece(){
        /*
        * 1.name:队列名称
        * 2.durable:是否持久化
        * 3.exclusive:是否独享
        * 4.autoDelete:是否自动删除
        * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC,true,false,false);
    }
​
    @Bean
    public DirectExchange rabbitmqDemoDirectExchange(){
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,true,false);
    }
​
    @Bean
    public Binding bindDirect(){
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoDirectQuece())
                //到交换机
                .to(rabbitmqDemoDirectExchange())
                //并设置匹配键
                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }
}
​
​
​
public class RabbitMQConfig {
    /*
    * RabbitMQ的队列主题名称
    * */
    public static final String RABBITMQ_DEMO_TOPIC="rabbitmqDemoTopic";
    /*
    * RabbitMQ的DIRECT交换机名称
    * */
    public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE="rabbitmqDemoDirectExchange";
    /*
    * RabbitMQ的DIRECT交换机和队列绑定的匹配键DirectRouting
    * */
    public static final String RABBITMQ_DEMO_DIRECT_ROUTING="rabbitmqDemoDirectRouting";
​
}
​
​
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
​
import javax.annotation.Resource;
​
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQServiceImpl rabbitMQService;
    @PostMapping("/sendMsg")
    public String sendMsg(@RequestParam(name="msg") String msg) throws Exception{
        return rabbitMQService.sendMsg(msg);
    }
}
​
​
​
import org.example.productflashsalesystem.RabbitMQDemo.demo.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
​
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
​
@Service
public class RabbitMQServiceImpl {
    //日期格式化
    private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Resource
    private RabbitTemplate rabbitTemplate;
    public String sendMsg(String msg) throws Exception{
        try{
            String msgId= UUID.randomUUID().toString().replace("-","").substring(0,32);
            String sendTime=sdf.format(new Date());
            Map<String,Object> map=new HashMap<>();
            map.put("msgId",msgId);
            map.put("sendTime",sendTime);
            map.put("msg",msg);
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING,map);
            return "ok";
        }catch (Exception e){
            e.printStackTrace();
            return "error";
        }
    }
}

(这些放在一个包里)

启动

​
​

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQDemoApplication.class,args);
    }
}

消费者

下面是消费者登场

​
​
import org.example.productflashsalesystem.RabbitMQDemo.demo.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.util.Map;
​
@Component
//使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
    @RabbitHandler
    public void process(Map map){
        System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:"+map.toString());
    }
}
​
启动

​
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQDemocoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQDemocoApplication.class,args);
    }
}
​

(怎么感觉有点在上生物课了。。。。)

废话不多说接下去就是了解这些代码都写了什么对象!

RabbitMQ中的组成部分

Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。

Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。RabbitMQ有4种交换器(在后面会介绍比较常用的三种)

Binding:绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列

Channel:多路复用连接中的一条独立的双向数据流通道,可读可写。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。

Queue:消息队列,存储消息的队列

Producer:消息生产者。生产方客户端将消息同交换路由发送到队列中。

Consumer:消息消费者。消费队列中存储的消息。

流程图

img

消息生产者连接到RabbitMQ Broker,创建connection,开启channel

生产者声明交换机类型、名称、是否持久化等。

生产者发送消息,并指定消息是否持久化等属性和routing key。

exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的队列里面。

消费者监听接收到消息之后开始业务处理。

(好吧,英语不好好难受。。。。)

Rabbit MQ消息模型

1650081010_1_.png

image _33_.png

Exchange

这个组件好像挺关键的,深入看看这么个事

消息发送到RabbitMQ后首先要经过Exchange路由才能对应的Queue

Direct Exchange

一对一的,点对点的发送。上面的代码就是例子

Fanout Exchange

发布订阅

Topic Exchange

通配符交换机

总结

比较常用的就是以上三种:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。

实际上稍微思考一下,可以发现通配符(TopicExchange)这种模式其实是可以达到直连(DirectExchange)和发布订阅(FanoutExchange)这两种的效果的。

FanoutExchange不需要绑定routingKey,所以性能相对TopicExchange会好一点。

标签:队列,springframework,笔记,org,import,RabbitMQ,public
From: https://blog.csdn.net/2301_79909038/article/details/139562694

相关文章

  • Objective-C 学习笔记 | 基础
    Objective-C学习笔记|基础参考书:《Objective-C编程(第2版)》第1部分入门Objective-C语言是以C语言为基础的,但增加了对面向对象编程的支持。Objective-C语言是用来开发在苹果iOS以及OSX操作系统上运行的应用的编程语言。第2部分如何编程该部分讲解了C语言编程的必......
  • CUDA编程学习笔记-02
    CUDA代码高效计算策略高效公式✒️Math代表数学计算量,Memory代表每个线程的内存......
  • FFmpeg开发笔记(二十八)Linux环境给FFmpeg集成libxvid
    ​XviD是个开源的视频编解码器,它与DivX一同被纳入MPEG-4规范第二部分的视频标准,但DivX并未开源。早期的MP4视频大多采用XviD或者DivX编码,当时的视频格式被称作MPEG-4。现在常见的H.264后来才增补到MPEG-4规范的第十部分,当然如今使用XviD压缩的视频已经不多了。在《FFmpeg开发实战......
  • OpenCompass大模型测评实战学习笔记
    一、OpenCompass介绍:评测相关:评测意义:研究评测对于我们全面了解大型语言模型的优势和限制至关重要;研究评测有助于指导和改进人类与大型语言模型之间的协同交互;研究评测可以帮助我们更好地规划大型语言模型未来的发展;评测能了解不同语言模型之间的性能、舒适性和安全性,能够帮......
  • 算法课程笔记——可撤销并查集
    算法课程笔记——可撤销并查集Gv......
  • 【RabbitMQ】SpringAMQP--消息转换器
    在SpringAMQP的发送方法中,接收消息的类型是 Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。测试发送Object类型消息1.声明队列@ConfigurationpublicclassFanoutConfig{@BeanpublicQueueobjectQueue(){return......
  • 读AI未来进行式笔记07量子计算
    1.      AI审讯技术1.1.        发明者最初的目的是发明一种能够替代精神药物,为人类带来终极快乐的技术1.1.1.          遗憾的是,他找到的只是通往反方向的大门1.2.        通过非侵入式的神经电磁干扰大脑边缘系统,诱发受审者最为恐惧及......
  • 数据结构学习笔记-堆排序
    堆排序算法的设计与分析问题描述:设计并分析堆排序【前置知识:什么是堆?】堆(Heap)是一种特殊的树形数据结构,它满足以下两个条件之一:最大堆(MaxHeap):每个节点的值都大于或等于其子节点的值。换句话说,根节点的值是整个堆中最大的。最小堆(MinHeap):每个节点的值都小于或等于其......
  • C语言学习笔记(八)————数组
    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录前言1一维数组1.2一维数组的引用1.3一维数组的初始化2二维数组2.1二维数组的定义2.2二维数组的存放顺序3多维数组总结前言一个学习C语言的小白,有问题评论或私信~本文主要记录C语言......
  • 计算机组成原理复习笔记
    前言就是按照考试的题型写的总结非常应试版题型一、进制转换只考十进制二进制十六进制之间的相互转换一个个看(1)十进制转其他转二进制:除以2从小到大取余数(0或1)转十六进制:除以16从小到大取余数(0到f)(2)二进制十六进制转十进制每位数字乘以相应的幂数再相......