此文主要讲解:
- 如何实现操作记录
- 如何将TransmittableThreadLocal和@Async搭配使用
TransmittableThreadLocal阿里的一个开源组件,为了在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题
1. 背景
有一个实验管理平台,用于配置和查看实验,想加一个操作历史功能,便于追踪改动,回滚历史等
实现后是这个样子:
2. 分析
这个功能简单来讲,就是做一个埋点,记录某人(operator),什么时间(time),做了什么事--也就是操作(operate_type)和改的什么东西(data_id,old_value, new_value)
CREATE TABLE `record` (
`id` bigint(20) NOT NULL,
`data_id` bigint(20) DEFAULT NULL,
`data_type` varchar(20) DEFAULT NULL,
`operator` varchar(200) NOT NULL,
`operate_type` int(11) NOT NULL,
`time` timestamp(4) NOT NULL DEFAULT CURRENT_TIMESTAMP(4),
`old_value` text,
`new_value` text,
`desc` varchar(1000) DEFAULT NULL,
`parent_id` bigint(20) DEFAULT NULL,
`namespace_id` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_parent_id` (`parent_id`),
KEY `idx_namespace_id` (`namespace_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='操作记录表';
其中还有一些额外的字段,用来实现其他功能
- parent_id实现父子记录,父子记录:例如修改实验是一条记录,里面具体改的是实验状态
- namespace_id实现命名空间,因为实验平台有多个接入方,所以要区分
3. 埋点
建一个RecordService,一个方法表示一个埋点时机
相比这种侵入式的埋点,另外一种埋点方式是通过AOP的方式将Service包裹,无侵入性,但是不够灵活
public interface RecordService{
void recordCreateExperiment( String operator, ExperimentVO experiment);
void recordUpdateExperiment( String operator, ExperimentVO oldExperiment, ExperimentVO newExperiment);
void recordDeleteExperiment( String operator, ExperimentVO experiment);
}
而且为了不影响主流程,埋点操作使用了异步@Async
@Async
@Slf4j
@Service
public class RecordServiceImpl implements RecordService{
...
}
4. TransmittableThreadLocal
现在看起来已经可以实现功能了,但为什么要引入TransmittableThreadLocal(下简称TTL)呢?
因为operator是Session级别的,需要一直从Controller记录传到Service
(当然可以直接在Service直接从Request里面取,但这样会导致Service对Web层的依赖,后期如果想把Serivce通过其他接口暴露出去,例如OpenApi的方式,就会很麻烦),这样传递参数一个是比较繁琐,另外会对主业务流程理解产生干扰
要解决这个问题就需要使用ThreadLocal, 简单讲就是利用了一个全局Map,以线程为Key去存取值
但这不能使用ThreadLocal,因为我们使用了@Async注解,通过线程池来执行的,也就是说不是一个线程,所以有了TTL,专门用于线程池的ThreadLocal
工作原理如下:对Runable进行包裹,使用ThreadLocal传递
4.1 使用TTL
- 引入pom
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.0</version>
</dependency>
- 建一个工具类
public class TTLUtil {
private static final TransmittableThreadLocal<ServiceContext> TTL_CONTEXT = new TransmittableThreadLocal();
public TTLUtil() {
}
public static ServiceContext get() {
return (ServiceContext)TTL_CONTEXT.get();
}
public static void set(ServiceContext serviceContext) {
TTL_CONTEXT.set(serviceContext);
}
public static void remove() {
TTL_CONTEXT.remove();
}
}
- 使用过滤器将需要传递的参数收集到TTL中, 主要此过滤器要在Appication加
@ServletComponentScan
注解才生效,finally中使用完之后将TTL释放
@Order( 999 )
@WebFilter( filterName = "nameSpaceFilter", urlPatterns = "/*" )
public class NameSpaceFilter implements Filter{
private static final String NAMESPACE_HEADER = "exp-namespace";
@Override
public void doFilter( ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain )
throws IOException, ServletException{
try{
HttpServletRequest request = ( HttpServletRequest )servletRequest;
String namespace = request.getHeader( NAMESPACE_HEADER );
String username = SSOClient.getLoginName( request );
ServiceContext serviceContext = new ServiceContext();
if( !StringUtils.isEmpty( namespace ) ){
serviceContext.setNamespace( Long.parseLong( namespace ) );
}
serviceContext.setUsername( username );
TTLUtil.set( serviceContext );
filterChain.doFilter( servletRequest, servletResponse );
}
finally{
TTLUtil.remove();
}
}
}
- 配置Spring的@Async默认线程池,注意其中的
TtlExecutors.getTtlExecutor( executor )
是使用TTL对线程池进行包裹
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer{
private final ObjectMapper objectMapper = new ObjectMapper();
@Bean( "defaultAsyncExecutor" )
public Executor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int corePoolSize = 10;
int queueCapacity = 10;
int maxPoolSize = 50;
executor.setCorePoolSize( corePoolSize );
executor.setMaxPoolSize( maxPoolSize );
executor.setQueueCapacity( queueCapacity );
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
executor.setThreadNamePrefix( "defaultAsyncExecutor-" );
executor.setWaitForTasksToCompleteOnShutdown( true );
executor.initialize();
return TtlExecutors.getTtlExecutor( executor );
}
@Override
public Executor getAsyncExecutor(){
return executor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return ( ex, method, params ) -> {
List<String> paramsStr = new ArrayList<>();
for( Object param : params ){
try{
String s = objectMapper.writeValueAsString( param );
paramsStr.add( s );
}
catch( JsonProcessingException e ){
log.error( "执行异步任务解析参数错误", e );
}
}
log.error( "执行异步任务出错 {},params: {}", method, paramsStr, ex );
};
}
}
- 在RecordService中使用,这样就中异步线程中拿到了需要的参数
record.setNamespaceId( TTLUtil.get().getNamespace() );
4.2 TTL的三种使用方式
- 修饰Runnable和Callable
- 修饰线程池,线程池内部还是第一种方式,这也是我使用的方式
- 使用Java Agent来修饰JDK线程池实现类,无侵入式的
4.3 TTL的场景
下面是几个典型场景例子。
- 分布式跟踪系统 或 全链路压测(即链路打标)
- 日志收集记录系统上下文
- Session级Cache
- 应用容器或上层框架跨应用代码给下层SDK传递信息
我们这的场景应该算第三种和第四种
5. 埋点的异常处理
上面的埋点逻辑中,因为记录是异步处理的,万一没记录怎么解决?
上面定义了AsyncUncaughtExceptionHandler
,会在处理失败的时候把日志打出来
不过更为稳妥的方式,可以在失败时将未格式化的数据写进数据库记录(比如写到mongodb),通过报警,以便后续处理
标签:import,executor,id,线程,TransmittableThreadLocal,TTL,Async,日志,public From: https://www.cnblogs.com/songjiyang/p/16667202.html