首页 > 其他分享 >ListeningExecutorService的使用

ListeningExecutorService的使用

时间:2023-01-10 11:35:09浏览次数:55  
标签:list new util 线程 ListeningExecutorService 使用 import size


由于普通的线程池,返回的Future,功能比较单一;Guava 定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口,ListenableFuture 允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。

1.使用方法如下:

  • 1.创建线程池
  • 2.装饰线程池
  • 3.任务处理
  • 4.回调函数处理
  • 5.所有任务完成后处理

场景模拟:
导入一张1211条数据的Excel表格:

  • 1.每条数据处理较慢
  • 2.处理完后需要汇总数据
  • 3.处理汇总成功的数据

2.代码示例如下:

2.1接口和调用

package com.java4all.test11;

import org.python.google.common.collect.ImmutableList;
import org.python.google.common.util.concurrent.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
* Author: yunqing
* Date: 2018/9/19
* Description:
*/
@RestController
@RequestMapping(value = "testThread")
public class TestThread {


/**线程池*/
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy()
);

/**
* 数据处理
* @return
* @throws Exception
*/
@RequestMapping(value = "parse",method = RequestMethod.GET)
public String parse() throws Exception{
List<String> result = new ArrayList<>();
List<String> list = new ArrayList<>();

//模拟原始数据
for(int i = 0; i < 1211;i ++){
list.add(i+"-");
System.out.println("添加原始数据:"+i);
}

int size = 50;//切分粒度,每size条数据,切分一块,交由一条线程处理
int countNum = 0;//当前处理到的位置
int count = list.size()/size;//切分块数
int threadNum = 0;//使用线程数
if(count*size != list.size()){
count ++;
}

final CountDownLatch countDownLatch = new CountDownLatch(count);

//使用Guava的ListeningExecutorService装饰线程池
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

while (countNum < count*size){
//切割不同的数据块,分段处理
threadNum ++;
countNum += size;
MyCallable myCallable = new MyCallable();
myCallable.setList(ImmutableList.copyOf(
list.subList(countNum-size,list.size() > countNum ? countNum : list.size())));

ListenableFuture listenableFuture = executorService.submit(myCallable);

//回调函数
Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
//任务处理成功时执行
@Override
public void onSuccess(List<String> list) {
countDownLatch.countDown();
System.out.println("第h次处理完成");
result.addAll(list);
}

//任务处理失败时执行
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
System.out.println("处理失败:"+throwable);
}
});

}

//设置时间,超时了直接向下执行,不再阻塞
countDownLatch.await(3,TimeUnit.SECONDS);

result.stream().forEach(s -> System.out.println(s));
System.out.println("------------结果处理完毕,返回完毕,使用线程数量:"+threadNum);

return "处理完了";
}
}

2.2任务处理

package com.java4all.test11;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
* Author: yunqing
* Date: 2018/9/19
* Description:任务处理逻辑
*/
public class MyCallable implements Callable{

private List<String> list ;
@Override
public Object call() throws Exception {
List<String> listReturn = new ArrayList<>();
//模拟对数据处理,然后返回
for(int i = 0;i < list.size();i++){
listReturn.add(list.get(i)+":处理时间:"+System.currentTimeMillis()+"---:处理线程:"+Thread.currentThread());
}

return listReturn;
}

public void setList(List<String> list) {
this.list = list;
}
}


标签:list,new,util,线程,ListeningExecutorService,使用,import,size
From: https://blog.51cto.com/u_15936016/5999927

相关文章

  • uniapp小程序使用高德地图api实现路线规划
    路线规划简介路线规划常用于出行路线的提前预览,我们提供4种类型的路线规划,分别为:驾车、步行、公交和骑行,满足各种的出行场景。高德开放平台本例是驾车路线规划功能和位......
  • unittest中使用ddt后生成的测试报告名称如何修改?(如test_api_0修改成test_api_0_titile
    修改前:Unittest使用ddt后生成的测试报告用例名称为:即就是,以“test_xx_数字”为格式的用例名称,感觉满足不了我们的测试需求,不够直观。那么怎么修改呢?查看ddt源码defmk_......
  • linux 命令使用6--free(内存)
    一:free命令 free命令显示系统内存的使用情况,包括物理内存、交换内存(swap)和内核缓冲区内存  二:free命令显示的各项参数 第一行Mem部分:total物理内存的......
  • Django中使用内连接(子查询)
    目录在Django中使用内连接(子查询)Subquery()Exists()在Django中使用内连接(子查询)Subquery()模型类可能如下所示:classCategory(models.Model):name=models.C......
  • 对象存储及MINIO使用
    常见对象存储技术选型。存储的方案分成两种:一种是可以自定对象名称的,另一种是系统自动生成对象名称。不能自定义名称的有领英的Ambry,MogileFS。TFS是淘宝开源的,但是目......
  • rpm-mock工具使用
    #rpmbuild多环境构建的工具#用户基础安装useradd-mshuttle-slave-s/bin/bash/etc/mock/build.cfg777权限ln-s/home/shuttle-slave/rpmbuild/root/rpmbuild/etc/......
  • MySQL必知必会第三章-使用MySQL
    使用MySQL链接为了链接MySQL需要以下信息:主机名(计算机名)——如果连接到本地MySQL服务器,为localhost;端口(如果使用默认端口3306之外的端口);一个合法的用户名;用户口令(如......
  • 使用@RequestBody注解接收的实体类中的某些参数为null
    原因postman调试接口为null的参数命名不符合“驼峰法”,类似实体类A的属性cEnterpriseId,这种命名是不规范的和lombook的@Data注解有关用postman传一个json到接口,json......
  • rsync使用技巧
    rsync-vzrtopg--progressdeepin@192.168.0.66:/tmp/xxx.tgz.参数说明v:显示详细信息z:传输过程中对数据进行压缩r:递归t:保留修改时间属性o:保留文件所有者属性p:保......
  • 使用 sudo 命令时,重定向标准输出的两种方法
    错误的写法:sudocat>/etc/sysctl.d/bbr.conf<<EOFnet.core.default_qdisc=fqnet.ipv4.tcp_congestion_control=bbrEOF此写法看似很合理,但执行的时候会报Permissio......