Worker的实现
总体来说我们首先需要实现一个Worker线程类,让他的线程实例拿取任务然后执行任务。
//worker主要是从jobs中拿工作,拿不到等,拿到了执行。
class Worker implements Runnable{
private volatile boolean running = true;
@Override
public void run() {
while (running){
Job job = null;
synchronized (jobs){
while (jobs.isEmpty()){
try{
jobs.wait();
} catch (Exception e){
Thread.currentThread().interrupt();
return;
}
}
job = jobs.removeFirst();
}
if (job != null){
job.run();
}
}
}
public void shutdown(){
running = false;
}
}
由此我们引出了,需要一个集合来保存worker。
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
同时worker需要拿取任务,任务需要保存在一个队列中,所以引出来了jobs队列。
private final LinkedList<Job> jobs =new LinkedList<>();
线程池的初始化
前提工作准备好了,我们可以开始去实现一个简易的线程池。
初始化就免不了要提起那设定好初始化的数值。
private static final int DEFAULT_WORKER_NUMBERS = 5;
private int workerNum = DEFAULT_WORKER_NUMBERS;
private static final int MAX_WORKER_NUMBERS = 10;
private static final int MIN_WORKER_NUMBERS = 1;
初始工人为5个,对应的就是线程池中的核心线程数为5.
wokerNum用来即时记录工人数,也就是线程池中存活的线程数。
并且限制了最大工人数为10,最少工人数为1。对应的是最大线程数为10,最少线程数为1。
接下来就可以执行初始化了。
public ThreadExcutor(){
InitialWorkers(DEFAULT_WORKER_NUMBERS);
}
//将worker放入wokers集合中,然后开始让worker线程执行。
private void InitialWorkers(int num){
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker,"ThreadPool-Worker-"+ i);
thread.start();
}
}
//判断num是否在最大和最少工人范围之间,在的话将工人数量赋值num,不在的话num大于max就复制max,小于max就赋值min
public ThreadExcutor(int num){
this.workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : Math.max(num, MIN_WORKER_NUMBERS);
InitialWorkers(workerNum);
}
如果我们调用构造方法时,没有传入参数,他就直接默认生成5个工人,并将它依次加入工人集合中,并开启这个工人线程。
如果我们调用传入参数的方法,就需要判断传入的参数是否在max和min之间,如果在就赋值给workerNum进行记录,然后初始化相应的工人线程。如果不在就考虑大于max就初始化max的数量,如果小于min就初始化min数量。
线程池的五大基础方法实现
以下四种操作都会涉及到
执行方法
言简意赅就是将job放入jobs队列中,然后唤醒jobs队列。
//如果工作不为空,就将工作放入工作队列中,然后唤醒工作队列
public void execute(Job job){
if (job != null){
synchronized (jobs){
jobs.addLast(job);
jobs.notify();
}
}
}
添加工人(添加线程)
先判断是否超出最大工人数,超出了的话,强制初始化剩下的工人数。最后别忘了workerNum的即时记录。
//这里初始化时对jobs有操作,所以也对jobs进行加锁。添加工人时需要判断加上原来的工人是否超过了最大的工人数,
//如果超过了就将num赋值成不超过最大工人的最大数量,然后去用num数初始化,并且将工人数更新成最新。
public void addWorks(int num){
synchronized (jobs){
if (num + workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - workerNum;
}
InitialWorkers(num);
workerNum += num;
}
}
移除工人(删除工作线程)
这个就是不断地从工人集合中取出工人,然后将他们shutdown掉。别忘了workerNum的即时记录。
//先检查删除数是否在工人数之内,不在就抛出超出范围的异常。然后在工人集合中不断地取工人,然后进行shutdown,最后workerNum更新成剩下的工人数。
public void removeWorkers(int num){
synchronized (jobs){
if(num >= workerNum){
throw new IllegalArgumentException("超出范围");
}
int count = 0;
while (count < num){
Worker worker = workers.get(count);
if (workers.remove(worker)){
worker.shutdown();
count++;
}
}
workerNum -= count;
}
}
获取工作数
//获取工作数
public int getJobSize(){
return jobs.size();
}
停止工人线程
对工人集合进行遍历,逐一调用shutdown方法。
//遍历工人集合,一个一个关闭
public void shutdown(){
for (Worker worker : workers) {
worker.shutdown();
}
}
总结
这样一个建议的线程池就实现了。
总的来说增加和构造函数都比较需要初始化方法,所以这个记忆要深刻一些。
最后奉上完整代码
public class ThreadExcutor<Job extends Runnable> {
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
private final LinkedList<Job> jobs =new LinkedList<>();
private static final int DEFAULT_WORKER_NUMBERS = 5;
private int workerNum = DEFAULT_WORKER_NUMBERS;
private static final int MAX_WORKER_NUMBERS = 10;
private static final int MIN_WORKER_NUMBERS = 1;
public ThreadExcutor(){
InitialWorkers(DEFAULT_WORKER_NUMBERS);
}
//将worker放入wokers集合中,然后开始让worker线程执行。
private void InitialWorkers(int num){
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker,"ThreadPool-Worker-"+ i);
thread.start();
}
}
//判断num是否在最大和最少工人范围之间,在的话将工人数量赋值num,不在的话num大于max就复制max,小于max就赋值min
public ThreadExcutor(int num){
this.workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : Math.max(num, MIN_WORKER_NUMBERS);
InitialWorkers(workerNum);
}
//如果工作不为空,就将工作放入工作队列中,然后唤醒工作队列
public void execute(Job job){
if (job != null){
synchronized (jobs){
jobs.addLast(job);
jobs.notify();
}
}
}
//遍历工人集合,一个一个关闭
public void shutdown(){
for (Worker worker : workers) {
worker.shutdown();
}
}
//这里初始化时对jobs有操作,所以也对jobs进行加锁。添加工人时需要判断加上原来的工人是否超过了最大的工人数,
//如果超过了就将num赋值成不超过最大工人的最大数量,然后去用num数初始化,并且将工人数更新成最新。
public void addWorks(int num){
synchronized (jobs){
if (num + workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - workerNum;
}
InitialWorkers(num);
workerNum += num;
}
}
//先检查删除数是否在工人数之内,不在就抛出超出范围的异常。然后在工人集合中不断地取工人,然后进行shutdown,最后workerNum更新成剩下的工人数。
public void removeWorkers(int num){
synchronized (jobs){
if(num >= workerNum){
throw new IllegalArgumentException("超出范围");
}
int count = 0;
while (count < num){
Worker worker = workers.get(count);
if (workers.remove(worker)){
worker.shutdown();
count++;
}
}
workerNum -= count;
}
}
//获取工作数
public int getJobSize(){
return jobs.size();
}
//worker主要是从jobs中拿工作,拿不到等,拿到了执行。
class Worker implements Runnable{
private volatile boolean running = true;
@Override
public void run() {
while (running){
Job job = null;
synchronized (jobs){
while (jobs.isEmpty()){
try{
jobs.wait();
} catch (Exception e){
Thread.currentThread().interrupt();
return;
}
}
job = jobs.removeFirst();
}
if (job != null){
job.run();
}
}
}
public void shutdown(){
running = false;
}
}
}
标签:jobs,实现,workerNum,worker,简易,int,num,线程
From: https://blog.csdn.net/weixin_62773644/article/details/136638095