读取配置
alimq: accessKey: GKCZZBAjd81rufJsn secretKey: aaIIHyoeqLymh0hQNQjLiu9 namesrvAddr: http://aliyuncs.com groupId: GID_WORKFLOW_CALLBACK topic: workflow_callback_topic instanceId: 040818_BcdlxE
@Value("${alimq.instanceId}")
private String instanceId;
@Value("${alimq.accessKey}")
private String accessKey;
@Value("${alimq.secretKey}")
private String secretKey;
@Value("${alimq.namesrvAddr}")
private String ONSAddr;
// 不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
@Value("${alimq.topic}")
private String topic;
@Value("${alimq.groupId}")
private String groupId;
发送消息
private void sendProducer (MQMessage mqMessage){ Gson gson = new Gson(); // 发送同步消息 String messageStr = gson.toJson(mqMessage); log.info("start to send ali mq message : " + messageStr); Message message = new Message(topic, mqMessage.getDomainType(), messageStr.getBytes()); MQClient mqClient = new MQClient( // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 ONSAddr, // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 accessKey, // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。 secretKey ); // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 // 获取Topic的生产者。 MQProducer producer; if (instanceId != null && instanceId != "") { producer = mqClient.getTransProducer(instanceId, topic,groupId); } else { producer = mqClient.getProducer(topic); } try { TopicMessage pubMsg; // 普通消息。 pubMsg = new TopicMessage( messageStr.getBytes(),// 消息内容。 mqMessage.getDomainType()// 消息标签。 ); // 设置消息的自定义属性。 pubMsg.getProperties().put("Message", message.toString()); // 设置消息的Key。 pubMsg.setMessageKey("MessageKey"); // 同步发送消息,只要不抛异常就是成功。 TopicMessage pubResultMsg = producer.publishMessage(pubMsg); // 同步发送消息,只要不抛异常就是成功。 log.info(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId() + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()); } catch (Throwable e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 log.error(new Date() + " Send mq message failed. Topic is:" + topic); e.printStackTrace(); } mqClient.close(); }
接收消息
/** * 订阅消息,处理业务 */ public void normalSubscribe() { MQClient mqClient = new MQClient( // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 "http://hangzhou.aliyuncs.com", // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 "LTI4GKCZZd81rufJsn", // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。 "aaIIyoeqLymh0D5NQjLiu9" ); final MQConsumer consumer; if (instanceId != null && instanceId != "") { consumer = mqClient.getConsumer(instanceId, topic, groupId, "bid"); } else { consumer = mqClient.getConsumer(topic, groupId); } // 在当前线程循环消费消息,建议多开个几个线程并发消费消息。 do { List<com.aliyun.mq.http.model.Message> messages = null; try { // 长轮询消费消息。 // 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。 messages = consumer.consumeMessage( 3,// 一次最多消费3条消息(最多可设置为16条)。 3// 长轮询时间3秒(最多可设置为30秒)。 ); } catch (Throwable e) { e.printStackTrace(); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } } // Topic中没有消息可消费。 if (messages == null || messages.isEmpty()) { //System.out.println(Thread.currentThread().getName() + ": no new message, continue!"); continue; } // 处理业务逻辑。 for (com.aliyun.mq.http.model.Message message : messages) { Gson gson = new Gson(); try { //接收到的消息内容 String msg = message.getMessageBodyString(); //log.info(" message received : " + msg); MQMessage messagess = gson.fromJson(msg, MQMessage.class); //log.info("New message received : " + msg); if ("bid".equals(messagess.getDomainType()) && StringUtils.isNotBlank(messagess.getBusinessKey()) && StringUtils.isNotBlank(messagess.getStatus())) { handler.processRequest(messagess.getBusinessKey(), messagess.getStatus()); } } catch (Exception e) { //log.info("消费失败:messageID:" + message.getMessageId()); e.printStackTrace(); } } // 消息重试时间到达前若不确认消息消费成功,则消息会被重复消费。 // 消息句柄有时间戳,同一条消息每次消费的时间戳都不一样。 { List<String> handles = new ArrayList<String>(); for (com.aliyun.mq.http.model.Message message : messages) { handles.add(message.getReceiptHandle()); } try { consumer.ackMessage(handles); } catch (Throwable e) { // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 if (e instanceof AckMessageException) { AckMessageException errors = (AckMessageException) e; System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:"); if (errors.getErrorMessages() != null) { for (String errorHandle :errors.getErrorMessages().keySet()) { System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode() + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage()); } } continue; } e.printStackTrace(); } } } while (true); }
标签:Topic,String,topic,消息,new,message,rocketmq From: https://www.cnblogs.com/dahei96/p/16718222.html