首页 > 其他分享 >复习消息队列之RabbitMQ

复习消息队列之RabbitMQ

时间:2023-08-09 17:45:46浏览次数:37  
标签:复习 队列 springframework RabbitMQ rabbit import org annotation amqp

概念:

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
对比:

RabbitMQ对比Kafka  mq来说更安全可靠 但大数据处理来说kafka更适合IO高吞吐的处理

安装部署(基于linux):

https://blog.csdn.net/kuaixiao0217/article/details/128593503

 

基于springboot实现MQ

引入依赖pom.xml 配置文件application.yaml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 10.15.0.9
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

第一种模式 直连

  生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("hello","hello world"); 
// 生产端没有指定交换机只有routingKey和Object。
//消费方产生hello队列,放在默认的交换机(AMQP default)上。
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
//交换机的队列有相同的名字,他就会自动路由上。 
//生产端routingKey 叫hello ,消费端生产hello队列。
//他们就路由上了
    }

}

  消费者

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// 生产端没有指定交换机只有routingKey和Object。
//消费方产生hello队列,放在默认的交换机(AMQP default)上。
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
//交换机的队列有相同的名字,他就会自动路由上。 
//生产端routingKey 叫hello ,消费端生产hello队列。
//他们就路由上了
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {

    @RabbitHandler
    public void receive1(String message){
        System.out.println("message = " + message);
    }
}

第二种模式 work模型

  生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work","hello work!"); 
            // 生产端没有指定交换机只有routingKey和Object。
            //消费方产生work队列,放在默认的交换机(AMQP default)上。
            //而默认的交换机有一个特点,只要你的routerKey的名字与这个
            //交换机的队列有相同的名字,他就会自动路由上。 
            //生产端routingKey 叫work ,消费端生产work队列。
            //他们就路由上了                 
        }
    }
}

  消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    // 生产端没有指定交换机只有routingKey和Object。
    //消费方产生work队列,放在默认的交换机(AMQP default)上。
    //而默认的交换机有一个特点,只要你的routerKey的名字与这个
    //交换机的队列有相同的名字,他就会自动路由上。 
    //生产端routingKey 叫work ,消费端生产work队列。
    //他们就路由上了                 
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("work message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("work message2 = " + message);
    }
}

 Fanout 广播模型

生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("logs","","这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容
    }
}

消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 创建临时队列
            exchange = @Exchange(name = "logs", type = "fanout")
    ))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(name = "logs", type = "fanout")  //绑定交换机类型
    ))
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

Route 路由模型

生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
    }

}

消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    key = {"info", "error"}, // 路由key
                    exchange = @Exchange(type = "direct", name = "directs")
            )})
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"error"},
                    exchange = @Exchange(type = "direct", name = "directs")
            )})
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

Topic 订阅模型(动态路由模型)

生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
    }
}

消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.*"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.#"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

 

标签:复习,队列,springframework,RabbitMQ,rabbit,import,org,annotation,amqp
From: https://www.cnblogs.com/ziwang520/p/17617522.html

相关文章

  • 记rabbitmq崩溃随笔
    之前一直以为MQ是个很简单的东西,部署好后,知道怎么样用就好了。不就是生产者-消费者之间的消息传递嘛,还有几种消息模式,延时队列插件啥的,都会用就好。直到在这家公司,自建的RabbitMQ稳定性一直很差,消费者老是起不来,偶尔又遇到幽灵队列的情况。终于,上周这个MQ终于顶不住了,又一开始排......
  • RabbitMQ总结
    1、为什么要使用MQ1、流量消峰举个例子:如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以......
  • 每日汇报 第七周第二天 JAVA复习&PTA
    今日学习:继续从网站中复习,PTA刷题时又复习了一下C++的vector用法和unordered_mapunordered_map是一个将key和value关联起来的容器,它可以高效的根据单个key值查找对应的value。key值应该是唯一的,key和value的数据类型可以不相同。unordered_map存储元素时是没有顺序的,......
  • 复习笔记|《计算机组成原理》
    参考教材:《计算机组成原理》蒋本珊➢前2类题看书中和课件中的有关概念。➢第3、4、5类题请注意平时的作业。如:❑扩展操作码设计❑有效地址的计算❑定点数乘、除运算❑存储器设计❑Cache计算❑微指令操作控制字段的设计第一章➢存储程序概念计算机硬件的组成,存储器控......
  • 微服务项目使用RabbitMQ
    微服务项目使用RabbitMQ很久未用RabbitMQ了,根据网上的Demo,大多数是一个服务包含所有的生产者和消费者和配置,当自己去搭建服务的时候,还需要一些思考各种包的划分.无法无脑CV大法,所以,下文,我根据实际项目抽离出一个比较完整的小Demo演示微服务项目使用RabbitMQ.注意:这个小D......
  • 大一下第二学期期中知识复习梳理 之 c++动态内存分配
    一、动态内存分配基本概念1、数组实现顺序表的缺陷:静态内存管理——程序在编译时,根据数组元素类型和个数分配所需内存大小,在程序运行时无法改变。2、内存空间分布 3、动态内存管理1) 2)动态内存分配(1)操作符new动态分配变量数组(对象数组): 指针变量=new变量类型[变量表......
  • [代码随想录]Day11-栈与队列part03
    题目:239.滑动窗口最大值思路:说实话这题真不能说是困难题,麻烦是麻烦点但是比较容易实现。维护一个单调队列,队列内是由大到小排序(数组内的顺序是由小到大的),每次移动都会进行两次判断:如果前面去掉的数就是队列的首部,那么就要把首部移除如果后面添加的数比队尾的元素要大就......
  • RabbitMQ从入门到精通零基础进阶学习路线?
    RabbitMQ从入门到精通零基础进阶学习路线?学习RabbitMQ可以遵循以下路线,从入门到精通:步骤1:理解消息队列和RabbitMQ基础知识-学习什么是消息队列和为什么要使用它们。-了解RabbitMQ的概念和术语,如生产者、消费者、队列、交换器和绑定等。步骤2:安装和设置RabbitMQ-下载和安装Ra......
  • rabbitMQ
    MQ是一个消息队列,是存储信息的中间件 分布式系统通信的两种方式:1.直接远程调用 2.通过第三方中间件消息传递。......
  • 【Rabbitmq】报错:ERROR CachingConnectionFactory Channel shutdown: channel error
    目录01报错原因02解决方案2.1全局配置2.2注解方式原文链接:【Rabbitmq】报错:ERRORCachingConnectionFactoryChannelshutdown:channelerror异常信息Channelshutdown:channelerror;protocolmethod:#method<channel.close>(reply-code=406,reply-text=PRECONDITI......