首页 > 其他分享 >Apache Rocket MQ 组件 (泛型应用)

Apache Rocket MQ 组件 (泛型应用)

时间:2023-08-03 19:58:28浏览次数:49  
标签:SAYHELLO String Rocket busi MQ env 泛型 import public

 一、实现

 

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.alibaba.fastjson.JSONObject;

import lombok.CustomLog;

/**
 * @author witas
 *
 */
@CustomLog
public class ApacheRocketMqManager {
    
    //思路:class定义业务对象,实现特定接口
    // 组名利用对象class名
    // 消费时仅需实现listener
    // 生产直接发送类对象 
    // 消费对象中通用的消费次数等信息 

    private static final Map<String, DefaultMQPushConsumer> CONSUMER_MAP = new ConcurrentHashMap<>();
    private static final Map<String, DefaultMQProducer> PRODUCER_MAP = new ConcurrentHashMap<>();

    private static void bindingConsumerBusi(String busi, String namesrvAddr,
            MessageListenerConcurrently listener) {
        if(busi != null) {
            busi = busi.toUpperCase();
        }
        if(CONSUMER_MAP.containsKey(busi)) {
            return;
        }
        if (busi == null || busi.trim().length() < busi.length()) {
            throw new IllegalArgumentException("业务标识[" + busi + "]格式错误");
        }
        if (StringUtils.isBlank(namesrvAddr) || namesrvAddr.split(":").length != 2 || namesrvAddr.contains("/")) {
            throw new IllegalArgumentException("namesrvAddr[" + busi + "]格式错误,必须是<域名/IP>:<端口>,例如 localhost:9876");
        }
        String env = System.getProperty("env");

        if (StringUtils.isBlank(env)) {
            env = "DEFAULT";
        } else {
            env = env.toUpperCase();
        }
        String consumerGroup = env + "_CONSUMER_GROUP_" + busi;
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setMaxReconsumeTimes(3);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.registerMessageListener(listener);
        String topic = env + "_TOPIC_" + busi;
        try {
            consumer.subscribe(topic, env + "_TAG_" + busi);
            consumer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("",e);
        }
        log.info("主题[{}]消费者[{}]已注册", topic, consumerGroup);
        CONSUMER_MAP.put(busi, consumer);
    }

    private static void bindingProducerBusi(String busi, String namesrvAddr) {
        if(busi != null) {
            busi = busi.toUpperCase();
        }
        if(PRODUCER_MAP.containsKey(busi)) {
            return;
        }
        if (busi == null || busi.trim().length() < busi.length()) {
            throw new IllegalArgumentException("业务标识[" + busi + "]格式错误");
        }
        if (StringUtils.isBlank(namesrvAddr) || namesrvAddr.split(":").length != 2 || namesrvAddr.contains("/")) {
            throw new IllegalArgumentException("namesrvAddr[" + busi + "]格式错误,必须是<域名/IP>:<端口>,例如 localhost:9876");
        }
        String env = System.getProperty("env");

        if (StringUtils.isBlank(env)) {
            env = "DEFAULT";
        } else {
            env = env.toUpperCase();
        }
        String producerGroup = env + "_PRODUCER_GROUP_" + busi;
        String topic = env + "_TOPIC_" + busi;
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("",e);
        }
        log.info("主题[{}]生产者[{}]已启动", topic, producerGroup);
        PRODUCER_MAP.put(busi, producer);
    }

    private static SendResult send(String busi, Object obj) {
        DefaultMQProducer defaultMQProducer = PRODUCER_MAP.get(busi);
        if(defaultMQProducer == null) {
            log.info("发送失败,无生产者[{}]", busi);
            return null;
        }
        String env = System.getProperty("env");

        if (StringUtils.isBlank(env)) {
            env = "DEFAULT";
        } else {
            env = env.toUpperCase();
        }

        SendResult sendResult = null;
        try {
            sendResult = defaultMQProducer.send(new Message(env + "_TOPIC_" + busi, env + "_TAG_" + busi, JSONObject.toJSONBytes(obj)));
        } catch (Exception e) {
            e.printStackTrace();
        }

        return sendResult;
    }

    
    public static void clean() {
        for (DefaultMQPushConsumer defaultMQPushConsumer : CONSUMER_MAP.values()) {
            defaultMQPushConsumer.shutdown();
        }
        for (DefaultMQProducer defaultMQProducer : PRODUCER_MAP.values()) {
            defaultMQProducer.shutdown();
        }
    }
    
    
    public static <P extends BusiProtocol> void bindingConsumerBusi(Class<P> protocolClazz, String namesrvAddr, BusiHandler<P> handler) {
        
        if(protocolClazz == null || handler == null || namesrvAddr == null) {
            log.info("Binding consumer failed.Illegal arguments[{}],[{}],[{}]",protocolClazz, handler, namesrvAddr);
            return;
        }
        bindingConsumerBusi(protocolClazz.getSimpleName(), namesrvAddr, new BusiListener<P>(protocolClazz, handler) {
            @Override
            public boolean doBusi(P p, BusiHandler<P> handler, MessageExt msg) {
                log.info("<<<< 来消息了 {}", msg.getMsgId());
                boolean result = handler.handle(p, msg);
                log.info("消息处理" + (result?"成功":"失败"));
                return result;
            }
        });
    }
    
    public static void bindingProducerBusi(Class<? extends BusiProtocol> clazz, String namesrvAddr){
        bindingProducerBusi(clazz.getSimpleName(), namesrvAddr);
    }
    
    public static <T> SendResult send(T t) {
        return send(t.getClass().getSimpleName().toUpperCase(),t);
    }
}

 

 

import java.nio.charset.Charset;
import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import com.alibaba.fastjson.JSONObject;

public abstract class BusiListener<P extends BusiProtocol> implements MessageListenerConcurrently {

    private Class<P> protocolClazz;
    private BusiHandler<P> handler;

    /**
     * @param protocolClazz 协议类
     * @param handlerClazz 消息处理类 
     */
    public BusiListener(Class<P> protocolClazz, BusiHandler<P> handler) {
        this.protocolClazz = protocolClazz;
        this.handler = handler;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        MessageExt msg = msgs.get(0);
        P p = JSONObject.parseObject(new String(msg.getBody(), Charset.forName("utf-8")), protocolClazz);
        p.setRetry(msg.getReconsumeTimes());
        p.setMsgId(msg.getMsgId());
        p.setPercent(msg.getQueueOffset() + "/" + msg.getStoreSize());
        return doBusi(p, handler, msg) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                : ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    /**
     * @param p
     * @param handlerClazz 可通过类型寻找spring托管的bean 
     * @param msg
     * @return
     */
    public abstract boolean doBusi(P p, BusiHandler<P> handler, MessageExt msg);

    public Class<? extends BusiProtocol> getProtocolClazz() {
        return protocolClazz;
    }

    public BusiHandler<P> getHandler() {
        return handler;
    }

}

 

import org.apache.rocketmq.common.message.MessageExt;

public interface BusiHandler<P extends BusiProtocol> {

    boolean handle(P p, MessageExt messageExt);
}

 

public class BusiProtocol {

    private int retry;
    private String msgId;
    private String percent;

    public int getRetry() {
        return retry;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }

    public String getMsgId() {
        return msgId;
    }

    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }

    public String getPercent() {
        return percent;
    }

    public void setPercent(String percent) {
        this.percent = percent;
    }
    
}

 

使用:

第一步:定义协议对象

public class SAYHELLO extends BusiProtocol{

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
    
}

 

第二步:

    @Test
    public void test() {

        ApacheRocketMqManager.bindingConsumerBusi(SAYHELLO.class, "192.168.200.214:9876", (p, msg) -> {
            System.out.println(JSONObject.toJSONString(p));
            return true;
        });
        ApacheRocketMqManager.bindingProducerBusi(SAYHELLO.class, "192.168.200.214:9876");

        SAYHELLO t = new SAYHELLO();
        t.setName("张三1");
        ApacheRocketMqManager.send(t);

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

备注:需要自建rocket mq,可以自动创建 topic ,一个协议对象代表一个业务。

具体使用时只需在项目中配置好生产和消费,然后直接发送对象即可

环境区分需要项目启动时传入env参数 

 

 

 

二、该组件初衷

  • 项目中重复而繁琐的初始化消费者和生产者
  • Apollo重复而繁杂的topic维护,命名无规则且容易出错
  • Apollo 各个环境topic需要单独维护
  以上现状令人头疼,为了扭转当前局面,封装了该组件  

三、设计思路

  • 保持业务独立。一个业务就是一组特定的业务数据触发特定的动作,业务绑定生产者组和消费者组,业务数据需要有业务发起者(生产者)发起,有业务处理者处理(消费者)。不同的业务是需要隔离。
  • 生产者到消费者的消息通过协议约束。协议是为了约束生产者和消费者,保证各个业务的处理数据不会错乱,并绑定到特定的topic
  • 自动维护Topic。受限制于RocketMQ生产和消费一致性问题,引入协议后不再需要手动维护topic,包括各个环境
 

四、使用说明

第一步:定义协议对象

一个普通的pojo,需要继承 cn.xs.ambi.mgt.apachermq.BusiProtocol ,因为协议对象可能跨项目使用,建议统一放在 qishi-rocketmq 项目 举例:
public class SAYHELLO extends BusiProtocol{

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
    
}

 

  这个协议中只有一个业务数据name  

第二步:配置生产者和消费者

@Configuration
public class MqConfig implements InitializingBean {

    @Value("${rocketmq.namesrv-addr}")
    private String MQ_ADDRESS;
    @Autowired
    ApplicationContext ac;

    @Override
    public void afterPropertiesSet() throws Exception {
        //工单完结发送短信
        ApacheRocketMqManager.bindingProducerBusi(WoFinishSmsDto.class, this.MQ_ADDRESS);
        ApacheRocketMqManager.bindingConsumerBusi(WoFinishSmsDto.class, this.MQ_ADDRESS, ac.getBean(FinishWoSmsHandler.class));
        
        // say hello
        ApacheRocketMqManager.bindingProducerBusi(SAYHELLO.class, this.MQ_ADDRESS);
        ApacheRocketMqManager.bindingConsumerBusi(SAYHELLO.class, this.MQ_ADDRESS, (p,ex)->{
            System.out.println(JSONObject.toJSONString(p));
            return true;
        });
    }
}

 

上面示例配置了两个协议,最后一个是SAYHELLO  

第三步:实现消费者handler

需要实现 cn.xs.ambi.mgt.apachermq.BusiHandler<P> 接口,实现handle 方法,返回true代表消费成功,返回false代表需要重试  

第四步:发消息

SAYHELLO t = new SAYHELLO();
t.setName("张三1");
SendResult result = ApacheRocketMqManager.send(t);
System.out.println(result);

 

发消息非常简单,只需要封装业务数据,然后调用 ApacheRocketMqManager.send(T) 可以通过返回的 org.apache.rocketmq.client.producer.SendResult 判断是否发送成功,该返回可能是null  

五、测试

@Before
    public void init() {
        ApacheRocketMqManager.bindingConsumerBusi(SAYHELLO.class, "192.168.200.214:9876", (p, ex) -> {
            System.out.println(JSONObject.toJSONString(p));
            return true;
        });
        ApacheRocketMqManager.bindingProducerBusi(SAYHELLO.class, "192.168.200.214:9876");
    }

    @Test
    public void test() {
        SAYHELLO t = new SAYHELLO();
        t.setName("张三1");
        SendResult result = ApacheRocketMqManager.send(t);
        System.out.println(result);
        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

结果:

15:28:35.708 [main] INFO ApacheRocketMqManager - 主题[DEFAULT_TOPIC_SAYHELLO]消费者[DEFAULT_CONSUMER_GROUP_SAYHELLO]已注册
15:28:36.542 [main] INFO ApacheRocketMqManager - 主题[DEFAULT_TOPIC_SAYHELLO]生产者[DEFAULT_PRODUCER_GROUP_SAYHELLO]已启动

SendResult [sendStatus=SEND_OK, msgId=7F00000146E836BAF30C7EEACC4A0000, offsetMsgId=C0A8C8D600002A9F00000000EADAEE73, messageQueue=MessageQueue [topic=DEFAULT_TOPIC_SAYHELLO, brokerName=xx-pubsrv-rocketmq, queueId=0], queueOffset=7]

15:28:37.093 [ConsumeMessageThread_DEFAULT_CONSUMER_GROUP_SAYHELLO_1] INFO ApacheRocketMqManager - <<<< 来消息了 7F00000146E836BAF30C7EEACC4A0000
{"msgId":"7F00000146E836BAF30C7EEACC4A0000","name":"张三1","percent":"7/231","retry":0}
15:28:37.093 [ConsumeMessageThread_DEFAULT_CONSUMER_GROUP_SAYHELLO_1] INFO ApacheRocketMqManager - 消息处理成功

 

       

标签:SAYHELLO,String,Rocket,busi,MQ,env,泛型,import,public
From: https://www.cnblogs.com/zno2/p/16924948.html

相关文章

  • 64位 CentOS 6.2 安装erlang及rabbitmq Server
    主题 RabbitMQErlangCentosCentOS6.264bit安装erlang及RabbitMQServer1、操作系统环境(CentOS6.264bit) [root@leekwen~]#cat/etc/issueCentOSrelease6.2(Final)Kernel\ronan\m[root@leekwen~]#cat/proc/cpuinfo|grep"clflushsize"c......
  • rabbitmq安装及异常处理
        官网:     https://www.rabbitmq.com/install-standalone-mac.html     安装erlang语言环境安装依赖文件yuminstallncurses-develyum-yinstallopensslyum-yinstallsslyum-yinstallxmltoyum-yinstallpython-simplejsonyum-yinstallpyth......
  • MQTT:轻量级消息传输协议在物联网中的应用
    随着物联网技术的发展,越来越多的设备需要进行实时通信和数据交换。在这样的背景下,MQTT(MessageQueuingTelemetryTransport)作为一种轻量级的消息传输协议,逐渐成为物联网领域的热门选择。本文将介绍MQTT协议的基本概念、特点以及在物联网中的应用,同时通过代码实例演示如何使用MQTT......
  • python使用mqtt
    一、安装mqtt服务器安装对应的软件:https://www.emqx.io/zh/downloads推荐使用docker安装默认账号和密码:admin、public 二、编写代码消息发布程序importtimeimportjsonimportpsutilimportrandomfrompaho.mqttimportclientasmqtt_clientbroker='127.0.0.1......
  • linux mqtt 安装配置
    安装sudoaptinstallmosquitto配置密码用户sudomkdir-p/etc/mosquitto/configsudotouch/etc/mosquitto/config/pwfile.confsudomosquitto_passwd-b/etc/mosquitto/config/pwfile.confqq123456配置文件qtimes@AIBox-01-01-m:~$cat/etc/mosquitto/mosquitto.......
  • Kafka - Kafka v.s. NATS v.s. RabbitMQ
     Kafkav.s.RabbitMQ 优先选择Kafka的条件·严格的消息顺序·延长消息留存时间,包括过去消息重放的可能·传统解决方案无法满足的高伸缩能力 优先选择RabbitMQ的条件·高级灵活的路由规则·消息时序控制(控制消息过期或消息延迟)·高级的容错处理能力,在消费者更......
  • 语义检索系统之排序模块:基于ERNIE-Gram的Pair-wise和基于RocketQA的CrossEncoder训练
    语义检索系统之排序模块:基于ERNIE-Gram的Pair-wise和基于RocketQA的CrossEncoder训练的单塔模型文本匹配任务数据每一个样本通常由两个文本组成(query,title)。类别形式为0或1,0表示query与title不匹配;1表示匹配。基于单塔Point-wise范式的语义匹配模型ernie_matchi......
  • rabbitmq安装
     按照官网步骤安装1.执行成功sudoapt-getinstallcurlgnupgapt-transport-https-y2.下面三个都执行失败##TeamRabbitMQ'smainsigningkeycurl-1sLf"https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA"|sudogpg-......
  • MQTT 订阅者(异步)代码解读
    一、问题引入官方给出了MQTTClient的异步订阅的例子,对于消息的订阅就无需讲究什么同步了。二、解决过程2-1MQTT订阅者程序流程第一步:创建客户端LIBMQTT_APIintMQTTAsync_create(MQTTAsync*handle,constchar*serverURI,constchar*clientId, intpersistence_t......
  • 安卓项目编译RocketX集成流程
    依赖gradle插件//appmodule的build.gradle加入applyplugin:'com.rocketx'//注:保证写入依赖关系在最后一行 //在根目录的build.gradle加入buildscript{dependencies{classpath'io.github.trycatchx:rocketx:1.1.......