首页 > 其他分享 >RabbitMQ使用说明

RabbitMQ使用说明

时间:2024-09-04 22:26:02浏览次数:16  
标签:队列 RabbitMQ 说明 消息 使用 import message 路由

一、前言

本文基于作者对RabbitMQ使用的经验积累进行阶段性总结,希望没有使用经验的开发人员,看完本文可以直接上手。

1、RabbitMQ核心概念

Server:又称Broker,rabbitmq-server,一般指服务器运行的服务。

Connection:是客户端与 RabbitMQ 服务器之间的通信通道,用于发送和接收消息。

Channel:是一个虚拟连接,用于在单一的 RabbitMQ Connection 上管理消息的发布和消费。

Message:服务与应用程序之间传递的消息体,包含配置和消息主体,配置可以对消息进行修饰,比如消息的优先级,延迟等高级特性,主体就是消息体的内容,一般是json格式。

Virtual Host:是一个用于逻辑分组和资源分离的概念,‌它允许在单个RabbitMQ服务器上创建多个逻辑上相互隔离的消息代理。‌每个Virtual Host本质上是一个mini版的RabbitMQ服务器,‌拥有自己的Queue、‌Exchange、‌绑定和权限控制。‌通过Virtual Host,‌不同的应用或团队可以独立地管理自己的消息队列,‌避免了不同应用之间的冲突和干扰。‌用户连接到RabbitMQ时,‌需要指定要连接的Virtual Host名称,‌并且只能在指定的Virtual Host中进行操作。‌

Exchange:是消息的中转站,‌用于接收生产者发送的消息,‌并根据特定的路由规则将消息路由到一个或多个队列。‌RabbitMQ提供了多种类型的Exchange,‌包括‌Direct、‌Fanout、‌Topic和Headers‌等。‌

Bindings:是交换机和队列之间的连接规则‌。‌Bindings由交换机名称、‌队列名称和绑定键组成,‌它决定了消息如何从交换机路由到队列。‌当消息的Routing Key与绑定键匹配时,‌交换机会将消息发送到与之绑定的队列中。‌

Routing key:生产者发送消息时指定Routing Key,‌Exchange根据此Key将消息路由到一个或多个队列,‌队列与Exchange的绑定关系通过Binding Key确定。‌

Queue:队列也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

2、RabbitMQ交换机类型简述

直连交换机(‌Direct Exchange)‌‌:‌根据消息的路由键(‌Routing Key)‌完全匹配的方式,‌将消息路由到指定的队列。‌适用于一对一的消息传递场景。‌

扇形交换机(‌Fanout Exchange)‌‌:‌将消息广播到所有绑定到该交换机的队列,‌忽略路由键。‌适用于需要广播消息的场景。‌

主题交换机(‌Topic Exchange)‌‌:‌根据消息的路由键和通配符模式进行匹配,‌将消息路由到一个或多个队列。‌符号“#”匹配一个或多个词,符号“*”只能匹配一个词。适用于主题订阅模型,‌如邮件分类、‌日志级别过滤等。‌

头部交换机(‌Headers Exchange)‌‌:‌不依赖路由键,‌而是根据消息的头部信息(‌Headers)‌进行匹配,‌在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。适用于需要根据消息的特定属性进行路由的场景。‌

二、SpringCloud stream 整合RabbitMQ

mq通过binder与应用联系,应用通过SpringCloud stream 的output(相当于生产者,生产消息)和input(相当于消费者,从队列中接收消息进行消费)通道与外界联系,消费者通@StreamListener 接听生产者的消息

配置文件 bootstrap.yml

server:
  port : 9015

spring:
  application:
    name: clan-communication-service #注册到eureka的服务名称
  profiles:
    active: dev #指定配置文件-后缀
  cloud:
    config:
      discovery:
        enabled: true
        service-id: clan-config

    #rabbitmq配置
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment: #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: 127.0.0.1
                username: guest
                password: guest
                virtual-host: /
bindings:
  message-center-out:
    destination: message-center   #exchange名称,交换模式默认是topic
    content-type: application/json #数据类型
  message-center-input:
    destination: message-center
    content-type: application/json

eureka:
  client:
    register-with-eureka: true   # 指定当前主机是否向Eureka服务器进行注册
    fetch-registry: true    # 指定当前主机是否要从Eurka服务器下载服务注册列表
    service-url:
      defaultZone: http://127.0.0.1:9000/eureka/

#日志设置
logging:
  level:
    com:
      clan:
        clan_base: debug

#关闭默认的feign熔断
feign:
  hystrix:
    enabled: false

hystrix:
  command:
    default:
      execution:
        isolation:
          thread:
            #设置熔断触发的超时时间(毫秒)
            timeoutInMilliseconds: 5000

#rabbon设置
#ribbon:
#  ReadTimeout: 60000
#  ConnectTimeout: 60000
#  maxAutoRetries: 0

消息生产者:OutputMessageBinding.java

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;


public interface OutputMessageBinding {
    /** Topic 名称*/
    String OUTPUT = "message-center-out";

    @Output(OUTPUT)
    MessageChannel output();
}

OutputMessage .java

import com.clan.clan_communication.req.AccountReq;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


@Component
@EnableBinding(OutputMessageBinding.class)
public class OutputMessage {

    @Resource
    private OutputMessageBinding outputMessageBinding;

    public void sendMessage(AccountReq accountReq){
        System.out.println("推送消息:" + accountReq.toString());
        outputMessageBinding.output().send(MessageBuilder.withPayload(accountReq).build());
    }
}

消息消费者InputMessageBinding.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;


public interface InputMessageBinding {

    String INPUT = "message-center-input";

    @Input(INPUT)
    SubscribableChannel input();
}

CollectionReceiver.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;


@Slf4j
@EnableBinding(InputMessageBinding.class)
public class CollectionReceiver {

    @StreamListener(InputMessageBinding.INPUT)
    public void handle(String value){
        log.info("[消息] 接收到发送消息MQ: {}", value);
    }
}

注意点:

spring.cloud.stream.bindings.message-center-input.destination 中的 message-center-input 和 InputMessageBinding.INPUT 一致

  • spring.cloud.stream.bindings.message-center-input.destination=message-center
  • spring.cloud.stream.bindings.message-center-out.destination=message-center输入流和输出流的destination值要一致

三、使用场景及问题

1、RabbitMQ的作用

解耦:A系统把数据发送到BCD系统,如果通过接口调用,那么得在A系统去写调用各个系统的代码,如果再加一个E系统,又得改A系统代码,造成耦合,此时可以用mq,A系统产生一条消息,发送到mq,其他系统需要到消息就自己去mq里面消费

异步:A系统接收请求,需要在本地写库,还要在BCD三个系统写库,如果一个一个系统写耗时比较久才能得到回应,如果使用mq,A系统连续发送3条消息到mq,然后异步写入BCD库,此时A从接收请求到返回响应给用户,用时比较短。

削峰:减轻高并发下服务器的压力

2、如何保证消息的可靠性传输

消息丢失,如何处理?

丢失情况一:生产者将消息发送到rabbitmq,半路丢失

解决方案:

(1)开启rabbitmq事务,生产者收到异常报错,进行回滚重试,(同步的)

(2)开启confirm模式,mq接收到消息会反馈一个ok的消息,mq没能处理这个消息,就反馈一个失败的标志消息,(异步的)

丢失情况二:mq中丢失,mq弄丢了数据

解决方案:

开启RabbitMQ的持久化,消息写入之后持久化到磁盘

3、如何保证消息不被重复消费

(1)插入数据库的数据加个唯一键,这样可以保证不会出现脏数据

(2)在redis中存个id,每次操作数据之前查下id是否存在,存在的话就不做处理,不存在就操作数据库

4、消息队列有什么缺点

(1)系统可用性降低,引用外部依赖越多,容易挂掉,RabbitMQ挂,整个系统崩溃

(2)系统复杂度提高,消息有可能被重复消费,也有可丢失

(3)数据一致性问题,A处理成功了返回,BC成功,D失败,数据不一致

四、RabbitMQ监控

使用监控工具进行监控(如Prometheus或Zabbix)对服务器资源进行监控

  1. CPU状态(user、system、iowait&idle percentages
  2. 内存使用率(used、buffered、cached & free percentages)
  3. 虚拟内存统计信息(dirty page flushes, writeback volume)
  4. 磁盘I/O
  5. 装载上用于节点数据目录的可用磁盘空间
  6. beam.smp使用的文件描述符与最大系统限制
  7. 按状态列出的TCP连接(ESTABLISHED,CLOSE_WAIT,TIME_WATT)
  8. 网络吞吐量(bytes received,bytes sent) & 最大网络吞吐量
  9. 网络延迟(集群中所有RabbitMQ节点之间以及客户端之间)

标签:队列,RabbitMQ,说明,消息,使用,import,message,路由
From: https://blog.csdn.net/ruanxinyan12345/article/details/141902757

相关文章

  • ansible使用docker模块构建镜像
    目录Docker的Ansible自动化应用解决国内无法下载镜像问题Dockerfile文件构建镜像运行容器使用Ansible创建和管理容器Docker的Ansible自动化应用解决国内无法下载镜像问题国内的服务器无法正常下载仓库的镜像了,只能到其他地区下载,再推送到阿里的镜像,再下载原来阿里的镜像构......
  • pandas数据处理库使用
    1、pandas简介Pandas(PanelData的缩写)是一个开源的Python数据处理库,它提供了高性能、易用的数据结构和数据分析工具,用于处理和分析结构化数据。Pandas的核心数据结构是DataFrame和Series,它们使数据的清理、转换、分析和可视化变得非常便捷。2、Series使用2.1、Series是一种类似一......
  • 使用AI写WebSocket知识是一种怎么样的体验?
    一、WebSocket基础知识1.WebSocket概念1.1为什么会出现WebSocket一般的Http请求我们只有主动去请求接口,才能获取到服务器的数据。例如前后端分离的开发场景,自嘲为切图仔的前端大佬找你要一个配置信息的接口,我们后端开发三下两下开发出一个RESTful架构风格的API接口,只有当......
  • Windows 10/11下使用tar进行打包/解压
    目录前言tar的原理tar基本用法使用示例前言tar是一个在Unix、Linux、macOS等操作系统上常用的文件压缩和归档工具,它可以将多个文件或目录打包成一个文件,并进行压缩,以便于传输和存储。但Windows10从17063版本开始,默认自带tar工具。tar的原理tar的原理:将多个文件或目......
  • 服务器运维-sudo权限控制的sudoers配置文件详细说明以及利用sudo对用户账号分组权限控
    一、服务器运维-sudo权限控制的sudoers配置文件详细说明1.sudo权限控制的sudoers配置文件详细说明:[root@test~]#cat/etc/sudoers##Sudoersallowsparticularuserstorunvariouscommandsastherootuser,withoutneedingtherootpassword.##该文件允许特定......
  • Android 使用拦截器结合协程实现无感知的 Token 预刷新方案
    背景在应用中,我们通常使用Token作为用户认证的凭证。为了安全起见,Token一般设置较短的有效期,并通过refreshToken进行续期。传统的做法是当服务端返回Token过期的响应(如401)时,再进行刷新,但这种方式可能导致用户体验不佳(如突然的登录状态丢失、请求失败等)。网上关于A......
  • 【python因果推断库6】使用 pymc 模型的工具变量建模 (IV)1
    目录使用pymc模型的工具变量建模(IV)使用pymc模型的工具变量建模(IV)这份笔记展示了一个使用工具变量模型(InstrumentalVariable,IV)的例子。我们将会遵循Acemoglu,Johnson和Robinson(2001)的一个案例研究,该研究尝试解开强大的政治机构对于以国内生产总值(GDP)......
  • 【Azure Policy】使用deployIfNotExists 把 Azure Activity logs 导出保存在Storage A
    问题描述使用AzurePolicy,对订阅下的全部ActivityLog配置DiagnosticSetting,要求:在Subscription或ManagementGroup级别,针对未启用ActivityLog功能的订阅,启用ActivityLog功能;对已经启用了Activitylog功能的订阅,使用该Policy纠正并统一其参数配置;所收集到的AzureActivityLog存......
  • 这些免费开源字体你知道吗?(附说明链接及字体展示)
    汉字字体制作是一个庞大的工程,不同于西文字库,汉字常用字库表就有6763个汉字,GBK标准中共有20902个汉字,而新出版的GB_18064,共有六万多个字符。而且汉字的字形相对较为复杂,一套中文字体的完成需要耗费大量专业人士的精力和时间,倡导大家使用正版字体,为中文字体的制作创造一个良......