路由模式: 1、每个消费者监听自己的队列,并且设置routingkey。 2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。 Routing模式是可以完成订阅模式的工作的,下面的代码在RoutingKey为“inform”的消息中有所体现
示例代码:
生产者:
public class Producer03_routing { //private static final String QUEUE = "HEllO"; //队列名 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; private static final String ROUTINGKEY_EMAIL= "inform_email"; private static final String ROUTINGKEY_SMS = "inform_sms"; public static void main(String[] args) throws IOException, TimeoutException { //生产者于MQ建立连接 //通过建立连接工厂建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接信息 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); //端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机 实现多个虚拟MQ 每个虚拟机相当于一个独立的MQ connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); //创建会话通道 生产者和mq服务所有通信都在通道中完成 channel = connection.createChannel(); /** 声明队列 如果队列在MQ中没有,则创建 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) param1:队列名称 * param2:是否持久化 (如果重启后,队列还在) * param3:队列是否独占此连接-------队列只允许在该连接中访问,连接关闭队列删除 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数--------队列拓展参数 * */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); /** * 声明交换机 * param1:交换机名称 * param2:交换机类型 * fanout:对应的工作模式是:发布订阅publish/scribe * direct:对应Routing工作模式 * topic:对应通配符工作模式 * headers:对应headers模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key 交换机根据路由KEY的值将消息转发到指定队列 在发布订阅工作模式时设置为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); for (int i = 0; i < 5; i++) { //定义一个消息的内容 String message = "Hello,Send EMAIL message to User"; //发送消息 /** * void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,(如果是默认路由,routingKEY使用队列名) * param3:消息包含的属性 * param4:消息体 * */ //指定RoutingKey channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes()); System.out.println("Send to mq :'" + message + "'"); } for (int i = 0; i < 5; i++) { //定义一个消息的内容 String message = "Hello,Send INFORM message to User"; //发送消息 /** * void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,(如果是默认路由,routingKEY使用队列名) * param3:消息包含的属性 * param4:消息体 * */ //指定RoutingKey channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes()); System.out.println("Send to mq :'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); } finally { if(channel != null) { channel.close(); } if(connection != null) { connection.close(); } } }
}
消费者:
public class Consumer03_routing_email { //队列名 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; private static final String ROUTINGKEY_EMAIL= "inform_email"; public static void main(String[] args) throws IOException, TimeoutException { //消费者于MQ建立连接 //通过建立连接工厂建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接信息 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); //端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机 实现多个虚拟MQ 每个虚拟机相当于一个独立的MQ connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); //声明队列 如果队列在MQ中没有,则创建 /** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) param1:队列名称 * param2:是否持久化 (如果重启后,队列还在) * param3:队列是否独占此连接-------队列只允许在该连接中访问,连接关闭队列删除 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数--------队列拓展参数 * */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); /** * 声明交换机 * param1:交换机名称 * param2:交换机类型 * fanout:对应的工作模式是:发布订阅publish/scribe * direct:对应Routing工作模式 * topic:对应通配符工作模式 * headers:对应headers模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key 交换机根据路由KEY的值将消息转发到指定队列 在发布订阅工作模式时设置为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform"); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /** 当接收到消息后,此方法被调用 * @param consumerTag 消费者标签:用来标识消费者 * @param envelope 信封:可以拿到信息 * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //super.handleDelivery(consumerTag, envelope, properties, body); //交换机 String exchange = envelope.getExchange(); //消息ID:在channel中标识消息ID,可用于消息被确认接受 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body,"UTF-8"); System.out.println("receive a message: " + message); } }; //监听队列 /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } }
消费者2:
public class Consumer03_routing_sms { //队列名 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; private static final String ROUTINGKEY_SMS= "inform_sms"; public static void main(String[] args) throws IOException, TimeoutException { //消费者于MQ建立连接 //通过建立连接工厂建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接信息 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); //端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机 实现多个虚拟MQ 每个虚拟机相当于一个独立的MQ connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); //声明队列 如果队列在MQ中没有,则创建 /** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) param1:队列名称 * param2:是否持久化 (如果重启后,队列还在) * param3:队列是否独占此连接-------队列只允许在该连接中访问,连接关闭队列删除 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数--------队列拓展参数 * */ channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 声明交换机 * param1:交换机名称 * param2:交换机类型 * fanout:对应的工作模式是:发布订阅publish/scribe * direct:对应Routing工作模式 * topic:对应通配符工作模式 * headers:对应headers模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key 交换机根据路由KEY的值将消息转发到指定队列 在发布订阅工作模式时设置为空字符串 */ channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform"); //对于一个通道绑定多个队列 //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /** 当接收到消息后,此方法被调用 * @param consumerTag 消费者标签:用来标识消费者 * @param envelope 信封:可以拿到信息 * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //super.handleDelivery(consumerTag, envelope, properties, body); //交换机 String exchange = envelope.getExchange(); //消息ID:在channel中标识消息ID,可用于消息被确认接受 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body,"UTF-8"); System.out.println("receive a message: " + message); } }; //监听队列 /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_SMS, true, defaultConsumer); } }
标签:connectionFactory,String,队列,RabbitMQ,INFORM,交换机,模式,channel,Routing From: https://www.cnblogs.com/lksses/p/17416919.html