首页 > 其他分享 >简单入门 Rabbit MQ

简单入门 Rabbit MQ

时间:2023-10-04 16:55:43浏览次数:30  
标签:入门 队列 MQ Rabbit new null public channel String

RabbitMQ

1 安装

1.1 mac

当然是使用 mac 的神器 homebrew 咯。

#  切记先更新 brew
brew install rabbitmq
#  如果出现找不到的情况,需要重置国内源
export HOMEBREW_BOTTLE_DOMAIN=''

1.2 docker

拉取镜像

docker pull rabbitmq:management

创建并运行容器

docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

启动图形化界面

docker exec -it myrabbit rabbitmq-plugins enable rabbitmq_management

查看日志

docker logs -f myrabbit

2 使用

2.1 添加用户并授权

新增用户

rabbitmqctl add_user admin admin

设置用户分配权限

rabbitmqctl set_user_tags admin administrator

用户级别分为:

  • 1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
  • 2、monitoring 监控者 登录控制台,查看所有信息
  • 3、policymaker 策略制定者 登录控制台,指定策略
  • 4、managment 普通管理员 登录控制台

2.1.1角色分类:

1.none

不能访问management plugin

2.management

  • 查看自己相关节点信息

  • 列出自己可以通过AMQP登入的虚拟机

  • 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息

  • 查看和关闭自己的channels和connections

  • 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。

3.Policymaker

  • 包含management所有权限
  • 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。

4.Monitoring

  • 包含management所有权限
  • 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
  • 查看其他用户的connections和channels信息
  • 查看节点级别的数据如clustering和memory使用情况

查看所有的virtual hosts的全局统计信息。
5.Administrator

  • 最高权限
  • 可以创建和删除virtual hosts
  • 可以查看,创建和删除users
  • 查看创建permisssions
  • 关闭所有用户的connections

添加用户资源权限

#  对超级管理员意义不大
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

⚠️注意:guest用户只能在本机访问

3 案例-Simple

3.1 依赖

3.1.1 Java原生依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

3.1.2 Spring依赖

<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-amqp -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

3.1.3 Springboot 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.0.RELEASE</version>
</dependency>

3.1 实操

3.1.1 生产者

public class Producer {

    public static void main(String[] args) {
        // 配置工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        // 创建连接
        try(Connection connection = factory.newConnection("测试队列")){
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列 参数含义:队列名称、是否持久化、排他性、自动删除(最后一个消费者消费之后)、附加参数
            String queueName = "Queue1";
            channel.queueDeclare(queueName,false,false,false,null);
            // 准备消息
            String message = "你好服务器,我是沈自在,我最帅了";
            // 发送消息
            channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
            // 关闭一切
        }catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

3.1.2 消费者

public class Consumer {

    public static void main(String[] args) {
        // 配置工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        // 创建连接
        try(Connection connection = factory.newConnection("消费者")){
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列 参数含义:队列名称、是否持久化、排他性、自动删除(最后一个消费者消费之后)、附加参数
            channel.basicConsume("testQueue", new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("s: "+s);
                    System.out.println("收到的消息是:"+ new String(delivery.getBody(),"UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("消息接受失败");
                }
            });
            System.out.println("开始接受消息");
            System.in.read();
        }catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

4 AMQP协议

4.1 简介

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。

4.2 AMQP生产者流转过程

可以看出来啊,每次连接的建立都需要3次握手和4次挥手

4.3 AMQP消费者流转过程

4.4 为什么RabbitMQ是基于channel而不是连接的操作

我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。

一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID。

信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是多个 TCP 连接。

然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。

RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。

比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。

RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

5 RabbitMQ中的核心组成部分

5.1 核心概念

  • Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
  • Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
  • Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
  • Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
  • Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
  • Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.交换机和队列之间的绑定。
  • Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。消费者可能有多个,队列中的消息如果想要指定推送呢?就要通过这个key来查找投递给谁。
  • Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

5.2 RabbitMQ的整体架构

5.3 RabbitMQ的运行流程

6 完整创建

6.1 生产者

package com.echo.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        //所有的中间价技术都是基于tcp/ip协议的,并在此协议上构建的。rabbitmq遵循的是amqp协议。
        //所以既然如此,必然会有ip和port
        //1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.45.135");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try{
            //2.创建Connection,名字叫做Producer
            connection = connectionFactory.newConnection("Producer");
            //3.通过连接获取通道channel
            channel = connection.createChannel();
            //4.准备交换机名字
            String exchange = "direct-message-exchange";    //自己在代码中定义的
            //5.交换机类型
            String exchangeType = "direct";
            //6.在broker中声明交换机
            //第三个参数是交换机是否持久化,如果持久化 在broker关闭之后,该交换机也不会被移除
            channel.exchangeDeclare(exchange,exchangeType,true);
            //7.声明队列
            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);
            //8.绑定队列和交换机的关系,第三个参数是路由名字
            channel.queueBind("queue5",exchange,"order");
            channel.queueBind("queue6",exchange,"order");
            channel.queueBind("queue7",exchange,"course");

            //9.往队列里发送消息
            String messageOrder = "Hello direct All Order";
            String messageCourse = "Hello direct All Course";
            channel.basicPublish(exchange,"order",null,messageOrder.getBytes());
            channel.basicPublish(exchange,"course",null,messageCourse.getBytes());
            System.out.println("消息发送成功");
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            //7.关闭通道
            if (channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}


6.2 消费者

package com.echo.all;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    private static Runnable runnable = () -> {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.45.135");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //获取队列的名称
        final String queueName = Thread.currentThread().getName();
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("Consumer");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
            //channel.queueDeclare("queue1", false, false, false, null);
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println(queueName + ":开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    };
    public static void main(String[] args) {
        // 启动4个线程去执行,这4个queue是之前使用web界面操作时,定义好的。
        new Thread(runnable, "queue5").start();
        new Thread(runnable, "queue6").start();
        new Thread(runnable, "queue7").start();
    }
}


7 RabbirMQ-Springboot

7.1 简单的配置

@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange fanoutExchange(){
        return new DirectExchange("fanout-exchange-order",true,false);
    }

    @Bean
    public Queue emailQueue(){
        return new Queue("email-fanout-queue");
    }
    @Bean
    public Queue smsQueue(){
        return new Queue("sms-fanout-queue");
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin-fanout-queue");
    }

    @Bean
    public Binding emailBinding(){
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange()).with("email");
    }

    @Bean
    public Binding smsBinding(){
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange()).with("sms");
    }

    @Bean
    public Binding daunxinBinding(){
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()).with("duanxin");
    }


}

7.2 简单的用法

非常简单懒得粘贴了,直接用 rabbitTemplate 见名知意就好

8 TTL 过期时间

8.1 队列的TTL

    @Bean
    public Queue ttlQueue(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("ttl-direct-queue",true,false,false,args);
    }

8.2 消息的 TTL

public void makeOrderTTLMessage(String userId,String productId,Integer num){
        //1.根据商品ID查询库存是否充足
        //2.保存订单
        String orderId = UUID.randomUUID().toString().replace("-","");
        System.out.println("订单:" + orderId + "生成成功TTLMessage.....");
        //3.通过MQ来完成消息的分发
        String exchangeName = "ttl-direct-exchange";  //交换机名称
        String routeKey1 = "ttlMessage";
        /**
         * 队列与交换机的绑定等在消费方
         */
        //给消息设置过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置过期时间5秒
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };

        //参数说明 param1:交换机名称,param2:路由key/queue队列名称,param3:消息内容
        rabbitTemplate.convertAndSend(exchangeName,routeKey1,orderId,messagePostProcessor);
    }

9 死信队列

10 磁盘监控

11 集群

12 分布式事务

标签:入门,队列,MQ,Rabbit,new,null,public,channel,String
From: https://www.cnblogs.com/shenzizai/p/17742448.html

相关文章

  • TypeScript入门到精通——TypeScript类型系统基础——字面量类型
    字面量类型 TypeScript支持将字面量作为类型使用,我们称之为字面量类型。每一个字面量类型都只有一个可能的值,即字面量本身。1、boolean字面量类型 boolean字面量类型只有以下两种:true字面量类型false字面量类型 原始类型boolean等同于由true字面量类型......
  • 学习JVM---入门
    1.JVM体系结构JVM的位置JVM体系结构2.类加载器双亲委派机制packagejava.lang;/***测试自定义java.lang.String类能否运行成功*体会双亲委派机制**类加载器逐级向上检查:app->ext->boot*发现boot类加载器中也有String类,但是没有main方法,于是报错*app:应......
  • P6190 [NOI Online 1 入门组] 魔法
    P6190[NOIOnline1入门组]魔法该题中用到的矩阵加速Floyd可能存在负环,但是这个题是可以用的,所以不能每次跑完之后把各个节点到自己的距离更新为0!最外层循环才是中转站节点,不管什么时候都是这样的。特别是在矩阵乘法中,一般的矩阵相乘都是最内层循环遍历行和列,而矩阵加速......
  • 基本入门案例、视图类
    flask-restful中有两个基本的类,一个是Api、一个是ResourceApi(用于构建restful风格的主类,需要将flask实例app传递给其实例化。)Resource(视图类,类似django的View,理念上和django、drf都是类似的,以请求方式名作为每个视图函数入口),Resource视图类默认返回Content-type为application/j......
  • Python开发入门
    Lifeisshort,usePython. Life’spathetic,let’spythonic. 一、Python简介1.1Python语言起源Python的创始人是吉多·范罗苏姆(GuidovanRossum),1989年的圣诞节,吉多为了打发时间,决定开发一个新型的基于互联网社区的脚本解释程序,就这样Python就在键盘的敲击声中诞生了,Pyth......
  • RabbitMQ 消息发送和消费的可靠性保障
    在一些比较重要的场景中,我们必须要保障RabbitMQ消息的可靠性,也就是发送给rabbitmq的消息必须最终成功,消费者接收消息进行处理也必须最终成功。即使是中间失败了,也必须要有其它保障措施,哪怕最后进行人工进行干预处理。消息出现丢失的场景主要有:发送消息时丢失:比如消息发送到......
  • JAVA入门——方法引用
    把已经有的方法拿过来用,当作函数式接口中抽象方法的方法体引用出必须是函数式接口被引用的方法必须已经存在被引用方法的形参和返回值需要和抽象方法保持一致被引用方法的功能要满足当前要求::双冒号是方法引用符 方法引用的分类引用静态方法:格式类名::静态......
  • Python开发入门
    Lifeisshort,usePython. Life’spathetic,let’spythonic. 一、Python简介1.1Python语言起源Python的创始人是吉多·范罗苏姆(GuidovanRossum),1989年的圣诞节,吉多为了打发时间,决定开发一个新型的基于互联网社区的脚本解释程序,就这样Python就在键盘的敲击声中诞......
  • Python入门系列7-函数进阶
    一、函数参数和返回值的作用函数根据有没有参数以及有没有返回值,可以相互组合一共有4种组合方式:1.无参数,无返回值2.无参数,有返回值3.有参数,无返回值4.有参数,有返回值如果函数内部处理的数据不确定,就可以将外界的数据以参数传递到函数内部,如果希望一个函数执行完成后,向外界汇报执行......
  • Spring Boot 入门教程
    大家好,我是深码青年,作为一名迄今为止已经有四年码龄的人来说,springboot已经深入了自己的脑子里面,所以借此机会,我们来仔细说一说关于springboot2.0的那些事儿一、SpringBoot是什么以下截图自[SpringBoot官方文档](https://spring.io/projects/spring-boot"SpringBoot官方......