以下是四种 RabbitMQ 交换机类型(Direct、Topic、Fanout、Headers)的详细实例代码,展示如何分别实现并使用它们。
1. Direct Exchange(直连交换机)
Direct Exchange 将消息根据路由键(Routing Key)发送到指定的队列。
配置代码
@Configuration public class DirectExchangeConfig { public static final String DIRECT_EXCHANGE = "direct_exchange"; public static final String DIRECT_QUEUE = "direct_queue"; public static final String DIRECT_ROUTING_KEY = "direct_key"; @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE); } @Bean public Binding directBinding(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY); } }
发送消息
@Component public class DirectMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String message) { rabbitTemplate.convertAndSend("direct_exchange", "direct_key", message); System.out.println("Direct Message Sent: " + message); } }
接收消息
@Component public class DirectMessageConsumer { @RabbitListener(queues = "direct_queue") public void receive(String message) { System.out.println("Direct Message Received: " + message); } }
2. Topic Exchange(主题交换机)
Topic Exchange 根据通配符匹配路由键发送消息。
配置代码
@Configuration public class TopicExchangeConfig { public static final String TOPIC_EXCHANGE = "topic_exchange"; public static final String TOPIC_QUEUE = "topic_queue"; public static final String ROUTING_KEY_PATTERN = "topic.#"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue topicQueue() { return new Queue(TOPIC_QUEUE); } @Bean public Binding topicBinding(Queue topicQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue).to(topicExchange).with(ROUTING_KEY_PATTERN); } }
发送消息
@Component public class TopicMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String message, String routingKey) { rabbitTemplate.convertAndSend("topic_exchange", routingKey, message); System.out.println("Topic Message Sent: " + message + " with Routing Key: " + routingKey); } }
接收消息
@Component public class TopicMessageConsumer { @RabbitListener(queues = "topic_queue") public void receive(String message) { System.out.println("Topic Message Received: " + message); } }
3. Fanout Exchange(广播交换机)
Fanout Exchange 将消息广播到所有绑定的队列,无需路由键。
配置代码
@Configuration public class FanoutExchangeConfig { public static final String FANOUT_EXCHANGE = "fanout_exchange"; public static final String FANOUT_QUEUE_1 = "fanout_queue_1"; public static final String FANOUT_QUEUE_2 = "fanout_queue_2"; @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Queue fanoutQueue1() { return new Queue(FANOUT_QUEUE_1); } @Bean public Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE_2); } @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
发送消息
@Component public class FanoutMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String message) { rabbitTemplate.convertAndSend("fanout_exchange", "", message); System.out.println("Fanout Message Sent: " + message); } }
接收消息
@Component public class FanoutMessageConsumer1 { @RabbitListener(queues = "fanout_queue_1") public void receive(String message) { System.out.println("Fanout Queue 1 Received: " + message); } } @Component public class FanoutMessageConsumer2 { @RabbitListener(queues = "fanout_queue_2") public void receive(String message) { System.out.println("Fanout Queue 2 Received: " + message); } }
4. Headers Exchange(头交换机)
Headers Exchange 根据消息头中的键值对匹配发送消息。
配置代码
@Configuration public class HeadersExchangeConfig { public static final String HEADERS_EXCHANGE = "headers_exchange"; public static final String HEADERS_QUEUE = "headers_queue"; @Bean public HeadersExchange headersExchange() { return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headersQueue() { return new Queue(HEADERS_QUEUE); } @Bean public Binding headersBinding(Queue headersQueue, HeadersExchange headersExchange) { return BindingBuilder.bind(headersQueue).to(headersExchange).where("header_key").matches("header_value"); } }
发送消息
@Component public class HeadersMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("header_key", "header_value"); Message amqpMessage = new Message(message.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headers_exchange", "", amqpMessage); System.out.println("Headers Message Sent: " + message); } }
接收消息
@Component public class HeadersMessageConsumer { @RabbitListener(queues = "headers_queue") public void receive(String message) { System.out.println("Headers Message Received: " + message); } }
总结
交换机类型 | 匹配机制 | 示例用途 |
---|---|---|
Direct | 精确匹配路由键(Routing Key)。 | 点对点通信 |
Topic | 通配符匹配路由键(* 单词,# 多单词)。 |
广义主题订阅 |
Fanout | 广播消息到所有绑定的队列,无需路由键。 | 广播通知 |
Headers | 根据消息头的键值对匹配。 | 高级过滤逻辑 |
根据你的实际场景,可以选择适合的交换机类型并配置!
标签:return,String,Direct,RabbitMQ,Queue,Bean,message,public,SpringBoot From: https://www.cnblogs.com/luorongxin/p/18583718