首页 > 其他分享 >rocketmq

rocketmq

时间:2022-09-22 10:25:43浏览次数:40  
标签:Topic String topic 消息 new message rocketmq

读取配置
alimq:
  accessKey: GKCZZBAjd81rufJsn
  secretKey: aaIIHyoeqLymh0hQNQjLiu9
  namesrvAddr: http://aliyuncs.com
  groupId: GID_WORKFLOW_CALLBACK
  topic: workflow_callback_topic
  instanceId: 040818_BcdlxE
@Value("${alimq.instanceId}")
private String instanceId;

@Value("${alimq.accessKey}")
private String accessKey;

@Value("${alimq.secretKey}")
private String secretKey;

@Value("${alimq.namesrvAddr}")
private String ONSAddr;
// 不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
@Value("${alimq.topic}")
private String topic;

@Value("${alimq.groupId}")
private String groupId;
 

发送消息

private void sendProducer (MQMessage mqMessage){
        Gson gson = new Gson();
        // 发送同步消息
        String messageStr = gson.toJson(mqMessage);
        log.info("start to send ali mq message : " + messageStr);
        Message message = new Message(topic, mqMessage.getDomainType(), messageStr.getBytes());
        MQClient mqClient = new MQClient(
                // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                ONSAddr,
                // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
                accessKey,
                // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
                secretKey
        );
        // 消息所属的Topic,在消息队列RocketMQ版控制台创建。
        // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。

        // 获取Topic的生产者。
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getTransProducer(instanceId, topic,groupId);
        } else {
            producer = mqClient.getProducer(topic);
        }
        try {

                TopicMessage pubMsg;        // 普通消息。
                pubMsg = new TopicMessage(
                        messageStr.getBytes(),// 消息内容。
                        mqMessage.getDomainType()// 消息标签。
                );
                // 设置消息的自定义属性。
                pubMsg.getProperties().put("Message", message.toString());
                // 设置消息的Key。
                pubMsg.setMessageKey("MessageKey");
                // 同步发送消息,只要不抛异常就是成功。
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
                // 同步发送消息,只要不抛异常就是成功。
            log.info(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
        } catch (Throwable e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            log.error(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

接收消息

    /**
     * 订阅消息,处理业务
     */
    public void normalSubscribe() {
        MQClient mqClient = new MQClient(
                // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                "http://hangzhou.aliyuncs.com",
                // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
                "LTI4GKCZZd81rufJsn",
                // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
                "aaIIyoeqLymh0D5NQjLiu9"
        );


        final MQConsumer consumer;
        if (instanceId != null && instanceId != "") {
            consumer = mqClient.getConsumer(instanceId, topic, groupId, "bid");
        } else {
            consumer = mqClient.getConsumer(topic, groupId);
        }

        // 在当前线程循环消费消息,建议多开个几个线程并发消费消息。
        do {
            List<com.aliyun.mq.http.model.Message> messages = null;

            try {
                // 长轮询消费消息。
                // 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。
                messages = consumer.consumeMessage(
                        3,// 一次最多消费3条消息(最多可设置为16条)。
                        3// 长轮询时间3秒(最多可设置为30秒)。
                );
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            // Topic中没有消息可消费。
            if (messages == null || messages.isEmpty()) {
                //System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                continue;
            }

            // 处理业务逻辑。
            for (com.aliyun.mq.http.model.Message message : messages) {
                Gson gson = new Gson();
                try {
                    //接收到的消息内容
                    String msg = message.getMessageBodyString();
                    //log.info(" message received : " + msg);

                    MQMessage messagess = gson.fromJson(msg, MQMessage.class);
                    //log.info("New message received : " + msg);

                    if ("bid".equals(messagess.getDomainType()) && StringUtils.isNotBlank(messagess.getBusinessKey()) && StringUtils.isNotBlank(messagess.getStatus())) {
                        handler.processRequest(messagess.getBusinessKey(), messagess.getStatus());
                    }

                } catch (Exception e) {
                    //log.info("消费失败:messageID:" + message.getMessageId());
                    e.printStackTrace();
                }
            }

            // 消息重试时间到达前若不确认消息消费成功,则消息会被重复消费。
            // 消息句柄有时间戳,同一条消息每次消费的时间戳都不一样。
            {
                List<String> handles = new ArrayList<String>();
                for (com.aliyun.mq.http.model.Message message : messages) {
                    handles.add(message.getReceiptHandle());
                }

                try {
                    consumer.ackMessage(handles);
                } catch (Throwable e) {
                    // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
                    if (e instanceof AckMessageException) {
                        AckMessageException errors = (AckMessageException) e;
                        System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                        if (errors.getErrorMessages() != null) {
                            for (String errorHandle :errors.getErrorMessages().keySet()) {
                                System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                        continue;
                    }
                    e.printStackTrace();
                }
            }
        } while (true);
    }

 

标签:Topic,String,topic,消息,new,message,rocketmq
From: https://www.cnblogs.com/dahei96/p/16718222.html

相关文章

  • RocketMQ
    1、安装1、Docker安装1、单机部署#镜像拉取dockerpullfoxiswho/rocketmq:server-4.3.2dockerpullfoxiswho/rocketmq:broker-4.3.2#挂载目录mkdir-p/opt/d......
  • 阿里云基于全新 RocketMQ 5.0 内核的落地实践
    简介: 本篇文章的核心就消息架构以及产品能力的云原生化,介绍了阿里云是如何基于全新的RocketMQ5.0内核做出自己的判断和演进,以及如何适配越来越多的企业客户在技术和能......
  • 【RocketMQ 课程笔记】7.RocketMQ高可用方案
    RocketMQ高可用消息生产消费流程​ Broker即MQ服务器;​ NameServer可理解为注册中心。Broker主挂了的情况Broker主从都挂了的情况Broker双主挂了的情......
  • 【RocketMQ 课程笔记】11.RocketMQ消息发送之普通消息
    RocketMQ消息发送之普通消息架构拓扑NameServer:192.168.31.103Master:192.168.31.105Slave:192.168.31.111执行流程Master与Slave启动向NameServer注册生产者Prod......
  • rocketMQ客户端和nameService、broker之间的信息交互
    客户端(包含生产者和消费者)定时任务里updateTopicRouteInfoFromNameServer方法,定时向nameService获取topic(当前客户端所包含的所有消费者者消费的和生产者要发送的)的信......
  • RocketMQ实战与原理解析-杨开元.pdf
    这是一本学习RocketMQ实战与实现原理的非常好的资料,内容言简意赅,非常适合初学者和对RocketMQ有一定使用经验的人,能够快速从全局层面掌握RocketMQ设计思想与核心实现。点击......
  • 读完 RocketMQ 源码,我学会了如何优雅的创建线程
    RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。这篇文章,笔者整理了RocketMQ源码中创建线程的几点技巧,希望大......
  • RocketMq使用过程中问题场景和解决方案
    MQ使用过程中可能出现的问题以及解决方案一、MQ如何避免消息堆积的问题:1)产生背景:producer发送消息的速率远大于consumer消费消息的速率,从而导致消息堆积在mq服务端中;2)......
  • 十问 RocketMQ:十年再出发,到底有何不同?
    背景作为一种实时数据的处理平台,消息系统的发展跟业务架构的变迁一直息息相关,那么我们可以透过业务架构的变化来看消息系统的发展历程和未来趋势。经过十多年的发展,Rocket......
  • RocketMQ:RocketMQ常见面试题整理
    RocketMQ常见面试题整理MQ优缺点:优点:异步;解耦;削峰。RocketMQ默认端口号:9876。RocketMQ三大功能:缺点:系统可用性降低;系统复杂性提高;存在消息(数据)一致性问题。消息可靠......