RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?
南山远眺
问题
RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?
答案是集群并不能的增加单个队列的吞吐量,这是因为RabbitMQ的普通集群只是共享元数据信息,达到将整个集群规模的队列扩大以增加吞吐量的目的。普通集群甚至不能保证消息数据的高可用,任意一个broker宕机,都会导致这个broker上的队列不可用。
而镜像队列也仅仅只是保证了实现镜像复制的队列的高可用。消费者并不能并发消费复制出来的队列。
那么RabbitMQ是否也能提高类似Kafka的topic分区的机制,来加大单个主题队列的吞吐量呢?
通过使用 RabbitMQ Sharding 插件、Consistent-hash Sharding Exchange 来更加灵活地动态均衡队列压力,可以更从容地达到百万并发的性能。
这里我重点介绍下,RabbitMQ Sharding 插件,有兴趣的伙伴可以自己研究下Consistent-hash Sharding Exchange,两者的基本思路一致,都是根据Routeing Key的hash值将消息分发到分片队列中。
原理介绍
官网:https://github.com/rabbitmq/rabbitmq-sharding
rabbitmq sharding插件为您自动对队列进行分区,也就是说,一旦您将一个exchange 定义为sharded,那么在每个集群节点上自动创建支持队列,并在它们之间共享消息。rabbitmq sharding向使用者显示了一个队列,但它可能是后台运行在它后面的多个队列。rabbitmq sharding插件为您提供了一个集中的位置,通过向集群中的其他节点添加队列,您可以将消息以及跨多个节点的负载平衡发送到该位置。
插件安装
查看当前插件
find / -name rabbitmq-plugins
cd /usr/sbin/
./rabbitmq-plugins list
如果有没有对应的插件,自己下载后复制插件到指定的目录
手动下载安装,https://www.rabbitmq.com/community-plugins/
RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:
插件安装完成后可以通过命令sudo rabbitmq-plugins list查看已有插件列表,eg:
指定安装插件命令:
./rabbitmq-plugins enable rabbitmq_sharding
说明安装成功,重启生效:
service rabbitmq-server restart
队列分片
1.配置策略
find / -name rabbitmqctl
cd /usr/sbin/
./rabbitmqctl set_policy history-shard "^history" \
'{"shards-per-node": 2, "routing-key": "1234"}' \
--apply-to exchanges
说明:
通过rabbitmqctl set_policy设置新增策略,策略名称为history-shard,
匹配规则为^history,shards-per-node表示每个节点上分片出2个分片队列,
routing-key为1234,应用到所有交换器exchanges上去匹配执行。
2、在管理界面,手动创建名称为history的交换器,交换器类型选择x-consistent-hash
如上图现实,说明我们的消息分片已经成功。
但是,我们怎么去消费history交换器下的消息呢?
代码实战
1、引入rabbitmq的依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、生产者
public class ProductTest {
private static final String EXCHANGE_NAME = "history";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 10000; i++) {
//第一个参数是交换器名称,第二个参数是routeing key ,注意这里的routeing key一定要是随机的,不然消息都会发送到同一个队列中
channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), bldr.build(), "hello".getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(10000);
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
消费者向名称为history的交换器,发送10000条routeing key为0~10000,内容为hello的消息
3、消费者
public class ConsumerTest {
private static final String QUEUE_NAME = "history";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//每次抓取的消息数量
channel.basicQos(32);
for (int i = 0; i < 10; i++) {
Consumer consumer = new MyConsumer(channel);
channel.basicConsume(QUEUE_NAME,consumer);
}
TimeUnit.SECONDS.sleep(120);
channel.close();
connection.close();
}
private static class MyConsumer extends DefaultConsumer{
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息为:" + new String(body));
super.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}
}
10个消费者去消费名称为history的队列,消费者均匀分布在每个队列上(每个队列上绑定了5个),每次抓取32个消息去消费。
总结
通过rabbitmq-sharding插件,将原本单个队列history的分成了2个队列,但是对消费者来说,还是消费的原来的history队列,而不用管底层实际对应的物理队列。
极大的提高了单个队列在大并发下的吞吐量。
感谢每一次关注和点赞
标签:插件,队列,factory,rabbitmq,sharding,channel,history From: https://blog.51cto.com/u_15905482/5920002