首页 > 其他分享 >分布式日志追踪ID实战 | 京东物流技术团队

分布式日志追踪ID实战 | 京东物流技术团队

时间:2024-01-04 15:32:09浏览次数:32  
标签:return log MDC message param String 日志 ID 分布式

本文通过介绍分布式应用下各个场景的全局日志ID透传思路,以及介绍分布式日志追踪ID简单实现原理和实战效果,从而达到通过提高日志查询排查问题的效率。

背景

开发排查系统问题用得最多的手段就是查看系统日志,相信不少人都值过班当过小秘吧:给下接口和出入参吧,麻烦看看日志里的有没有异常信息啊等等,但是在并发大时使用日志定位问题还是比较麻烦,由于大量的其他用户/其他线程的日志也一起输出穿行其中导致很难筛选出指定请求的全部相关日志,以及下游线程/服务对应的日志,甚至一些特殊场景的出入参只打印了一些诸如gis坐标、四级地址等没有单据信息的日志,使得日志定位起来非常不便

场景分析

自己所在组负责的系统主要是web应用,其中涉及到的请求方式主要有:springmvc的servlet的http场景、jsf场景、MQ场景、resteasy场景、clover场景、easyjob场景,每一种场景都需要不同的方式进行logTraceId的透传,接下来逐个探析上述各个场景的透传方案。

在这之前我们先要简单了解一下日志中透传和打印logTraceId的方式,一般我们使用MDC进行logTraceId的透传与打印,但是基于MDC内部使用的是ThreadLocal所以只有本线程才有效,子线程服务的MDC里的值会丢失,所以这里我们要么是在所有涉及到父子线程的地方以编码侵入式自行实现值的传递,要么就是通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题,而本文采用的是比较粗糙地以编码侵入式来解决此问题。

springmvc的servlet的http场景

这个场景相信大家都已经烂熟到骨子里了,主要思路是通过拦截器的方式进行logTraceId的透传,新建一个类实现HandlerInterceptor

preHandle:在业务处理器处理请求之前被调用,这里实现logTraceId的设置与透传

postHandle:在业务处理器处理请求执行完成后,生成视图之前执行,这里空实现就好

afterCompletion:在DispatcherServlet完全处理完请求后被调用,这里用于清除MDC的logTraceId

@Slf4j
public class TraceInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
        try{
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId());
            }
        }catch (RuntimeException e){
            log.error("mvc自定义log跟踪拦截器执行异常",e);
        }
        return true;
    }

    @Override
    public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
        try{
            MDC.clear();
        }catch (RuntimeException ex){
            log.error("mvc自定义log跟踪拦截器执行异常",ex);
        }
    }
}

jsf场景

相信大家对于jsf并不陌生,而jsf也支持自定义filter,基于jsf过滤器的运行方式(如下图),可以通过配置全局过滤器(继承AbstractFilter)的方式进行logTraceId的透传,需要注意的是jsf是在线程池中执行的所以一定要信任消息体中的logTraceId

分布式日志追踪ID实战 | 京东物流技术团队_JSON

jsf消费者过滤器:主要从上下文环境中获取logTraceId并进行透传,实现代码如下

@Slf4j
public class TraceIdGlobalJsfFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //设置traceId
        setAndGetTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
        }
    }

    /**
     * 设置并返回traceId
     * @param requestMessage
     * @return
     */
    private void setAndGetTraceId(RequestMessage requestMessage) {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都没有获取到则说明有遗漏,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC没有,filter有,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,MDC没有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter没有,说明是源头已经有了,但是jsf是第一次调,透传
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId);
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){
                //MDC和fitler都有,但是并不相等,则存在问题打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
        }catch (RuntimeException e){
            log.error("jsf消费者自定义log跟踪拦截器执行异常",e);
        }
    }
}

jsf提供者过滤器:通过拿到消费者在消息体中透传的logTraceId来实现,实现代码如下

@Slf4j
public class TraceIdGlobalJsfProducerFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //设置traceId
        boolean isNeedClearMdc = transferTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
            if(isNeedClearMdc){
                clear();
            }
        }
    }
    /**
     * 设置并返回traceId
     * @param requestMessage
     * @return
     */
    private boolean transferTraceId(RequestMessage requestMessage) {
        boolean isNeedClearMdc = false;
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都没有获取到,说明存在遗漏场景或是提供给外部系统调用的接口,打印日志进行观察
                String traceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId);
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId);
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC没有,filter有,说明是被调用方,需要透传下去
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString());
                isNeedClearMdc = true;
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter没有,存在问题,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,MDC有filter没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){
                //MDC和fitler都有,但是并不相等,则信任filter透传结果
                TraceUtils.resetTraceId(logTraceIdObj.toString());
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
            return isNeedClearMdc;
        }catch (RuntimeException e){
            log.error("jsf生产者自定义log跟踪拦截器执行异常",e);
            return false;
        }
    }

    /**
     * 清除MDC
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jsf生产者自定义log跟踪拦截器执行异常",e);
        }
    }
}

MQ场景

说到MQ相信大家对于此就更不陌生了,此种场景主要通过在提供者发送消息时拿到上下文中的logTraceId,将其以扩展信息的方式设置进消息体中进行透传,而消费者则从消息体中进行获取

生产者:新建一个抽象类继承MessageProducer,覆写父类中的两个send方法(批量发送、单条发送),send方法中主要调用抽象加工消息体的方法(logTraceId属性赋值)和日志打印,在子类中进行发送前对消息体的加工处理,具体代码如下

@Slf4j
public abstract class BaseTraceIdProducer extends MessageProducer {

    private static final String SEPARATOR_COMMA = ",";

    public BaseTraceIdProducer() {
    }

    public BaseTraceIdProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 获取消息体-单个
     * @param messageContext
     * @return
     */
    protected abstract Message getMessage(MessageContext messageContext);

    /** 获取消息体-批量
     *
     * @param messageContext
     * @return
     */
    protected abstract List<Message> getMessages(MessageContext messageContext);

    /**
     * 填充消息体上下文信息
     * @param message
     * @param messageContext
     */
    protected void fillContext(Message message,MessageContext messageContext) {
        if(message == null){
            return;
        }
        if(StringUtils.isBlank(messageContext.getLogTraceId())){
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            messageContext.setLogTraceId(logTraceId);
        }
        if(StringUtils.isBlank(messageContext.getTopic())){
            String topic = message.getTopic();
            messageContext.setTopic(topic);
        }
        String businessId = message.getBusinessId();
        messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId);
    }

    /**
     * traceId嵌入消息体中
     * @param message
     */
    protected void generateTraceIdIntoMessage(Message message){
        if(message == null){
            return;
        }
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
            }
            message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

    /**
     * 批量发送消息-无回调
     * @param messages
     * @param timeout
     * @throws JMQException
     */
    public void send(List<Message> messages, int timeout) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List<Message> messageList = this.getMessages(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        super.send(messageList, timeout);
    }

    /**
     * 单个发送消息
     * @param message
     * @param transaction
     * @param <T>
     * @return
     * @throws JMQException
     */
    public <T> T send(Message message, LocalTransaction<T> transaction) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessage(message);
        Message msg = this.getMessage(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        return super.send(msg, transaction);
    }

    /**
     * 批量发送消息-有回调
     * @param messages
     * @param timeout
     * @param callback
     * @throws JMQException
     */
    public void send(List<Message> messages, int timeout, AsyncSendCallback callback) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List<Message> messageList = this.getMessages(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        super.send(messageList, timeout, callback);
    }

    /**
     * 打印日志,方便排查问题
     * @param messageContext
     */
    private void printLog(MessageContext messageContext) {
        if(messageContext==null){
            return;
        }
        if(log.isInfoEnabled()){
            log.info("MQ发送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());
        }
    }

}

@Slf4j
public class TraceIdEnvMessageProducer extends BaseTraceIdProducer {

    private static final String UAT_TRUE = String.valueOf(true);
    private boolean uat = false;

    public TraceIdEnvMessageProducer() {
    }

    public TraceIdEnvMessageProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 环境变量打标-单个消息体
     * @param message
     */
    private void convertUatMessage(Message message) {
        if (message != null) {
            message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE);
        }
    }


    /**
     * 消息转换-批量消息体
     * @param messageContext
     * @return
     */
    private List<Message> convertMessages(MessageContext messageContext) {
        List<Message> messages = messageContext.getMessages();
        if (!CollectionUtils.isEmpty(messages)) {
            Iterator messageIterator = messages.iterator();
            while(messageIterator.hasNext()) {
                Message message = (Message)messageIterator.next();
                if(this.isUat()){
                    this.convertUatMessage(message);
                }
                super.generateTraceIdIntoMessage(message);
                super.fillContext(message,messageContext);
            }
        }
        return messageContext.getMessages();
    }

    /**
     * 消息转换-单个消息体
     * @param messageContext
     * @return
     */
    private Message convertMessage(MessageContext messageContext){
        Message message = messageContext.getMessage();
        if(this.isUat()){
            this.convertUatMessage(message);
        }
        super.generateTraceIdIntoMessage(message);
        super.fillContext(message,messageContext);
        return message;
    }

    protected Message getMessage(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessage(messageContext);
    }

    protected List<Message> getMessages(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessages(messageContext);
    }

    public void setUat(boolean uat) {
        this.uat = uat;
    }

    boolean isUat() {
        return this.uat;
    }

}

消费者:新建一个抽象类继承MessageListener,覆写父类中的onMessage方法,主要进行设置日志traceId和消费完成后的traceId清理等,而在子类中进行一些自定义处理,具体代码如下

@Slf4j
public abstract class BaseTraceIdMessageListener implements MessageListener {

    public BaseTraceIdMessageListener() {
    }

    public abstract void onMessageList(List<Message> messages) throws Exception;

    @Override
    public final void onMessage(List<Message> messages) throws Exception {
        try{
            if(CollectionUtils.isEmpty(messages)){
                return;
            }
            //设置日志traceId
            setLogTraceId(messages);
            this.onMessageList(messages);
            //消费完后清除traceId
            clear();
        }catch (Exception e){
            throw e;
        }finally {
            MDC.clear();
        }
    }

    /**
     * 设置日志traceId
     * @param messages
     */
    private void setLogTraceId(List<Message> messages) {
        try{
            Message message = messages.get(0);
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
            }
            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

    /**
     * 清除traceId
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

}

@Slf4j
public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{

    private String uat;

    public TraceIdEnvMessageListener() {
    }

    public abstract void onMessages(List<Message> var1) throws Exception;

    @Override
    public void onMessageList(List<Message> messages) throws Exception {
        Iterator iterator;
        Message message;
        if (this.getUat() != null && Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else {
            this.onMessages(messages);
        }
    }

    public void setUat(String uat) {
        if (!"true".equals(uat) && !"false".equals(uat)) {
            throw new IllegalArgumentException("uat 属性值只能为 true 或 false.");
        } else {
            this.uat = uat;
        }
    }

    public String getUat() {
        return this.uat;
    }
}

resteasy场景

此场景类似于spinrg-mvc场景,也是http请求,需要通过拦截器在消息头中进行logTraceId的透传,主要有客户端拦截器,服务端:预处理拦截器、后置拦截器,代码如下

@ClientInterceptor
@Provider
@Slf4j
public class ResteasyClientInterceptor implements ClientExecutionInterceptor {
    @Override
    public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            ClientRequest request = clientExecutionContext.getRequest();
            String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){
                //如果filter和MDC都没有获取到则说明是调用源头
                String traceId = TraceUtils.getTraceId();
                TraceUtils.resetTraceId(traceId);
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId);
            } else if(StringUtils.isBlank(headerTraceId)){
                //如果MDC有但是filter没有则需要传递
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy客户端log跟踪拦截器执行异常",e);
        }
        return clientExecutionContext.proceed();
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class RestEasyPreInterceptor implements PreProcessInterceptor {
    @Override
    public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException {
        try{
            MultivaluedMap<String, String> requestHeaders = request.getHttpHeaders().getRequestHeaders();
            String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isNotBlank(headerTraceId)){
                //如果filter则透传
                TraceUtils.resetTraceId(headerTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy服务端log跟踪前置拦截器执行异常",e);
        }
        return null;
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class ResteasyPostInterceptor implements PostProcessInterceptor {
    @Override
    public void postProcess(ServerResponse serverResponse) {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("resteasy服务端log跟踪后置拦截器执行异常",e);
        }
    }
}

clover场景

clover的大体机制主要是在项目启动的时候扫描到带有注解@HessianWebService的类进行服务注册并维持心跳检测,而clover端则通过servlet请求方式进行任务的回调,同时继承AbstractScheduleTaskProcess方式的任务是以线程池的方式进行业务的处理

基于上述原理我们需要解决两个问题:1.新建一个类继承ServiceExporterServlet,并在web.xml配置中进行servlet配置,代码如下;

@Slf4j
public class ServiceExporterTraceIdServlet extends ServiceExporterServlet {

    @Override
    public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
        try {
            String traceId = MDC.get("traceId");
            if (StringUtils.isBlank(traceId)) {
                MDC.put("traceId", TraceUtils.getTraceId());
            }
        } catch (Exception e) {
            log.error("clover请求servlet执行异常", e);
        }
        try {
            super.service(req, res);
        } catch (Throwable e) {
            log.error("clover请求servlet执行异常", e);
            throw e;
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("clover请求servlet执行异常",ex);
            }
        }
    }
}

2.新建一个抽象类继承AbstractScheduleTaskProcess,在类中以编码形式进行父子线程的透传(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),所有任务均改为继承此类,关键代码如下

try{
            traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                log.warn("clover自定义log跟踪拦截器预警,mdc没有traceId");
            }
        }catch (RuntimeException e){
            log.error("clover自定义log跟踪拦截器执行异常",e);
        }
        final String logTraceId = traceId;
        while(iterator.hasNext()) {
            final List<TcTask> list = (List<TcTask>)iterator.next();
            this.executor.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    try{
                        if (StringUtils.isNotBlank(logTraceId)) {
                            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId);
                        }
                    }catch (RuntimeException e){
                        log.error("clover自定义log跟踪拦截器执行异常",e);
                    }
                    Object var1;
                    try {
                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("正在执行任务[" + this.getClass().getName() + "],条数:" + list.size() + "...");
                        }


                        BaseTcTaskProcessWorker.this.executeTasks(list);

                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("执行任务[" + this.getClass().getName() + "],条数:" + list.size() + "成功!");
                        }

                        var1 = null;
                    } catch (Exception var5) {
                        BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5);
                        throw var5;
                    } finally {
                        try{
                            MDC.clear();
                        }catch (RuntimeException ex){
                            log.error("clover自定义log跟踪拦截器执行异常",ex);
                        }
                        latch.countDown();
                    }

                    return var1;
                }
            });
        }

easyjob场景

easyjob的大体机制是在项目启动的时候通过扫描实现接口Scheduler的类进行上报注册,同时启动一个acceptor(获取任务的线程池),而acceptor拉取到任务后会将父任务放进一个叫executor的线程池,子任务范进一个叫slowExecutor的线程池,我们可以新建一个抽奖类实现接口ScheduleFlowTask,复用clover场景硬编码方式进行父子线程logTraceId的透传处理(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),示例代码如下

@Slf4j
public abstract class AbstractEasyjobOnlyScheduleProcess<T> implements ScheduleFlowTask {

    /**
     * EASYJOB平台UMP监控key前缀
     */
    private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask.";

    /**
     * EASYJOB单个任务处理分布式锁前缀
     */
    private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_";

    /**
     * 环境标识-开关配置进行环境隔离
     */
    @Value("${spring.profiles.active}")
    private String activeEnv;

    @Value("${task.scene.mark}")
    private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc();

    /**
     * easyJob维度线程池变量
     */
    private ThreadPoolExecutor easyJobExecutor;
    /**
     * easyJob维度服务器个数-分片个数
     */
    private volatile int easyJobLastThreadCount = 0;

    /**
     * easyjob多线程名称
     */
    private static final String EASYJOB_THREAD_NAME = "dts.easyJobs";

    /**
     * 子类的泛型参数类型
     */
    private Class<T> argumentType;

    /**
     * 无参构造
     */
    public AbstractEasyjobOnlyScheduleProcess() {
        //设置子类泛型参数类型
        argumentType = this.getArgumentType();
    }

    @Autowired
    private RedisHelper redisHelper;

    /**
     * 非task表扫描待处理的任务数据
     * @param taskServerParam
     * @param curServer
     * @return
     */
    protected abstract List<T> loadTasks(TaskServerParam taskServerParam, int curServer);

    /**
     * 业务处理抽象方法-单个
     * @param task
     */
    protected abstract void doSingleTask(T task);

    /**
     * 业务处理抽象方法-批量
     * @param tasks
     */
    protected abstract void doBatchTasks(List<T> tasks);

    /**
     * 拼装ump监控key
     * @param prefix
     * @param taskNameKey
     * @return
     */
    private String getUmpKey(String prefix,String taskNameKey) {
        StringBuffer umpKeyBuf = new StringBuffer();
        umpKeyBuf.append(prefix).append(taskNameKey);
        return umpKeyBuf.toString();
    }

    /**
     * easyjob平台异步任务回调方法
     * @param scheduleContext
     * @return
     * @throws Exception
     */
    @Override
    public TaskResult doTask(ScheduleContext scheduleContext) throws Exception {
        String requestNo = TraceUtils.getTraceId();
        try {
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
            }
        } catch (Exception e) {
            log.error("easyjob执行异常", e);
        }
        EasyJobTaskServerParam taskServerParam = null;

        CallerInfo callerinfo = null;
        try {
            //条件转换
            taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext);
            String taskNameKey = getTaskNameKey();
            String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey);
            callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true);
            //多服务器,并且非子任务,本次不执行,提交子任务
            if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) {
                submitSubTask(scheduleContext, taskServerParam,requestNo);
                return TaskResult.success();
            }

            if (log.isInfoEnabled()) {
                log.info("请求编号[{}],开始获取任务,任务ID[{}],任务名称[{}],执行参数[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));
            }
            TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam);

            List<T> tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer());

            if (log.isInfoEnabled()) {
                log.info("请求编号[{}],获取任务ID[{}],任务名称[{}]共{}条", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());
            }

            if (CollectionUtils.isNotEmpty(tasks)) {
                if (log.isInfoEnabled()) {
                    log.info("请求编号[{}],开始执行任务,任务ID[{}],任务名称[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());
                }

                this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo);
                if (log.isInfoEnabled()) {
                    log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],执行数量[{}]完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());
                }

            }
            return TaskResult.success();
        } catch (Exception e) {
            Profiler.functionError(callerinfo);
            if (log.isInfoEnabled()) {
                log.error("请求编号[{}],任务执行失败,任务ID[{}],任务名称[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);
            }
            return TaskResult.fail(e.getMessage());
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("easyjob执行异常",ex);
            }
            Profiler.registerInfoEnd(callerinfo);
        }
    }

    /**
     * 多分片提交子任务
     * @param scheduleContext 调度任务上下文参数
     * @param taskServerParam 调度任务参数
     * @param requestNo 调度任务参数
     * @return void
     */
    private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException {

        log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],开始提交子任务", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());

        String jobClass = scheduleContext.getTaskGetResponse().getJobClass();

        if (StringUtils.isBlank(jobClass)) {
            throw new RuntimeException("jobClass get error");
        }

        for (int i = 0; i < taskServerParam.getServerCount(); i++) {
            Map<String, String> dataMap = scheduleContext.getParameters();
            //提交子任务标识
            dataMap.put("isSubTask", "true");
            //给子任务进行编号
            dataMap.put("curServer", String.valueOf(i));
            //父任务名称传递子任务
            dataMap.put("taskName", taskServerParam.getTaskName());
            scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept());
        }
        // 父任务等待子任务执行完毕再更改状态,如果执行时间超过等待时间,抛异常
        //scheduleContext.waitForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected());
        log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],提交完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
    }

    /**
     * 创建线程池,按配置参数执行task
     * @param param 执行参数
     * @param tasks 任务集合
     * @param requestNoStr
     * @return void
     */
    private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List<T> tasks,String requestNoStr) {
        int threadCount = param.getThreadCount();
        synchronized (this) {
            if (this.easyJobExecutor == null) {
                this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount > this.easyJobLastThreadCount) {
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount < this.easyJobLastThreadCount) {
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            }
        }

        List<List<T>> lists = Lists.partition(tasks, param.getExecuteCount());
        final CountDownLatch latch = new CountDownLatch(lists.size());
        final String requestNo = requestNoStr;
        for (final List<T> list : lists) {
            this.easyJobExecutor.submit(
                    new Callable<Object>() {
                        public Object call() throws Exception {
                            try{
                                if (StringUtils.isNotBlank(requestNo)) {
                                    MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
                                }
                            }catch (RuntimeException e){
                                log.error("easyjob自定义log跟踪拦截器执行异常",e);
                            }
                            try {
                                if (log.isInfoEnabled()) {
                                    log.info("请求编号[{}],正在执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                                executeTasks(list);
                                if (log.isInfoEnabled()) {
                                    log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                                throw e;
                            } finally {
                                try{
                                    MDC.clear();
                                }catch (RuntimeException ex){
                                    log.error("easyjob自定义log跟踪拦截器执行异常",ex);
                                }
                                latch.countDown();
                            }
                            return null;
                        }

                    }
            );
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("interrupted when processing data access request in concurrency", e);
        }
    }

    /**
     * 获取任务名称
     * @return
     */
    private String getTaskNameKey(){
        StringBuffer keyBuf = new StringBuffer();
        keyBuf.append(activeEnv)
                .append(Constants.SEPARATOR_UNDERLINE)
                .append(this.getClass().getSimpleName());
        return keyBuf.toString();
    }

    protected void executeTasks(List<T> taskList) {
        if(CollectionUtils.isEmpty(taskList)) {
            return;
        }
        this.doTasks(taskList);
    }

    /**
     * 业务处理抽象方法
     * @param list
     */
    protected void doTasks(List<T> list){
        if(isDoBatchTasks()){
            CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true);
            try {
                /** 开始执行各个子类真正业务逻辑 */
                this.doBatchTasks(list);
            } catch(CommonBusinessException ex){
                log.warn(ex.getMessage());
            } catch (Exception e) {
                Profiler.functionError(info);
                log.error("任务处理失败,方法:{},任务:{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);
            } finally {
                Profiler.registerInfoEnd(info);
            }
        }else{
            for (T task : list) {
                CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true);
                if(task == null) { continue; }
                String lockKey = "";
                try {
                    /** 开始执行各个子类真正业务逻辑 */
                    if (useConcurrentLock()) {
                        lockKey = getLockKey(task);
                        if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) {
                            this.doSingleTask(task);
                        }else{
                            lockKey = "";
                            log.warn("lockKey:{},加载失败,正在被其他用户锁定,请重试!",lockKey);
                        }
                    } else {
                        this.doSingleTask(task);
                    }
                } catch(CommonBusinessException ex){
                    log.warn(ex.getMessage());
                } catch (Exception e) {
                    Profiler.functionError(info);
                    log.error("任务处理失败,方法:{},任务:{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);
                } finally {
                    Profiler.registerInfoEnd(info);
                    if (StringUtils.isNotBlank(lockKey)) {
                        redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey);
                    }
                }
            }
        }
    }

    /**
     * 获取实体类的实际类型
     *
     * @return
     */
    private Class<T> getArgumentType() {
        return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    /**
     * 是否使用防并发锁
     * 默认不使用,如需使用子类重写该方法
     * @return
     */
    protected boolean useConcurrentLock() {
        return false;
    }

    /**
     * 根所注解获取LockKey,可被子类重写,提高效率
     *
     * @param businessObj   业务对象
     * @return concurrent lock key
     */
    protected String getLockKey( T businessObj) {
        StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX);
        //若存在注解指定的防重字段,则使用这些字段拼装防重Key,否则使用MQ业务主键防重
        List<ValueEntryInfo> valueEntries = getAnnotaionConcurrentKeys(businessObj);
        if (!CollectionUtils.isEmpty(valueEntries)) {
            for (ValueEntryInfo valueEntry : valueEntries) {
                lockKey.append(Constants.SEPARATOR_UNDERLINE);
                lockKey.append(valueEntry.getValue());
            }
        } else {
           throw new CommonBusinessException(String.format("此任务处理需要加分布式锁,但是未设置锁key,所以不做业务处理,请检查,任务信息:%s",JSON.toJSONString(businessObj)));
        }
        return lockKey.toString();
    }

    /**
     * 查找对象的ConccurentKey注解,获取防重字段,并排序返回
     *
     * @param businessObj 业务对象
     * @return 有序的业务字段值列表
     */
    private List<ValueEntryInfo> getAnnotaionConcurrentKeys(T businessObj) {
        List<ValueEntryInfo> valueEntries = new ArrayList<ValueEntryInfo>();
        Field[] fields = businessObj.getClass().getDeclaredFields();
        for (int i = 0; i < fields.length; i++) {
            ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class);
            if (concurrentKey != null) {
                fields[i].setAccessible(true);
                Object fieldVal = null;
                try {
                    ValueEntryInfo valueEntry = new ValueEntryInfo();
                    fieldVal = fields[i].get(businessObj);
                    if (fieldVal != null) {
                        valueEntry.setValue(String.format("%1$s", fieldVal));
                        valueEntry.setOrder(concurrentKey.order());
                        valueEntries.add(valueEntry);
                    }
                } catch (IllegalAccessException e) {
                    log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());
                }
            }
        }
        if (valueEntries.size() > 1) {
            //排序ConcurrentKey
            Collections.sort(valueEntries, new Comparator<ValueEntryInfo>() {
                @Override
                public int compare(ValueEntryInfo o1, ValueEntryInfo o2) {
                    if (o1.getOrder() > o2.getOrder()) {
                        return 1;
                    } else if (o1.getOrder() == o2.getOrder()) {
                        return 0;
                    } else {
                        return -1;
                    }
                }
            });
        }
        return valueEntries;
    }

    protected List<T> selectTasks(TaskServerParam taskServerParam, int curServer) {
        return this.loadTasks(taskServerParam, curServer);
    }

    /**
     * 获取select时的任务创建开始时间
     * @param serverArg
     * @return
     */
    protected Date getCreateTimeFrom(String serverArg){
        return null;
    }

    /**
     * 是否以批量方式处理任务
     * @return
     */
    protected boolean isDoBatchTasks(){
        return false;
    }

}

实战结果

上述所述均为透传ID场景的原理和示例代码,实战效果如下图:调用jsf超时,跨系统查看日志进行排查,得知为慢sql引起

分布式日志追踪ID实战 | 京东物流技术团队_JSON_02

分布式日志追踪ID实战 | 京东物流技术团队_JSON_03

上述大部分场景已经抽出一个通用jar包,详细使用教程见我的另一篇文章:分布式日志追踪ID使用教程

作者:京东物流 张小龙

来源:京东云开发者社区 自猿其说 Tech 转载请注明来源

标签:return,log,MDC,message,param,String,日志,ID,分布式
From: https://blog.51cto.com/u_15714439/9101802

相关文章

  • 寻路迷宫,Android休闲益智小游戏开发
    使用AndroidStudio开发了一款休闲益智小游戏——《寻路迷宫》。A.项目描述《寻路迷宫》是一款非常有趣的小游戏app,玩家需要寻找到迷宫的出口,挑战自己的智力和反应能力。在游戏中,玩家需要操作角色前进、转向等动作,避免被障碍物(迷宫墙)阻挡,玩家需要思考解决方案,如何才能在最短的时......
  • Amazon EventBridge 实战详解
    AmazonEventBridge是AWS提供的一项强大的服务器无关事件总线服务,它能够帮助您构建跨应用程序、服务和AWS账户的事件驱动型应用程序。在本文中,我们将深入介绍AmazonEventBridge的基本概念,并通过示例演示如何使用EventBridge进行事件的生成、处理和管理。1.理解EventBri......
  • whistle网络监控 fiddler的开源替代
    github源码:https://github.com/avwo/whistle官网说明:http://wproxy.org/whistle/windows/mac一键安装先安装nodejs然后运行命令npmi-gwhistle&&w2start--init启动w2start停止w2stop注意停止后要手动关闭代理服务器设置windows设置——代理——使用代理服务器(关)webUI界面......
  • tcp缓存引起的日志丢失
    背景logstash从数据源拉取日志,然后通过tcp插件发送到proxy进程中。在业务侧发现日志量明显少了,所以有了这一次的问题排查。问题排查定位首先从logstash侧开始检查。我们先看logstash的日志,没有明显的报错信息。然后再查看logstash管道的状态。可以很明显的看到,在output管道中,in远远......
  • JetBrains AppCode 2023.1 (macOS x64、aarch64) - 适用于 iOS/macOS 开发的智能 IDE
    Xcode14.3compatibility,Swiftrefactoringsandintentions,theIDE’sUI,andKotlinMultiplatformMobile.作者主页:sysin.orgJetBrainsAppCode-适用于iOS/macOS开发的智能IDEAppCode2023现已推出,立即了解最新变化为什么选择AppCode得益于对代码结构的深刻理解,Ap......
  • VMware Tanzu Kubernetes Grid (TKG) 1.4 下载 - VMware Kubernetes 分发版
    作者:gc,主页:www.sysin.orgTanzuKubernetes集群是由VMware构建、签名和支持的开源Kubernetes容器编排平台的完整分发版。可以通过使用TanzuKubernetesGrid服务在主管集群上置备和运行TanzuKubernetes集群。主管集群是启用了vSpherewithTanzu的vSphere集群。Tanzu......
  • Windows Subsystem for Android (WSA) 下载:在 Windows 11 上运行 Android 应用
    作者主页:www.sysin.org在Beta频道中为Windows预览体验成员宣布Windows11Preview上的Android™应用程序今天我们宣布的Android™应用程序适用于Windows11的第一个预览版现已在Beta频道在美国的WindowsInsiders可用,如果您还没有读过thisblogpostfromGiorgio......
  • VMware Tanzu Kubernetes Grid Integrated Edition (TKGI) 1.12 下载
    作者:gc,主页:www.sysin.orgVMwareTanzuKubernetesGridIntegratedEdition(TKGI)使运营商能够使用BOSH和OpsManager配置、运营和管理企业级Kubernetes集群。VMwareTanzuKubernetesGridIntegratedEdition(以前称为VMwareEnterprisePKS)是基于Kubernetes的容器解决......
  • VMware vRealize Log Insight 8.6 下载 - 智能日志记录和分析
    作者:gc,主页:www.sysin.org概述vRealizeLoglnsight提供了高度可扩展的异构日志管理功能,它具有多个可在其中执行操作的直观仪表盘、完善的分析功能和范围更广的第三方延展性。它还能够跨物理、虚拟和云计算环境提供深入的运维洞察信息并加快故障排除速度。VMwarevRealizeLogInsi......
  • Uncaught SyntaxError: Invalid or unexpected token
    UncaughtSyntaxError:InvalidorunexpectedtokenJS替换空格发现患者姓名里有空格,导致转JSON的时候,多了"号,在JavaScript中,你可以使用replace()函数来替换字符串中的特定字符或模式。如果你想替换字符串中的空格,你可以这样做:letstr="这是一个含有多个空格的字符串"......