首页 > 其他分享 >异步分组查询

异步分组查询

时间:2022-09-05 14:33:07浏览次数:153  
标签:异步 index System 查询 分组 println Dog out

依赖

    <!--集合工具类-->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-collections4</artifactId>
        <version>4.4</version>
    </dependency>
    <!--junit单元测试-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

代码

package com.example.demo;


import org.apache.commons.collections4.ListUtils;
import org.assertj.core.util.Lists;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class AsyncTest {


    /**
     * CPU核数
     */
    // private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
    private static final int AVAILABLE_PROCESSORS = 10;

    /**
     * 设置线程池
     */
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            AVAILABLE_PROCESSORS,  //核心线程数
            3 * AVAILABLE_PROCESSORS,  //最大线程数
            3, TimeUnit.SECONDS,  // keepAliveTime
            new LinkedBlockingDeque<>(10000));  //阻塞队列

    // 分批大小 可以通过配置处理
    private static final Integer pageSize = 2;
    // 休眠时间
    private static final Long sleepMillis = 1000L;
    // 模拟数据
    private static final Map<Integer, Dog> map = new HashMap<>();

    static {
        map.put(1, new Dog(1, "xxx1"));
        map.put(2, new Dog(2, "xxx2"));
        map.put(3, new Dog(3, "xxx3"));
        map.put(4, new Dog(4, "xxx4"));
        map.put(5, new Dog(5, "xxx5"));
        map.put(6, new Dog(6, "xxx6"));
        map.put(7, new Dog(7, "xxx7"));
        map.put(8, new Dog(8, "xxx8"));
        map.put(9, new Dog(9, "xxx9"));
    }

    // 数据对象
    static class Dog {
        private Integer index;
        private String name;

        public Dog() {
        }

        public Dog(Integer index, String name) {
            this.index = index;
            this.name = name;
        }

        public Integer getIndex() {
            return index;
        }

        public void setIndex(Integer index) {
            this.index = index;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Dog{" + "index=" + index + ", name='" + name + '\'' + '}';
        }
    }

    // 模拟查询数据库 每次查询休眠1s 为了更好对比时间差
    private Dog selectDataBaseGetDog(Integer index) {
        System.out.println("select DataBase index :{ " + index + " }");
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return map.get(index);
    }


    @Test
    public void testInitMonitor() {
        // 请求的入参
        List<Integer> list = Lists.newArrayList(1, 2, 3, 4, 5);

        // 单线程查询
        select(list);

        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("分批大小 " + pageSize + " , 休眠时间 " + sleepMillis + " , CPU核数 " + AVAILABLE_PROCESSORS);
        // 分组异步查询
        asyncSelect(list);

    }

    // 普通查询
    private void select(List<Integer> list) {
        System.out.println("=====================================");
        System.out.println("==============单线程查询===============");
        System.out.println("查询数据,param:" + list);
        System.out.println("=====================================");

        final long a = System.currentTimeMillis();
        System.out.println("方法开始:" + a);
        List<Dog> dogList = new ArrayList<>();
        list.forEach(index -> dogList.add(selectDataBaseGetDog(index)));
        final long a1 = System.currentTimeMillis();
        System.out.println("方法结束:" + a1);
        System.out.println("时间差:" + (a1 - a));
        System.out.println("=====================================");
        System.out.println("查询结束,return:" + dogList);
        System.out.println("==============单线程查询===============");
        System.out.println("=====================================");
    }

    // 分组异步查询
    private void asyncSelect(List<Integer> list) {

        System.out.println("=====================================");
        System.out.println("=============异步分组查询==============");
        System.out.println("查询数据,param:" + list);
        System.out.println("=====================================");

        final long l = System.currentTimeMillis();
        System.out.println("方法开始" + l);

        List<List<Integer>> partition = ListUtils.partition(list, pageSize);
        List<CompletableFuture<List<Dog>>> completableFutures = getCompletableFutures(partition);

        final long l1 = System.currentTimeMillis();
        System.out.println("方法结束" + l1);
        System.out.println("时间差" + (l1 - l));
        List<Dog> dogList = new ArrayList<>();
        completableFutures.forEach(future -> {
            try {
                dogList.addAll(future.get());
            } catch (Exception e) {
                System.out.println("分页查询取消订单列表 -> 多线程处理取消列表异常");
            }
        });

        System.out.println("=====================================");
        System.out.println("查询结束,return:" + dogList);
        System.out.println("==============异步分组查询===============");
        System.out.println("=====================================");
    }

    // 异步查询
    private List<CompletableFuture<List<Dog>>> getCompletableFutures(List<List<Integer>> partition) {

        int partitionSize = partition.size();

        List<CompletableFuture<List<Dog>>> completableFutureList = IntStream.range(0, partitionSize).mapToObj(
                index -> CompletableFuture.supplyAsync(() ->
                        {
                            System.out.println("共 { " + partitionSize + " } 批,当前第 { " + (index + 1) + " } 批。分批查询数据 :{ " + partition.get(index) + " }");
                            return getSkuBasicInfoListResultTO(partition.get(index));
                        },
                        threadPoolExecutor)
        ).collect(Collectors.toList());
        // 等待执行结束
        CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[partitionSize])).join();
        return completableFutureList;
    }

    // 分批查询数据
    private List<Dog> getSkuBasicInfoListResultTO(List<Integer> request) {
        System.out.println("查询数据param :{ " + request + " }");
        List<Dog> dogList = new ArrayList<>();
        request.forEach(i -> dogList.add(selectDataBaseGetDog(i)));
        return dogList;
    }
}

测试结果

    /**
     =====================================
     ==============单线程查询===============
     查询数据,param:[1, 2, 3, 4, 5]
     =====================================
     方法开始:1662348134998
     select DataBase index :{ 1 }
     select DataBase index :{ 2 }
     select DataBase index :{ 3 }
     select DataBase index :{ 4 }
     select DataBase index :{ 5 }
     方法结束:1662348140040
     时间差:5042
     =====================================
     查询结束,return:[Dog{index=1, name='xxx1'}, Dog{index=2, name='xxx2'}, Dog{index=3, name='xxx3'}, Dog{index=4, name='xxx4'}, Dog{index=5, name='xxx5'}]
     ==============单线程查询===============
     =====================================




     分批大小 2 , 休眠时间 1000 , CPU核数 10
     =====================================
     ==============异步分组查询===============
     查询数据,param:[1, 2, 3, 4, 5]
     =====================================
     方法开始1662348140040
     共 { 3 } 批,当前第 { 1 } 批。分批查询数据 :{ [1, 2] }
     查询数据param :{ [1, 2] }
     共 { 3 } 批,当前第 { 2 } 批。分批查询数据 :{ [3, 4] }
     查询数据param :{ [3, 4] }
     共 { 3 } 批,当前第 { 3 } 批。分批查询数据 :{ [5] }
     查询数据param :{ [5] }
     select DataBase index :{ 1 }
     select DataBase index :{ 3 }
     select DataBase index :{ 5 }
     select DataBase index :{ 2 }
     select DataBase index :{ 4 }
     方法结束1662348142062
     时间差2022
     =====================================
     查询结束,return:[Dog{index=1, name='xxx1'}, Dog{index=2, name='xxx2'}, Dog{index=3, name='xxx3'}, Dog{index=4, name='xxx4'}, Dog{index=5, name='xxx5'}]
     ==============异步分组查询===============
     =====================================
     */

标签:异步,index,System,查询,分组,println,Dog,out
From: https://www.cnblogs.com/huiqing/p/16658025.html

相关文章

  • JS实现异步的方法:回调函数callback、事件监听、setTimeout、Promise、生成器Generato
    所有异步任务都是在同步任务执行结束之后,从任务队列中依次取出执行。回调函数是异步操作最基本的方法,比如AJAX回调,回调函数的优点是简单、容易理解和实现,缺点是不利于代码......
  • Python列表、元祖、字典查询速度对比
    先比较列表和字典的查询速度:字典查询速度快于列表,原因是:列表是有序的数组,每个值都有自己的索引位置,查询的时候会逐步偏移查找,很浪费时间,但是内存消耗小。而字典是key,value......
  • [HTML+CSS] 20.媒体查询,响应式布局
    笔记来源:尚硅谷Web前端HTML5&CSS3初学者零基础入门全套完整版目录媒体查询响应式布局媒体查询媒体查询语法媒体类型媒体特性断点媒体查询响应式布局网页可以根据不......
  • 分组报表
    操作步骤:1、新建一个普通报表2、新建一个数据源,查询语句如下select*from销量3、拖动地区、销售员、销量列到设计器4、销量那一列设置数据设置为分组求和。5......
  • [MySQL]查询所有表数据量
    1.所有表mysql>SELECTTABLE_NAME,TABLE_ROWSFROMinformation_schema.`TABLES`->WHERETABLE_SCHEMA=(SELECTdatabase())->ORDERBYTABLE_ROWSDESC;......
  • C# 手动终止async/await异步方法的几种实现
     终止异步方法的实现主要依靠 CancellationToken类 usingSystem;usingSystem.Net.Http;usingSystem.Threading;usingSystem.Threading.Tasks;namespaceC......
  • 使用媒体查询的响应式菜单 - 教程
    使用媒体查询的响应式菜单-教程HTML在HTML中,我们有标题和菜单。在菜单项中,我们有桌面和移动元素。屏幕大于500px时显示桌面,小于500px时显示手机。在移动类中,我们将......
  • MyBatis 五 ---查看详情&&条件查询
    查看详情1、编写接口方法:Mapper接口参数:id返回结果:Brand   2、编写SQL语句:SQL映射文件;参数占位......
  • 查询数字的最邻近
        这道题目要用二分+桶排的方式解决函数:l~r找vc:靠左/右(‘l’/‘r’)靠左和靠右用STL函数二分就行,这里讲一下思路,二分出最靠左/右的v值(but二维,在but[v][0~len......
  • Oracle根据用户名和表名查询表的字段和字段类型等信息
    1 该用户下所有表的字段筛选方法selecta.column_nameasuploadcolumnfromuser_tab_columnsawherea.DATA_TYPE='VARCHAR2'anda.TABLE_NAME='DCS_MED......