首页 > 其他分享 >RabbitMq

RabbitMq

时间:2023-05-30 17:25:52浏览次数:21  
标签:false 消费者 队列 RabbitMq 交换机 消息 模式

角色 

生产者

消息的创建者,负责创建和推送数据到消息服务器

消费者

消息的接收方,用于处理数据和确认消息

组件

交换机(exchange)  接受消息 ,分配消息   ,用于存储生产者的消息

路由键(RountingKey)  用于那生成这的数据分配到固定的交换机上面  BingKey把交换机的消息绑定到队列上

队列(Queue)与交换机相连,交换机传递消息的载体

绑定键(BingKey)交换机与队列的绑定Key

信道(Channel)推送消息使用的通道

队列模式

direct  单一   一对一

fanout 广播  一对多

topic 主体   可以一对一,也可以一对多

操作

接受消息的类上面添加注解 @RabbitListener(queues = "QUEUE_B")

rabbitTemplate.convertAndSend(exchange,routeKey,list');    指定交换机名称,绑定键,数据

@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "QUEUE_E", durable = "false", autoDelete = "true"),exchange = @Exchange(value = "EXCHANGE_E", type = ExchangeTypes.TOPIC),key = "EXCHANGE_E_TO_E"))                           指定交换机类型,队列名称,绑定键

  消息避免丢失

1、消息进行持久化存储

2、开启Ack确认模式

实施

 /**
*@description : 在 ackMode = "MANUAL" 后面添加 concurrency ="1" 表示这个消息固定为4个消费者
*@author : wangyuhua TODO 正常的业务逻辑里面的date是方法执行后的返回结果,data是消费者方法执行后的返回值,根据返回值判断
* todo 根据返回值判断是需要重新进入队列还是直接放弃
*@date : 2022/6/24 14:39
*@param : [data, deliveryTag, channel]
*@return : void
**/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "QUEUE_F", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "EXCHANGE_F", type = ExchangeTypes.TOPIC), key = "EXCHANGE_TO_F"), ackMode = "MANUAL")
public void consumerDoAck(List data, @Header( AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("消息传递到了这里,在这里模拟消费失败,进行重新消费,测试ack");
if (data.contains("999")) {
i+=1;
// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
// channel.basicAck(deliveryTag, false);
channel.basicAck(deliveryTag,false);
System.out.println(" 走到了第一个方法的选择里面 " );
} else if (i<3){
i+=1;
System.out.println("开始第"+i+"次消费" );
// 第三个参数true,表示这个消息会重新进入队列
channel.basicNack(deliveryTag, false, true);
}
else {
System.out.println("尝试次数超过三回,放弃该请求");
channel.basicNack(deliveryTag, false, false);
}
}

3、设置集群

4、消息补偿机制

优点

1、异步处理    并行方式,提高吞吐量

2、解耦

3、可以通过规定队列的长度,控制请求量,缓解高并发

4、日志

5、特殊的消息机制,点对点,广播模式等

解决消息消费顺序

创建一个一对一的队列即可

工作模式

三.RabbitMQ的六种工作模式
1.简单模式:只有一个队列一个消费者

简单模式下我们无需指定交换机,RabbitMQ会通过默认的default AMQP交换机将我们的消息投递到指定的队列,它是一种Direct类型的交换机,队列与它绑定时的binding key其实就是队列的名称

 

2.WorkQueues:工作队列模式,只有一个队列,有多个消费者

工作队列模式也是采用默认的default AMQP交换机,Queue中的消息会被平均分发给多个消费者处理。有两种消息分发方式:

轮询分发:一个消费者消费一条,按均分配,woek模式下默认是采用轮询分发方式。轮询分发就不写代码演示了,比较简单,比如生产费者发送了6条消息到队列中,如果有3个消费者同时监听着这一个队列,那么这3个消费者每人就会分得2条消息。下面主要介绍公平分发。

公平分发:根据消费者的消费能力进行公平分发,处理得快的分得多,处理的慢的分得少,能者多劳。

 

3.Publish/Subscribe:发布订阅模式,有N个队列,N个消费者,需要声明绑定交换机

 

RabbitMQ中交换机有三种:fanout、direct、topic

 

(1)fanout:扇出模式,消息广播到所有于交换机绑定的队列当中

 

(2)direct:路由模式,消息发布者指定一个key,消息路由到所有匹配key的队列当中

 

(3)topic:主题模式,与路由模式差不多,可以通过通配符进行模糊匹配

 

4.RPC模式:支持生产者和消费者不在同一个系统中,即允许远程调用的情况

RPC模式下通常消费者作为服务端,放置在远程的系统中,提供接口;生产者调用接口,并发送消息。
RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢。而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢。因此,不能滥用rpc模式。
在MessageProperties中有两个属性:
reply_to:用于定义回调队列的名字

correlation_id:用于关联rpc的消息请求发送与消息响应接收。

要实现RPC模式,生产者需要发送回调队列,工作流程:
1、生产者(Client)开始生产消息后,创建了匿名的、独占的回调队列。

2、生产者(Client)发送请求时,包含两个属性:reply_to,即回调队列;correlation_id,即本次请求的ID

3、请求(request )被发送到rpc_queue队列。

4、消费者(The RPC worker)在rpc_queue上等待请求,收到时,它处理消息并使用replyTo字段中的队列将结果发回客户机

5、生产者在回调队列上等待消息,当消息出现时,校验correlation_id,如果匹配请求中的值则向程序返回该响应数据。

 

 



 

标签:false,消费者,队列,RabbitMq,交换机,消息,模式
From: https://www.cnblogs.com/wyhqmm/p/17443612.html

相关文章

  • docker rabbitMQ 安装延时队列插件
    1下载插件到容器内在这个网站上找到插件的下载链接容器内wget或使用dockercp复制到容器内dockercp/rabbitmq_delayed_message_exchange-3.8.0.ezrabbit:/plugins2启用插件#进入容器启用插件dockerexec-itrabbit/bin/bashrabbitmq-pluginsenablera......
  • jmeter压测rabbitMQ
    一、安装RabbitMQ测试插件这个插件需要编译1.安装ant环境,配置环境变量下载地址:https://dlcdn.apache.org//ant/binaries/apache-ant-1.9.16-bin.zip下载解压即可用,记得配置下环境变量 Pathcmd直接运行ant,如下表示配置ok2、AMQP源码下载并打包下载地址:https://github.com......
  • RabbitMQ 工作模式介绍
    RabbitMQ工作模式介绍1.HelloWorldRabbitMQ是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定信使最终会将邮件交付给您的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个信件载体。RabbitMQ和邮局之间的主要区别在于......
  • canal+rabbitmq: Could not convert incoming message with content-type [null]
    SpringBoot整合Canal+RabbitMQ实现监听MySQL数据库同步更新Redis缓存,编写RabbitMQ消费端监听同步缓存。接收消息是字符串返回的是字节数据,eg:-30,-128,-100,-25,-126,-71,-27,-81,-71,-25,-126,-71,-30,-128,-99使用了jackson的converter后,报了如下的异常:Causedby:......
  • RabbitMQ基础
    基本结构RabbitMQ中的一些角色:publisher:生产者consumer:消费者exchange:交换机,负责消息路由queue:队列,存储消息virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离 ......
  • Rabbitmq安装
    我们在Centos8虚拟机中使用Docker来安装。 下载镜像 方式一:在线拉取 方式二:从本地加载下载tar镜像包,上传到虚拟机到某个目录: dockerimages 使用命令加载镜像 dockerload-imq.tar 安装MQ执行下面的命令来运行MQ容器: 解释dockerrun:这是运行Docker......
  • RabbitMQ
    1、RabbitMQ有哪些重要的角色?客户端、RabbitMQ、服务端。2、有哪些重要的组件?(1)connectionFactory(连接管理器)应用程序与RabbitMQ之间建立连接的管理器。(2)Channel(信道)消息推送使用的信道。(3)RoutingKey(路由键)用于把生产者的数据分配到交换机上。(4)Exchange(交换机)用于接受和......
  • rabbitmq自动及手动ACK
      mq的ack  主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除。而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要......
  • rabbitMQ windows环境重装后报错RabbitMQ service is already present - only updatin
    错误如下:C:\Users\Administrator>rabbitmq-serviceinstallRabbitMQserviceisalreadypresent-onlyupdatingserviceparametersC:\ProgramFiles\erl\erts\bin\erlsrv:Warning,couldnotsetcorrectinteractivemode.Error:句柄无效。---此行有时显示中文乱码C:\Progr......
  • Linux 安装 RabbitMQ
    一、概要1.环境(1)RockyLinux9.1(2)RabbitMQ3.11.162.安装方式针对RHEL系统,RabbitMQ官方介绍了两种安装方式:(1)通过Yumrepositories安装,需要配置Yumrepositories文件并设置RabbitMQ镜像地址。这是官方强烈推荐的安装方式,也是本文选择的安装方式;(2)下载RPM包......