交换机和队列支持持久化。现在我们也需要给消息设计元数据
DeliveryMode
设置为2,表示支持消息的持久化。
=======================================================================================
接上一边博文。 修改文件:
发送者:
package org.example.sender; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息生产者 发送消息 */ @Component public class MessageSender { @Autowired RabbitTemplate rabbitTemplate; /** * 发送消息 * @param info */ public void send(String info) { System.out.println("发送消息>>>"+info); // CorrelationData correlationData = new CorrelationData(); // // String uuid = UUID.randomUUID().toString(); // System.out.println(uuid); // // correlationData.setId(uuid); // rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info,correlationData); /** * public static int toInt(MessageDeliveryMode mode) { * switch (mode) { * case NON_PERSISTENT: * return 1; * case PERSISTENT: * return 2; * default: * return -1; */ MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("20000"); // 设置过期时间20秒 messageProperties.setAppId("abc123456"); messageProperties.setHeader("国家1","中国1"); messageProperties.setHeader("国家2","中国2"); Message message = new Message(info.getBytes(), messageProperties); System.out.println(message); rabbitTemplate.convertAndSend("amqp-topic", "huawei.a", message); } }
消费者:
package org.example.receiver; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class TopicReceiver { //分别监听名称为xiaomi、huawei的队列 @RabbitListener(queues = "xiaomi") public void handlerXM(Message message,String msg, Channel channel) throws IOException { System.out.println("小米:"+msg); //手动签收,不启动批量签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println(message.getMessageProperties().getDeliveryTag()); } @RabbitListener(queues = "huawei") public void handlerHW(Message message,String msg, Channel channel) throws IOException { System.out.println("华为:"+msg); System.out.println(message.getMessageProperties().getHeaders()); System.out.println((String) message.getMessageProperties().getHeader("国家2")); System.out.println(message); System.out.println(message.getMessageProperties().getExpiration()); System.out.println(message.getMessageProperties().getAppId()); //手动签收,不启动批量签收 //告诉rmq签收的消息的id。以及是否批量签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
======================================================================================
参考:https://blog.csdn.net/qq_43623492/article/details/124259773
标签:java,spring,boot,System,org,println,import,message,out From: https://www.cnblogs.com/xiaobaibailongma/p/17157254.html