首页 > 其他分享 >rabbitmq-2

rabbitmq-2

时间:2024-12-31 14:31:57浏览次数:1  
标签:队列 springframework rabbitmq org import com public

springboot/springcloud整合rabbitmq实战
简单例子
生产者
创建项目,引入jar

 <!--springboot整合rabbitmq包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.15</version>
        </dependency>

配置Rabbitmq数据源

spring:
  #spring整合rabbtitmq数据库配置
  rabbitmq:
    username: root
    virtual-host: /root/
    password: tiger
    host: 192.168.170.30
    port: 5672

编写配置交换-队列代码

package com.aaa.sbm.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @FileName: RabbitmqConfig
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:23
 * @Version: 1.0.0
 */
@Configuration //相当于rabbitmq-config.xml  <beans>
public class RabbitmqConfig {

    //定义常量,交换机的名称
    public   static  final String EXCHANGE_NAME="exchange_topic_1";
    //队列名称
    private  static  final String QUEUE_NAME="queue12";

    /**
     * 配置交换机
     * @return
     */
    @Bean
    public Exchange   exchangeTopic(){
         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }


    /**
     *配置队列
     * @return
     */
    @Bean
    public Queue  queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 配置绑定
     * @return
     */
    @Bean
    public Binding  bindExchangeToQueue(){
        //把交换机和队列绑定时一定写绑定key  = info.#
        return  BindingBuilder.bind(queue()).to( exchangeTopic()).with("info.#").noargs();
    }
}

编写发送消息代码

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqSendMsg {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;
    @Test
    public void testSendMsg(){
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
    }
}

测试

先打开rabbit服务器,启动rabbitmq服务
 systemctl start rabbitmq-server

消息者
创建项目,引入jar

 <!--springboot整合rabbitmq包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.15</version>
        </dependency>

配置Rabbitmq数据源

spring:
  #spring整合rabbtitmq数据库配置
  rabbitmq:
    username: root
    virtual-host: /root/
    password: tiger
    host: 192.168.170.30
    port: 5672

编写接受消息监听器
复杂例子
需求
生产者有批量定时处理的任务,需要大量部门信息批量向中间件中存储,然后消费要进行消费处理。
生产者

package com.aaa.sbm.task;

import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @FileName: BatchScheduledHandlerDeptInfo
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/11 10:30
 * @Version: 1.0.0
 */
@Component
@EnableScheduling  //开启定时功能
public class BatchScheduledHandlerDeptInfo {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //原子Long类型  线程安全的
    private AtomicInteger atomicInteger  = new AtomicInteger();

    /**
     * 每个5秒钟产生20条部门信息,并发送到消息中间中,使用多线程提高发送速度
     */
    // * 秒   *  分   * 时  * 日 * 月 * 周
    //  /  每隔   - 范围   , 枚举
    // */5 * * * * *  每隔5秒执行一次
    // 5-15 * * * * *   任意一分钟的第5秒,第6秒。。。第15分别执行一次
    // 5,10,15 * * * * *  任意一分钟的第5秒,第10秒,第15分别执行一次
    // 0 30  6  01  * *  每月一周6.30执行一次
    @Scheduled(cron = "*/5 * * * * *")
    public  void   handlerDept(){
        //使用线程池的固定线程池大小类型
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        //启动线程
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Integer i = atomicInteger.getAndIncrement();
                    Dept dept = new Dept(i, "开发" + i + "部", "郑州" + i);
                    //使用fastjson把对象转为json发送,方便消费者读取
                    String deptJson = JSON.toJSONString(dept);
                    rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.dept",deptJson);
                }
            });
        }
        System.out.println("每隔5秒,发送20个对象到rabbitmq中!!!");

    }
}

消息者

package com.aaa.sbm.util;

import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @FileName: RabbitmqConsumerUtil
 * @Description: Rabbitmq消费者工具类,用来监听并处理消息
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:57
 * @Version: 1.0.0
 */
@Component
public class RabbitmqConsumerUtil {


    /**
     * 监听方法
     * @param message
     */
    @RabbitListener(queues = "${queue.name}")    //获取配置文件中配置的队列名称
    public  void    handlerMsg(Message message){
        //底层使用序列化过的字节对象进行传输
        byte[] messageBodyByteArray = message.getBody();
        String deptJson = new String(messageBodyByteArray);
        System.out.println("消费的消息为:"+deptJson);
        Dept dept = JSON.parseObject(deptJson, Dept.class);
        System.out.println("dept对象:"+dept);
    }

}

如何保证消息不丢失
示意图

生产者保证消息发送到交换机(confirm机制)
配置
#发布确定类型 none 不开启 不会确定回调 simple 同步 必须等交换机给我发送确认后,再处理业务 correlated 异步 速度快
publisher-confirm-type: correlated
代码实现

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqSendMsgComfirm {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        //deptDao.deleteById(1);
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        //实例化确认回到接口
        RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // CorrelationData correlationData 相关数据,有一个唯一识别的ID和其他信息, boolean ack, @Nullable String cause
                System.out.println("识别ID:"+(correlationData==null?"":correlationData.getId()));
                //ack应答   如果为true说明消息到交换中  false出错没到
                if(ack){
                    System.out.println("消息已经发送到交换机中!");
                }else {
                    System.out.println("出现错误,错误原因为:"+cause);
                }
            }
        };
        //调用确定回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象    exchange_top_1111
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
        // 打印提示
        System.out.println("消息发送完毕!");
    }
}

生产者保证消息发送到队列(returnsCallback机制)
配置
#开启返回回调
publisher-returns: true
代码实现

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqSendMsgReturnsCallback {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        //deptDao.deleteById(1);
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("确定到队列的信息是:"+new String(returnedMessage.getMessage().getBody()));
                System.out.println("交换机:"+returnedMessage.getExchange());
                System.out.println("路由键:"+returnedMessage.getRoutingKey());
                System.out.println("返回码:"+returnedMessage.getReplyCode());
                System.out.println("返回字符串:"+returnedMessage.getReplyText());
            }
        };
        //发送前,设置消息确定到队列的回调
        rabbitTemplate.setReturnsCallback(returnsCallback);
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
        // 打印提示
        System.out.println("消息发送完毕!");
    }
}

消费者保证消息成功消费后再删除

概述
默认情况下,消息自动确认,消费者一旦拿到消息,确定消费,消费就可以删除了。但是消费者拿到消息后在处理业务过程中,可能出现错误,使用消息没有正确的处理完业务,也属于消息不可靠。这种情况下,开启手动确认消息,写代码完成业务处理成功后再确认消息成功消费,再删除,如果出现错误,不确定消息成功消费,并把消息返回队列。
配置
listener:
simple: #支持1个消费者
# 确认类型
acknowledge-mode: manual #none 禁止使用消息确认 auto 自动确认 manual手动确认
#direct: #支持多个消费者
#acknowledge-mode: manual
代码实现

package com.aaa.sbm.util;

import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @FileName: RabbitmqConsumerUtil
 * @Description: Rabbitmq消费者工具类,用来监听并处理消息
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:57
 * @Version: 1.0.0
 */
@Component
public class RabbitmqConsumerUtil {


    /**
     * 监听方法
     * @param message
     */
    /*@RabbitListener(queues = "${queue.name}")    //获取配置文件中配置的队列名称
    public  void    handlerMsg(Message message){
        //底层使用序列化过的字节对象进行传输
        byte[] messageBodyByteArray = message.getBody();
        String deptJson = new String(messageBodyByteArray);
        System.out.println("消费的消息为:"+deptJson);
        Dept dept = JSON.parseObject(deptJson, Dept.class);
        System.out.println("dept对象:"+dept);
    }*/

    /**
     * 接受消息的方法,手动保证数据可靠
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${queue.name}")    //获取配置文件中配置的队列名称
    public  void    handlerMsg(Message message, Channel channel){
        //获取消息唯一数字标识
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //底层使用序列化过的字节对象进行传输
            byte[] messageBodyByteArray = message.getBody();
            System.out.println("消费的消息为:"+messageBodyByteArray);
            //执行业务  //1 ,2,3,4....100
            System.out.println(1/0);
            //...
            //...
            //basicAck手动确认收到信息的方法
            //long deliveryTag 消息唯一数字标识,
            // boolean multiple   许多的 小于当前deliveryTag所有消息都确认,

            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            e.printStackTrace();
            ////basicNack  Nack=Not ack  手动不确认收到信息的方法
            //long deliveryTag 消息唯一数字标识,
            // boolean multiple   许多的 小于当前deliveryTag所有消息都确认
            // boolean requeue  重回队列
            try {
                channel.basicNack(deliveryTag,false,true);
                //channel.basicNack(deliveryTag,false,false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }

    }

}

死信队列
概念
死信(Dead Letter),是rabbitmq中的一种机制,因为某种原因造成消息丢失,丢失的消息就称为死信。把死信放入一个队列,不让它丢失,这个队列称为死信队列。通常死信队列会和一个交换机进行绑定,这个交换机称为死信交换机(Dead Letter Exchange),又叫DLX。
原因

示意图

代码实现

package com.aaa.sbm.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @FileName: RabbitmqDeadLetterQueueConfig
 * @Description: 死信队列配置
 * @Author: zhz
 * @CreateTime: 2024/12/12 9:37
 * @Version: 1.0.0
 */
@Configuration
public class RabbitmqDeadLetterQueueConfig {

    public  static  final  String BUSINESS_EXCHANGE_NAME="businessExchangeA";
    private   static  final  String DEAD_LETTER_EXCHANGE_NAME="deadLetterExchangeA";
    private   static  final  String DEAD_LETTER_QUEUE_NAME="deadLetterQueueA";
    private   static  final  String BUSINESS_QUEUE_NAME="businessQueueA";
    private   static  final  String ROUTING_KEY="deadLetter.info.#";




    /**
     * 配置业务交换机
     * @return
     */
    @Bean
    public Exchange   businessExchange(){
         return ExchangeBuilder.topicExchange(BUSINESS_EXCHANGE_NAME).build();
     }

    /**
     * 配置业务队列
     * @return
     */
    @Bean
    public Queue businessQueue(){
        //实例化参数集合
        Map<String, Object> arguments = new HashMap<>();
        //让当前队列绑定死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key",ROUTING_KEY);
        //实例化队列,传递参数
        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(arguments).build();
    }

    /**
     * 把业务交换机和业务队列绑定
     * @return
     */
    @Bean
    public Binding  businessExchangeToQueueBind(){

        return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY).noargs();
    }

    /**
     * 配置死信交换机
     * @return
     */
    @Bean
    public Exchange   deadLetterExchange(){
        return ExchangeBuilder.topicExchange(DEAD_LETTER_EXCHANGE_NAME).build();
    }

    /**
     * 配置死信队列
     * @return
     */
    @Bean
    public Queue  deadLetterQueue(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).build();
    }

    /**
     * 把死信交换机和死信队列绑定
     * @return
     */
    @Bean
    public Binding  deadLetterExchangeToQueueBind(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY).noargs();
    }

}

测试
测试:消费者不确认消息也不重回队列
生产者

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.config.RabbitmqDeadLetterQueueConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqDeadLetterSendMsg {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        //deptDao.deleteById(1);
        //定义发送字符串
        String  sendMsg = "hello.qy178!!!!";
        // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
        rabbitTemplate.convertAndSend(RabbitmqDeadLetterQueueConfig.BUSINESS_EXCHANGE_NAME,"deadLetter.info.test",sendMsg);
        // 打印提示
        System.out.println("消息发送完毕!");
    }
}

消费者

  //basicNack 不确认     boolean requeue=false  不重回队列
                channel.basicNack(deliveryTag,false,false);

测试:消息超时丢失。
生产者

 //实例化参数集合
        Map<String, Object> arguments = new HashMap<>();
         //设置业务队列存放消息后,消息的过期时间   x-message-ttl =time to live   单位是:毫秒
        arguments.put("x-message-ttl",20000);

消费者
消费者20秒之内一定不要运行

测试:消息超出了队列的存放长度丢失。
生成者
设置长度

  //实例化参数集合
        Map<String, Object> arguments = new HashMap<>();
         //设置业务队列存放消息后,消息的过期时间   x-message-ttl =time to live   单位是:毫秒
       // arguments.put("x-message-ttl",20000);
        //设置业务队列存放消息的最大长度10
        arguments.put("x-max-length",10);

循环发送信息,发送数量大于长度

package com.aaa.sbm.test;

import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.config.RabbitmqDeadLetterQueueConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @FileName: TestRabbitmqSendMsg
 * @Description: 测试rabbitmq发送消息的类
 * @Author: zhz
 * @CreateTime: 2024/12/11 9:34
 * @Version: 1.0.0
 */
@SpringBootTest
public class TestRabbitmqDeadLetterSendMsg {

    //使用spring支持的设计模式-模版模式
    // spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
    @Resource
    private RabbitTemplate  rabbitTemplate;

   /* @Resource
    private DeptDao deptDao;*/
    @Test
    public void testSendMsg(){
        for (int i = 0; i < 15; i++) {
            //deptDao.deleteById(1);
            //定义发送字符串
            String  sendMsg = "hello.qy178!!!!"+i;
            // 参数1,交换机名称   参数2  routingKey  参数3  发送字符串对象
            rabbitTemplate.convertAndSend(RabbitmqDeadLetterQueueConfig.BUSINESS_EXCHANGE_NAME,"deadLetter.info.test",sendMsg);
        }
         // 打印提示
        System.out.println("消息发送完毕!");
    }
}

消费者
消费者不要运行
延迟队列
概念
延迟队列(Delay Queue),生成者生产消息后,不让消费者立马消费,而是过一段时间再让消费者消费,这种机制就是延迟队列。实现方式可以借助死信队列实现(本节就是这么做的),也可以通过插件实现(自己找帖子看下)。
应用场景
电商项目中最常见的订单超时功能

会议提醒功能
网站注册后,提醒登录
等等
示意图

代码实现
生产者

把死信队列代码中的businessExchange和businessQueue改为delayExchage和DelayQueue并设置DelayQueue的存储消息的过期时间

消费者
监控死信队列即可

集群

标签:队列,springframework,rabbitmq,org,import,com,public
From: https://www.cnblogs.com/xiaomubupi/p/18643862

相关文章

  • 7、RabbitMQ队列之远程调用(RPC)【RabbitMQ官方教程】
    在第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?好吧,那是另一回事。这种模式通常被称为远程过程调用或RPC。在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务......
  • 6、RabbitMQ队列之主题【RabbitMQ官方教程】
    在前面的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的扇出交换机,而是使用了直接交换机,从而有可能选择性地接收日志。虽然使用直接交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要......
  • 5、RabbitMQ队列之路由【RabbitMQ官方教程】
    在前面的教程中,我们构建了一个简单的日志系统。我们能够向许多接收器广播日志消息。在本教程中,我们将为其添加一个功能——我们将使仅订阅消息的一个子集成为可能。例如,我们将能够仅将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 绑定......
  • 4、RabbitMQ队列之发布/订阅模式【RabbitMQ官方教程】
    在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只传递给一个工作者。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一个信息。这种模式被称为“发布/订阅”。为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成——......
  • rabbitmq学习笔记
    RabbitMQ简介简介2006年,AMQP规范发布。2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0发布AMQP​AMQP,即AdvancedMessageQueuingProtocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议......
  • rabbitmq-1
    1.消息队列点到点模式发布订阅模式2.rabbitMQ简介3.AMQP协议4.工作原理图及核心概念原理图核心概念Broker:接收和分发消息的应用,RabbitMQServer就是MessageBrokerVirtualhost:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中......
  • 3、RabbitMQ队列之工作队列【RabbitMQ官方教程】
    工作队列使用 php-amqplib 在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并必须等待其完成。相反,我们把任务安排在以后......
  • 2、RabbitMQ队列之HelloWorld【RabbitMQ官方教程】
    简介1.本教程假设RabbitMQ已安装并在本地主机的标准端口(5672)上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。2.如果你在学习本教程时遇到困难,可以通过 GitHubDiscussions或者RabbitMQcommunityDiscord与我们联系RabbitMQ是一个消息代理:它接受和转发消息。......
  • 1、【RabbitMQ官方教程】简介
    简介RabbitMQ是一个开源的消息代理软件(也被称为消息队列),它实现了高级消息队列协议(AMQP)。本教程旨在帮助开发者通过RabbitMQ创建消息应用的基础知识。教程分为两部分:RabbitMQ队列和RabbitMQ流 RabbitMQ队列这部分教程涵盖了默认的RabbitMQ协议AMQP0-9-1。包括以下......
  • 消息中间件——rabbitmq,kafka,rocketmq
    目录mqmq解决什么问题rabbitmq工作原理消息路由如何保证消息不丢失实现高可用kafka能支持这么大吞吐量的原因如何保证消息不丢失避免重复消费如何保证消息顺序消费数据存储原理IRSleader选举rocketmq为什么不使用zookeeper分布式事务mqmessageQueue,消息......