首页 > 其他分享 >rabbitmq-sharding插件使用

rabbitmq-sharding插件使用

时间:2022-12-07 20:03:18浏览次数:88  
标签:插件 队列 factory rabbitmq sharding channel history


RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?

rabbitmq-sharding插件使用_插件安装

南山远眺

问题

RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?

答案是集群并不能的增加单个队列的吞吐量,这是因为RabbitMQ的普通集群只是共享元数据信息,达到将整个集群规模的队列扩大以增加吞吐量的目的。普通集群甚至不能保证消息数据的高可用,任意一个broker宕机,都会导致这个broker上的队列不可用。

而镜像队列也仅仅只是保证了实现镜像复制的队列的高可用。消费者并不能并发消费复制出来的队列。

那么RabbitMQ是否也能提高类似Kafka的topic分区的机制,来加大单个主题队列的吞吐量呢?

通过使用 RabbitMQ Sharding 插件、Consistent-hash Sharding Exchange 来更加灵活地动态均衡队列压力,可以更从容地达到百万并发的性能。

这里我重点介绍下,RabbitMQ Sharding 插件,有兴趣的伙伴可以自己研究下Consistent-hash Sharding Exchange,两者的基本思路一致,都是根据Routeing Key的hash值将消息分发到分片队列中。

原理介绍

​官网:https://github.com/rabbitmq/rabbitmq-sharding​

rabbitmq sharding插件为您自动对队列进行分区,也就是说,一旦您将一个exchange 定义为sharded,那么在每个集群节点上自动创建支持队列,并在它们之间共享消息。rabbitmq sharding向使用者显示了一个队列,但它可能是后台运行在它后面的多个队列。rabbitmq sharding插件为您提供了一个集中的位置,通过向集群中的其他节点添加队列,您可以将消息以及跨多个节点的负载平衡发送到该位置。

rabbitmq-sharding插件使用_队列分片_02

 

插件安装

查看当前插件
find / -name rabbitmq-plugins
cd /usr/sbin/
./rabbitmq-plugins list

rabbitmq-sharding插件使用_sharding_03

 

如果有没有对应的插件,自己下载后复制插件到指定的目录
手动下载安装,​​​https://www.rabbitmq.com/community-plugins/​

rabbitmq-sharding插件使用_rabbitmq_04

 

RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

rabbitmq-sharding插件使用_rabbitmq_05

 

插件安装完成后可以通过命令sudo rabbitmq-plugins list查看已有插件列表,eg:

rabbitmq-sharding插件使用_rabbitmq_06

 

指定安装插件命令:
./rabbitmq-plugins enable rabbitmq_sharding

 

说明安装成功,重启生效:
service rabbitmq-server restart

队列分片

1.配置策略

find / -name rabbitmqctl

cd /usr/sbin/

./rabbitmqctl set_policy history-shard "^history" \
'{"shards-per-node": 2, "routing-key": "1234"}' \
--apply-to exchanges

说明:
通过rabbitmqctl set_policy设置新增策略,策略名称为history-shard,
匹配规则为^history,shards-per-node表示每个节点上分片出2个分片队列,
routing-key为1234,应用到所有交换器exchanges上去匹配执行。

2、在管理界面,手动创建名称为history的交换器,交换器类型选择x-consistent-hash

rabbitmq-sharding插件使用_队列分片_07

 

rabbitmq-sharding插件使用_队列分片_08

 

如上图现实,说明我们的消息分片已经成功。

但是,我们怎么去消费history交换器下的消息呢?

代码实战

1、引入rabbitmq的依赖包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、生产者

public class ProductTest {
private static final String EXCHANGE_NAME = "history";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 10000; i++) {
//第一个参数是交换器名称,第二个参数是routeing key ,注意这里的routeing key一定要是随机的,不然消息都会发送到同一个队列中
channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), bldr.build(), "hello".getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(10000);

TimeUnit.SECONDS.sleep(5);

channel.close();
connection.close();
}
}

消费者向名称为history的交换器,发送10000条routeing key为0~10000,内容为hello的消息

rabbitmq-sharding插件使用_插件安装_09

 

3、消费者

public class ConsumerTest {
private static final String QUEUE_NAME = "history";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//每次抓取的消息数量
channel.basicQos(32);
for (int i = 0; i < 10; i++) {
Consumer consumer = new MyConsumer(channel);
channel.basicConsume(QUEUE_NAME,consumer);
}
TimeUnit.SECONDS.sleep(120);

channel.close();
connection.close();

}

private static class MyConsumer extends DefaultConsumer{
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息为:" + new String(body));
super.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}
}

10个消费者去消费名称为history的队列,消费者均匀分布在每个队列上(每个队列上绑定了5个),每次抓取32个消息去消费。

rabbitmq-sharding插件使用_rabbitmq_10

 

总结

通过rabbitmq-sharding插件,将原本单个队列history的分成了2个队列,但是对消费者来说,还是消费的原来的history队列,而不用管底层实际对应的物理队列。
极大的提高了单个队列在大并发下的吞吐量。

感谢每一次关注和点赞

rabbitmq-sharding插件使用_sharding_11

 

标签:插件,队列,factory,rabbitmq,sharding,channel,history
From: https://blog.51cto.com/u_15905482/5920002

相关文章

  • rabbitmq高并发RPC调用,你Get到了吗?
    微信公众号:​​跟着老万学java​今天给大家介绍下rabbitmq中很重要的一个功能,RPC调用。RPC,即RemoteProcedureCall的简称,也就是远程过程调用,是一种通过网络从远程计算机上......
  • RabbitMQ
    1.消息队列的应用场景1.1任务异步处理1.2应用程序解耦合2.RabbitMQ优势2.1基于AMQP协议VSJMS(java消息中间件的api,类似jdbc)2.2springboot集成3.RabbitMQ组成和工作原......
  • VIP06-ShardingSphere5.x新版本特性
    一、整体理解新版本二、5.X部分新特性1、DistSQL2、可插拔内核3、数据迁移三、全部内容总结 一、整体理解新版本​ShardingSphere在2021年十月份推出了5.0......
  • docker 部署 rabbitmq(持久化) 和postgresql redis mysql
    rabbitmq:dockerrun-d--hostname=rabbitmq--restart=always-eRABBITMQ_DEFAULT_USER=admin-eRABBITMQ_DEFAULT_PASS=admin--name=rabbitmq-p5672:5672-p15672......
  • RabbitMQ 实现延时队列
    1. 消息的TTL(TimeToLive)消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,......
  • FinClip产品更新:FIDE 插件开发功能优化;开发者文档英文版上线
    不知不觉22年进入尾声,通过一年的不断打磨,FinClip也在不断成长,现在,让我们看看过去的11月,FinClip又有了哪些新的变化。产品方面的相关动向......
  • Webpack插件核心原理
    引言围绕Webpack打包流程中最核心的机制就是所谓的Plugin机制。所谓插件即是webpack生态中最关键的部分,它为社区用户提供了一种强有力的方式来直接触及webpack......
  • 使用Spring Cloud Stream 驱动 RabbitMQ 代码示例
    1、SpringCloudStream官方文档官方配置文档参考:SpringCloudStreamReferenceDocumentationSpringCloudStreamRabbitMQBinderReferenceGuide说明:在网上查......
  • 强大的VS插件DevExpress CodeRush v22.1 - 让代码编程更智能
    DevExpressCodeRush是一个强大的VisualStudio.NET插件,它利用整合技术,通过促进开发者和团队效率来提升开发者体验。为VisualStudioIDE增压、消除重复的代码并提高代码......
  • RabbitMQ 6种模式的练习,以及知识梳理
    常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通过设置交换机类型和配置参数来实现各个模式简单模式(Simple)工作模式(Work)工作模式是考虑到多个消......