首页 > 其他分享 >线程池的使用:批量导入、数据汇总、异步保存搜索记录

线程池的使用:批量导入、数据汇总、异步保存搜索记录

时间:2024-06-12 20:59:29浏览次数:9  
标签:异步 Map id 导入 线程 import org public

文章目录

1、场景一:MySQL批量导入数据到ES

场景: 需要将库里的1000万左右的数据量,导入到ES索引库中

实现思路: 分批处理,防止OOM,使用线程池 + CountDownLatch(控制线程协作,等所有线程都干完活儿后,才能继续往下走

在这里插入图片描述

1.1 CountDownLatch

将MySQL的数据按总量分批,一批批的提交到线程池去处理
在这里插入图片描述
提交完后,可以选择直接返回,不去等进入线程池阻塞队列的任务都执行完。也可以使用CountDownLatch,线程池的线程每处理一个提交的任务,就让CountDownLatch减一(CountDownLatch初始化等待计数等于批次数),阻塞请求过来的那个线程,直到CountDownLatch减为0,最后统计下总耗时,返回给前端

1.2 流程图

在这里插入图片描述

调用ES同步接口 ⇒ 查库分批 ⇒ 提交任务到线程池 ⇒ CountDownLatch阻塞当前http请求对应的线程 ⇒ CountDownLatch计数减为0后,http请求对应的线程继续执行,统计总耗时

1.3 代码实现

Service层:

public interface ApArticleService {

    /**
     * 批量导入
     */
    public void importAll();
}

Service层实现:

import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl implements ApArticleService {

	//数据层对象
    @Autowired
    private ApArticleMapper apArticleMapper;

	//ES客户端操作对象,自定义的Bean
    @Autowired
    private RestHighLevelClient client;

	//线程池对象,自定义的Bean
    @Autowired
    private ExecutorService executorService;

    private static final String ARTICLE_ES_INDEX = "app_info_article";

	//每批2000条
    private static final int PAGE_SIZE = 2000;

    /**
     * 批量导入
     */
    @SneakyThrows
    @Override
    public void importAll() {

        //总条数
        int count = apArticleMapper.selectCount();
        //总页数
        int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;
        //开始执行时间
        long startTime = System.currentTimeMillis();
        //一共有多少页,就创建多少个CountDownLatch的计数
        CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);

        int fromIndex;
        List<SearchArticleVo> articleList = null;

        for (int i = 0; i < totalPageSize; i++) {
            //起始分页条数
            fromIndex = i * PAGE_SIZE;
            //查询文章
            articleList = apArticleMapper.loadArticleList(fromIndex, PAGE_SIZE);
            //创建线程,做批量插入es数据操作,TaskThread实现Runnable,定义了写入ES的逻辑
            TaskThread taskThread = new TaskThread(articleList, countDownLatch);
            //提交任务给线程池中的线程执行
            executorService.execute(taskThread);
        }

        //调用await()方法,用来等待计数归零
        countDownLatch.await();

        long endTime = System.currentTimeMillis();
        log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒", count, (endTime - startTime) / 1000);
    }
	
	//任务,遍历每一批里的一条条数据,用BulkRequest批量导入ES
    class TaskThread implements Runnable {

        List<SearchArticleVo> articleList;
        CountDownLatch cdl;

        public TaskThread(List<SearchArticleVo> articleList, CountDownLatch cdl) {
            this.articleList = articleList;
            this.cdl = cdl;
        }

        @SneakyThrows
        @Override
        public void run() {
            //批量导入
            BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);

            for (SearchArticleVo searchArticleVo : articleList) {
                bulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString())
                        .source(JSON.toJSONString(searchArticleVo), XContentType.JSON));
            }
            //发送请求,批量添加数据到es索引库中
            client.bulk(bulkRequest, RequestOptions.DEFAULT);

            //让计数减一
            cdl.countDown();
        }
    }
}

ES客户端的Bean定义:

import lombok.Getter;
import lombok.Setter;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
    private String host;
    private int port;

    @Bean
    public RestHighLevelClient client(){
        return new RestHighLevelClient(RestClient.builder(
                new HttpHost(
                        host,
                        port,
                        "http"
                )
        ));
    }
}

线程池的Bean定义:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
public class ThreadPoolConfig {

    /**
     * 核心线程池大小
     */
    private static final int CORE_POOL_SIZE = 17;

    /**
     * 最大可创建的线程数
     */
    private static final int MAX_POOL_SIZE = 50;

    /**
     * 队列最大长度
     */
    private static final int QUEUE_CAPACITY = 1000;

    /**
     * 线程池维护线程所允许的空闲时间
     */
    private static final int KEEP_ALIVE_SECONDS = 500;

    @Bean("taskExecutor")
    public ExecutorService executorService(){
        AtomicInteger c = new AtomicInteger(1);
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);
        return new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_SECONDS,
                TimeUnit.MILLISECONDS,
                queue,
                r -> new Thread(r, "sync-es-pool-" + c.getAndIncrement()),
                new ThreadPoolExecutor.DiscardPolicy()
        );
    }
}

1.4 效果

@SpringBootTest(classes = CDLApplication.class)
@RunWith(SpringRunner.class)
public class ApArticleServiceImplTest {

    @Autowired
    private ApArticleService apArticleService;

    @Test
    public void importAll() {
        apArticleService.importAll();
    }
}

在这里插入图片描述

2、场景二:数据汇总

2.1 流程图

在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息。但现在这三个信息各自在其他三个微服务中,如果用一个线程分别去查,再汇总,则耗时为 Time a + Time b + Time c,如下,耗时1800ms

在这里插入图片描述

可优化为:改为三个查询并行处理,总耗时800ms

在这里插入图片描述

2.2 代码实现

模拟下订单信息、包含的商品、物流信息的接口:

// 商品信息,sleep 800秒
@RestController
@RequestMapping("/product")
public class ProductController {


    @SneakyThrows
    @GetMapping("/get/{id}")
    public Map<String, Object> product(@PathVariable int id)  {
        HashMap<String, Object> map = new HashMap<>();
        if (id == 1) {
            map.put("name", "小爱音箱");
            map.put("price", 300);
        } else if (id == 2) {
            map.put("name", "小米手机");
            map.put("price", 2000);
        }
        map.put("id", id);
        Thread.sleep(800);
        return map;
    }
}

//订单信息,sleep 500秒
@RestController
@RequestMapping("/order")
public class OrderController {


    @SneakyThrows
    @GetMapping("/get/{id}")
    public Map<String, Object> order(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("total", "2300.00");
        Thread.sleep(500);
        return map;
    }
}
//快递信息,sleep 500秒
@RestController
@RequestMapping("/logistics")
public class LogisticsController {

    @SneakyThrows
    @GetMapping("/get/{id}")
    public Map<String, Object> logistics(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("name", "中通快递");
        Thread.sleep(500);
        return map;
    }
}

性能低的实现:一个线程里分别做三次远程调用

@SneakyThrows
@GetMapping("/get/detail/{id}")
public Map<String, Object> getOrderDetail() {

    long startTime = System.currentTimeMillis();

    Map<String, Object> order = restTemplate.getForObject("http://localhost:9991/order/get/{id}", Map.class, 1);

    Map<String, Object> product = restTemplate.getForObject("http://localhost:9991/product/get/{id}", Map.class, 1);

    Map<String, Object> logistics = restTemplate.getForObject("http://localhost:9991/logistics/get/{id}", Map.class, 1);

    long endTime = System.currentTimeMillis();



    Map<String, Object> resultMap = new HashMap<>();
    resultMap.put("order", order);
    resultMap.put("product", product);
    resultMap.put("logistics", logistics);

    log.info("接口调用共耗时:{}毫秒",endTime-startTime);
    return resultMap;
}

耗时与计算的近似:

在这里插入图片描述

优化:提交三个远程调用的任务,并行去查,使用Future去get获取线程执行的结果

public class OrderDetailController {

    //@Autowired
   //private RestTemplate restTemplate;

    @Autowired
    private ExecutorService executorService;


    @SneakyThrows
    @GetMapping("/get/detail_new/{id}")
    public Map<String, Object> getOrderDetailNew() {

        long startTime = System.currentTimeMillis();

        Future<Map<String, Object>> f1 = executorService.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:9991/order/get/{id}", Map.class, 1);
            return r;
        });
        Future<Map<String, Object>> f2 = executorService.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:9991/product/get/{id}", Map.class, 1);
            return r;
        });

        Future<Map<String, Object>> f3 = executorService.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:9991/logistics/get/{id}", Map.class, 1);
            return r;
        });


        Map<String, Object> resultMap = new HashMap<>();
        resultMap.put("order", f1.get());
        resultMap.put("product", f2.get());
        resultMap.put("logistics", f3.get());

        long endTime = System.currentTimeMillis();

        log.info("接口调用共耗时:{}毫秒",endTime-startTime);
        return resultMap;
    }

耗时约800ms:

在这里插入图片描述

汇总数据时,如果所调用的接口之间没有依赖关系,则可使用线程池 + future 来优化性能,如报表汇总 近期刚好做了一个资产全景的接口

在这里插入图片描述

3、场景三:异步调用

3.1 需求

搜索时,保存历史搜索记录。用户搜索一个keyword时,存历史搜索记录,不能影响到查询的性能,用异步去处理,将存历史记录的任务提交到线程池

在这里插入图片描述

3.2 代码实现

Controller层:

@RestController
@RequestMapping("/api/v1/article")
public class ArticleSearchController {

    @Autowired
    private ArticleSearchService articleSearchService;

    @GetMapping("/search")
    public List<Map> search(String keyword){
        return articleSearchService.search(keyword);
    }
}

Service层:注意调用 insert(userId,keyword) 异步写入:

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Service
@Slf4j
public class ArticleSearchServiceImpl implements ArticleSearchService {

    @Autowired
    private RestHighLevelClient client;

    private static final String ARTICLE_ES_INDEX = "app_info_article";

    private int userId = 1102;

    @Autowired
    private ApUserSearchService apUserSearchService;

    /**
     * 文章搜索
     * @return
     */
    @Override
    public List<Map> search(String keyword) {

        try {
            SearchRequest request = new SearchRequest(ARTICLE_ES_INDEX);

            //设置查询条件
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            //第一个条件
            if(null == keyword || "".equals(keyword)){
                request.source().query(QueryBuilders.matchAllQuery());
            }else {
                request.source().query(QueryBuilders.queryStringQuery(keyword).field("title").defaultOperator(Operator.OR));
                //保存搜索历史,异步
                apUserSearchService.insert(userId,keyword);
            }
            //分页
            request.source().from(0);
            request.source().size(20);

            //按照时间倒序排序
            request.source().sort("publishTime", SortOrder.DESC);
            //搜索
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);

            //解析结果
            SearchHits searchHits = response.getHits();
            //获取具体文档数据
            SearchHit[] hits = searchHits.getHits();
            List<Map> resultList = new ArrayList<>();
            for (SearchHit hit : hits) {
                //文档数据
                Map map = JSON.parseObject(hit.getSourceAsString(), Map.class);
                resultList.add(map);
            }
            return resultList;

        } catch (IOException e) {
            throw new RuntimeException("搜索失败");
        }

    }
}

@Async异步写入用户的搜索记录(注意启动类加@EnableAsync)

@Service
@Slf4j
public class ApUserSearchServiceImpl implements ApUserSearchService {


    /**
     * 保存搜索历史记录
     * @param userId
     * @param keyword
     */
    @Async("taskExecutor")	//taskExecutor是前面场景一中自己定义的线程池的Bean
    @Override
    public void insert(Integer userId, String keyword) {

        //保存用户记录  mongodb或mysql
        //执行业务

        log.info("用户搜索记录保存成功,用户id:{},关键字:{}",userId,keyword);

    }
}

标签:异步,Map,id,导入,线程,import,org,public
From: https://blog.csdn.net/llg___/article/details/139626919

相关文章

  • GATK不能多线程的问题
    问题:申请了多线程但是只能单线程 17:13:48.941INFOIntelPairHmm-Availablethreads:117:13:48.941INFOIntelPairHmm-Requestedthreads:417:13:48.941WARNIntelPairHmm-Using1availablethreads,but4wererequested 解决方法:exportOMP_N......
  • 线程池
    从上图可以看到,线程被创建出来之后,都处于睡眠态,它们实际上是进入了条件量的等待队列中。而任务都被放入一个链表,被互斥锁保护起来。下面是线程池里面线程们的一生:\1.被创建\2.写遗书(准备好退出处理函数,防止在持有一把锁的状态中死去)\3.试图持有互斥锁(等待任务)\4.判断是......
  • 【C】线程池实现
    后续会移植为C++版文章目录一、线程池原理二、一些函数2.1pthread_cond_wait()2.2pthread_cond_signal()2.3pthread_create()2.4pthread_exit()三、任务队列定义四、线程池定义五、头文件内容threadpool.h六、.c文件实现6.1threadpool.c文件6.2TestMain测......
  • 【C++】多线程(基于Windows以及pthread库)
    文章目录一、前言1.1进程和线程二、创建线程2.1线程函数pthread_self(void)2.2创建线程三、线程退出3.1线程函数pthread_exit()四、线程回收4.1线程函数pthread_join()4.2线程数据回收五、线程分离5.1线程函数pthread_detach()六、C++线程类七、线程同......
  • 线程安全问题【snychornized 、死锁、线程通信】
    目录一、线程安全1.1线程安全问题?1.2如何解决线程安全问题方法具体如何实现?1.3同步方法1.4同步代码块1.5总结1.6售票例子1.8补充二、线程安全的集合三、死锁【了解】四、线程通信4.1同步方法4.2同步代码块4.3wait和sleep本篇的思维导图最后一、线程......
  • python指南之多线程与多进程编程大全
    Python作为一种高级编程语言,提供了多种并发编程的方式,其中多线程与多进程是最常见的两种方式之一。在本文中,我们将探讨Python中多线程与多进程的概念、区别以及如何使用线程池与进程池来提高并发执行效率。多线程与多进程的概念多线程多线程是指在同一进程内,多个线程并发执......
  • Java线程池以及Future和CompletableFuture的用法
    参考:https://blog.csdn.net/weixin_50330544/article/details/1316871501.线程池为什么使用线程池?频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。系统无法合理管理内部的资源分布,会降低系统的稳定性。......
  • 学习分享-Tomcat 的线程池在工作方式上与普通的 Java 线程池的区别
    前言最近在学习过程中遇到在某个场景下:修改某条数据时,给该线程上分布式写锁,然后引入延迟队列处理其他请求;这个方案有一定的缺点,因为在用到消息队列时,不存在占用过多线程从而导致OOM的问题,消费者组只会安排固定的几个线程去拉取消息,如果碰到上面那种拿不到锁的情况,阻塞等待......
  • 网易面试:SpringBoot如何开启虚拟线程?
    虚拟线程(VirtualThread)也称协程或纤程,是一种轻量级的线程实现,与传统的线程以及操作系统级别的线程(也称为平台线程)相比,它的创建开销更小、资源利用率更高,是Java并发编程领域的一项重要创新。PS:虚拟线程正式发布于Java长期支持版(LongTermSuort,LTS)Java21(也就是JDK21)。......
  • Interlocked 为多个线程共享的变量提供原子操作 多线程重入
    Interlocked可以为多个线程共享的变量提供原子操作主要使用的读写方法varrunningState=Interlocked.Read(refisRunning);Interlocked.Exchange(refisRunning,0);可以配合lock实现业务常用方法Add(Int32,Int32) 对两个32位整数进行求和并用和替换第一个整数,上述操......