配置信息
bbitMQ: host: rabbitmq.com port: 0000 username: 0000 password: 0000 topic: amq.fanout queueName: icost-beta virtualHost: icost-beta #开启发送确认机制,将来消息到达交换机以后有一个回调 publisher-confirm-type: correlated #消息到达消息队列回调(如果消息没有成功到达队列,会触发回调方法) publisher-returns: true template: retry: enabled: true # 开启重发机制 initial-interval: 1000ms #间隔 1秒 max-attempts: 6 #最多发6次 multiplier: 1.2 #每次间隔 时间*1.2 max-interval: 10000ms #每次最大间隔时间 listener: simple: acknowledge-mode: manual
生产者
@Value("${rabbitMQ.username}") private String username; @Value("${rabbitMQ.password}") private String password; @Value("${rabbitMQ.host}") private String host; @Value("${rabbitMQ.port}") private int port; @Value("${rabbitMQ.topic}") private String topic; //声明队列 @Value("${rabbitMQ.queueName}") private String queueName; @Value("${rabbitMQ.virtualHost}") private String virtualHost;
//调用方法
private void notifyMQProcessComplete(String businessKey, String domain, String project, String instanceId , int result) {
MQMessage message = new MQMessage();
message.setBusinessKey(businessKey);
message.setDomainId(project);
message.setDomainType(domain);
message.setInstanceId(instanceId);
message.setStatus(result);
try {
sendProducer(message);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/** * 发布信息 */ private void sendProducer (MQMessage mqMessage)throws IOException, TimeoutException{ Gson gson = new Gson(); // 发送同步消息 String messageStr = gson.toJson(mqMessage); //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); //设置 RabbitMQ 地址 factory.setHost(host); factory.setPort(port); factory.setVirtualHost(virtualHost); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 Channel channel = conn.createChannel(); //声明交换机(参数为:交换机名称; 交换机类型,广播模式) channel.exchangeDeclare(topic, BuiltinExchangeType.FANOUT,true); //消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可) channel.basicPublish(topic, "", true,null,messageStr.getBytes(StandardCharsets.UTF_8)); channel.close(); conn.close(); }
消费者
/** * 监听 */ @Component public class RocketConsumerListener implements CommandLineRunner { @Autowired private RocketMQConsumer rocketMQConsumer; protected final Logger logger = LoggerFactory.getLogger(RocketConsumerListener.class); @Override public void run(String... args) throws IOException, TimeoutException { logger.info("========消费者启动=========="); rocketMQConsumer.normalSubscribe(); } }
@Component @Slf4j public class RocketMQConsumer { @Autowired private ObjectMapper objectMapper; @Autowired BidProcessMessageHandler handler; @Value("${rabbitMQ.username}") private String username; @Value("${rabbitMQ.password}") private String password; @Value("${rabbitMQ.host}") private String host; @Value("${rabbitMQ.port}") private int port; @Value("${rabbitMQ.topic}") private String topic; @Value("${rabbitMQ.queueName}") private String queueName; @Value("${rabbitMQ.virtualHost}") private String virtualHost; /** * 订阅消息,处理业务 */ public void normalSubscribe() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); //设置 RabbitMQ 地址 factory.setHost(host); factory.setPort(port); factory.setVirtualHost(virtualHost); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 final Channel channel = conn.createChannel(); //声明交换机 channel.exchangeDeclare(topic, ExchangeTypes.FANOUT,true); //绑定临时队列与交换机并设置获取交换机中动态路由 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, topic, ""); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //接收到的消息内容 String msg = new String(body); MQWorkflowMessageBody messageBody = objectMapper.readValue(msg, MQWorkflowMessageBody.class); if ("bid".equals(messageBody.getDomainType()) && StringUtils.isNotBlank(messageBody.getBusinessKey()) && StringUtils.isNotBlank(String.valueOf(messageBody.getStatus()))) { log.info("bid收到消息messageID:" + " msg:" + msg); handler.processRequest(messageBody.getBusinessKey(), String.valueOf(messageBody.getStatus())); } } catch (Exception e) { e.printStackTrace(); } } }); }
标签:String,rabbitMQ,Value,factory,private,广播,fanout,rabbitmq,channel From: https://www.cnblogs.com/dahei96/p/16828565.html