首页 > 其他分享 >SpringAMQP的简要实现

SpringAMQP的简要实现

时间:2024-08-07 21:23:16浏览次数:18  
标签:简要 SpringAMQP String 实现 队列 消息 msg org public

1.Basic Queue 简单队列模型

1.1导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 yaml

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

1.3 消息发送

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

 1.4消息接收

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

2.WorkQueue

2.1消息发送

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

2.2消息接收

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

 2.3yaml 添加属性

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.发布/订阅

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

3.1Fanout 

 3.1.1 声明交换机和队列

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2消息发送

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.1.3消息接收

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

 3.2Direct

3.2.1基于注解声明队列和交换机,消息接收

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

3.2.2消息发送

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.3Topic

3.3.1 特点

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!  

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert  

通配符规则:  #:匹配一个或多个词  *:匹配不多不少恰好1个词  

举例:  item.#:能够匹配item.spu.insert 或者 item.spu  item.*:只能匹配

3.3.2消息发送

/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.3.3消息接收

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

标签:简要,SpringAMQP,String,实现,队列,消息,msg,org,public
From: https://blog.csdn.net/m0_52223915/article/details/140987835

相关文章

  • uniapp-实现轮播图效果深度总结【建议收藏】
       ......
  • Exceljs 实现html table转Excel
    在Vue3项目中将包含图片的HTML表格转换为Excel文件,你可以使用exceljs库,并结合前端技术来处理图片和表格数据。下面是一个在Vue3项目中实现的示例:安装依赖首先,确保你已经安装了exceljs库。你可以通过npm安装它:npminstallexceljs示例代码以下示例展示了如何......
  • 最小二乘法原理推导+代码实现[Python]
    0.前言本文主要介绍了最小二乘法公式推导,并且使用Python语言实现线性拟合。读者需要具备高等数学、线性代数、Python编程知识。请读者按照文章顺序阅读。绘图软件为:geogebra5。1.原理推导1.1应用最小二乘法在购房中的应用通常涉及房价预测和房屋定价方面。这种统计方法通......
  • 用C语言实现输入一个奇数n,输出一个由*构成的n阶实心菱形
    样图示例:一.基本思路该问题的主要难点时是如何使用循环通过人为输入的指定长度来确定空格和星号的输出,我的想法是将图形以中间最长的一条线分为上下部分,然后分别用不同的变量来表示空格和星号的输出,最后通过c语言来实现对图形颜色和闪烁的控制。二.具体实现1.上半部分......
  • C语言实现猜数字小游戏
    游戏要求:1.电脑自动生成1-100的随机数2.玩家猜数字,猜数字的过程中,根据猜测数据的大小给出大了还是小了的反馈,直到猜对游戏结束1.随机数的生产C语言提供了一个函数叫rand,这个函数可以生产随机数,函数的原型如下所示:rand函数会返回一个伪随机数,这个随机数的大小是在0-32767(......
  • 第五代英特尔® 至强® 可扩展处理器: PK8072205560、PK8072205560x00 Gold 处理器可实
    至强®可扩展处理器:第五代英特尔®至强®可扩展处理器采用内置英特尔®AIEngines,并具有与上一代相同的功率范围、软件和平台兼容性,可实现无与伦比的CPUAI性能。介绍英特尔®至强®Gold处理器英特尔®至强®Gold处理器针对要求严苛的AI、主流数据中心、多云计算......
  • QStyledItemDelegate 和QTreeView实现鼠标hover消息
    1.QTreeView设置属性mousetracking和tablettracing 重写QStyledItemDelegate类,重写函数booleditorEvent(QEvent*event,QAbstractItemModel*model,constQStyleOptionViewItem&option,constQModelIndex&index);这个函数里可以处理鼠标hover和点击事件;boolTreeTas......
  • 使用QNetworkAccessManager实现FTP上传下载功能
    自己写了一份FTP的代码,可以上传下载单文件,上传下载多文件,上传目录所有文件,但是下载目录的功能有问题,接口里代码规范也没做(如果有大佬提供修改方案就更好了),代码直接复制可用,留给有需要的人。#pragmaonce#include<QObject>#include<QNetworkReply>#include<QNetworkAcce......
  • 线程池实现范例
    1初始化线程池   确定线程数量,并做好互斥访问2启动所有线程  std::vector<std::thread*>threads_;unique_lock<mutex>lock(mutex_);for(inti=0;i<thread_num_;i++){autoth=newthread(&XThreadPool::Run,this);threads_.push_back(th);}3准备......