了解Work Queues
1.1官网中的图片:通过官网里的图片,我们可以看到Word Queues 与Hello World 的区别,这里的消费者增加,但是时多个消费者消费单个队列,在这里我们依然要注意,这里面使用的是默认的交换机,并不是直接连接的队列。
1.2直观的图片:
更好的理解每次的连接都是需要交换机的,对于后续的学习是有帮助的
二.消费者消费的过程。
2.1 :了解AMQP:
RabbitMQ是AMQP协议的一个实现者,它是一个开源的消息队列系统,基于AMQP协议实现。
AMQP协议将消息传递分为两个部分:交换(exchanges)和消息队列(message queues)。消息队列是基本的,专注于将消息传递给消费者。交换则根据一组规则将新消息路由到正确的消息队列。RabbitMQ支持多种类型的交换,如直接交换(Direct exchange)、扇出交换(Fanout exchange)和主题交换(Topic exchange)。
AMQP还引入了消息确认(message acknowledgements)的概念,以确保可靠的消息传递。消息确认有两种模式:自动确认和显式确认。自动确认模式下,消息一旦发送就被视为已确认;显式确认模式则允许消费者决定何时确认消息,例如在接收或处理消息后
2.1 :了解在work queue 下的消息的默认分配
2.1在资源的非配上采取的是轮询(round-robin) 的方式将消息平均分配给所有消费者。在这里,我们想一个问题:因为消费者处理的能力不同,会影响效率,我们该如何解决呢?
三.代码出发,演示如何使用Work Queues
3.1 提供者:这里的代码与hello world 并没有什么不同
这里面连接工厂的代码在我的文章:RabbitMQ通讯方式第一讲:Hello World-CSDN博客 在这里就不再赘述了。
注意这里面的交换机是默认的,与队列连接的方式式直接连接。
public class Provider {
public static final String WORK_QUEUE_NAME="work";
@Test
public void p() throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
//workQueues 模式下还是默认的交换机;
for (int i = 0; i < 10; i++) {
String msg=("开始发送第"+i+"条消息");
channel.basicPublish("",WORK_QUEUE_NAME,null,msg.getBytes());
}
System.out.println("消息发送成功");
}
3.2消费者代码
@Test
public void consumer1() throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
DefaultConsumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("得到数据"+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//将autoACk设置为非自动
channel.basicConsume(WORK_QUEUE_NAME,false,callback);
System.out.println("获取数据成功");
System.in.read();
}
@Test
public void consumer2() throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
//设置消息的流控
channel.basicQos(3);
DefaultConsumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
System.out.println("得到数据"+new String(body,"utf-8"));
}
};
channel.basicConsume(WORK_QUEUE_NAME,false,callback);
System.out.println("获取数据成功");
System.in.read();
}
在这段代码里面,我们解决了处理效率不同的问题。在消费者一的代码里面,线程休眠了100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
在消费者二的代码里面,线程休眠了1000ms 显然一的速度更快。在这里的运行时我们关闭了自动确认
channel.basicConsume(WORK_QUEUE_NAME,false,callback);
3.3介绍流控:
在这段代码里面,我们开启的流控起到的作用是:
channel.basicQos(3);
在 RabbitMQ 中,basicQos
方法用于设置消息的流控,即消费者预取(prefetch)设置。这是 AMQP 0-9-1 协议中的一个特性,允许消费者告诉 RabbitMQ 它能够处理多少条未被确认(unacknowledged)的消息。这种机制可以防止 RabbitMQ 向消费者发送过多消息,从而避免消费者过载。