首页 > 其他分享 >自定义注解,mq消费

自定义注解,mq消费

时间:2023-06-08 17:33:08浏览次数:27  
标签:jmqClient 自定义 Object bean mq import 注解 consumer method

1.解析注解方式的mq消费者

2.注解的定义

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface JmqListener {

    String id() default "";

    String[] topics() default  {};
}

3.注解的使用

@Component
public class MqConsumer {

@JmqListener(id = "alarmRecordsConsumer", topics = {"${alarmRecordsConsumer.receive.alarm.topic}"})
public void onMessage(List<Message> messages)  {}
}

4.注解如何生效的

4.1 定义了一个实现了BeanPostProcessor接口的实现类JmqListenerRegister

package com.xx.jmq.client.springboot.annotation;

import com.xx.jmq.client.consumer.Consumer;
import com.xx.jmq.client.consumer.MessageListener;
import com.xx.jmq.client.springboot.configuration.JmqClient;
import com.xx.jmq.client.springboot.configuration.JmqConfigurationProperties;
import com.xx.jmq.common.message.Message;
import com.xx.jmq.client.springboot.adaptor.MessageListenerReflectAdapter;
import com.xx.jmq.client.springboot.configuration.policy.JmqConsumerPolicy;
import com.xx.jmq.client.springboot.support.ConsumerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.EmbeddedValueResolverAware;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.StringValueResolver;

import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Objects;

public class JmqListenerRegister implements BeanPostProcessor, EmbeddedValueResolverAware, ApplicationContextAware {

    private static final Logger logger = LoggerFactory.getLogger(JmqListenerRegister.class);

    private final JmqConfigurationProperties jmqConfigurationProperties;

    private GenericApplicationContext applicationContext;

    private StringValueResolver stringValueResolver;

    public JmqListenerRegister(JmqConfigurationProperties jmqConfigurationProperties) {
        this.jmqConfigurationProperties = jmqConfigurationProperties;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Class<?> beanClass = AopUtils.getTargetClass(bean);
        for (Method method : beanClass.getDeclaredMethods()) {
            JmqListener jmqListenerAnnotation = AnnotationUtils.findAnnotation(method, JmqListener.class);
            if (jmqListenerAnnotation == null) {
                continue;
            }
            MessageListener messageListener = getMethodMessageListener(bean, method);
            JmqClient<JmqConsumerPolicy> jmqClient = jmqConfigurationProperties.getConsumers().get(jmqListenerAnnotation.id());
            if (jmqClient == null || !jmqClient.getEnabled()) {
                continue;
            }
            if (jmqClient.getPolicy() == null) {
                if (jmqConfigurationProperties.getGlobalConsumerPolicy() != null) {
                    jmqClient.setPolicy(jmqConfigurationProperties.getGlobalConsumerPolicy());
                } else {
                    jmqClient.setPolicy(new JmqConsumerPolicy());
                }
            }
            registerListener(jmqListenerAnnotation, jmqClient, messageListener);
        }
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
        return bean;
    }

    protected MessageListener getMethodMessageListener(Object bean, Method method) {
        Class<?>[] parameterTypes = method.getParameterTypes();
        Type[] genericParameterTypes = method.getGenericParameterTypes();

        boolean isListener = (parameterTypes.length == 1 &&
                genericParameterTypes[0].getTypeName().equals(String.format("java.util.List<%s>", Message.class.getName())));

        if (!isListener) {
            throw new IllegalArgumentException("listener parameters error, need MessageListener.onMessage");
        }
        return new MessageListenerReflectAdapter(bean, method);
    }

    protected void registerListener(JmqListener jmqListener, JmqClient<JmqConsumerPolicy> jmqClient, MessageListener messageListener) {
        ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
        if (!applicationContext.containsBean(jmqListener.id())) {
            ConsumerContainer consumerContainer = new ConsumerContainer(jmqClient);
            Object wrapperBean = beanFactory.initializeBean(consumerContainer, "consumerContainer." + jmqListener.id());
            beanFactory.registerSingleton(jmqListener.id(), Objects.requireNonNull(wrapperBean));
            logger.info("Register jmq consumer: {}", jmqClient.getId());
            Consumer consumer = consumerContainer.getObject();
            Assert.notNull(consumer, "consumer can not be null");
            for (String topic : jmqListener.topics()) {
                consumer.subscribe(stringValueResolver.resolveStringValue(topic), messageListener);
            }
        } else {
            Consumer consumer = applicationContext.getBean(jmqListener.id(), Consumer.class);
            for (String topic : jmqListener.topics()) {
                consumer.subscribe(stringValueResolver.resolveStringValue(topic), messageListener);
            }
        }
    }

    @Override
    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (GenericApplicationContext) applicationContext;
    }

    @Override
    public void setEmbeddedValueResolver(@NonNull StringValueResolver stringValueResolver) {
        this.stringValueResolver = stringValueResolver;
    }
}

4.2 BeanPostProcessor接口有什么作用,为什么要定义类来实现它

BeanPostProcessor是Spring IOC容器给我们提供的一个扩展接口。,他的作用主要是如果我们需要在Spring 容器完成 Bean 的实例化、配置和其他的初始化前后添加一些自己的逻辑处理,我们就可以定义一个或者多个 BeanPostProcessor 接口的实现,然后注册到容器中。
这样MqConsumer类在被spring加载并实例化的时候,会自动执行了JmqListenerRegister中的postProcessBeforeInitialization方法

4.3 Sring加载bean的时候会自动找到实现了BeanPostProcessor接口的实现类,并执行接口里面定义的方法

@Override
   public Object applyBeanPostProcessorsBeforeInitialization(Object existingBean, String beanName)
   		throws BeansException {

   	Object result = existingBean;
   	for (BeanPostProcessor processor : getBeanPostProcessors()) {
   		Object current = processor.postProcessBeforeInitialization(result, beanName);
   		if (current == null) {
   			return result;
   		}
   		result = current;
   	}
   	return result;
   }

4.4 JmqListenerRegister的postProcessBeforeInitialization方法,实例初始化之前执行

@Override
   public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
       Class<?> beanClass = AopUtils.getTargetClass(bean); // 获取当时实例化的bean对象的类
       for (Method method : beanClass.getDeclaredMethods()) {
   	    .// 判断当前方法是否带JmqListener的注解
           JmqListener jmqListenerAnnotation = AnnotationUtils.findAnnotation(method, JmqListener.class); 
           if (jmqListenerAnnotation == null) {
               continue;
           }
   		// 创建listener
           MessageListener messageListener = getMethodMessageListener(bean, method);
   		// 创建jmqClient
           JmqClient<JmqConsumerPolicy> jmqClient = jmqConfigurationProperties.getConsumers().get(jmqListenerAnnotation.id());
           if (jmqClient == null || !jmqClient.getEnabled()) {
               continue;
           }
           if (jmqClient.getPolicy() == null) {
               if (jmqConfigurationProperties.getGlobalConsumerPolicy() != null) {
                   jmqClient.setPolicy(jmqConfigurationProperties.getGlobalConsumerPolicy());
               } else {
                   jmqClient.setPolicy(new JmqConsumerPolicy());
               }
           }
           registerListener(jmqListenerAnnotation, jmqClient, messageListener);
       }
       return bean;
   }
   
   // 注册listener
protected void registerListener(JmqListener jmqListener, JmqClient<JmqConsumerPolicy> jmqClient, MessageListener messageListener) {
       ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
       if (!applicationContext.containsBean(jmqListener.id())) {
           ConsumerContainer consumerContainer = new ConsumerContainer(jmqClient);
           Object wrapperBean = beanFactory.initializeBean(consumerContainer, "consumerContainer." + jmqListener.id());
           beanFactory.registerSingleton(jmqListener.id(), Objects.requireNonNull(wrapperBean));
           logger.info("Register jmq consumer: {}", jmqClient.getId());
           Consumer consumer = consumerContainer.getObject();
           Assert.notNull(consumer, "consumer can not be null");
           for (String topic : jmqListener.topics()) {
               consumer.subscribe(stringValueResolver.resolveStringValue(topic), messageListener);
           }
       } else {
           Consumer consumer = applicationContext.getBean(jmqListener.id(), Consumer.class);
           for (String topic : jmqListener.topics()) {
   		   // consumer消费,绑定listener,listener里又绑定了method,method反射invoke去执行
               consumer.subscribe(stringValueResolver.resolveStringValue(topic), messageListener);
           }
       }
   }
   
protected MessageListener getMethodMessageListener(Object bean, Method method) {
       Class<?>[] parameterTypes = method.getParameterTypes();
       Type[] genericParameterTypes = method.getGenericParameterTypes();

       boolean isListener = (parameterTypes.length == 1 &&
               genericParameterTypes[0].getTypeName().equals(String.format("java.util.List<%s>", Message.class.getName())));

       if (!isListener) {
           throw new IllegalArgumentException("listener parameters error, need MessageListener.onMessage");
       }
       return new MessageListenerReflectAdapter(bean, method);
   }


MessageListenerReflectAdapter的定义,还用到了适配器模式

package com.jd.jmq.client.springboot.adaptor;

import com.jd.jmq.client.consumer.MessageListener;
import com.jd.jmq.common.message.Message;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;

public class MessageListenerReflectAdapter implements MessageListener {

    private final Method method;
    private final Object instance;

    public MessageListenerReflectAdapter(Object instance, Method method) {
        this.instance = instance;
        this.method = method;
        this.method.setAccessible(true);
    }

    @Override
    public void onMessage(List<Message> messages) {
        try {
            method.invoke(instance, messages);
        } catch (InvocationTargetException e) {
            if (e.getMessage() == null) {
                throw new RuntimeException(e.getCause());
            } else {
                throw new RuntimeException(e);
            }
        } catch (Exception e) {
            throw new RuntimeException("consume error", e);
        }
    }
}

4.5 JmqListenerRegister的postProcessAfterInitialization方法,实例初始化之前执行

 @Override
   public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
       return bean;
   }

标签:jmqClient,自定义,Object,bean,mq,import,注解,consumer,method
From: https://www.cnblogs.com/PythonOrg/p/17467098.html

相关文章

  • Docker(八):容器互联之自定义网络
    在Docker中每一个容器都是一个独立的个体,相互之间不能进行信息的传输。这里学习一种让容器可以相互联系的方法——自定义网络。一、创建自定义网络命令:dockernetworkcreate--driver网络模式--subnet子网地址--gateway网关网络名称测试:--driverbridge 这里是以创建的......
  • JQ插件:nicescroll自定义滚动条
    参考:http://www.areaaperta.com/nicescroll/该插件支持移动设备;可上下、左右拖拉滚动;看图,上面的分类菜单就是。导入:<scripttype="text/javascript"src="js/jquery.nicescroll.min.js"></script>定义需要滚动的区域,#boxscroll是vieport,#categorys是c......
  • RocketMQ消费暂停问题分析
    一、背景客经使用rocketMq批量推送数据到pcr执行次贷策略引擎和互斥决策引擎,pcr将决策结果推送到前置路由。二、问题现象描述在客经推数据时,pcr-updateBorrowState消息积压越来越多,从日志上看,pcr不拉取消息,重启服务器后可以消费消息,过一会又消费变慢,不断重启才让所有消息消费......
  • 小治同学的JAVAWEB学习笔记-Junit&反射&注解
    Junit单元测试Junit使用:白盒测试 步骤 1.定义一个测试类(测试用类) 建议: 测试类名:北侧是的类+Test 包名:XXX.XXX.XX.Test 2.定义测试方法:可以独立运行 建议: 方法名:test测试的方法名 返回值void 参数列表空参 3.给方法加@Test 判定结果 1.红......
  • 注解
    一.异步注解  @Async注意点1.启用该注解时需要在配置类或者启动类中添加 @EnableAsync2.@Async不能在同一个类中,应该新建立一个组件类3.注解打在方法上作用域为该方法、打在类上时作用域为该类下所有的方法@Component@AsyncpublicclassAsyncDemo{publicvo......
  • Qt之MQTT编译(一)
    一、MQTT简介MQTT(MessageQueuingTelemetryTransport)是一种轻量级的、发布-订阅模式的消息传输协议。它最初是为低带宽和不稳定网络环境设计的,以支持物联网(IoT)设备之间的高效通信。MQTT的工作方式基于发布-订阅模型,其中包含两个角色:发布者(Publisher)和订阅者(Subscriber)。发......
  • 直播小程序源码,自定义支持360度旋转的View
    直播小程序源码,自定义支持360度旋转的View自定义Touch360ImageView的代码如下: packagecom.example.myapplication;importandroid.content.Context;importandroid.content.res.TypedArray;importandroid.graphics.drawable.LevelListDrawable;importandroid.util.Attribut......
  • 视频直播网站源码,自定义气泡效果(BubbleView)
    视频直播网站源码,自定义气泡效果(BubbleView)代码如下: packagecom.example.myapplication;importandroid.content.Context;importandroid.graphics.BlurMaskFilter;importandroid.graphics.Canvas;importandroid.graphics.Color;importandroid.graphics.Paint;importandr......
  • jackson annotations注解详解(转)
    官方WIKI:https://github.com/FasterXML/jackson-databind/wikijackson1.x和2.x版本的注解是放置在不同的包下的1.x是在jacksoncorejar包org.codehaus.jackson.annotate下2.x是在jackson-databind包com.fasterxml.jackson.annotation下jackson的自动检测机制jackson允许使用任意......
  • 使用ImportBeanDefinitionRegistrar处理自定义注解将类注册到容器中
    START两个自定义注解:@Documented@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Import(LogRegistry.class)public@interfaceEnableLog{StringbasePackage()default"";}该注解的作用是扫描指定的basePackage目录中使用了@Log注解的类,并将这......