介绍
Guava EventBus 是 Google Guava 提供的一种发布-订阅式的事件总线,基于观察者模式的思想,用于处理应用程序内部的消息通信。
导入依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
标记任务处理类
通过 @EventBusListener
注解,将任务处理类注册到事件总线上。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {
}
任务处理
任务处理类中使用 @Subscribe
注解标注的方法会被注册到事件总线上,根据传入的事件类型进行分发。
@Component
@EventBusListener
@Slf4j
public class XXModifyListener {
@Data
public static class ModifyEvent {
Context context;
public ModifyEvent(Context context) {
this.context = context;
}
}
@Subscribe
public void modify(ModifyEvent event) {
log.info("xxx");
}
}
任务处理中心
异步任务处理需要构造一个线程池,使用 AsyncEventBus
进行处理。
@Component
@Slf4j
public class EventBusCenter {
@Autowired
ApplicationContext applicationContext;
@Resource(name = "XXThreadPoolExecutor")
private Executor XXThreadPoolExecutor;
ThreadLocal<Exception> threadLocalSync = new ThreadLocal<>();
/**
* 管理同步事件
*/
private final EventBus syncEventBus = new EventBus(
(exception, context) -> threadLocalSync.set((Exception) exception)
);
/**
* 管理异步事件
*/
private final AsyncEventBus asyncEventBus = new AsyncEventBus(
//构造一个线程池
XXThreadPoolExecutor
);
public void postSync(Object event) throws Exception {
syncEventBus.post(event);
Exception ex = threadLocalSync.get();
if (ex != null) {
// 记得 remove
threadLocalSync.remove();
throw ex;
}
}
public void postAsync(Object event) {
try {
asyncEventBus.post(event);
} catch (Exception ex) {
log.error("postAsync event error, {}", JSON.toJSONString(event));
}
}
@PostConstruct
public void init() {
// 获取所有带有 @EventBusListener 的 bean,将他们注册为监听者
Map<String, Object> listeners = applicationContext.getBeansWithAnnotation(EventBusListener.class);
log.info("EventBusListener get detail:{}", JSON.toJSONString(listeners));
for (Object listener : listeners.values()) {
asyncEventBus.register(listener);
syncEventBus.register(listener);
}
}
}
常用线程池定义
@Configuration
public class XXSyncTaskPool {
@Bean(name = "XXThreadPoolExecutor")
public Executor XXThreadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 20);
threadPoolTaskExecutor.setKeepAliveSeconds(2 * 60 * 60);
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setThreadNamePrefix("XX-sync-");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(30);
return threadPoolTaskExecutor;
}
@Bean
public Executor taskExecutor(@Qualifier("XXThreadPoolExecutor") Executor XXThreadPoolExecutor) {
XXSyncTaskExecutor.setExecutor(XXThreadPoolExecutor);
return XXThreadPoolExecutor;
}
}
线程池任务上下文处理
在异步任务处理时,需要将当前任务的上下文传递到线程池中进行处理。
public class XXSyncTaskExecutor {
@Setter(AccessLevel.PACKAGE)
private static Executor executor;
public static void execute(Runnable r) {
executor.execute(new CopyContextAsyncTask(WorkStationEnvUtil.getEnvContext(), r));
}
static class CopyContextAsyncTask implements Runnable {
private final XXEnvContext XXEnvContext;
private final Runnable r;
CopyContextAsyncTask(XXEnvContext XXEnvContext, Runnable r) {
this.r = r;
this.XXEnvContext = XXEnvContext;
}
@Override
public void run() {
MockEnvUtils.setStrEmpIdAndRegion(XXEnvContext.getEmpId()
, XXEnvContext.getUpperNation());
try {
r.run();
} finally {
MockEnvUtils.clearStrEmpIdAndTenantId();
}
}
}
}
标签:XXThreadPoolExecutor,class,介绍,event,XXEnvContext,EventBus,Guava,threadPoolTaskEx
From: https://www.cnblogs.com/lyInfo/p/17204928.html