工作队列模型
workQueue,多个消费者绑定到一个队列,当队列堆积消息时,可使用work模型。
多个消费者绑定一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
发布订阅模型
角色类型如下:
publisher,生产者
exchange,交换机,负责转发消息,不负责存储消息的能力
交换机类型如下:
Fanout,广播,将消息交给所有绑定交换机的队列
Direct,定向,把消息交给符合制定routing key的队列
Topic,通配符,把消息交给符合routing pattern的队列
consumer,消费者
queue,消息队列
在广播模式下,消息发送流程是这样的: - 1) 可以有多个队列 - 2) 每个队列都要绑定到Exchange(交换机) - 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定 - 4) 交换机把消息发送给绑定过的所有队列 - 5) 订阅队列的消费者都能拿到消息
生产者代码
public static final String EXCHANGE_NAME = "fanout_exchange"; public static final String QUEUE_NAME = "fanout_queue1"; @Bean("fanout_exchange")//交换机 public Exchange exchange(){ return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("fanout_queue1")//消息队列 public Queue queue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean//绑定交换机和消息队列1 public Binding binding(@Qualifier("fanout_exchange")Exchange exchange,@Qualifier("fanout_queue1")Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); } @Bean("fanout_queue2")//消息队列2 public Queue queue2(){ return QueueBuilder.durable("fanout_queue2").build(); } @Bean//绑定交换机和消息队列2 public Binding binding2(@Qualifier("fanout_exchange")Exchange exchange,@Qualifier("fanout_queue2")Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); }
发送消息
@Test void fanout_exchangeTest() { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"","fanout,这是在广播..."); }
消费者
@RabbitListener(queues = "fanout_queue1") public void listenFanoutQueue1(Message message) { System.out.println("消费者1收到:"+new String(message.getBody())); } @RabbitListener(queues = "fanout_queue2") public void listenFanoutQueue2(Message message) { System.out.println("消费者2收到:"+new String(message.getBody())); }
标签:fanout,exchange,队列,rabbitmq,交换机,消息,相关,public From: https://www.cnblogs.com/gstszbc/p/18137494