首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-12-13 11:24:16浏览次数:40  
标签:String 队列 RabbitMQ 消息 message public channel

简介

作用

  1. 流量消峰:相当于等待队列。
  2. 应用解耦:当子系统出现故障,该系统的要处理的信息被缓存在消息队列中,待修复完成后即可恢复。
  3. 异步处理。

四大核心概念

  1. 生产者:产生数据发送消息的程序。
  2. 交换机:一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。
  3. 队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。
  4. 消费者:消费者大多时候是一个等待接收消息的程序。

常用命令

命令 作用
/sbin/service rabbitmq-server start status stop 启动、查看状态、停止
chkconfig rabbitmq-server on 添加开机自启动
systemctl stop firewalld 暂时关闭防火墙
systemctl status firewalld 查看防火墙状态

实战

RabbitMQ在启动的时候可能会出现ativating (auto-restart)的状态,此时不要着急,等待一段时间后会成功启动。经常出现问题可以把开机自启动关掉,然后手动开启。

RabbitMQ的后台默认端口是15672,如果访问不了大概率是防火墙的问题。

IDEA中写代码连接RabbitMQ的后台的端口是5762。

在信道首次使用时,需要先启动生产者新建信道。

需要关闭防火墙,idle中的代码才能发送消息。

Hello World

生产者

public class Producer {
    //队列名称
    public static final String QUEUE_NAME = "hello";

    //发消息
    public static void main(String[] args) throws  Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP,连接RabbitMQ队列
        factory.setHost("xxx.xxx.xxx.xxx");
        //设置RabbitMQ的用户名,密码
        factory.setUsername("xxx");
        factory.setPassword("xxx");

        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息,需要将消息转换成二进制发送
        String message = "hello world";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");

    }
}

消费者

public class Consumer {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //接收消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //声明如何处理消息
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag ->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

消息应答

消息应答:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它己经处理了,rabbitmq可以把该消息删除了。

自动应答在接收时则进行消息应答,仅适用于消费者可以高效并以某种速率能够处理这些消息的时候使用。

手动应答的好处是可以批量应答并且减少网络拥堵,但一般不使用批量应答。

消息手动应答

DeliverCallback deliverCallback = (consumerTag,message) -> {
    SleepUtils.sleep(1);
    System.out.println("接收到的消息" + new String(message.getBody(),"UTF-8"));
    //手动应答
    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};

持久化

队列实现持久化只需要在声明队列的时候把durable参数设置为持久化。但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个特久化的队列,不然就会出现错误。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

不公平分发采取能者多劳的方式分发,只需在消费者端设置channel.basicQos(1);

生产者

public class Task2 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        boolean durable = true;//设置队列持久化
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            //设置生产者发送消息为持久化消息,要求保存在磁盘上
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生成者发出消息" + message);
        }
    }
}

发布确认

发布确认是队列将信息保存在磁盘后,发信息告诉生产者信息已经持久化。

发布确认:单个确认,批量确认和异步确认,常用的是异步确认。

处理异步未确认信息的最好解决方案是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

异步确认

public static void publishMessageAsync() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    channel.confirmSelect();//开启发布确认
    //记录信息是否确认的线程安全有序的哈希表
    ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

    //准备消息的监听器,监听信息成功与否
    ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
        if(multiple){
            //删除已经确认的消息
            //返回的是小于等于当前序列号的未确认消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed =
                    outstandingConfirms.headMap(deliveryTag);
            //清除该部分未确认消息
            confirmed.clear();
        }else {
            outstandingConfirms.remove(deliveryTag);
        }

    };
    ConfirmCallback nackCallback = (deliveryTag,multiple) ->{
        String message = outstandingConfirms.get(deliveryTag);
        System.out.println("未确认的消息" + deliveryTag + message);
    };
    channel.addConfirmListener(ackCallback,nackCallback);
    long begin = System.currentTimeMillis();
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "消息" + i;
        channel.basicPublish("", queueName, null, message.getBytes());
        //记录所有要发送的消息
        outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end-begin) + "ms");
}

交换机

交换机可以将消息传递给多个消费者,这种模式称为“发布/订阅”模式。

RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。生产者只能将消息发送到交换机。

交换机有:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)四种模式。

生产者关注交换机和路由键、消费者关心队列名、交换机、路由键的绑定。

发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd,nyse","nyse.vmw”。

扇出

生产者

public class EmitLog {

    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);

        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("生产消息" + message);
        }
    }
}

消费者

public class ReceiveLogs01 {

    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个交换机
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,使用默认的key值
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("01控制台打印接收到的消息"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

死信队列

死信的产生:消息被拒绝、信息TTL过期、队列达到最大长度。

修改队列信息后,需要将原本的队列删除后才能正常运行。

消费者

public class Consumer01 {

    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        Map<String,Object> arguments = new HashMap<>();
        //过期时间设置,正常队列设置死信交换机
//        arguments.put("x-message-ttl",10000);
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","lisi");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息");

        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println("consumer01接收的消息" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag ->{});
    }
}

生产者

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 0; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            System.out.println("dayin" + i);
        }
    }
}

延迟队列

延迟队列用于存放需要在指定时间被处理的元素的队列。

延迟队列本质上就是死信队列中的消息TTL过期。

Spring报错'BootFailed to start bean ‘documentationPluginsBootstrapper'

基于死信队列存在的问题:RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。解决方法是使用RabbitMQ插件rabbitmq_delayed_message_exchange来实现延迟队列。安装成功后会新增一个x-delayed-message类型的交换机,此时在交换机进行延迟,等TTL过期后再给队列。

原始方式

配置类代码

@Configuration
public class TtlQueueConfig {

    //普通交换机的名称
    public static final String X_EXCHANGE = "X";
    //死信交换机的名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列的名称
    public static final String QUEUE_A = "QA";

    //声明xExchange
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明队列
    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> arguments = new HashMap<>(3);
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    
    //绑定关系
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
}

生产者

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列"+message);
    }
}

消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

基于插件方式

配置类

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
    log.info("当前时间:{},发送一条时长为{}毫秒的信息给延迟队列:{}",new Date().toString(),delayTime,message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
            DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}

消费者

@Slf4j
@Component
public class DelayQueueConsumer {
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
    }
}

高级篇

发布确认

发布确认高级篇要解决的问题是当RabbitMQ出现问题时,交换机和队列都不存在的消息发送问题,我们可以使用缓存机制来解决改为问题。

生产者

回调接口中的参数CorrelationData是由生产者发送的时候写入的。

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,"key1",message,correlationData);
        log.info("发送消息内容为:{}",message);
    }
}

回调接口和消息回退接口

实现RabbitTemplate内部的ConfirmCallback接口,由于是内容接口,因此需要将该接口注入到RabbitTemplate。

开启回调接口需要在配置文件中设定spring.rabbitmq.publisher-confirm-type=correlated

回退消息发生在生产者已经成功将信息交给交换机,而交换机没有找到对应的routingkey的队列,默认情况下交换机会直接将消息丢弃。

开启回退消息,需要在配置文件中增加spring.rabbitmq.publisher-returns=true

#@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    //将实现的接口注入
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    //交换机确认回调方法
    @Override
    public void confirm(CorrelationData correlationData,boolean ack,String cause){
        String id = correlationData !=null ? correlationData.getId():"";
        if(ack){
            log.info("交换机已经收到 id 为:{}的消息",id);
        }else{
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
        }
    }


    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息已被回退");
    }
}

其他内容

MQ消费者的幂等性(重复提交问题)的解决一般使用全局ID或者唯一标识符来解决。另一种思路是使用Redis的原子性进行解决。

RabbitMQ可以开启优先级队列。

惰性队列会尽可能的将消息存储在磁盘中,一般用于消费者长时间不能消费的场景下。

标签:String,队列,RabbitMQ,消息,message,public,channel
From: https://www.cnblogs.com/xiqin-huang/p/17898662.html

相关文章

  • RabbitMQ-3.12:安装教程详解
    安装环境: centOS7操作系统1.1什么是MQMQ即消息队列(MessageQueue),是一种用于进行异步通信的技术。它允许应用程序异步地向队列中发送消息,而不需要立即等待接收方处理完毕。MQ将消息缓存在队列中,等待消费者进行处理1.2什么是RabbitMQRabbitMQ是一个开源的消息队列中间件,它实......
  • RabbitMQ学习笔记(一)
    安装1.下载erlang并安装,地址:http://erlang.org2.下载mq并安装,地址:http://www.rabbitmq.com/download.html3.安装完成后,管理后台地址:http://localhost:15672,初始账号和密码:guest/guest优缺点优点:解耦、削峰、数据分发缺点:系统可用性降低;系统引入的外部依赖越多,系统稳定性越......
  • AMQP协议中的,消息队列RabbitMQ,ActiveMQ,Apache Kafka区别是什么?
    都是基于AMQP协议来的一种实现方式。参考chatGPT4回答请使用Markdown表格来展示RabbitMQ、ActiveMQ和ApacheKafka之间的区别:维度RabbitMQActiveMQApacheKafka语言ErlangJavaScala/Java协议AMQP、STOMP、MQTTAMQP、STOMP、OpenWire自定义协议......
  • Erlang&Rabbitmq安装
    一.安装erlang1wgethttp://www.erlang.org/download/otp_src_19.3.tar.gz解压1tar-xvfotp_src_19.3.tar.gz进入文件夹1cdotp_src_19.3配置1./configure--prefix=/home/erlang--without-javac如果报错:1configure:error:Nocurseslibraryfunct......
  • Rabbitmq队列
    rabbitmq消息中间件-消息队列异步开发语言erlang爱立信公司1.安装pythonrabbitMQmodule 1pip3installpika关闭防火墙1serviceiptablesstop关闭防火墙2.实现最简单的队列通信send端:1#send端2importpika34credentials=pika.PlainCredent......
  • 十一、RabbitMQ集群
    一、clustering1、使用集群的原因2、搭建步骤2.1搭建架构图2.2操作步骤2.3实战部分操作演示二、镜像队列1、使用镜像的原因2、搭建步骤2.1操作步骤2.2实战步骤三、Haproxy+Keepalive实现高可用负载均衡1、整体架构图2、Haproxy实现负载均......
  • 十、RabbitMQ其他知识点
    一、幂等性1、概念2、消息重复消费3、解决思路4、消费端的幂等性保障5、唯一ID+指纹码机制Redis原子性(推荐)二、优先级队列1、使用场景2、如何添加3、实战4、测试结果三、惰性队列1、使用场景2、两种模式3、内存开销对比......
  • rabbitmq
    简介:RabbitMQ是一种流行的开源消息队列系统,使用Erlang语言编写,支持多种消息协议,例如AMQP、MQTT等。RabbitMQ提供了可靠的消息传递机制,可以将消息从一个应用程序传递到另一个应用程序。RabbitMQ的主要组件包括:Producer:生产者,用于生成消息并将其发送到RabbitMQ服务器上的Exchan......
  • RabbitMQ 延迟消息的实现——延迟消息插件
     步骤:1.把资料中的rabbitmq_delayed_message_exchange-3.9.0.ez 复制到docker的mq容器的插件目录2.执行命令 dockerexec-itmqrabbitmq-pluginsenablerabbitmq_delayed_message_exchange 在Java代码中配置延迟交换机:(图的左边是注解方式,右下角是@Bean的方式) 比......
  • RabbitMQ高可用集群的搭建部署(Centos7)
    高可用集群架构节点域名操作系统RabbitMQ版本Erlang版本iamdemo.tp-link.comCentos7.93.8.2823.3-2iamdemo2.tp-link.comCentos7.93.8.2823.3-2iamdemo3.tp-link.comCentos7.93.8.2823.3-2目前Centos7.9通过直接RPM包部署安装的版本最高支持到3.8.......