消费端限流
什么是消费端限流
- 假设一个场景, 首先, 我们RabbitMQ服务器有上万条消息未处理的消息, 我们随机打开一个消费者客户端, 会出现下面情况
- 巨量的消息瞬间全部推送过来, 但是我们单个客户端无法同时处理这么多数据
- RabbitMQ提供了一种Qos(服务质量保证)功能, 即在非自动确认消息的前提下, 如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前, 不进行消费新的消息
- void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
- 参数解释
- prefetchSize: 0
- prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多余N个消息, 即一旦有N个消息还没有ACK, 则该consumer将block掉, 直到有消息ACK
- global: true\false 是否将上面设置应用于channel, 简单点说, 就是上面限制是channel级别还是consumer级别
- 注意
- prefetchSize和global这两项, rabbitmq没有实现, 暂且不研究
- prefetch_count和no_ack=false的情况下生效, 即在自动应答的情况下这两个值是不生效的
消费端限流代码实现
帮助类新增函数
public static AMQP.Queue.DeclareOk queueDeclare(Channel channel, String queueName, boolean durable) throws IOException { return channel.queueDeclare(queueName, durable, false, false, null); }
消费者
package com.dance.redis.mq.rabbit.qos; import com.dance.redis.mq.rabbit.RabbitMQHelper; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; public class Receiver { public static void main(String[] args) throws Exception { Channel channel = RabbitMQHelper.getChannel(); String queueName = "test001"; // durable 是否持久化消息 RabbitMQHelper.queueDeclare(channel,queueName,true); channel.basicQos(0, 1, false); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; // 参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, false, consumer); // 等待回调函数执行完毕之后,关闭资源。 TimeUnit.SECONDS.sleep(50); channel.close(); RabbitMQHelper.closeConnection(); } }
生产者
package com.dance.redis.mq.rabbit.qos; import com.dance.redis.mq.rabbit.RabbitMQHelper; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class Sender { public static void main(String[] args) throws Exception { Channel channel = RabbitMQHelper.getChannel(); String queueName = "test001"; RabbitMQHelper.queueDeclare(channel, queueName, true); Map<String, Object> headers = new HashMap<>(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers).build(); for (int i = 0; i < 5; i++) { String msg = "Hello World RabbitMQ " + i; channel.basicPublish("", queueName, props, msg.getBytes()); } } }
测试
启动消费者
启动生产者
查看消费者
标签:15,String,RabbitMQHelper,RabbitMQ,限流,import,com,queueName,channel From: https://www.cnblogs.com/flower-dance/p/16754817.html