首页 > 编程语言 >Java 的 8 种异步实现方式

Java 的 8 种异步实现方式

时间:2023-06-08 22:22:21浏览次数:38  
标签:异步 Java String 方式 void 线程 executor public

异步的八种实现方式

  1. 线程Thread
  2. Future
  3. 异步框架CompletableFuture
  4. Spring注解@Async
  5. Spring ApplicationEvent事件
  6. 消息队列
  7. 第三方异步框架,比如Hutool的ThreadUtil
  8. Guava异步

1. 线程异步

public class AsyncThread extends Thread {

    @Override
    public void run() {
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }

    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.run();
    }
}

当然如果每次都创建一个Thread线程,频繁的创建、销毁,浪费系统资源,我们可以采用线程池:

private ExecutorService executorService = Executors.newCachedThreadPool();

public void fun() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            log.info("执行业务逻辑...");
        }
    });
}

可以将业务逻辑封装到RunnableCallable中,交由线程池来执行。

 

2. Future异步

@Slf4j
public class FutureManager {

    public String execute() throws Exception {

        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {

                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
                return "this is future execute final result!!!";
            }
        });

        //这里需要返回值时会阻塞主线程
        String result = future.get();
        log.info("Future get result: {}", result);
        return result;
    }

    @SneakyThrows
    public static void main(String[] args) {
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

输出结果:

 --- task start --- 
 --- task finish ---
 Future get result: this is future execute final result!!!


3. CompletableFuture实现异步

public class CompletableFutureCompose {

    /**
     * thenAccept子任务和父任务公用同一个线程
     */
    @SneakyThrows
    public static void thenRunAsync() {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something...");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

    public static void main(String[] args) {
        thenRunAsync();
    }
}

4. Spring的@Async异步

自定义异步线程池:

/**
 * 线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")
 **/
@EnableAsync
@Configuration
public class TaskPoolConfig {

    /**
     * 自定义线程池
     *
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  : " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

AsyncService:

public interface AsyncService {

    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

    MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {

    @Autowired
    private IMessageHandler mesageHandler;

    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);

        } catch (Exception e) {
            log.error("发送短信异常 -> ", e)
        }
    }
    
    
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {
            log.error("发送email异常 -> ", e)
        }
    }
}



5. Spring ApplicationEvent事件实现异步

定义事件:

public class AsyncSendEmailEvent extends ApplicationEvent {

    /**
     * 邮箱
     **/
    private String email;

   /**
     * 主题
     **/
    private String subject;

    /**
     * 内容
     **/
    private String content;
  
    /**
     * 接收者
     **/
    private String targetUserId;

}

定义事件处理器:

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {

    @Autowired
    private IMessageHandler mesageHandler;
    
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {
        if (event == null) {
            return;
        }

        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}


6. 消息队列

回调事件消息生产者:

@Slf4j
@Component
public class CallbackProducer {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {

        log.info("生产者发送消息,callbackDTO,{}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

回调事件消息消费者:

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {

    @Autowired
    private IGlobalUserService globalUserService;

    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {

        if (map.get("error") != null) {
            //否认消息
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }

        try {
        
            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //执行业务逻辑
            globalUserService.execute(callbackDTO);
            //消息消息成功手动确认,对应消息确认模式acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {
            log.error("回调失败 -> {}", e);
        }
    }

7. ThreadUtil异步工具类

@Slf4j
public class ThreadUtils {

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
                int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("当前第:" + i + "个线程");
        }

        log.info("task finish!");
    }
}

8. Guava异步

GuavaListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用Guava ListenableFuture可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

ListenableFuture是一个接口,它从jdkFuture接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("callable execute...")
                TimeUnit.SECONDS.sleep(1);
                return 1;
            }
        });

首先通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后使用此实例的submit方法即可初始化ListenableFuture对象。

ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。

 Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        //成功执行...
        System.out.println("Get listenable future's result with callback " + result);
    }

    @Override
    public void onFailure(Throwable t) {
        //异常情况处理...
        t.printStackTrace();
    }
});







}
 

标签:异步,Java,String,方式,void,线程,executor,public
From: https://www.cnblogs.com/KL2016/p/17467856.html

相关文章

  • Java反射机制详解上篇
    1反射机制是什么反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法和属性;这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制。在面向对象的世界里,万事万物皆对象.在java语言里,静态的成员,普......
  • Java多态综合案例(包含接口,接口实现类)
    首先定义一个接口名为USB其次定义两个实现类分别名为KeyBorad和Mouse此时就可以使用多态了,因为实现类和接口某种意义上来说是继承关系。USBu=newKeyborad();USBu2 =newMouse();因为键盘和鼠标都具有插拔功能,所以为了方便,把这两个功能写入接口,然后实现类重写。pac......
  • 一个Java对象到底占用多大内存?
    一个Java对象到底占用多大内存? 最近在读《深入理解Java虚拟机》,对Java对象的内存布局有了进一步的认识,于是脑子里自然而然就有一个很普通的问题,就是一个Java对象到底占用多大内存?在网上搜到了一篇博客讲的非常好:http://yueyemaitian.iteye.com/blog/2033046,里面提供的......
  • java分页代码
    现在开始编写 Service 层代码:在 com.game.products.services.iface 包中新建 ProductsService 接口,代码如下:packagecom.game.products.services.iface;importjava.util.List;importcom.game.products.model.Products;publicint......
  • javascript操作xml(增删改查)例子代码
    关键字:javascript操作xml(增删改查)自己做了一个小东西,不是很好,但是对初学来说是一个不错的例子!包括了stu.hta(是HTML应用程序);stu.xml注意下面的HTML代码必须保存为后缀名为hta否则当对XML文件进行操作(增删改)的时候就会提示没有权限!!文件stu.hta代码如......
  • javaScript通用数据类型校验_2
    /*要求:一、电话号码由数字、"("、")"和"-"构成二、电话号码为3到8位三、如果电话号码中包含有区号,那么区号为三位或四位四、区号用"("、")"或"-"和其他部分隔开用途:检查输入的电话号码格式是否正确输入:strPhone:字符串返回:如果通过验证返回true,否......
  • Java拓展-拆,装箱,线程,反射
    导言:在学习JavaSE的时候,我们会使用Java基础编程,并且了解了什么是面向对象的编程,会使用Java写一些基础算法程序,接下来,我们需要了解Java的自动拆箱和自动装箱,单线程和多线程,反射是什么,值得注意的是,讲的是Java中的特性,但是OOP语言其实都是用这些操作的,只是小部分不同罢了一.自动拆......
  • 简易java分页标签
    简易java分页标签1,标签的实现类NumenTag.javajava代码 1.importjava.util.ArrayList;2.importjava.util.HashMap;3.importjava.util.List;4.importjava.util.Map;5.6.importjavax.servlet.jsp.JspException;7.impor......
  • Javascript动态修改select选项
    1、向Select里添加OptionJs代码1.//IEonly,FF不支持Add方法2.functionfnAddItem(text,value){3.varselTarget=document.getElementById("selID");4.selTarget.Add(newOption("text","value"));5.}6.......
  • Javascript: setTimeout()使用及 setInterval()使用
    Javascript:setTimeout()使用及setInterval()使用2006-10-1203:36Evaluatesanexpressionafteraspecifiednumberofmillisecondshaselapsed.(在指定时间过后执行指定的表达式) Syntax:iTimerID=window.setTimeout(vCode,iMilliSeconds[,sLanguage])ParametersvCod......