@FeignClient(name = "bwie-elastic") public interface EsFeign { @PostMapping("/add") public Result add(@RequestBody ArrayList<ResourceInfo> resourceInfo); }
@RestController public class UserController implements ApplicationRunner { @Autowired private UserMapper userService; @Autowired private EsFeign feign; private static final int THREAD_COUNT_SIZE = 5000; public void list() { //计算表总数 Integer integer = userService.UserSum(); //记录开始时间 long start = System.currentTimeMillis(); //new个和表总数一样长的ArrayList List<ResourceInfo> threadList = new ArrayList<>(integer); // 线程数,以5000条数据为一个线程,总数据大小除以5000,再加1 int round = integer / THREAD_COUNT_SIZE + 1; //new一个临时储存List的Map,以线程名为k,用做list排序 Map<Integer, List<ResourceInfo>> temporaryMap = new HashMap<>(round); // 程序计数器 final CountDownLatch count = new CountDownLatch(round); // 创建线程 ExecutorService executor = Executors.newFixedThreadPool(round); // 分配数据 try { for (int i = 0; i < round; i++) { //该线程的查询开始值 int startLen = i * THREAD_COUNT_SIZE; int k = i + 1; executor.execute(() -> { ArrayList<ResourceInfo> resourceInfos = userService.list(startLen); feign.add(resourceInfos); temporaryMap.put(k, resourceInfos); System.out.println("正在处理线程【" + k + "】的数据,数据大小为:" + resourceInfos.size()); count.countDown(); }); } count.await(); // 等待所有线程执行完毕 long end = System.currentTimeMillis(); System.out.println("数据查询耗时:" + (end - start) + "ms"); temporaryMap.values().forEach(threadList::addAll); System.out.println("list长度为:" + threadList.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } finally { temporaryMap.clear(); executor.shutdown(); // 关闭线程池 } }
@Mapper public interface UserMapper { Integer UserSum(); ArrayList<ResourceInfo> list(@Param("startLen") int startLen); }
@Override public void run(ApplicationArguments args) { this.list(); } }
@RestController public class EsController { @Autowired private EsService esService; @PostMapping("/add") public Result add(@RequestBody ArrayList<ResourceInfo> resourceInfo){ esService.addAsync(resourceInfo); Result<Object>result= Result.success(); return result; } }
@Service public class EsServiceImpl implements EsService { @Autowired private RestHighLevelClient restHighLevelClient; @Override public CompletableFuture<Void> addAsync(ArrayList<ResourceInfo> resourceInfoList) { return CompletableFuture.runAsync(() -> { try { BulkRequest bulkRequest = new BulkRequest(); for (ResourceInfo resourceInfo : resourceInfoList) { IndexRequest indexRequest = new IndexRequest("apply") .id(String.valueOf(resourceInfo.getId())) .source(JSONObject.toJSONString(resourceInfo), XContentType.JSON); bulkRequest.add(indexRequest); } BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); if (bulkResponse.hasFailures()) { for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); System.out.println("Failed to index document with id: " + bulkItemResponse.getId() + ", failure message: " + failure.getMessage()); } } } } catch (IOException e) { e.printStackTrace(); // 也可以选择记录日志或抛出自定义异常 } }); } }标签:FeignClient,java,ArrayList,list,add,线程,new,多线程,public From: https://blog.csdn.net/c8899y/article/details/140569374