首页 > 其他分享 >notes-rabbitmq

notes-rabbitmq

时间:2024-09-07 20:04:01浏览次数:10  
标签:队列 notes rabbitmq Bean 交换机 消息 public

1 什么是消息队列

消息队列能够将发送方发送的信息放入队列中,当新的消息入队时,会通知接收方进行处理。一般消息发送方称为生产者,接收方称为消费者。

1.1 几种具体实现:

  • RabbitMQ - 性能很强,吞吐量很高,支持多种协议,集群化,消息的可靠执行特性等优势,很适合企业的开发。
  • Kafka - 提供了超高的吞吐量,ms级别的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。
  • RocketMQ - 阿里巴巴推出的消息队列,经历过双十一的考验,单机吞吐量高,消息的高可靠性,扩展性强,支持事务等,但是功能不够完整,语言支持性较差。

1.2 rabbitmq

官网 -> https://www.rabbitmq.com

特点:

  • RabbitMQ轻量级,易于在本地和云端部署,它支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模、高可用性要求。

  • RabbitMQ在许多操作系统和云环境中运行,并为大多数流行语言提供了广泛的开发者工具

1.3 start rabbitmq with Ubuntu

# 下载rabbitmq服务端应用 事实上需要先下载erlang环境 但是以下命令会自动一起下载了
sudo apt install rabbitmq-server

# 查看服务状态 注意此处如果出错 可能是因为服务未启动 可以通过rabbitmqctl start先启动再查看状态
sudo rabbitmqctl status

重点查看监听的端口:

连接服务使用amqp协议的那个端口5672来进行连接,25672是集群化端口

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

将RabbitMQ的管理面板开启,这样话就可以在浏览器上进行实时访问和监控了:

sudo rabbitmq-plugins enable rabbitmq_management

再次查看状态,可以看到多了一个管理面板,使用的是HTTP协议:

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API

浏览器访问ip:15672登录web控制面板查看

用户默认提供了一个username=guest&password=guest,也可以创建

# 添加用户
sudo rabbitmqctl add_user your_username your_password

# 设置用户管理员权限
sudo rabbitmqctl set_user_tags your_username administrator

2 rabbitmq的结构

rabbitmq的总体设计架构由生产者+server+消费者构成

  • 生产者与消费者都可以与server建立connection来进行消息的发布和收取
  • server由若干virtual host组成,每一个virtua host又是由exchange交换机和queue队列组成
  • 一个完整的流程是 生产者通过指定virtual host与server建立起connection,然后发布消息到指定的交换机,通过交换机来路由消息到对应的队列,最后消息再通过与其所在队列建立有连接的消费者弹出
  • 具体的消息路由方式取决于使用的交换机的类型

3 几种交换机

direct直连交换机

此类型的交换机会通过消息的routingKey决定将消息路由到对应的同名队列,持久化的(自动生成的其实都是Durable)

  • AMQP default
    • 自带默认交换机,不可删除
    • 不能显示的进行解绑或绑定
  • amq.direct
    • 需要进行绑定才能路由消息到队列

fanout扇出类型

  • amq.fanout
    • 广播类型,消息会被广播到所有与此交换机绑定的消息队列中

topic主题类型

  • amq.topic
    • 根据routingKey进行模糊匹配
      • * - 表示任意的一个单词
      • # - 表示0个或多个单词
  • amq.rabbitmq.trace
    • 帮助追踪记录生产者和消费者使用消息队列的交换机
    • sudo rabbitmqctl trace_on -p /test启动
    • 定义两个routingKey分别为‘publish.#’‘deliver.#’的队列用于和此交换机绑定

header标题类型

  • amq.headers

    • 发送的消息中是可以携带一些头部信息

    • 根据这些头部信息来决定路由到哪一个消息队列中

    •  	@Bean("binding")
          public Binding binding2(@Qualifier("headerExchange") HeadersExchange exchange, //这里和上面一样的类型
                                 @Qualifier("yydsQueue") Queue queue){
              return BindingBuilder
                      .bind(queue)
                      .to(exchange)   //使用HeadersExchange的to方法,可以进行进一步配置
                		//.whereAny("header1", "header2").exist();   这个是只要存在任意一个指定的头部Key就行
                      //.whereAll("header1", "header2").exist();   这个是必须存在所有指定的的头部Key
                      .where("test").matches("hello");   //比如我们现在需要消息的头部信息中包含test,并且值为hello才能转发给我们的消息队列
            			//.whereAny(Collections.singletonMap("test", "hello")).match();  传入Map也行,批量指定键值对
          }
      
      

4 应答模式

  • Nack message requeue true:拒绝消息,也就是说不会将消息从消息队列取出,并且重新排队,一次可以拒绝多个消息。
  • Ack message requeue false:确认应答,确认后消息会从消息队列中移除,一次可以确认多个消息。
  • Reject message requeue true/false:也是拒绝此消息,但是可以指定是否重新排队。

5 死信队列

通过直连交换机的性质可以定义死信交换机和死信队列

死信:消息队列中的数据,如果迟迟没有消费者来处理,那么就会一直占用消息队列的空间。

以下类型的消息都会被判定为死信:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

定义队列的时候可以指定对应的死信交换机和死信routingKey

...

    @Bean("yydsQueue")
    public Queue queue(){
        return QueueBuilder
                .nonDurable("yyds")
                .deadLetterExchange("dlx.direct")   //指定死信交换机
                .deadLetterRoutingKey("dl-yyds")   //指定死信RoutingKey
                .build();
    }
  
...

6 Java SpringBoot中集成

5.1 引入客户端依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.2 配置

  1. 配置服务器信息
spring:
  rabbitmq:
    addresses: 192.168.0.4
    username: admin
    password: admin
    virtual-host: /test
  1. 配置相关对象的bean
@Configuration
public class RabbitConfiguration {
    @Bean("directExchange")  //定义交换机Bean,可以很多个 这里指定名字主要是方便后续通过Qualifier区分各交换机
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("amq.direct").build();
    }

    @Bean("yydsQueue")     //定义消息队列
    public Queue queue(){
        return QueueBuilder
          				.nonDurable("yyds")   //非持久化类型
          				.build();
    }

    @Bean("binding")
    public Binding binding(@Qualifier("directExchange") Exchange exchange,
                           @Qualifier("yydsQueue") Queue queue){
      	//将我们刚刚定义的交换机和队列进行绑定
        return BindingBuilder
                .bind(queue)   //绑定队列
                .to(exchange)  //到交换机
                .with("my-yyds")   //使用自定义的routingKey
                .noargs();
    }
}

5.3 使用

  1. 生产者发送消息
@SpringBootTest
class SpringCloudMqApplicationTests {

  	//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
    @Resource
    RabbitTemplate template;

    @Test
    void publisher() {
      	//使用convertAndSend方法一步到位,参数基本和之前是一样的
      	//最后一个消息本体可以是Object类型,真是大大的方便
        String msg = template.convertAndSend("amq.direct", "my-yyds", "Hello World!");
    	System.out.pintln(msg);
    }

}
  1. 消费者接收消息
@Component  //注册为Bean
public class TestListener {

    @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
    public Object test(Message message){
        System.out.println(new String(message.getBody()));
        return "消费者已收到";
    }
}
  1. 封装消息对象为json

实体类

@Data
public class User {
    private int id;
    private String name;
}

配置转换类bean

@Configuration
public class RabbitConfiguration {
  	...

    @Bean("jacksonConverter")   //直接创建一个用于JSON转换的Bean
    public Jackson2JsonMessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }
}

发送端

@Test
void publisher() {
    template.convertAndSend("amq.direct", "yyds", new User());
}

监听端

@Component
public class TestListener {

  	//指定messageConverter为我们刚刚创建的Bean名称
    @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
    public void receiver(User user){  //直接接收User类型
        System.out.println(user);
    }
}

标签:队列,notes,rabbitmq,Bean,交换机,消息,public
From: https://www.cnblogs.com/yuqiu2004/p/18402087

相关文章

  • 第17篇 RabbitMQ安装详细步骤
    一.RabbitMQ是什么?RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。​AMQP:AdvancedMessageQueue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。​RabbitMQ最......
  • RabbitMQ使用说明
    一、前言本文基于作者对RabbitMQ使用的经验积累进行阶段性总结,希望没有使用经验的开发人员,看完本文可以直接上手。1、RabbitMQ核心概念Server:又称Broker,rabbitmq-server,一般指服务器运行的服务。Connection:是客户端与RabbitMQ服务器之间的通信通道,用于发送和接收消息。C......
  • MCTS notes
    采样trajectory,从尾部到头考虑每个节点,重新计算探索它的奖励。如果是在一棵树上,我们可以在采样的时候考虑究竟是走谁。MCTS认为如果你对一个子树探索次数很多,就得给别人一些机会,即使这个子树的reward很高。我们用\(p_x\)表示\(x\)点的得分,具体式子感觉很奇怪,我不知道为什么......
  • 用 `paho-mqtt` 客户端连接 RabbitMQ 并发布和订阅消息
    在Python中,使用MQTT客户端连接RabbitMQ可以通过paho-mqtt库来实现。RabbitMQ必须启用MQTT插件,这样才能作为一个MQTTBroker工作。以下是一个完整的Python示例,用paho-mqtt客户端连接RabbitMQ并发布和订阅消息。步骤1:启用RabbitMQ的MQTT插件在R......
  • rabbitmq高可用集群搭建
    需求分析基本情况在进行RabbitMQ搭建时,我们基于现有的连接数据和业务需求进行了深入分析。目前的统计数据显示,连接数为631,队列数为80418。为了确保业务需求的顺利满足,我们需要在云产品和自建RabbitMQ消息队列服务之间做出选择。经过比较发现,即使选择腾讯云的最高规格配置,其Queue......
  • RabbitMQ 入门教程
    介绍RabbitMQ是一个消息中间件,它实现了AMQP(AdvancedMessageQueuingProtocol)协议。本教程将引导你通过几个简单的步骤来学习如何使用RabbitMQ发送和接收消息。环境准备1.安装RabbitMQ-在你的系统上安装RabbitMQ:https://www.rabbitmq.com/download.html-......
  • RabbitMQ 入门教程
    介绍RabbitMQ是一个开源的消息代理和队列服务器,实现了AMQP0-9-1标准。它能够接收、存储并转发消息数据。安装与配置1.安装RabbitMQForLinux(Ubuntu/Debian)```bashsudoaptupdatesudoaptinstallrabbitmq-server```FormacOS(Homebrew)```bashbrewins......
  • RabbitMQ 队列使用基础教程
    实践环境JDK1.8.0_121amqp-client5.16.0附:查看不同版本的amqp-client客户端支持的JavaJDK版本https://www.rabbitmq.com/client-libraries/java-versionsmavnsettings.xml<?xmlversion="1.0"encoding="UTF-8"?><settingsxsi:schemaLocation="h......
  • Java消息队列:RabbitMQ与Kafka的集成与应用
    Java消息队列:RabbitMQ与Kafka的集成与应用大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介......
  • 高效达人必备!Simple Sticky Notes让灵感与任务不再遗漏!
    前言阿尔伯特·爱因斯坦所言:“我们不能用制造问题时的同一水平思维来解决它。”这句话深刻地揭示了创新与突破的必要性。正是基于这样的理念,SimpleStickyNotes这款桌面便签软件以其独特的创新视角和实用性,在众多同类软件中脱颖而出。它源自于开发团队对于高效工作与便捷生......