首页 > 编程语言 >Java 线程池的原理与实现

Java 线程池的原理与实现

时间:2023-06-27 18:04:33浏览次数:35  
标签:Java private 任务 线程 Date 原理 public wait


[分享]Java 线程池的原理与实现

 

这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧。

线程池就是其中之一,一提到线程,我们会想到以前《操作系统》的生产者与消费者,信号量,同步控制等等。

一提到池,我们会想到数据库连接池,但是线程池又如何呢?

 

建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用法。

关于我对同步的认识,要缘于大三年的一本书,书名好像是 Java 实战,这本书写得实在太妙了,真正的从理论到实践,从截图分析到.class字节码分析。哇,我想市场上很难买到这么精致的书了。作为一个Java爱好者,我觉得绝对值得一读。

我对此书印象最深之一的就是:equal()方法,由浅入深,经典!

还有就是同步了,其中提到了我的几个编程误区,以前如何使用同步提高性能等等,通过学习,使我对同步的认识进一步加深。

--------------------------------------------------------------------------------------------------

简单介绍

 

    创建线程有两种方式:继承Thread或实现Runnable。Thread实现了Runnable接口,提供了一个空的run()方法,所以不论是继承Thread还是实现Runnable,都要有自己的run()方法。

    一个线程创建后就存在,调用start()方法就开始运行(执行run()方法),调用wait进入等待或调用sleep进入休眠期,顺利运行完毕或休眠被中断或运行过程中出现异常而退出。

 

wait和sleep比较:

    sleep方法有:sleep(long millis),sleep(long millis, long nanos),调用sleep方法后,当前线程进入休眠期,暂停执行,但该线程继续拥有监视资源的所有权。到达休眠时间后线程将继续执行,直到完成。若在休眠期另一线程中断该线程,则该线程退出。

    wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),调用wait方法后,该线程放弃监视资源的所有权进入等待状态;

      wait():等待有其它的线程调用notify()或notifyAll()进入调度状态,与其它线程共同争夺监视。wait()相当于wait(0),wait(0, 0)。

      wait(long timeout):当其它线程调用notify()或notifyAll(),或时间到达timeout亳秒,或有其它某线程中断该线程,则该线程进入调度状态。

      wait(long timeout, long nanos):相当于wait(1000000*timeout + nanos),只不过时间单位为纳秒。

===================================================================================

线程池:

    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

    

 假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

    

    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

                一个线程池包括以下四个基本组成部分:

      1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;

      2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;

      3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;

     4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

                

    线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

 

    线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

 

    假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

------------------------------------------------------------------------------

好了,废话就到这里了,下面就是程序了,我也不讲解了,注释已经很清晰了:

/** 线程池类,工作线程作为其内部类 **/
package org.ymcn.util;
 
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
 
import org.apache.log4j.Logger;
/**
* 线程池
* 创建线程池,销毁线程池,添加新任务
* 
* @author obullxl
*/
public final class ThreadPool {
    private static Logger logger = Logger.getLogger(ThreadPool.class);
    private static Logger taskLogger = Logger.getLogger("TaskLogger");
 
    private static boolean debug = taskLogger.isDebugEnabled();
    // private static boolean debug = taskLogger.isInfoEnabled();
    /* 单例 */
    private static ThreadPool instance = ThreadPool.getInstance();
 
    public static final int SYSTEM_BUSY_TASK_COUNT = 150;
    /* 默认池中线程数 */
    public static int worker_num = 5;
    /* 已经处理的任务数 */
    private static int taskCounter = 0;
    public static boolean systemIsBusy = false;
    private static List<Task> taskQueue = Collections
            .synchronizedList(new LinkedList<Task>());
    /* 池中的所有线程 */
    public PoolWorker[] workers;
 
    private ThreadPool() {
        workers = new PoolWorker[5];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }
    private ThreadPool(int pool_worker_num) {
        worker_num = pool_worker_num;
        workers = new PoolWorker[worker_num];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }
    public static synchronized ThreadPool getInstance() {
        if (instance == null)
            return new ThreadPool();
        return instance;
    }
    /**
    * 增加新的任务
    * 每增加一个新任务,都要唤醒任务队列
    * @param newTask
    */
    public void addTask(Task newTask) {
        synchronized (taskQueue) {
            newTask.setTaskId(++taskCounter);
            newTask.setSubmitTime(new Date());
            taskQueue.add(newTask);
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        logger.info("Submit Task<" + newTask.getTaskId() + ">: "
                + newTask.info());
    }
    /**
    * 批量增加新任务
    * @param taskes
    */
    public void batchAddTask(Task[] taskes) {
        if (taskes == null || taskes.length == 0) {
            return;
        }
        synchronized (taskQueue) {
            for (int i = 0; i < taskes.length; i++) {
                if (taskes[i] == null) {
                    continue;
                }
                taskes[i].setTaskId(++taskCounter);
                taskes[i].setSubmitTime(new Date());
                taskQueue.add(taskes[i]);
            }
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        for (int i = 0; i < taskes.length; i++) {
            if (taskes[i] == null) {
                continue;
            }
            logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
                    + taskes[i].info());
        }
    }
    /**
    * 线程池信息
    * @return
    */
    public String getInfo() {
        StringBuffer sb = new StringBuffer();
        sb.append("/nTask Queue Size:" + taskQueue.size());
        for (int i = 0; i < workers.length; i++) {
            sb.append("/nWorker " + i + " is "
                    + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
        }
        return sb.toString();
    }
    /**
    * 销毁线程池
    */
    public synchronized void destroy() {
        for (int i = 0; i < worker_num; i++) {
            workers[i].stopWorker();
            workers[i] = null;
        }
        taskQueue.clear();
    }
    /**
    * 池中工作线程
    * 
    * @author obullxl
    */
    private class PoolWorker extends Thread {
        private int index = -1;
        /* 该工作线程是否有效 */
        private boolean isRunning = true;
        /* 该工作线程是否可以执行新任务 */
        private boolean isWaiting = true;
        public PoolWorker(int index) {
            this.index = index;
            start();
        }
        public void stopWorker() {
            this.isRunning = false;
        }
        public boolean isWaiting() {
            return this.isWaiting;
        }
        /**
        * 循环执行任务
        * 这也许是线程池的关键所在
        */
        public void run() {
            while (isRunning) {
                Task r = null;
                synchronized (taskQueue) {
                    while (taskQueue.isEmpty()) {
                        try {
                            /* 任务队列为空,则等待有新任务加入从而被唤醒 */
                            taskQueue.wait(20);
                        } catch (InterruptedException ie) {
                            logger.error(ie);
                        }
                    }
                    /* 取出任务执行 */
                    r = (Task) taskQueue.remove(0);
                }
                if (r != null) {
                    isWaiting = false;
                    try {
                        if (debug) {
                            r.setBeginExceuteTime(new Date());
                            taskLogger.debug("Worker<" + index
                                    + "> start execute Task<" + r.getTaskId() + ">");
                            if (r.getBeginExceuteTime().getTime()
                                    - r.getSubmitTime().getTime() > 1000)
                                taskLogger.debug("longer waiting time. "
                                        + r.info() + ",<" + index + ">,time:"
                                        + (r.getFinishTime().getTime() - r
                                                .getBeginExceuteTime().getTime()));
                        }
                        /* 该任务是否需要立即执行 */
                        if (r.needExecuteImmediate()) {
                            new Thread(r).start();
                        } else {
                            r.run();
                        }
                        if (debug) {
                            r.setFinishTime(new Date());
                            taskLogger.debug("Worker<" + index
                                    + "> finish task<" + r.getTaskId() + ">");
                            if (r.getFinishTime().getTime()
                                    - r.getBeginExceuteTime().getTime() > 1000)
                                taskLogger.debug("longer execution time. "
                                        + r.info() + ",<" + index + ">,time:"
                                        + (r.getFinishTime().getTime() - r
                                                .getBeginExceuteTime().getTime()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        logger.error(e);
                    }
                    isWaiting = true;
                    r = null;
                }
            }
        }
    }
}
/** 任务接口类 **/
package org.ymcn.util;
import java.util.Date;
/**
* 所有任务接口
* 其他任务必须继承访类
* @author obullxl
*/
public abstract class Task implements Runnable {
    // private static Logger logger = Logger.getLogger(Task.class);
    /* 产生时间 */
    private Date generateTime = null;
    /* 提交执行时间 */
    private Date submitTime = null;
    /* 开始执行时间 */
    private Date beginExceuteTime = null;
    /* 执行完成时间 */
    private Date finishTime = null;
    private long taskId;
 
    public Task() {
        this.generateTime = new Date();
    }
    /**
    * 任务执行入口
    */
    public void run() {
        /**
        * 相关执行代码
        * beginTransaction();
        * 执行过程中可能产生新的任务 subtask = taskCore();
        * commitTransaction();
        * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
        */
    }
 
    /**
    * 所有任务的核心 所以特别的业务逻辑执行之处
    * @throws Exception
    */
    public abstract Task[] taskCore() throws Exception;
    /**
    * 是否用到数据库
    * @return
    */
    protected abstract boolean useDb();
    /**
    * 是否需要立即执行
    * @return
    */
    protected abstract boolean needExecuteImmediate();
    /**
    * 任务信息
    * @return String
    */
    public abstract String info();
 
    public Date getGenerateTime() {
        return generateTime;
    }
 
    public Date getBeginExceuteTime() {
        return beginExceuteTime;
    }
 
    public void setBeginExceuteTime(Date beginExceuteTime) {
        this.beginExceuteTime = beginExceuteTime;
    }
 
    public Date getFinishTime() {
        return finishTime;
    }
 
    public void setFinishTime(Date finishTime) {
        this.finishTime = finishTime;
    }
 
    public Date getSubmitTime() {
        return submitTime;
    }
 
    public void setSubmitTime(Date submitTime) {
        this.submitTime = submitTime;
    }
 
    public long getTaskId() {
        return taskId;
    }
 
    public void setTaskId(long taskId) {
        this.taskId = taskId;
    }
}

标签:Java,private,任务,线程,Date,原理,public,wait
From: https://blog.51cto.com/nethub/6564748

相关文章

  • java动态代理技术
    主要用来做方法的增强,让你可以在不修改源码的情况下,增强一些方法,在方法执行前后做任何你想做的事情(甚至根本不去执行这个方法),因为在InvocationHandler的invoke方法中,你可以直接获取正在调用方法对应的Method对象,具体应用的话,比如可以添加调用日志,做事务控制等。还......
  • Java NIO
    NIO主要有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开、数据达到)。因此,单个线程可......
  • java JAXB 学习
    JAXB(JavaArchitectureforXMLBinding)是JDK的一部分,用于Object<->XML的转换(有点类似于.NET中的XML序列化)。1、创建XSD可以使用任何工具生成XSD工具,比如XMLSPY。eclipse也提供了相关的jaxb插件,File->New->XMLSchemaFile文件命名为order.xsd,eclipse中也提供了xsd可视化编......
  • 基于vue +Java+springboot+element-ui开发的智慧班牌系统源码
    电子班牌系统又称之为智慧班牌,是当前校园数字化信息化建设、文化建设的主流,是校园日常工作安排、校园信息发布、班级文化风采展示、课堂交流、家校互通的重要应用载体。在每个班级门口安装一台电子班牌终端,实现学校日常管理、校园信息化建设数据对接,为学生提供一个德智教育文化环境......
  • django框架原理
    一、MTV设计 T模板(Template)接收用户输入后交由V视图(View)去处理,V视图(View)负责连接M模型(Model)进行数据操作、并将操作返回的结果再传送给T模板(Template)进行展示。以上就是Django框架的MTV模式的基本工作原理 二、Django框架的View视图展示机制 在Django服务器......
  • python实现多进程和多线程
    https://blog.csdn.net/weixin_44917390/article/details/119610760创建多进程方法:importmultiprocessingimporttimedefsing():foriinrange(3):print("iamsingooo~")time.sleep(0.5)defdance():foriinrange(3):......
  • Java-基本语法回顾总结[13-24]
    (13)copyonwriteArrayList线程安全的arrayList,底层也是用数组实现的,主要集中在读与写操作上读:由于读写分别在老新数组上,因此,互相不干扰,也因此,读的性能不会受写的性能影响[适用于读多写少]写:写操作会生成新数组,在完成之前,其他线程无法进行写操作[上了锁,线程安全];在完成之前,读的......
  • c# Thread.Sleep 与 Task.Delay 在多线程中的影响
    一般在函数执行的时候,如果需要让一个任务等待一会儿在执行,大部分都是采用的Thread.Sleep()语句。但如果该函数要复用,同时要给函数一个参数,并让该函数被线程调用后并发执行。当采用如下调用方式的时候,就会出现什么情况呢?就会出现线程阻塞,你会发现只有task1执行,也即只有一个线程......
  • CentOS7+java8+hadoop3.3.5环境搭建
    需要的配置文件centos7的镜像centos-7.9.2009-isos-x86_64安装包下载_开源镜像站-阿里云(aliyun.com)java8JavaDownloads|Oraclehadoop3.3.5Indexof/dist/hadoop/common/hadoop-3.3.5(apache.org)步骤首先第一步在本地下载好vmware和centos7的镜像 之后的......
  • Java基础 -Day04
    Java基础-Day04For循环循环结构的4个要素:①初始化条件②循环条件----->只能是Boolean类型③循环体④迭代条件循环结构for(①;②;④){③}执行过程:①->②->③->④->②->③->④->...->②/*输入两个正整数(m,n),求其最大公约数和最小公倍数*/impor......