首页 > 其他分享 >分布式日志追踪ID实战

分布式日志追踪ID实战

时间:2025-01-20 10:55:51浏览次数:1  
标签:return log MDC param String 日志 logTraceId ID 分布式

作者:京东物流 张小龙

本文通过介绍分布式应用下各个场景的全局日志ID透传思路,以及介绍分布式日志追踪ID简单实现原理和实战效果,从而达到通过提高日志查询排查问题的效率。

背景

开发排查系统问题用得最多的手段就是查看系统日志,相信不少人都值过班当过小秘吧:给下接口和出入参吧,麻烦看看日志里的有没有异常信息啊等等,但是在并发大时使用日志定位问题还是比较麻烦,由于大量的其他用户/其他线程的日志也一起输出穿行其中导致很难筛选出指定请求的全部相关日志,以及下游线程/服务对应的日志,甚至一些特殊场景的出入参只打印了一些诸如gis坐标、四级地址等没有单据信息的日志,使得日志定位起来非常不便

场景分析

自己所在组负责的系统主要是web应用,其中涉及到的请求方式主要有:springmvc的servlet的http场景、jsf场景、MQ场景、resteasy场景、clover场景、easyjob场景,每一种场景都需要不同的方式进行logTraceId的透传,接下来逐个探析上述各个场景的透传方案。

在这之前我们先要简单了解一下日志中透传和打印logTraceId的方式,一般我们使用MDC进行logTraceId的透传与打印,但是基于MDC内部使用的是ThreadLocal所以只有本线程才有效,子线程服务的MDC里的值会丢失,所以这里我们要么是在所有涉及到父子线程的地方以编码侵入式自行实现值的传递,要么就是通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题,而本文采用的是比较粗糙地以编码侵入式来解决此问题。

springmvc的servlet的http场景

这个场景相信大家都已经烂熟到骨子里了,主要思路是通过拦截器的方式进行logTraceId的透传,新建一个类实现HandlerInterceptor

preHandle:在业务处理器处理请求之前被调用,这里实现logTraceId的设置与透传

postHandle:在业务处理器处理请求执行完成后,生成视图之前执行,这里空实现就好

afterCompletion:在DispatcherServlet完全处理完请求后被调用,这里用于清除MDC的logTraceId


@Slf4j
public class TraceInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
        try{
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId());
            }
        }catch (RuntimeException e){
            log.error("mvc自定义log跟踪拦截器执行异常",e);
        }
        return true;
    }

    @Override
    public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
        try{
            MDC.clear();
        }catch (RuntimeException ex){
            log.error("mvc自定义log跟踪拦截器执行异常",ex);
        }
    }
}

jsf场景

相信大家对于jsf并不陌生,而jsf也支持自定义filter,基于jsf过滤器的运行方式(如下图),可以通过配置全局过滤器(继承AbstractFilter)的方式进行logTraceId的透传,需要注意的是jsf是在线程池中执行的所以一定要信任消息体中的logTraceId

 

 

jsf消费者过滤器:主要从上下文环境中获取logTraceId并进行透传,实现代码如下


@Slf4j
public class TraceIdGlobalJsfFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //设置traceId
        setAndGetTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
        }
    }

    /**
     * 设置并返回traceId
     * @param requestMessage
     * @return
     */
    private void setAndGetTraceId(RequestMessage requestMessage) {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都没有获取到则说明有遗漏,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC没有,filter有,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,MDC没有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter没有,说明是源头已经有了,但是jsf是第一次调,透传
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId);
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){
                //MDC和fitler都有,但是并不相等,则存在问题打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
        }catch (RuntimeException e){
            log.error("jsf消费者自定义log跟踪拦截器执行异常",e);
        }
    }
}

jsf提供者过滤器:通过拿到消费者在消息体中透传的logTraceId来实现,实现代码如下


@Slf4j
public class TraceIdGlobalJsfProducerFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //设置traceId
        boolean isNeedClearMdc = transferTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
            if(isNeedClearMdc){
                clear();
            }
        }
    }
    /**
     * 设置并返回traceId
     * @param requestMessage
     * @return
     */
    private boolean transferTraceId(RequestMessage requestMessage) {
        boolean isNeedClearMdc = false;
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都没有获取到,说明存在遗漏场景或是提供给外部系统调用的接口,打印日志进行观察
                String traceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId);
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId);
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC没有,filter有,说明是被调用方,需要透传下去
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString());
                isNeedClearMdc = true;
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter没有,存在问题,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,MDC有filter没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){
                //MDC和fitler都有,但是并不相等,则信任filter透传结果
                TraceUtils.resetTraceId(logTraceIdObj.toString());
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
            return isNeedClearMdc;
        }catch (RuntimeException e){
            log.error("jsf生产者自定义log跟踪拦截器执行异常",e);
            return false;
        }
    }

    /**
     * 清除MDC
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jsf生产者自定义log跟踪拦截器执行异常",e);
        }
    }
}

MQ场景

说到MQ相信大家对于此就更不陌生了,此种场景主要通过在提供者发送消息时拿到上下文中的logTraceId,将其以扩展信息的方式设置进消息体中进行透传,而消费者则从消息体中进行获取

生产者:新建一个抽象类继承MessageProducer,覆写父类中的两个send方法(批量发送、单条发送),send方法中主要调用抽象加工消息体的方法(logTraceId属性赋值)和日志打印,在子类中进行发送前对消息体的加工处理,具体代码如下


@Slf4j
public abstract class BaseTraceIdProducer extends MessageProducer {

    private static final String SEPARATOR_COMMA = ",";

    public BaseTraceIdProducer() {
    }

    public BaseTraceIdProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 获取消息体-单个
     * @param messageContext
     * @return
     */
    protected abstract Message getMessage(MessageContext messageContext);

    /** 获取消息体-批量
     *
     * @param messageContext
     * @return
     */
    protected abstract List<Message> getMessages(MessageContext messageContext);

    /**
     * 填充消息体上下文信息
     * @param message
     * @param messageContext
     */
    protected void fillContext(Message message,MessageContext messageContext) {
        if(message == null){
            return;
        }
        if(StringUtils.isBlank(messageContext.getLogTraceId())){
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            messageContext.setLogTraceId(logTraceId);
        }
        if(StringUtils.isBlank(messageContext.getTopic())){
            String topic = message.getTopic();
            messageContext.setTopic(topic);
        }
        String businessId = message.getBusinessId();
        messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId);
    }

    /**
     * traceId嵌入消息体中
     * @param message
     */
    protected void generateTraceIdIntoMessage(Message message){
        if(message == null){
            return;
        }
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
            }
            message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

    /**
     * 批量发送消息-无回调
     * @param messages
     * @param timeout
     * @throws JMQException
     */
    public void send(List<Message> messages, int timeout) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List<Message> messageList = this.getMessages(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        super.send(messageList, timeout);
    }

    /**
     * 单个发送消息
     * @param message
     * @param transaction
     * @param <T>
     * @return
     * @throws JMQException
     */
    public <T> T send(Message message, LocalTransaction<T> transaction) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessage(message);
        Message msg = this.getMessage(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        return super.send(msg, transaction);
    }

    /**
     * 批量发送消息-有回调
     * @param messages
     * @param timeout
     * @param callback
     * @throws JMQException
     */
    public void send(List<Message> messages, int timeout, AsyncSendCallback callback) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List<Message> messageList = this.getMessages(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        super.send(messageList, timeout, callback);
    }

    /**
     * 打印日志,方便排查问题
     * @param messageContext
     */
    private void printLog(MessageContext messageContext) {
        if(messageContext==null){
            return;
        }
        if(log.isInfoEnabled()){
            log.info("MQ发送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());
        }
    }

}

@Slf4j
public class TraceIdEnvMessageProducer extends BaseTraceIdProducer {

    private static final String UAT_TRUE = String.valueOf(true);
    private boolean uat = false;

    public TraceIdEnvMessageProducer() {
    }

    public TraceIdEnvMessageProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 环境变量打标-单个消息体
     * @param message
     */
    private void convertUatMessage(Message message) {
        if (message != null) {
            message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE);
        }
    }


    /**
     * 消息转换-批量消息体
     * @param messageContext
     * @return
     */
    private List<Message> convertMessages(MessageContext messageContext) {
        List<Message> messages = messageContext.getMessages();
        if (!CollectionUtils.isEmpty(messages)) {
            Iterator messageIterator = messages.iterator();
            while(messageIterator.hasNext()) {
                Message message = (Message)messageIterator.next();
                if(this.isUat()){
                    this.convertUatMessage(message);
                }
                super.generateTraceIdIntoMessage(message);
                super.fillContext(message,messageContext);
            }
        }
        return messageContext.getMessages();
    }

    /**
     * 消息转换-单个消息体
     * @param messageContext
     * @return
     */
    private Message convertMessage(MessageContext messageContext){
        Message message = messageContext.getMessage();
        if(this.isUat()){
            this.convertUatMessage(message);
        }
        super.generateTraceIdIntoMessage(message);
        super.fillContext(message,messageContext);
        return message;
    }

    protected Message getMessage(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessage(messageContext);
    }

    protected List<Message> getMessages(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessages(messageContext);
    }

    public void setUat(boolean uat) {
        this.uat = uat;
    }

    boolean isUat() {
        return this.uat;
    }

}

消费者:新建一个抽象类继承MessageListener,覆写父类中的onMessage方法,主要进行设置日志traceId和消费完成后的traceId清理等,而在子类中进行一些自定义处理,具体代码如下


@Slf4j
public abstract class BaseTraceIdMessageListener implements MessageListener {

    public BaseTraceIdMessageListener() {
    }

    public abstract void onMessageList(List<Message> messages) throws Exception;

    @Override
    public final void onMessage(List<Message> messages) throws Exception {
        try{
            if(CollectionUtils.isEmpty(messages)){
                return;
            }
            //设置日志traceId
            setLogTraceId(messages);
            this.onMessageList(messages);
            //消费完后清除traceId
            clear();
        }catch (Exception e){
            throw e;
        }finally {
            MDC.clear();
        }
    }

    /**
     * 设置日志traceId
     * @param messages
     */
    private void setLogTraceId(List<Message> messages) {
        try{
            Message message = messages.get(0);
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
            }
            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

    /**
     * 清除traceId
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

}

@Slf4j
public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{

    private String uat;

    public TraceIdEnvMessageListener() {
    }

    public abstract void onMessages(List<Message> var1) throws Exception;

    @Override
    public void onMessageList(List<Message> messages) throws Exception {
        Iterator iterator;
        Message message;
        if (this.getUat() != null && Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else {
            this.onMessages(messages);
        }
    }

    public void setUat(String uat) {
        if (!"true".equals(uat) && !"false".equals(uat)) {
            throw new IllegalArgumentException("uat 属性值只能为 true 或 false.");
        } else {
            this.uat = uat;
        }
    }

    public String getUat() {
        return this.uat;
    }
}

resteasy场景

此场景类似于spinrg-mvc场景,也是http请求,需要通过拦截器在消息头中进行logTraceId的透传,主要有客户端拦截器,服务端:预处理拦截器、后置拦截器,代码如下


@ClientInterceptor
@Provider
@Slf4j
public class ResteasyClientInterceptor implements ClientExecutionInterceptor {
    @Override
    public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            ClientRequest request = clientExecutionContext.getRequest();
            String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){
                //如果filter和MDC都没有获取到则说明是调用源头
                String traceId = TraceUtils.getTraceId();
                TraceUtils.resetTraceId(traceId);
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId);
            } else if(StringUtils.isBlank(headerTraceId)){
                //如果MDC有但是filter没有则需要传递
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy客户端log跟踪拦截器执行异常",e);
        }
        return clientExecutionContext.proceed();
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class RestEasyPreInterceptor implements PreProcessInterceptor {
    @Override
    public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException {
        try{
            MultivaluedMap<String, String> requestHeaders = request.getHttpHeaders().getRequestHeaders();
            String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isNotBlank(headerTraceId)){
                //如果filter则透传
                TraceUtils.resetTraceId(headerTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy服务端log跟踪前置拦截器执行异常",e);
        }
        return null;
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class ResteasyPostInterceptor implements PostProcessInterceptor {
    @Override
    public void postProcess(ServerResponse serverResponse) {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("resteasy服务端log跟踪后置拦截器执行异常",e);
        }
    }
}

clover场景

clover的大体机制主要是在项目启动的时候扫描到带有注解@HessianWebService的类进行服务注册并维持心跳检测,而clover端则通过servlet请求方式进行任务的回调,同时继承AbstractScheduleTaskProcess方式的任务是以线程池的方式进行业务的处理

基于上述原理我们需要解决两个问题:1.新建一个类继承ServiceExporterServlet,并在web.xml配置中进行servlet配置,代码如下;


@Slf4j
public class ServiceExporterTraceIdServlet extends ServiceExporterServlet {

    @Override
    public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
        try {
            String traceId = MDC.get("traceId");
            if (StringUtils.isBlank(traceId)) {
                MDC.put("traceId", TraceUtils.getTraceId());
            }
        } catch (Exception e) {
            log.error("clover请求servlet执行异常", e);
        }
        try {
            super.service(req, res);
        } catch (Throwable e) {
            log.error("clover请求servlet执行异常", e);
            throw e;
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("clover请求servlet执行异常",ex);
            }
        }
    }
}

2.新建一个抽象类继承AbstractScheduleTaskProcess,在类中以编码形式进行父子线程的透传(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),所有任务均改为继承此类,关键代码如下


try{
            traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                log.warn("clover自定义log跟踪拦截器预警,mdc没有traceId");
            }
        }catch (RuntimeException e){
            log.error("clover自定义log跟踪拦截器执行异常",e);
        }
        final String logTraceId = traceId;
        while(iterator.hasNext()) {
            final List<TcTask> list = (List<TcTask>)iterator.next();
            this.executor.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    try{
                        if (StringUtils.isNotBlank(logTraceId)) {
                            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId);
                        }
                    }catch (RuntimeException e){
                        log.error("clover自定义log跟踪拦截器执行异常",e);
                    }
                    Object var1;
                    try {
                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("正在执行任务[" + this.getClass().getName() + "],条数:" + list.size() + "...");
                        }


                        BaseTcTaskProcessWorker.this.executeTasks(list);

                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("执行任务[" + this.getClass().getName() + "],条数:" + list.size() + "成功!");
                        }

                        var1 = null;
                    } catch (Exception var5) {
                        BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5);
                        throw var5;
                    } finally {
                        try{
                            MDC.clear();
                        }catch (RuntimeException ex){
                            log.error("clover自定义log跟踪拦截器执行异常",ex);
                        }
                        latch.countDown();
                    }

                    return var1;
                }
            });
        }

easyjob场景

easyjob的大体机制是在项目启动的时候通过扫描实现接口Scheduler的类进行上报注册,同时启动一个acceptor(获取任务的线程池),而acceptor拉取到任务后会将父任务放进一个叫executor的线程池,子任务范进一个叫slowExecutor的线程池,我们可以新建一个抽奖类实现接口ScheduleFlowTask,复用clover场景硬编码方式进行父子线程logTraceId的透传处理(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),示例代码如下

 


@Slf4j
public abstract class AbstractEasyjobOnlyScheduleProcess<T> implements ScheduleFlowTask {

    /**
     * EASYJOB平台UMP监控key前缀
     */
    private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask.";

    /**
     * EASYJOB单个任务处理分布式锁前缀
     */
    private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_";

    /**
     * 环境标识-开关配置进行环境隔离
     */
    @Value("${spring.profiles.active}")
    private String activeEnv;

    @Value("${task.scene.mark}")
    private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc();

    /**
     * easyJob维度线程池变量
     */
    private ThreadPoolExecutor easyJobExecutor;
    /**
     * easyJob维度服务器个数-分片个数
     */
    private volatile int easyJobLastThreadCount = 0;

    /**
     * easyjob多线程名称
     */
    private static final String EASYJOB_THREAD_NAME = "dts.easyJobs";

    /**
     * 子类的泛型参数类型
     */
    private Class<T> argumentType;

    /**
     * 无参构造
     */
    public AbstractEasyjobOnlyScheduleProcess() {
        //设置子类泛型参数类型
        argumentType = this.getArgumentType();
    }

    @Autowired
    private RedisHelper redisHelper;

    /**
     * 非task表扫描待处理的任务数据
     * @param taskServerParam
     * @param curServer
     * @return
     */
    protected abstract List<T> loadTasks(TaskServerParam taskServerParam, int curServer);

    /**
     * 业务处理抽象方法-单个
     * @param task
     */
    protected abstract void doSingleTask(T task);

    /**
     * 业务处理抽象方法-批量
     * @param tasks
     */
    protected abstract void doBatchTasks(List<T> tasks);

    /**
     * 拼装ump监控key
     * @param prefix
     * @param taskNameKey
     * @return
     */
    private String getUmpKey(String prefix,String taskNameKey) {
        StringBuffer umpKeyBuf = new StringBuffer();
        umpKeyBuf.append(prefix).append(taskNameKey);
        return umpKeyBuf.toString();
    }

    /**
     * easyjob平台异步任务回调方法
     * @param scheduleContext
     * @return
     * @throws Exception
     */
    @Override
    public TaskResult doTask(ScheduleContext scheduleContext) throws Exception {
        String requestNo = TraceUtils.getTraceId();
        try {
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
            }
        } catch (Exception e) {
            log.error("easyjob执行异常", e);
        }
        EasyJobTaskServerParam taskServerParam = null;

        CallerInfo callerinfo = null;
        try {
            //条件转换
            taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext);
            String taskNameKey = getTaskNameKey();
            String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey);
            callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true);
            //多服务器,并且非子任务,本次不执行,提交子任务
            if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) {
                submitSubTask(scheduleContext, taskServerParam,requestNo);
                return TaskResult.success();
            }

            if (log.isInfoEnabled()) {
                log.info("请求编号[{}],开始获取任务,任务ID[{}],任务名称[{}],执行参数[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));
            }
            TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam);

            List<T> tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer());

            if (log.isInfoEnabled()) {
                log.info("请求编号[{}],获取任务ID[{}],任务名称[{}]共{}条", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());
            }

            if (CollectionUtils.isNotEmpty(tasks)) {
                if (log.isInfoEnabled()) {
                    log.info("请求编号[{}],开始执行任务,任务ID[{}],任务名称[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());
                }

                this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo);
                if (log.isInfoEnabled()) {
                    log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],执行数量[{}]完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());
                }

            }
            return TaskResult.success();
        } catch (Exception e) {
            Profiler.functionError(callerinfo);
            if (log.isInfoEnabled()) {
                log.error("请求编号[{}],任务执行失败,任务ID[{}],任务名称[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);
            }
            return TaskResult.fail(e.getMessage());
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("easyjob执行异常",ex);
            }
            Profiler.registerInfoEnd(callerinfo);
        }
    }

    /**
     * 多分片提交子任务
     * @param scheduleContext 调度任务上下文参数
     * @param taskServerParam 调度任务参数
     * @param requestNo 调度任务参数
     * @return void
     */
    private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException {

        log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],开始提交子任务", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());

        String jobClass = scheduleContext.getTaskGetResponse().getJobClass();

        if (StringUtils.isBlank(jobClass)) {
            throw new RuntimeException("jobClass get error");
        }

        for (int i = 0; i < taskServerParam.getServerCount(); i++) {
            Map<String, String> dataMap = scheduleContext.getParameters();
            //提交子任务标识
            dataMap.put("isSubTask", "true");
            //给子任务进行编号
            dataMap.put("curServer", String.valueOf(i));
            //父任务名称传递子任务
            dataMap.put("taskName", taskServerParam.getTaskName());
            scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept());
        }
        // 父任务等待子任务执行完毕再更改状态,如果执行时间超过等待时间,抛异常
        //scheduleContext.waitForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected());
        log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],提交完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
    }

    /**
     * 创建线程池,按配置参数执行task
     * @param param 执行参数
     * @param tasks 任务集合
     * @param requestNoStr
     * @return void
     */
    private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List<T> tasks,String requestNoStr) {
        int threadCount = param.getThreadCount();
        synchronized (this) {
            if (this.easyJobExecutor == null) {
                this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount > this.easyJobLastThreadCount) {
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount < this.easyJobLastThreadCount) {
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            }
        }

        List<List<T>> lists = Lists.partition(tasks, param.getExecuteCount());
        final CountDownLatch latch = new CountDownLatch(lists.size());
        final String requestNo = requestNoStr;
        for (final List<T> list : lists) {
            this.easyJobExecutor.submit(
                    new Callable<Object>() {
                        public Object call() throws Exception {
                            try{
                                if (StringUtils.isNotBlank(requestNo)) {
                                    MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
                                }
                            }catch (RuntimeException e){
                                log.error("easyjob自定义log跟踪拦截器执行异常",e);
                            }
                            try {
                                if (log.isInfoEnabled()) {
                                    log.info("请求编号[{}],正在执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                                executeTasks(list);
                                if (log.isInfoEnabled()) {
                                    log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                                throw e;
                            } finally {
                                try{
                                    MDC.clear();
                                }catch (RuntimeException ex){
                                    log.error("easyjob自定义log跟踪拦截器执行异常",ex);
                                }
                                latch.countDown();
                            }
                            return null;
                        }

                    }
            );
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("interrupted when processing data access request in concurrency", e);
        }
    }

    /**
     * 获取任务名称
     * @return
     */
    private String getTaskNameKey(){
        StringBuffer keyBuf = new StringBuffer();
        keyBuf.append(activeEnv)
                .append(Constants.SEPARATOR_UNDERLINE)
                .append(this.getClass().getSimpleName());
        return keyBuf.toString();
    }

    protected void executeTasks(List<T> taskList) {
        if(CollectionUtils.isEmpty(taskList)) {
            return;
        }
        this.doTasks(taskList);
    }

    /**
     * 业务处理抽象方法
     * @param list
     */
    protected void doTasks(List<T> list){
        if(isDoBatchTasks()){
            CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true);
            try {
                /** 开始执行各个子类真正业务逻辑 */
                this.doBatchTasks(list);
            } catch(CommonBusinessException ex){
                log.warn(ex.getMessage());
            } catch (Exception e) {
                Profiler.functionError(info);
                log.error("任务处理失败,方法:{},任务:{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);
            } finally {
                Profiler.registerInfoEnd(info);
            }
        }else{
            for (T task : list) {
                CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true);
                if(task == null) { continue; }
                String lockKey = "";
                try {
                    /** 开始执行各个子类真正业务逻辑 */
                    if (useConcurrentLock()) {
                        lockKey = getLockKey(task);
                        if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) {
                            this.doSingleTask(task);
                        }else{
                            lockKey = "";
                            log.warn("lockKey:{},加载失败,正在被其他用户锁定,请重试!",lockKey);
                        }
                    } else {
                        this.doSingleTask(task);
                    }
                } catch(CommonBusinessException ex){
                    log.warn(ex.getMessage());
                } catch (Exception e) {
                    Profiler.functionError(info);
                    log.error("任务处理失败,方法:{},任务:{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);
                } finally {
                    Profiler.registerInfoEnd(info);
                    if (StringUtils.isNotBlank(lockKey)) {
                        redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey);
                    }
                }
            }
        }
    }

    /**
     * 获取实体类的实际类型
     *
     * @return
     */
    private Class<T> getArgumentType() {
        return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    /**
     * 是否使用防并发锁
     * 默认不使用,如需使用子类重写该方法
     * @return
     */
    protected boolean useConcurrentLock() {
        return false;
    }

    /**
     * 根所注解获取LockKey,可被子类重写,提高效率
     *
     * @param businessObj   业务对象
     * @return concurrent lock key
     */
    protected String getLockKey( T businessObj) {
        StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX);
        //若存在注解指定的防重字段,则使用这些字段拼装防重Key,否则使用MQ业务主键防重
        List<ValueEntryInfo> valueEntries = getAnnotaionConcurrentKeys(businessObj);
        if (!CollectionUtils.isEmpty(valueEntries)) {
            for (ValueEntryInfo valueEntry : valueEntries) {
                lockKey.append(Constants.SEPARATOR_UNDERLINE);
                lockKey.append(valueEntry.getValue());
            }
        } else {
           throw new CommonBusinessException(String.format("此任务处理需要加分布式锁,但是未设置锁key,所以不做业务处理,请检查,任务信息:%s",JSON.toJSONString(businessObj)));
        }
        return lockKey.toString();
    }

    /**
     * 查找对象的ConccurentKey注解,获取防重字段,并排序返回
     *
     * @param businessObj 业务对象
     * @return 有序的业务字段值列表
     */
    private List<ValueEntryInfo> getAnnotaionConcurrentKeys(T businessObj) {
        List<ValueEntryInfo> valueEntries = new ArrayList<ValueEntryInfo>();
        Field[] fields = businessObj.getClass().getDeclaredFields();
        for (int i = 0; i < fields.length; i++) {
            ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class);
            if (concurrentKey != null) {
                fields[i].setAccessible(true);
                Object fieldVal = null;
                try {
                    ValueEntryInfo valueEntry = new ValueEntryInfo();
                    fieldVal = fields[i].get(businessObj);
                    if (fieldVal != null) {
                        valueEntry.setValue(String.format("%1$s", fieldVal));
                        valueEntry.setOrder(concurrentKey.order());
                        valueEntries.add(valueEntry);
                    }
                } catch (IllegalAccessException e) {
                    log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());
                }
            }
        }
        if (valueEntries.size() > 1) {
            //排序ConcurrentKey
            Collections.sort(valueEntries, new Comparator<ValueEntryInfo>() {
                @Override
                public int compare(ValueEntryInfo o1, ValueEntryInfo o2) {
                    if (o1.getOrder() > o2.getOrder()) {
                        return 1;
                    } else if (o1.getOrder() == o2.getOrder()) {
                        return 0;
                    } else {
                        return -1;
                    }
                }
            });
        }
        return valueEntries;
    }

    protected List<T> selectTasks(TaskServerParam taskServerParam, int curServer) {
        return this.loadTasks(taskServerParam, curServer);
    }

    /**
     * 获取select时的任务创建开始时间
     * @param serverArg
     * @return
     */
    protected Date getCreateTimeFrom(String serverArg){
        return null;
    }

    /**
     * 是否以批量方式处理任务
     * @return
     */
    protected boolean isDoBatchTasks(){
        return false;
    }

}

实战结果

上述所述均为透传ID场景的原理和示例代码,实战效果如下图:调用jsf超时,跨系统查看日志进行排查,得知为慢sql引起

 

 

 

 

上述大部分场景已经抽出一个通用jar包,详细使用教程见我的另一篇文章:分布式日志追踪ID使用教程

标签:return,log,MDC,param,String,日志,logTraceId,ID,分布式
From: https://www.cnblogs.com/Jcloud/p/18680910

相关文章

  • IDEA如何将代码进行注释
    前言大家好,我是小徐啊。我们在使用IDEA开发Java应用的时候,一般都是需要写注释的,这些注释帮助我们和别人更好的理解代码的含义,可以说是必不可少的。在使用IDEA开发时,其实是可以快捷的进行代码注释的,而不用手动去注释,那么IDEA该如何进行代码注释呢?如何进行代码注释首先,打开一个我......
  • 分布式系统架构8:分布式缓存
    这是小卷对分布式系统架构学习的第11篇文章,今天了解分布式缓存的理论知识以及Redis集群。分布式缓存也是面试常见的问题,通常面试官会问为什么要用缓存,以及用的Redis是哪种模式,用的过程中遇到哪些问题这些1.AP还是CPRedis集群就是典型的AP式,它具有高性能、高可用等特点,但......
  • NVIDIA Isaac Sim 入门教程(三)ROS2 联合仿真
    系列文章目录前言一、导入URDF模型:TurtlebotOmniverseIsaacSim拥有多种工具,可促进与ROS系统的集成。我们有ROS和ROS2桥接器、URDF导入器等等。本系列教程将举例说明如何使用这些工具。1.1学习目标在本示例中,我们将在IsaacSim中设置一个Turtlebot3......
  • AIGC视频生成明星——Emu Video模型
    大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细介绍Meta的视频生成模型EmuVideo,作为Meta发布的第二款视频生成模型,在视频生成领域发挥关键作用。......
  • IntelliJ IDEA 2024.3 Java开发工具
    IntelliJIDEA2024.3Java开发工具JetBrainsIntelliJIDEA2024mac,是一款Java开发工具,IntelliJIDEA凭借无与伦比的Java和Kotlin支持脱颖而出。从一开始就支持尖IDEA2024.3中文版开发工具端语言功能,保持领先地位。IntelliJIDEA对您的代码了如指掌,利用这些知识在每个......
  • 简单日志宏实现(C++)
    意义:快速定位程序运行逻辑出错的位置。背景:项目在运行中可能会遇到各种问题,而出问题是开发过程中不可避免的一部分。关键在于能够有效地找到,并解决问题。解决问题的方式:GDB调试:适用于:程序崩溃后的定位。局限性:逐步调试过程繁琐且耗时较长。系统运行日志分析:方法:在......
  • UE学习日志#3 GAS--ASC源码简要分析1
    嘿我跟您说这坑一点也不大,AbilitySystemComponent.h也就两千行,.cpp也就三千多,乐凡事要一点点来,我也就按每天的进度分p了。1类的继承关系和修饰符先看这两行,为了方便看我加了换行UCLASS(ClassGroup=AbilitySystem,hidecategories=(Object,LOD,Lighting,Transform,Sockets......
  • Android JecPack组件之LifeCycles 使用详解
    一、背景LifeCycle是一个可以感知宿主生命周期变化的组件。常见的宿主包括Activity/Fragment、Service和Application。LifeCycle会持有宿主的生命周期状态的信息,当宿主生命周期发生变化时,会通知监听宿主的观察者。LifeCycle的出现主要是为了解决:系统组件的生命周期与......
  • Unraid 安装 WindowsServer2019 及 NGINX、PHP、Python 环境
    一、安装虚拟机使用Unraid安装。项目值初始内存:4096MB最大值内存:6144MB机器:i440fx-7.2BIOS:OVMF启用USB启动引导:NoHyper-V:是USB控制器:2.0(EHCI)操作系统安装ISO:windows_server_2019.iso操作系统安装光盘总线:......
  • Linux中RAID级别有哪些?
    RAID(RedundantArrayofIndependentDisks,独立磁盘冗余阵列)是一种将多个磁盘驱动器组合成一个逻辑单元的数据存储虚拟化技术,用于提高数据的可靠性、性能和/或容量。RAID有多种级别,每种级别都有其特定的性能、可靠性和成本效益。以下是一些常见的RAID级别:1.RAID0(条带化)特点:将......