首页 > 其他分享 >记一次并发工具类countdownlantch 在线程池中的使用

记一次并发工具类countdownlantch 在线程池中的使用

时间:2023-01-02 22:24:27浏览次数:45  
标签:程池 java Thread countdownlantch util 并发 线程 import public

首先 新建一个 ThreadFactory:

package cn.likui.common.thread.pool;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lidongge
 * @Description:
 * @date 2018/12/25 上午10:23
 */
public class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    NamedThreadFactory(String name) {

        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        if (null == name || name.isEmpty()) {
            name = "pool";
        }

        namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon()) {
            t.setDaemon(false);


        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {

            t.setPriority(Thread.NORM_PRIORITY);

        }
        return t;
    }

}

其次 需要定义一个Configuration   bean   ThreadPoolConfig  :

package cn.likui.common.thread.pool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author lidongge
 * @Description: 自定义线程池
 * @date 2018/11/05 上午9:59
 */
@Slf4j
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ExecutorService getThreadPool() {
        ExecutorService es = new ThreadPoolExecutor(3,
                Integer.MAX_VALUE,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new NamedThreadFactory("zhangpeng_study")) {

            @Override
            public void beforeExecute(Thread t, Runnable r) {
                super.beforeExecute(t, r);
                log.info("===》工作线程:{} ,任务线程:{}准备执行",((Thread) r).getName(),t.getName());
            }

            @Override
            public void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                log.info("===》工作线程{}执行完成", ((Thread) r).getName(),t);
            }

            @Override
            public void terminated() {
                super.terminated();
                log.info("===》线程池退出");
            }

            @Override
            public void execute(Runnable job) {
                super.execute(wrap(job, clientTrace(), Thread.currentThread().getName()));
            }
            //捕获异常
            private Runnable wrap(Runnable job, Exception clientTrace, String name) {
                return new Thread(){
                    @Override
                    public void run(){

                        try {
                            job.run();
                        } catch (Exception e) {
                            log.error("===》线程池中线程执行出现异常:{} ,e:{}", clientTrace,e);
                        }
                    }

                };
            }

            private Exception clientTrace() {
                return new Exception("client statck trace");
            }
        };
        return es;
    }
}

然后编写自己的 县城处理核心代码:

package cn.likui.common.thread.handle;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Description:
 * @Author: lidongge
 * @Date: 2019/10/29 10:47
 */
@Slf4j
public class DataHandleService implements Runnable{

    /** 接收实际方法调用送来的参数,多个参数支持 */
    private CountDownLatch count;
    private List<Integer> paramList ;
    public DataHandleService(CountDownLatch count, List paramList){
        this.count = count;
        this.paramList = paramList;
    }

    @SneakyThrows
    @Override
    public void run() {
        int j = 0;
        for (int i =0; i < paramList.size(); i++) {
            j = j + paramList.get(i);
        }
        log.info("本次循环当前线程名称:{},和等于【{}】",Thread.currentThread().getName(),j);
        count.countDown();
        log.info("剩余任务数量:{}", count.getCount());
    }
}

 

 

最后是对该线程池的调用,期间不计算线程执行完后的返回情况

package cn.likui.study.controller;

import cn.likui.common.thread.handle.DataHandleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Description:  Demo
 * @Author: ldg
 * @Date: 2020/10/10
 */
@Slf4j
@RestController
@RequestMapping("/pool")
public class ThreadPoolController {

    /** 最大线程数*/
    private static final int MAX_THREAD_COUNT = 2;

    @Resource
    private ExecutorService ex;

    @RequestMapping("/testThread")
    @ResponseBody
    public Object testRestTemplate() throws InterruptedException {

        //
        List<Integer> paramList = new ArrayList<>();
        for (int i=0; i< 10000; i++) {
            paramList.add(i);
        }

        //等待4个线程
        CountDownLatch count = new CountDownLatch(MAX_THREAD_COUNT);
        //多线程请求执行开始
        long startTime = System.currentTimeMillis();

        for (int i = 0 ; i < MAX_THREAD_COUNT; i++) {
            try {
                //参数一 count,参数二 其他
                List<Integer> subList = new ArrayList<>();
                if (i == 0) {
                    subList = paramList.subList(0,5000);
                } else {
                    subList = paramList.subList(5001,9999);
                }

                ex.execute(new DataHandleService(count,subList));
            } catch (Exception e) {
                log.error("线程执行中出现异常提前结束", e);

                count.countDown();
                log.error("剩余任务数量:{}", count.getCount());
                e.printStackTrace();
            } finally {
//                ex.shutdown();
            }
        }

        //10秒超时 :超时之前 还未走完线程 会直接走主线
        count.await(20, TimeUnit.SECONDS);
//        ex.shutdown();
        long endTime = System.currentTimeMillis();
        log.info("多线程打印 耗时:"+ (endTime - startTime) + " ms");
        return null;
    }

}

 

标签:程池,java,Thread,countdownlantch,util,并发,线程,import,public
From: https://www.cnblogs.com/beixiaoyi/p/17020702.html

相关文章

  • 浅谈Java并发
    Java并发是比较难的知识点,难于对并发的理解。并发要从操作系统和硬件层面去理解,才会比较深入,而不单单是从编程语言的逻辑去理解。首先对于并发要清楚的几点:线程可能在任......
  • 并发学习笔记
    并发三大特性可见性当一个线程修改了共享变量的值,其他线程能够看到修改的值。保证可见性的方式:volatile修饰变量内存屏障:Unsafe.getUnsafe().storeFence();synchr......
  • 狂神说Go语言—并发编程
    聊聊进程、线程、协程多线程上方左图所示:在主线程中为main方法左图的右边为test方法,在main方法中调用test方法,mian方法执行就会先去执行test方法,执行完后再回到main......
  • C#开发的线程池和管理器 - 开源研究系列文章
          上次编写了一个小软件,用于练手及自己的一个小工具集合。今天把其中的线程池和管理器的代码抽取出来,写成一个博文,让需要的朋友能够进行学习和应用。    ......
  • 一次多线程并发查询导致结果混乱的问题的排查和记录·JAVA·2022
    业务背景该业务是报表查询类业务:要求从销售出库数据(存储于ElasticSearch索引中)中,按照管理者分组聚合查询各个管理者一定时间段内的动销汇总数据,如出库金额、毛利率等。但......
  • 【Storm篇】--Storm并发机制
    =========================================================声明:由于不同平台阅读格式不一致(尤其源码部分),所以获取更多阅读体验!!个人网站地址:​​http://www.lhworldblog.......
  • Linux下c++开发的轻量级、高性能、高并发的web服务器
     代码:WebServer.zip项目描述:通过HTTP协议与客户端(通常是浏览器(Browser))进行通信,来接收、存储、处理来自客户端的HTTP请求,并对其请求做出HTTP响应,返回给客户端其请......
  • 高并发应用设计
    一、高并发的说明和背景高并发解决的核心问题是在同一时间上有大量的请求过来,然后我们的系统要怎么抗住这些请求带来的压力。比如在线直播服务,同时有上百万甚至上千万......
  • 不背锅运维:上篇:Go并发编程
    基本使用package mainimport ( "fmt" "sync")var wg sync.WaitGroupfunc hello() { fmt.Println("hello func...") wg.Done() // 通知计数器减......
  • Go - 高并发抢到红包实现
    //utils.gopackagemainimport("fmt""math/rand""sync""time")//抢红包任务结构体typetaskstruct{iduint32//表示红包idcallbackcha......