首页 > 其他分享 >TransmittableThreadLocal和@Async优雅的记录操作日志

TransmittableThreadLocal和@Async优雅的记录操作日志

时间:2022-09-07 20:58:56浏览次数:120  
标签:import executor id 线程 TransmittableThreadLocal TTL Async 日志 public

此文主要讲解:

  1. 如何实现操作记录
  2. 如何将TransmittableThreadLocal和@Async搭配使用

TransmittableThreadLocal阿里的一个开源组件,为了在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题

1. 背景

有一个实验管理平台,用于配置和查看实验,想加一个操作历史功能,便于追踪改动,回滚历史等

实现后是这个样子:
image.png

2. 分析

这个功能简单来讲,就是做一个埋点,记录某人(operator),什么时间(time),做了什么事--也就是操作(operate_type)和改的什么东西(data_id,old_value, new_value)

CREATE TABLE `record` (
  `id` bigint(20) NOT NULL,
  `data_id` bigint(20) DEFAULT NULL,
  `data_type` varchar(20) DEFAULT NULL,
  `operator` varchar(200) NOT NULL,
  `operate_type` int(11) NOT NULL,
  `time` timestamp(4) NOT NULL DEFAULT CURRENT_TIMESTAMP(4),
  `old_value` text,
  `new_value` text,
  `desc` varchar(1000) DEFAULT NULL,
  `parent_id` bigint(20) DEFAULT NULL,
  `namespace_id` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_parent_id` (`parent_id`),
  KEY `idx_namespace_id` (`namespace_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='操作记录表';

其中还有一些额外的字段,用来实现其他功能

  • parent_id实现父子记录,父子记录:例如修改实验是一条记录,里面具体改的是实验状态
  • namespace_id实现命名空间,因为实验平台有多个接入方,所以要区分

3. 埋点

建一个RecordService,一个方法表示一个埋点时机

相比这种侵入式的埋点,另外一种埋点方式是通过AOP的方式将Service包裹,无侵入性,但是不够灵活

public interface RecordService{

	void recordCreateExperiment( String operator, ExperimentVO experiment);

	void recordUpdateExperiment( String operator, ExperimentVO oldExperiment, ExperimentVO newExperiment);

	void recordDeleteExperiment( String operator, ExperimentVO experiment);
}

而且为了不影响主流程,埋点操作使用了异步@Async

@Async
@Slf4j
@Service
public class RecordServiceImpl implements RecordService{
...
}


4. TransmittableThreadLocal

现在看起来已经可以实现功能了,但为什么要引入TransmittableThreadLocal(下简称TTL)呢?

因为operator是Session级别的,需要一直从Controller记录传到Service
(当然可以直接在Service直接从Request里面取,但这样会导致Service对Web层的依赖,后期如果想把Serivce通过其他接口暴露出去,例如OpenApi的方式,就会很麻烦),这样传递参数一个是比较繁琐,另外会对主业务流程理解产生干扰

要解决这个问题就需要使用ThreadLocal, 简单讲就是利用了一个全局Map,以线程为Key去存取值

但这不能使用ThreadLocal,因为我们使用了@Async注解,通过线程池来执行的,也就是说不是一个线程,所以有了TTL,专门用于线程池的ThreadLocal

工作原理如下:对Runable进行包裹,使用ThreadLocal传递
ttl.png

4.1 使用TTL

  1. 引入pom
       <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.14.0</version>
        </dependency>
  1. 建一个工具类
public class TTLUtil {
    private static final TransmittableThreadLocal<ServiceContext> TTL_CONTEXT = new TransmittableThreadLocal();

    public TTLUtil() {
    }

    public static ServiceContext get() {
        return (ServiceContext)TTL_CONTEXT.get();
    }

    public static void set(ServiceContext serviceContext) {
        TTL_CONTEXT.set(serviceContext);
    }

    public static void remove() {
        TTL_CONTEXT.remove();
    }
}

  1. 使用过滤器将需要传递的参数收集到TTL中, 主要此过滤器要在Appication加@ServletComponentScan 注解才生效,finally中使用完之后将TTL释放
@Order( 999 )
@WebFilter( filterName = "nameSpaceFilter", urlPatterns = "/*" )
public class NameSpaceFilter implements Filter{

	private static final String NAMESPACE_HEADER = "exp-namespace";

	@Override
	public void doFilter( ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain )
			throws IOException, ServletException{

		try{
			HttpServletRequest request = ( HttpServletRequest )servletRequest;
			String namespace = request.getHeader( NAMESPACE_HEADER );
			String username = SSOClient.getLoginName( request );

			ServiceContext serviceContext = new ServiceContext();
			if( !StringUtils.isEmpty( namespace ) ){
				serviceContext.setNamespace( Long.parseLong( namespace ) );
			}
			serviceContext.setUsername( username );
			TTLUtil.set( serviceContext );

			filterChain.doFilter( servletRequest, servletResponse );
		}
		finally{
			TTLUtil.remove();
		}

	}

}
  1. 配置Spring的@Async默认线程池,注意其中的TtlExecutors.getTtlExecutor( executor )是使用TTL对线程池进行包裹
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer{

	private final ObjectMapper objectMapper = new ObjectMapper();

	@Bean( "defaultAsyncExecutor" )
	public Executor executor(){

		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

		int corePoolSize = 10;
		int queueCapacity = 10;
		int maxPoolSize = 50;

		executor.setCorePoolSize( corePoolSize );
		executor.setMaxPoolSize( maxPoolSize );
		executor.setQueueCapacity( queueCapacity );
		executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
		executor.setThreadNamePrefix( "defaultAsyncExecutor-" );
		executor.setWaitForTasksToCompleteOnShutdown( true );
		executor.initialize();

		return TtlExecutors.getTtlExecutor( executor );
	}

	@Override
	public Executor getAsyncExecutor(){

		return executor();
	}

	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){

		return ( ex, method, params ) -> {
			List<String> paramsStr = new ArrayList<>();
			for( Object param : params ){
				try{
					String s = objectMapper.writeValueAsString( param );
					paramsStr.add( s );
				}
				catch( JsonProcessingException e ){
					log.error( "执行异步任务解析参数错误", e );
				}
			}
			log.error( "执行异步任务出错 {},params: {}", method, paramsStr, ex );
		};
	}
}
  1. 在RecordService中使用,这样就中异步线程中拿到了需要的参数
record.setNamespaceId( TTLUtil.get().getNamespace() );

4.2 TTL的三种使用方式

  1. 修饰Runnable和Callable
  2. 修饰线程池,线程池内部还是第一种方式,这也是我使用的方式
  3. 使用Java Agent来修饰JDK线程池实现类,无侵入式的

4.3 TTL的场景

下面是几个典型场景例子。

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. Session级Cache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息

我们这的场景应该算第三种和第四种

5. 埋点的异常处理

上面的埋点逻辑中,因为记录是异步处理的,万一没记录怎么解决?

上面定义了AsyncUncaughtExceptionHandler,会在处理失败的时候把日志打出来

不过更为稳妥的方式,可以在失败时将未格式化的数据写进数据库记录(比如写到mongodb),通过报警,以便后续处理

标签:import,executor,id,线程,TransmittableThreadLocal,TTL,Async,日志,public
From: https://www.cnblogs.com/songjiyang/p/16667202.html

相关文章

  • JS: 模拟async/await语法糖
    不熟悉生成器对象的小伙伴,可查看:Generator、Generator.prototype.next模拟函数:/***模拟async关键字的函数*(不返回Promise对象也是可以的)*@paramgenerator*......
  • Logstash深入收集Nginx日志
    Logstash深入收集Nginx日志安装nginx[root@elkstack03~]#yuminstall-ynginx##主配置文件[root@elkstack03~]#cat/etc/nginx/nginx.confusernginx;worker......
  • Logstash深入收集Java日志
    Logstash深入收集Java日志没有修改Json格式在企业中,我们看到tomcat日志遇到异常(exception)一条日志可能是几行或者十几行甚至几十行,组成的,那么,我们需要将多行日志变成......
  • springboot的日志配置
    转载:https://blog.csdn.net/tz845195485/article/details/123361895#========================logging日志相关的配置=====================#日志级别trace<debug<inf......
  • js四种异步方法(回调函数、Promise、Generator、async/await)
    由于JS运行环境是单线程的,即一次只能完成一个任务,所以多任务时需要排队。异步可以理解为改变执行顺序的操作,异步任务必须在同步任务执行结束之后,从任务队列中依次取出执行......
  • async-await
    async函数介绍  1、async函数执行结果是:返回一个Promise对象(newPromise) fn返回普通值(只要不抛错返回的promise的状态就是fulfilled)asyncfunctionf......
  • LINUX系统中查询Oracle数据库的归档日志目录
    注:查询Linux系统中Oracle数据库的归档日志方法,不同的安装目录使用不同的方法,一下有两种方式,其中一种是我目前使用的数据库未查到归档才换的第二种方式。-----方式一:查询数......
  • 第 9 题:Async/Await 如何通过同步的方式实现异步
    首先想要更好的理解Async/Await,需要了解这两个知识点:同步异步背景首先,js是单线程的(重复三遍),所谓单线程,通俗的讲就是,一根筋(比喻有点过分,哈哈)执行代码是一行一行的往......
  • 第 8 题:setTimeout、Promise、Async/Await 的区别
    1.setTimeoutconsole.log('scriptstart')//1.打印scriptstartsetTimeout(function(){console.log('settimeout')//4.打印settimeout})//2.......
  • Python 运行日志 → 01.09.2022
    Python运行日志→01.09.20221-)Python简介在本文中,我想总结一下我们看到的第一堂课中的代码和基本信息。由于我对这种领域完全陌生,我突然将其视为课程重复。那么让......