首页 > 其他分享 >Guava EventBus介绍

Guava EventBus介绍

时间:2023-03-10 23:13:19浏览次数:27  
标签:XXThreadPoolExecutor class 介绍 event XXEnvContext EventBus Guava threadPoolTaskEx

 介绍

Guava EventBus 是 Google Guava 提供的一种发布-订阅式的事件总线,基于观察者模式的思想,用于处理应用程序内部的消息通信。

导入依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

标记任务处理类

通过 @EventBusListener 注解,将任务处理类注册到事件总线上。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {

}

任务处理

任务处理类中使用 @Subscribe 注解标注的方法会被注册到事件总线上,根据传入的事件类型进行分发。

@Component
@EventBusListener
@Slf4j
public class XXModifyListener {

    @Data
    public static class ModifyEvent {
        Context context;

        public ModifyEvent(Context context) {
            this.context = context;
        }
    }

    @Subscribe
    public void modify(ModifyEvent event) {
        log.info("xxx");
    }
}

任务处理中心

异步任务处理需要构造一个线程池,使用 AsyncEventBus 进行处理。

@Component
@Slf4j
public class EventBusCenter {

    @Autowired
    ApplicationContext applicationContext;

		@Resource(name = "XXThreadPoolExecutor")
    private Executor XXThreadPoolExecutor;

    ThreadLocal<Exception> threadLocalSync = new ThreadLocal<>();

    /**
     * 管理同步事件
     */
    private final EventBus syncEventBus = new EventBus(
            (exception, context) -> threadLocalSync.set((Exception) exception)
    );

    /**
     * 管理异步事件
     */
    private final AsyncEventBus asyncEventBus = new AsyncEventBus(
						//构造一个线程池
            XXThreadPoolExecutor
    );

    public void postSync(Object event) throws Exception {
        syncEventBus.post(event);
        Exception ex = threadLocalSync.get();
        if (ex != null) {
            // 记得 remove
            threadLocalSync.remove();
            throw ex;
        }
    }

    public void postAsync(Object event) {
        try {
            asyncEventBus.post(event);
        } catch (Exception ex) {
            log.error("postAsync event error, {}", JSON.toJSONString(event));
        }
    }

    @PostConstruct
    public void init() {
        // 获取所有带有 @EventBusListener 的 bean,将他们注册为监听者
        Map<String, Object> listeners = applicationContext.getBeansWithAnnotation(EventBusListener.class);
        log.info("EventBusListener get detail:{}", JSON.toJSONString(listeners));
        for (Object listener : listeners.values()) {
            asyncEventBus.register(listener);
            syncEventBus.register(listener);
        }
    }
}

常用线程池定义

@Configuration
public class XXSyncTaskPool {

    @Bean(name = "XXThreadPoolExecutor")
    public Executor XXThreadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 20);
        threadPoolTaskExecutor.setKeepAliveSeconds(2 * 60 * 60);
        threadPoolTaskExecutor.setQueueCapacity(1000);
        threadPoolTaskExecutor.setThreadNamePrefix("XX-sync-");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskExecutor.setAwaitTerminationSeconds(30);
        return threadPoolTaskExecutor;
    }

    @Bean
    public Executor taskExecutor(@Qualifier("XXThreadPoolExecutor") Executor XXThreadPoolExecutor) {
        XXSyncTaskExecutor.setExecutor(XXThreadPoolExecutor);
        return XXThreadPoolExecutor;
    }

}

线程池任务上下文处理

在异步任务处理时,需要将当前任务的上下文传递到线程池中进行处理。

public class XXSyncTaskExecutor {

    @Setter(AccessLevel.PACKAGE)
    private static Executor executor;

   
    public static void execute(Runnable r) {
        executor.execute(new CopyContextAsyncTask(WorkStationEnvUtil.getEnvContext(), r));
    }

    static class CopyContextAsyncTask implements Runnable {
        
        private final XXEnvContext XXEnvContext;
        private final Runnable r;

        CopyContextAsyncTask(XXEnvContext XXEnvContext, Runnable r) {
            this.r = r;
            this.XXEnvContext = XXEnvContext;
        }

        @Override
        public void run() {
            MockEnvUtils.setStrEmpIdAndRegion(XXEnvContext.getEmpId()
                    , XXEnvContext.getUpperNation());
            try {
                r.run();
            } finally {
                MockEnvUtils.clearStrEmpIdAndTenantId();
            }
        }
    }
}

标签:XXThreadPoolExecutor,class,介绍,event,XXEnvContext,EventBus,Guava,threadPoolTaskEx
From: https://www.cnblogs.com/lyInfo/p/17204928.html

相关文章

  • Jmeter接口测试 2=> Jmeter工具的介绍
    第一节接口测试流程 参数化:EXCEl文件参数化、数据库参数化、直接代码中配置、配置文件 预处理请求(前置处理):对请求的参数进行预处理、准备,如加密数据、组织测试数据 发起......
  • 前端之CSS介绍(层叠样式表)
    CascadingStyleSheets(CSS)1.CSS就是在HTML文档中,如何显示HTML标签,元素,以及他们的样式布局,前端页面的展示形式均由CSS来布局.2.如何使用CSS内部样式表head标签里<styl......
  • pugixml XML格式处理库的介绍和使用(面向业务编程-格式处理)
    pugixmlXML格式处理库的介绍和使用(面向业务编程-格式处理)介绍pugixml是一个轻量级的C++XML处理库。它的特点:类似dom的界面,具有丰富的遍历/修改功能非常快速的非......
  • 开源量子计算编程框架软件介绍
    1.编程框架简介​在编程领域,软件框架是指一种抽象形式,它提供了一个具有通用功能的软件,这些功能可以由使用者编写代码来有选择的进行更改,从而提供服务于特定应用的软件。可......
  • Node全局对象介绍
    在学习Javascript之初,会接触一个概念:JS由三部分组成,DOM+BOM+ECMAScript。其中前两者是宿主环境,也就是浏览器所提供的能力。后者才是JS语言本身的标准。在上篇文章......
  • redis之列表-redis之hash-redis其他操作-redis管道-django中使用redis-celery介绍和安
    目录redis之列表-redis之hash-redis其他操作-redis管道-django中使用redis-celery介绍和安装-celery快速使用-celery包结构今日内容详细1redis之列表2redis之hash3redis......
  • celery介绍安装以及基本使用步骤
    目录一、关于celery二、celery架构的构成1任务中间件Broker,2任务执行单元worker3结果存储backend三、celery的应用场景1.异步执行:解决耗时任务2.延迟执行:解决延......
  • 算法介绍
    冒泡排序冒泡排序是一种基本的排序算法,其基本思想是将相邻的元素进行比较和交换,从而逐步将最大的元素“冒泡”到最后面。其算法流程如下: 从数组的第一个元素开始,对相邻的......
  • 1 MySql基础介绍
    目录1mysql逻辑架构1.1连接管理与安全性1.2优化与执行2并发控制2.1锁粒度2.2表锁2.3行级锁3事务3.1数据库事务四特性3.2隔离级别3.3死锁3.4事务日志3.5MySql......
  • cpu、内存问题排查——gperftools 性能测试工具介绍
      在阅读reids源码时发现redis在自身内存管理malloc/frees的时候使用到tcmalloc,google后发现此组件竟然出自google开源的gperftools性能分析工具集,然后发现许多大虾云......