首页 > 其他分享 >rabbitmq高并发RPC调用,你Get到了吗?

rabbitmq高并发RPC调用,你Get到了吗?

时间:2022-12-07 20:03:01浏览次数:49  
标签:return String Get rabbitmq RPC template new public


微信公众号:​​跟着老万学java

今天给大家介绍下rabbitmq中很重要的一个功能,RPC调用。

RPC,即Remote Procedure Call的简称,也就是远程过程调用,是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。比如两台服务器上的A和B两个应用,需要进行服务接口的相互调用,我们就可以使用RPC实现。比如常见的Java RMI、WebService、Dubbo都可以
实现RPC调用。

rabbitmq实现的RPC调用主要是简单,不用管各种复杂的网络协议,客户端发送消息,消费者消费消息,反馈消息到回复队列Reply中,然后客户端获取反馈的结果。

一、原理

rabbitmq高并发RPC调用,你Get到了吗?_replyto

 

流程说明:
1、对于一个RPC请求,客户端发送一条消息,该消息具有两个重要属性:replyTo(设置为仅为该请求创建的匿名互斥队列,答复队列)和correlationId(设置为每个请求的唯一值)。

2、该请求被发送到rpc_queue队列。

3、RPC工作程序(消息消费者)会监听该队列的消息。监听到有新的消息后,会根据消息执行响应的逻辑,然后将结果返回到消息中携带的replyTo指定的答复队列中。

4、客户端(消息生产者)等待答复队列中的数据,出现出现后,它会检查correlationId属性是否一致,如果匹配,则将响应结果返回给应用程序。

二、rpc的三种调用方式

之后官网就针对使用Spring AMQP实现RPC调用给出了一个简单的 Tut6Server.java示例,但真心太简单,只能作为入门的参考demo。
之后分析通过查看rabbitTemplate.sendAndReceive()方法的源码,Spring AMQP支持3中RPC调用实现。
分别是:

1、doSendAndReceiveWithDirect 直接反馈
2、doSendAndReceiveWithFixed 使用固定队列答复
3、doSendAndReceiveWithTemporary 使用临时队列答复

根据源码,对着三种方式的排序不难看出,对三者的推荐顺序为:
doSendAndReceiveWithDirect 》 doSendAndReceiveWithFixed》doSendAndReceiveWithTemporary
直接反馈无疑是最快最资源消耗最少的,固定队列会声明指定的的队列用来接收答复,
而使用临时队列来接收答复是最消耗资源,性能也是最差的,因为队列的声明,建立,销毁会消耗大。

@Nullable
protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
if (!this.evaluatedFastReplyTo) {
synchronized(this) {
if (!this.evaluatedFastReplyTo) {
this.evaluateFastReplyTo();
}
}
}

if (this.usingFastReplyTo && this.useDirectReplyToContainer) {
return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData);
} else {
return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
}
}

三、代码实战

添加依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

生产者代码:

/**
* @program: rabbitmq
* @description: 交换器常量
* @author: laowan
* @create: 2019-06-13 17:36
**/
@Getter
public enum ExchangeEnum {

DIRECT_EXCHANGE("direct"),

FIXED_EXCHANGE("fixed"),

TMP_EXCHANGE("tmp");


private String value;

ExchangeEnum(String value) {
this.value = value;
}
}
/**
* @program: rabbitmq
* @description: 队列枚举
* @author: laowan
* @create: 2019-06-13 17:37
**/
@Getter
public enum QueueEnum {

//direct模式
DIRECT_REQUEST("direct.request", "direct"),

//固定队列应答模式
FIXED_REQUEST("fixed.request", "fixed"),
FIXED_RESPONSE("fixed.response", ""),

//临时模式 消息发送到的队列
TMP_REQUEST("tmp.request", "tmp")
;

/**
* 队列名称
*/
private String name;
/**
* 队列路由键
*/
private String routingKey;

QueueEnum(String name, String routingKey) {
this.name = name;
this.routingKey = routingKey;
}
}
/**
* @program: rpc-parent
* @description: direct rpc请求模式
* @author: laowan
* @create: 2020-04-09 18:05
**/
@Configuration
@Slf4j
public class DirectReplyConfig {
/**
* 注意bean的名称是由方法名决定的,所以不能重复
* @return
*/
@Bean
public Queue directRequest() {
return new Queue(QueueEnum.DIRECT_REQUEST.getName(), true);
}

@Bean
public DirectExchange directExchange() {
return new DirectExchange(ExchangeEnum.DIRECT_EXCHANGE.getValue());
}

@Bean
public Binding directBinding() {
return BindingBuilder.bind(directRequest()).to(directExchange()).with(QueueEnum.DIRECT_REQUEST.getRoutingKey());
}


/**
* 当进行多个主题队列消费时,最好对每个单独定义RabbitTemplate,以便将各自的参数分别控制
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate directRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());

//这一步非常关键
template.setUseTemporaryReplyQueues(false);
template.setReplyAddress("amq.rabbitmq.reply-to");
// template.expectedQueueNames();
template.setUserCorrelationId(true);

//设置请求超时时间为10s
template.setReplyTimeout(10000);
return template;
}

}

DirectProducer 生产者代码

@Component
@Slf4j
public class DirectProducer {
@Autowired
private RabbitTemplate directRabbitTemplate;

public String sendAndReceive(String request) throws TimeoutException {
log.info("请求报文:{}" , request);
//请求结果
String result = null;
//设置消息唯一id
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//直接发送message对象
MessageProperties messageProperties = new MessageProperties();
//过期时间10秒,也是为了减少消息挤压的可能
messageProperties.setExpiration("10000");
messageProperties.setCorrelationId(correlationId.getId());
Message message = new Message(request.getBytes(), messageProperties);

StopWatch stopWatch = new StopWatch();
stopWatch.start("direct模式下rpc请求耗时");
Message response = directRabbitTemplate.sendAndReceive(ExchangeEnum.DIRECT_EXCHANGE.getValue(), QueueEnum.DIRECT_REQUEST.getRoutingKey(), message, correlationId);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());

if (response != null) {
result = new String(response.getBody());
log.info("请求成功,返回的结果为:{}" , result);
}else{
log.error("请求超时");
//为了方便jmeter测试,这里抛出异常
throw new TimeoutException("请求超时");
}
return result;

}
}

四、Fixed reply-to模式

Fixed 配置类

/**
* @program: rpc-parent
* @description: Fixed rpc请求模式
* @author: wanli
* @create: 2020-04-09 18:05
**/
@Configuration
@Slf4j
public class FixedReplyConfig {
@Bean
public Queue fixedRequest() {
return new Queue(QueueEnum.FIXED_REQUEST.getName(), true);
}

@Bean
public DirectExchange fixedExchange() {
return new DirectExchange(ExchangeEnum.FIXED_EXCHANGE.getValue());
}

@Bean
public Binding fixedBinding() {
return BindingBuilder.bind(fixedRequest()).to(fixedExchange()).with(QueueEnum.FIXED_REQUEST.getRoutingKey());
}

/**
* 注意,固定模式指定的应答队列 exclusive排他属性设置为true,且能自动删除
* @return
*/
@Bean
public Queue fixedResponseQueue() {
return new Queue(QueueEnum.FIXED_RESPONSE.getName(),false,true,true,new HashMap<>());
}


@Bean
public RabbitTemplate fixedRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());

//设置固定的Reply 地址
template.setUseTemporaryReplyQueues(false);
template.setReplyAddress(QueueEnum.FIXED_RESPONSE.getName());
template.expectedQueueNames();
template.setUserCorrelationId(true);

//设置请求超时时间为10s
template.setReplyTimeout(10000);
return template;
}


@Bean
public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//这一步非常重要,固定队列模式要,一定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列
container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
container.setMessageListener(fixedRabbitTemplate(connectionFactory));
container.setConcurrentConsumers(100);
container.setConcurrentConsumers(100);
container.setPrefetchCount(250);
return container;
}

}

FixedProducer生产者

@Component
@Slf4j
public class FixedProducer {
@Autowired
private RabbitTemplate fixedRabbitTemplate;

public String sendAndReceive(String request) throws TimeoutException {
log.info("请求报文:{}" , request);
//请求结果
String result = null;
//设置消息唯一id
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//直接发送message对象
MessageProperties messageProperties = new MessageProperties();
//过期时间10秒
messageProperties.setExpiration("10000");
messageProperties.setCorrelationId(correlationId.getId());
Message message = new Message(request.getBytes(), messageProperties);

StopWatch stopWatch = new StopWatch();
stopWatch.start("fixed模式下rpc请求耗时");
Message response = fixedRabbitTemplate.sendAndReceive(ExchangeEnum.FIXED_EXCHANGE.getValue(), QueueEnum.FIXED_REQUEST.getRoutingKey(), message, correlationId);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());

if (response != null) {
result = new String(response.getBody());
log.info("请求成功,返回的结果为:{}" , result);
}else{
//为了方便jmeter测试,这里抛出异常
throw new TimeoutException("请求超时");
}

return result;
}
}

五、Temporary reply-to模式

/**
* @program: rpc-parent
* @description: Temporary应答模式
* @author: laowan
* @create: 2020-04-09 18:05
**/
@Configuration
@Slf4j
public class TmpReplyConfig {
@Bean
public Queue tmpRequest() {
return new Queue(QueueEnum.TMP_REQUEST.getName(), true);
}

@Bean
public DirectExchange tmpExchange() {
return new DirectExchange(ExchangeEnum.TMP_EXCHANGE.getValue());
}

@Bean
public Binding tmpBinding() {
return BindingBuilder.bind(tmpRequest()).to(tmpExchange()).with(QueueEnum.TMP_REQUEST.getRoutingKey());
}



@Bean
public RabbitTemplate tmpRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());

template.setUseTemporaryReplyQueues(true);
template.setUserCorrelationId(true);

//设置请求超时时间为10s
template.setReplyTimeout(10000);

return template;
}
}

TmpProducer生产者代码

@Component
@Slf4j
public class TmpProducer {
@Autowired
private RabbitTemplate tmpRabbitTemplate;

public String sendAndReceive(String request) throws TimeoutException {
log.info("请求报文:{}" , request);
//请求结果
String result = null;
//设置消息唯一id
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//直接发送message对象
MessageProperties messageProperties = new MessageProperties();
//过期时间10秒
messageProperties.setExpiration("10000");
messageProperties.setCorrelationId(correlationId.getId());
Message message = new Message(request.getBytes(), messageProperties);

StopWatch stopWatch = new StopWatch();
stopWatch.start("tmp模式下rpc请求耗时");
Message response = tmpRabbitTemplate.sendAndReceive(ExchangeEnum.TMP_EXCHANGE.getValue(), QueueEnum.TMP_REQUEST.getRoutingKey(), message, correlationId);

stopWatch.stop();
log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());

if (response != null) {
result = new String(response.getBody());
log.info("请求成功,返回的结果为:{}" , result);
}else{
log.error("请求超时");
//为了方便jmeter测试,这里抛出异常
throw new TimeoutException("请求超时");
}
return result;
}
}

生产者启动类:

@SpringBootApplication
@RestController
public class ProducerApplication {

@Autowired
DirectProducer directProducer;

@Autowired
FixedProducer fixedProducer;

@Autowired
TmpProducer tmpProducer;



public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}



@GetMapping("/direct")
public String direct(String message) throws Exception {
return directProducer.sendAndReceive(message);
}

@GetMapping("/fixed")
public String fixed(String message) throws Exception {
return fixedProducer.sendAndReceive(message);
}

@GetMapping("/tmp")
public String tmp(String message) throws Exception {
return tmpProducer.sendAndReceive(message);
}
}

消费者基本类似,就附上DirectConsumer类的代码:

/**
* @program: rabbitmq
* @description: direct消费者
* @author: wanli
* @create: 2019-06-13 18:01
**/
@Component
@RabbitListener(queues = "direct.request")
@Slf4j
public class DirectConsumer {

@RabbitHandler
public String onMessage(byte[] message,
@Headers Map<String, Object> headers,
Channel channel) {
StopWatch stopWatch = new StopWatch("调用计时");
stopWatch.start("rpc调用消费者耗时");
String request = new String(message);
String response = null;
log.info("接收到的消息为:" + request);

//模拟请求耗时3s
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
response= this.sayHello(request);
log.info("返回的消息为:" + response);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+stopWatch.getTotalTimeMillis()+"ms");
return response;
}

public String sayHello(String name){
return "hello " + name;
}


}

六、压测

通过对/direct,/fixed,/tmp三个接口使用JMeter压测,线程数1000,时间1s,
多次执行,比较发现:
direct和fixed的rpc方式调用的性能基本一致,差别不大,每分钟3500左右的并发
而tmp方式并发能力会弱会弱很多,大概3000并发左右。
并发请求时可以通过rabbitmq的管理界面明显看到tmp方式高并发时生成了非常多的临时队列。
性能:direct>=fixed>tmp,与之前根据源码和各自执行原理预期的执行性能基本一致

七、参数优化

生产者这边,在fix模式下,需要配置对应的SimpleMessageListenerContainer监听答复队列,可以适当增加消费者的并发数,并且提高每次抓取的消息数。
并且设置acknowledge-mode=auto自动ack。

@Bean
public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//这一步非常重要,固定队列模式要,一定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列
container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
container.setMessageListener(fixedRabbitTemplate(connectionFactory));
container.setConcurrentConsumers(100);
container.setConcurrentConsumers(100);
container.setPrefetchCount(250);
return container;
}

消费者这边,一定要注意设置消费者每次抓取的数量,如果每个消息消费比较耗时,一次抓取太多,就容易导致抓取的这一批消息被这个消费者串行消费的时候出现超时情况。这里我设置的是10,经过压测发现在高并发下,rpc响应出现延长,说明消费能力基本能满足。

#消费者的并发参数
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.concurrency=200
spring.rabbitmq.listener.simple.max-concurrency=500
#抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低
spring.rabbitmq.listener.simple.prefetch=10
#可以不需要反馈
spring.rabbitmq.listener.simple.acknowledge-mode=none

七、问题

这里要吐槽一下,关于rabbitmq的RPC调用,网上的资料真到太少了,踩了不少坑。
坑一:

CORRECTION: The RabbitTemplate does not currently support Direct reply-to for sendAndReceive() operations; you can, however, specify a fixed reply queue (with a reply-listener). Or you can use rabbitTemplate.execute() with a ChannelCallback to consume the reply from that "queue" (and publish).
I have created a JIRA issue if you wish to track it.
1.4.1 and above now supports direct reply-to.

百度上找的资料太少,之后在google上找到上面的说明,大意是RabbitTemplate在sendAndReceive操作时不支持Direct reply-to调用
解决:
作为老鸟一枚,这里我就和他杠上了,偏偏不信这个邪,RabbitTemplate源码中明明可以搜索到'amq.rabbitmq.reply-to'相关判断以及doSendAndReceiveWithDirect的定义,怎么可能不支持?

坑二:

Broker does not support fast replies via 'amq.rabbitmq.reply-to'

Broker指的是我们的rabbitmq的服务节点,不支持通过'amq.rabbitmq.reply-to'进行快速返回。

解决:
当前版本rabbitmq的Broker不支持通过'amq.rabbitmq.reply-to'进行快速返回,那么就升级broker的版本。
3.3.5版本不支持创建amq.rabbitmq.reply-to虚拟队列,那就升级到3.7.8版本。

坑三:

Caused by: java.lang.IllegalStateException: A listener container must not be provided when using direct reply-to

解决:
指定名为“amq.rabbitmq.reply-to”的反馈地址后,不能再调用expectedQueueNames方法

template.setUseTemporaryReplyQueues(false);
template.setReplyAddress("amq.rabbitmq.reply-to");
// template.expectedQueueNames();
template.setUserCorrelationId(true);

坑四:
压测过程中,并发一高,就容易出现rpc调用超时的问题。

解决:
增加消费者的并发数,减小消费者每次抓取的消息数。

总结

有些东西,百度不会告诉你,要看官网;
有些东西,官网不会告诉你,要看源码;
有些东西,源码不会告诉你,只能根据原理实践推敲;
最后,推敲不出来,可以找老万。

git源码地址:
​​​https://github.com/StarlightWANLI/rabbitmq-rpc.git​

更多精彩,关注我吧。

rabbitmq高并发RPC调用,你Get到了吗?_rpc_02

标签:return,String,Get,rabbitmq,RPC,template,new,public
From: https://blog.51cto.com/u_15905482/5920003

相关文章

  • Postman(一): postman介绍和安装,发送带参数的GET请求
    Postman(1):postman的介绍和安装Postman的介绍Postman是一款谷歌开发的接口测试工具,使API的调试与测试更加便捷。它提供功能强大的WebAPI&HTTP请求调试。它能够发......
  • RabbitMQ
    1.消息队列的应用场景1.1任务异步处理1.2应用程序解耦合2.RabbitMQ优势2.1基于AMQP协议VSJMS(java消息中间件的api,类似jdbc)2.2springboot集成3.RabbitMQ组成和工作原......
  • docker 部署 rabbitmq(持久化) 和postgresql redis mysql
    rabbitmq:dockerrun-d--hostname=rabbitmq--restart=always-eRABBITMQ_DEFAULT_USER=admin-eRABBITMQ_DEFAULT_PASS=admin--name=rabbitmq-p5672:5672-p15672......
  • RabbitMQ 实现延时队列
    1. 消息的TTL(TimeToLive)消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,......
  • VS200报错 缺少C:\WINDOWS\Microsoft.NET\Framework\v3.5\Microsoft.CompactFram
    转载:https://www.cnblogs.com/mooncher/p/13839473.html Microsoft.CompactFramework.CSharp.targetsnotfound 最近因为要给C盘扩容,就把D盘格式化了一下,扩容完成......
  • .NET 6 使用 MagicOnion 实现 gRPC
    .NET6使用MagicOnionMagicOnion开源地址:https://github.com/Cysharp/MagicOnion什么是MagicOnion?MagicOnion是用于.NET平台的现代RPC框架,它提供双向实时通信(如S......
  • Ubuntu:Could not get lock /var/lib/dpkg/lock
     看意思是上一次使用apt-get时异常退出了,锁住了,google了下解决方案如下:1、先判断是否有apt-get进程在跑,同一时刻只能有一个apt-get进程在跑,查看命令:ps-aux|grepapt-ge......
  • 使用Spring Cloud Stream 驱动 RabbitMQ 代码示例
    1、SpringCloudStream官方文档官方配置文档参考:SpringCloudStreamReferenceDocumentationSpringCloudStreamRabbitMQBinderReferenceGuide说明:在网上查......
  • RabbitMQ 6种模式的练习,以及知识梳理
    常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通过设置交换机类型和配置参数来实现各个模式简单模式(Simple)工作模式(Work)工作模式是考虑到多个消......
  • Thrift RPC添加access log
    前言:当我们在部署web服务的时候,web容器通常都会记录来自客户端的访问日志。而当我们使用ThriftRPC服务的时候,Thrift服务则不会给我们自动记录客户端的访问日志。通过这......