首页 > 其他分享 >RabbitMQ 学习笔记

RabbitMQ 学习笔记

时间:2024-02-22 19:33:07浏览次数:28  
标签:System 笔记 学习 RabbitMQ println new message public channel

AMQP协议模型

image

  • Server:又称为Broker,接受客户端的链接,实现AMQP实体服务

  • Connection:连接,应用程序与Broker的网络连接

  • channel:网络信道,几乎所有的操作都在channel中进行,是消息读写的通道,可建立多个channel,每个channel代表一个会话任务

  • Message:消息本体,由Properties和Body组成,Proerties对消息进行装饰(优先级、延迟等特征),body是消息体内容

  • Virtual host:虚拟地址,进行逻辑各级,一个VirtualHost里面可以有若干个Exchange和Queue,同一个VirtualHost不能有相同的Exchange或Queue

  • Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列

  • Binding:Exchange和Queue之间的虚拟链接 binding中可以报刊rounting key

  • Rounting key :消息发送的路由规则

  • Queue: 消息队列,保存并转发给消费者

    AMQP 消息流转

image

image

命令行与管理台

rabbitmqctl stop_app   	关闭应用
rabbitmqctl start_app  	启动应用
rabbitmqctl status		节点状态
rabbitmqctl add_user username password	添加用户
rabbitmqctl list_users					打印用户列表
rabbitmqctl delete_user username 删除用户
rabbitmqctl clear_permissions -p vhostpath username  用户权限清除
rabbitmqctl add_vhost vhostpath 创建虚拟主机
rabbitmqctl list_vhosts 打印虚拟主机列表
rabbitmqctl list_queues 打印队列信息
rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息

Exchange 交换机

Exchange:接受消息,并根据路由键转发消息所绑定的队列

image

Exchange 交换机属性

  • Name:交换机名称
  • Type:类型
    • direct 直连,发送到direct Exchange的消息被转发到RouteKey指定的Queue exchange=doutingKey
    • Topic 发布\订阅,发送到Topic Exchange的消息会被转发到所有关心RouteKey中制定的Topic的Queue上
    • fanout、不处理路由键,简单的将队列板绑定到交换机上,发送到交换机上的消息都会被转发到该交换机绑定的队列上、fount是最快的
    • headers
  • Durability:是否需要持久化
  • Auto Delete:当最后一个绑定在Exchange上的队列删除后,自动删除Exchange
  • Interal:当前exchange是否用于Rabbitmq内部使用,默认为FALSE
  • Arguments:扩展参数,用户扩展AMQP协议参数

Binding-绑定

  • Exchange和Exchange、queue之间绑定

消息可靠性投递 100%

生产端-可靠性投递

解决方案

  • 消息落库、对消息状态达标
  • 消息延迟投递、做二次确认,回掉检查

消费端-幂等性保证

解决方案:

  • 数据库ID+指纹码
  • 利用Redis原子性实现

Confirm确认消息

消费者收到消息会给生产者一个应答

channel.addConfirmListener

Return 消息机制

RabbitMQ代码整合

Spring-AMQP

  • RabbitAdmin -- Exchange\Queue\Binding 之间关系声明

  • SpringAMQP 声明

  • RabbitTemplate

  • SimpleMessageListenerContainer

  • MessageListenerAdapter

  • MessageConverter

    RabbitAdmin

    package com.study.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitMQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("192.168.223.128:5672");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // setAutoStartup  必须设置为true 否则Spring不会加载RabbitAdmin
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
    
        /**
         * 针对消费者配置
         * 1. 设置交换机类型
         * 2. 将队列绑定到交换机
         FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
         HeadersExchange :通过添加属性key-value匹配
         DirectExchange:按照routingkey分发到指定队列
         TopicExchange:多关键字匹配
         */
        @Bean
        public TopicExchange exchange001() {
            return new TopicExchange("topic001", true, false);
        }
    
        @Bean
        public Queue queue001() {
            return new Queue("queue001", true); //队列持久
        }
    
        @Bean
        public Binding binding001() {
            return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
        }
    }
    
    

    RabbitTemplate

image

   @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
    
        @Test
    public void testSendMessage() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }

SimpleMessageListenerContainer

  • SimpleMessageListenerContainer可以进行动态监听消息,设置事务、事务管理、事务属性、设置消费者数量,进行批量消费,设置消息确认模式、自动确认,是否重回队列、异常捕获header函数,设置消费者标签生成策略,是否独占模式,消费者属性,设置具体的监听器、消息转换器
@Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 容器添加监听队列
        container.setQueues(queue001());
        // 设置监听数据量
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        // 是否重回队列 - FALSE
        container.setDefaultRequeueRejected(false);
        // 签收 - auto 自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //
        container.setExposeListenerChannel(true);
        // 设置客户端tag
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        // 消息监听处理
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println("---- 处理消费者"+new String(message.getBody()));
            }
        });
        return container;
    }

消息监听MessageListenerAdapter 以及 消息封装处理 MessageConverter

// 消息坚定
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage"); // 设置监听处理方法名
adapter.setMessageConverter(new TextMessageConverter()); // 设置消息封装类
container.setMessageListener(adapter);

// TextMessageConverter 
public class TextMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if(null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}
// MessageDelegate 
public class MessageDelegate {
    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }

    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    }

    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));
    }


    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
    }


    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() +
                ", name: " + order.getName() +
                ", content: "+ order.getContent());
    }

    public void consumeMessage(File file) {
        System.err.println("文件对象 方法, 消息内容:" + file.getName());
    }
}



SpringBoot整合RabbitMQ

// 生产者
@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };

    //回调函数: return返回
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };


    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }

    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Order order) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData("0987654321");
        rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
    }

}
// 消费者
@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1",
                    durable="true"),
            exchange = @Exchange(value = "exchange-1",
                    durable="true",
                    type= "topic",
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }


    /**
     *
     * @param order
     * @param channel
     * @param headers
     * @throws Exception
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable="${spring.rabbitmq.listener.order.exchange.durable}",
                    type= "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
# 配置文件
spring:
  rabbitmq:
    addresses: 192.168.11.76:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000

    publisher-confirms: true # 生产者exchange确认
    publisher-returns: true # 生产者queue确认
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
      order: # 自定义消费队列
        queue:
          name: queue-2
          durable: true
        exchange:
          name: exchange-2
          durable: true
          type: topic
          ignoreDeclarationExceptions: true
        key: springboot.*

image-20240221125440948

SpringCloudStream 整合Rabbitmq

/**
 * 生产者
 * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */
public interface Barista {
    String OUTPUT_CHANNEL = "output_channel";

    //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
    @Output(Barista.OUTPUT_CHANNEL)
    MessageChannel logoutput();

}


@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
    @Autowired
    private Barista barista;

    // 发送消息
    public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
        try{
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            boolean sendStatus = barista.logoutput().send(msg);
            System.err.println("--------------sending -------------------");
            System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
        }catch (Exception e){
            System.err.println("-------------error-------------");
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());

        }
        return null;
    }
}
/**
 * 消费者
 * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */
public interface ConsumerBarista {
    String INPUT_CHANNEL = "input_channel";

    //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
    @Input(ConsumerBarista.INPUT_CHANNEL)
    SubscribableChannel loginput();

}

@EnableBinding(ConsumerBarista.class)
@Service
public class RabbitmqReceiver {

    @StreamListener(ConsumerBarista.INPUT_CHANNEL)
    public void receiver(Message message) throws Exception {
        Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        System.out.println("Input Stream 1 接受数据:" + message);
        System.out.println("消费完毕------------");
        channel.basicAck(deliveryTag, false);
    }
}
# 服务配置
server:
  port: 8888
  context-path: /rabbitmq
# spring应用配置
spring:
  application:
    name: rabbitmq-cloud-stream
  # SpringCloud 服务配置
  cloud:
    stream:
      binders:
        output_channel:
          destination: exchange-3
          group: queue-3
          binder: rabbit_cluster
        input_channel:
          destination: exchange-3
          group: queue-3
          binder: rabbit_cluster
          consumer:
            concurrency: 1
      bindings:
        rabbit_cluster:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: 192.168.11.76:5672
                username: guest
                password: guest
                virtual-host: /
        input_channel:
          consumer:
            requeue-rejected: false
            acknowledge-mode: MANUAL
            recovery-interval: 3000
            durable-subscription: true
            max-concurrency: 5

标签:System,笔记,学习,RabbitMQ,println,new,message,public,channel
From: https://www.cnblogs.com/Tonyzczc/p/18028004

相关文章

  • 安卓家庭记账本开发笔记7(补2月3日)
    完成收支记录界面的逻辑编写代码如下:packagecom.example.myapplication1;importandroid.os.Bundle;importandroid.view.View;importandroidx.appcompat.app.AppCompatActivity;importandroidx.fragment.app.Fragment;importandroidx.viewpager2.widget.ViewPager2;import......
  • 安卓家庭记账本开发笔记6(补2月2日)
    完成自定义软键盘的绘制和逻辑编写在res文件夹中创建一个文件包命名为xml。在里面创建一个名为key的xml文件,在其中完成自定义软键盘的绘制代码如下:<?xmlversion="1.0"encoding="utf-8"?><Keyboardxmlns:android="http://schemas.android.com/apk/res/android"......
  • 安卓家庭记账本开发笔记5(补2月1日)
    完成自定义软键盘的编写以及软键盘上面的备注和时间在记录页面的代码底下加上下面的代码<android.inputmethodservice.KeyboardViewandroid:id="@+id/frag_record_keyboard"android:layout_width="match_parent"android:layout_height="wrap_content"......
  • 【图论】基环树 学习笔记
    基环树下面几个条件互相等价:一个图(连通块)是基环树联通块有n个点n条边图上存在且仅存在一个环,且环上每个节点是一颗子树的根。通常情况下树指的都是无向图,但是有向图也可以构成基环树。内向基环树:每个点都有一条出边。容易发现沿着这条边一定会走到环上“向内走”。外......
  • 学习如何在C#中轻松实现串口数据接收:清晰步骤与实例代码
     概述:以上C#示例演示了如何使用SerialPort类实现串口数据接收。通过设置串口属性、定义数据接收事件处理程序,你可以轻松地打开串口、监听数据,并在事件处理程序中对接收到的数据进行处理。这提供了一个基本框架,可根据实际需求进行定制。在C#中实现串口数据接收通常需要使用Sys......
  • Vue学习笔记11--事件
    示例一:<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>Vue事件的基本使用</tit......
  • 概率与期望学习笔记(copy)
    概率&期望样本空间、随机事件定义一个随机现象中可能发生的不能再细分的结果被称为样本点。所有样本点的集合称为样本空间,通常用\(\Omega\)来表示。一个随机事件是样本空间\(\Omega\)的子集,它由若干样本点构成,用大写字母\(A,B,C,\cdots\)表示。对于一个随机现......
  • Python笔记11——函数
    十一、函数函数的作用:提高模块化程度,提高代码重复利用率。11.1定义一个函数一般格式:def函数名(参数列表):函数体以def关键字开头,后接函数标识符名称和圆括号()。所需参数必须都在圆括号中声明。(默认参数值和参数名称是按函数声明中定义的顺序匹配起来的。)函数内容以......
  • Solana 开发学习之通过RPC与Solana交互
    Solana开发学习之通过RPC与Solana交互相关链接https://solana.com/docs/rpc/httphttps://www.jsonrpc.org/specificationhttps://www.json.org/json-en.htmlJSON-RPC2.0规范JSON-RPC是一种无状态、轻量级远程过程调用(RPC)协议。该规范主要定义了几种数据结构及其处......
  • 《程序是怎么跑起来的》第2章读书笔记
    在看完第1章计算机的构造和其原理之后,就来到了第2章与其数据有关的理解。那么电脑里的数据是怎样来处理的呢?从第2章的学习中我理解到计算机它只有零和一两种数字而其他的都是由这两种数字来组成的。也就是说外界通过01的组成多种多样的类型来表现出不同的东西,从而展示给观看者。它......