下面介绍取消线程常用的4中方式:
一、通过设置“cancelled requested”标志来中断线程
java中的任务取消实现:
是通过一个协作机制完成的,使用一个线程能够要求另一个线程停止当前的工作。
这样做的原因是:
如果直接要求一个任务、线程或者服务立即停止,可能会导致共享的数据结构处于不一致的状态。
当要求他们停止时,他们首先会清除当前进程中的工作,然后再终止。提供了更好的灵活性。因为
任务代码本身比发出取消请求的代码更明确应该清除什么。
典型的协作机制:
设置“cancelled requested”标志,任务会定期查看;
如果发现标志被设置过,任务就会提前结束。
一般为了让这个标志变量的状态更可靠一点,cancelled最好是volatile类型的。
取消策略:
说明关于取消的“how”,"when","what"————代码如何请求取消任务,任务在什么
时候检查取消的请求是否到达,响应取消请求的任务中应有的行为。
public class CancelThread {
/**
* 生成素数任务
* @author hadoop
*
*/
class PrimeGenerator implements Runnable{
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
@Override
public void run() {
BigInteger p = BigInteger.ONE;
while(!cancelled){
//得到下一个素数
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel(){
cancelled = true;
}
public synchronized List<BigInteger> get(){
return new ArrayList<BigInteger>(primes);
}
}
/**
* 生成素数的程序运行1s钟后,取消任务
* @return
* @throws InterruptedException
*/
List<BigInteger> aSecondOfPrimes() throws InterruptedException{
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
TimeUnit.SECONDS.sleep(1);
}finally{
generator.cancel();
}
return generator.get();
}
public static void main(String[] args) throws InterruptedException {
CancelThread cancelThread = new CancelThread();
List<BigInteger> bList = cancelThread.aSecondOfPrimes();
for (BigInteger bigInteger : bList) {
System.out.println(bigInteger);
}
}
}
二、通过线程的interrupt()方法来中断线程
中断通常是实现取消最明智的选择
中断线程
中断:本身并不会真正中断一个正在运行的线程,仅仅发出中断请求,线程自己会在下一个方便的时刻中断
(这些时刻被称为取消点,cancellation point)
有些方法如wait,sleep和join方法,当接收一个中断请求时,会抛出一个异常,或者进入时中断状态就已经被设置了。
可阻塞的库函数,通过抛出InterruptedException作为中断的响应
常用的取消机制(设置“cancelled requested”标志)不能与可阻塞的库函数进行良好的互动。阻塞会导致中断无法立刻响应,或干脆无法响应。
如果任务代码响应中断,那么可以使用中断作为你的取消机制,而不是boolean标志来请求取消。
示例中进行了两个点的中断检查:
1.调用阻塞的put方法时
2.循环开始出显示地采集中断状态。
中断策略:
决定线程如何应对中断请求————当发现中断请求时,它会做什么,哪些工作单元对应中断来说是原子操作,
以及在多快的时间里响应中断。
线程应该只能被线程的所有者中断;所有者可以把线程的中断策略信息封装到一个合适的取消机制中,比如关闭(shutdown)方法
public class InterruptThread {
/**
* 素数生产者,向阻塞队列中放入内容
* @author hadoop
*
*/
class BrokenPrimePreducer extends Thread{
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimePreducer(BlockingQueue<BigInteger> queue){
this.queue = queue;
}
public void run(){
try {
BigInteger p = BigInteger.ONE;
while(!Thread.currentThread().isInterrupted()){//第一出检查中断
//第二次检查中断,put操作中 notFull.await();在等待操作中会检查线程是否中断
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException e) {
System.out.println("进行中断的逻辑处理");
}
}
public void cancel(){
this.interrupt();
}
}
void consumePrimes() throws InterruptedException{
//阻塞队列中只能存放5个对象
BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(5);
BrokenPrimePreducer producer = new BrokenPrimePreducer(primes);
long start = System.currentTimeMillis();
//开始执行线程,向队列中存放内容
producer.start();
long useTime = 0;
try {
while(needMorePrimes(useTime)){//是否需要更多的素数对象
Thread.sleep(40);
//这里取消消费者,让堵塞队列始终处于调用put的堵塞状态,这样就始终不会再对cancelled取消标志进行判断了,所以通过取消标志无法取消任务
//primes.take();
useTime = System.currentTimeMillis() - start;
}
}finally{
producer.cancel();
for (BigInteger bigInteger : primes) {
System.out.println(bigInteger);
}
}
}
/**
* 判断是否需要更多的素数
* 方法开始5s后,不再需要素数
* @return
*/
boolean needMorePrimes(long useTime){
return useTime < 5000 ?true :false;
}
public static void main(String[] args) throws InterruptedException {
InterruptThread it = new InterruptThread();
it.consumePrimes();
}
}
三、通过future取消任务
单个任务的取消还能通过future来实现
future.cancel(boolean mayInterruptIfRunning)
参数mayInterruptIfRunning:
@param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
如果为ture,就让执行中的任务立刻中断;如果为false,执行中任务会被执行完毕。
package com.thread;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 通过Future实现取消线程
* @author hadoop
*
*/
public class FutureCanceled {
//固定大小的线程池,同时只能接受5个任务
static ExecutorService mExecutor = Executors.newFixedThreadPool(5);
public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws Throwable {
Future<?> task = mExecutor.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
System.out.println("处理超时异常");
} catch (ExecutionException e) {
System.out.println("处理执行异常");
throw new Throwable(e.getCause());
} finally{
task.cancel(true);
}
}
}
四、通过停止基于线程的服务取消任务
应用程序通常会创建拥有线程的服务,比如线程池。线程的拥有者就是创建线程的类。
所以线程池拥有它的工作者线程。如果需要中断这些线程,那么应该由线程池来负责。
应用程序拥有服务,服务拥有工作者线程,而应用程序并不拥有工作者线程。因此应用程序不应该视图直接停止工作者线程。而是通过服务来管理线程。
ExecutorService提供了shutdown和shutdownNow方法来关闭线程。
package com.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ShutDown关闭线程池测试
* @author hadoop
*
*/
public class ShutDownThreadPool {
//固定大小的线程池,同时只能接受5个任务
static ExecutorService mExecutor = Executors.newFixedThreadPool(5);
/*
* 采用线程池开启多个子线程,主线程等待所有的子线程执行完毕
*/
public static void moreThread() {
try {
int threadNum = 0;
/**
* 尽管线程池只能同时开启5个线程处理任务,但是10个任务已经放入处理队列中,尽管调用了shutdown方法,已放入的10个任务仍然会执行完成。
* 但是调用了shutdown方法后,就不能再向线程池中添加新任务来了,否则会抛RejectedExecutionException异常
*/
for (int i = 0; i < 10; i++) {
threadNum++;
final int currentThreadNum = threadNum;
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("子线程[" + currentThreadNum + "]开启");
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("子线程[" + currentThreadNum + "]结束");
}
}
});
}
System.out.println("已经开启所有的子线程");
mExecutor.shutdown();
System.out.println("shutdown():启动一次顺序关闭,执行以前提交的任务,但不接受新任务。");
int threadNum2 = 0;
for (int i = 20; i < 30; i++) {
threadNum2++;
final int currentThreadNum = threadNum2;
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("尝试再次添加任务:子线程[" + currentThreadNum + "]开启");
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("子线程[" + currentThreadNum + "]结束");
}
}
});
}
/**
* 只有执行了shutdown方法,执行isTerminated才有效。否则isTerminated一直为ture
*/
while(true){
if(mExecutor.isTerminated()){
System.out.println("所有的子线程都结束了!");
break;
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("主线程结束");
}
}
public static void main(String[] args) {
moreThread();
}
}