直连交换机(Direct)
指定交换机模式为直连
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
public class DirectExchange {
public static final String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] args) {
try {
Channel channel = RabbitMqUtils.getChannel();
//BuiltinExchangeType.DIRECT 指定为直连交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
List<Map<String,String>> mapList = new ArrayList<>();
for(int i = 0; i < 11; i++){
//key = routingkey value=消息
Map<String,String> mapMessage = new HashMap<>();
if(i <= 5){
mapMessage.put("order","我是订单消息"+i);
mapList.add(mapMessage);
}else {
mapMessage.put("warehouse","我是仓库消息"+i);
mapList.add(mapMessage);
}
}
for(Map<String,String> map : mapList){
for(Map.Entry<String,String> message : map.entrySet()){
//1.交换机名称 2.routingKey 3.其他属性 例如持久化,4.消息
channel.basicPublish(EXCHANGE_NAME,message.getKey(),null,message.getValue().getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:"+message.getValue());
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者1
public class DirectExchangeConsumer1 {
//交换机名称
public static final String EXCHANGE_NAME = "direct_exchange_test";
public static final String QUEUE_NAME = "order_queue";
public static final String ROUTING_KEY = "order";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 1.交换机名称
* 2.交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
//绑定队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
System.out.println(ROUTING_KEY+":消费者等待接收消息!");
//接收消息回调
DeliverCallback callback = (consumerTag, message) -> {
String receiveMessage = new String(message.getBody(), Charsets.UTF_8.name());
System.out.println(ROUTING_KEY+"_接收到消息:"+receiveMessage+" 标签:"+consumerTag);
};
//取消消息回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(ROUTING_KEY+"_消费者取消消费接口回调逻辑:"+consumerTag);
};
channel.basicConsume(QUEUE_NAME,false,callback,cancelCallback);
}
}
消费者2
public class DirectExchangeConsumer2 {
//交换机名称
public static final String EXCHANGE_NAME = "direct_exchange_test";
public static final String QUEUE_NAME = "warehouse_queue";
public static final String ROUTING_KEY = "warehouse";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 1.交换机名称
* 2.交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
System.out.println(ROUTING_KEY+":消费者等待接收消息!");
//接收消息回调
DeliverCallback callback = (consumerTag, message) -> {
String receiveMessage = new String(message.getBody(), Charsets.UTF_8.name());
System.out.println(ROUTING_KEY+"_接收到消息:"+receiveMessage+" 标签:"+consumerTag);
};
//取消消息回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(ROUTING_KEY+"_消费者取消消费接口回调逻辑:"+consumerTag);
};
channel.basicConsume(QUEUE_NAME,false,callback,cancelCallback);
}
}
测试
生产者分别给不同的routingkey发送消息
消费者1接收6条消息
消费者这2接收到5条消息
实现的方式主要是靠routingkey 在生产者中发送指定的消息,同时声明routingkey,消费者绑定队列的时候声明队列、交换机名称、routingkey,就可以获取到指定消息。
标签:直连,String,Direct,RabbitMQ,交换机,ROUTING,public,channel,NAME From: https://blog.csdn.net/weixin_42942786/article/details/139716911