目录
什么是 Future?
Future
是 Java 中一个用于表示异步计算结果的接口。它主要用于处理那些可能需要较长时间运行的任务,使得主线程可以继续执行其他工作,而无需等待任务完成。
Future 的定义
Future
接口位于 java.util.concurrent
包中,其定义如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
其中,V
表示 Future
持有的结果的类型。这个接口提供了五个方法:
cancel(boolean mayInterruptIfRunning)
: 尝试取消任务的执行。isCancelled()
: 判断任务是否被取消。isDone()
: 判断任务是否已经完成。get()
: 获取任务的结果,如果任务尚未完成则阻塞。get(long timeout, TimeUnit unit)
: 在指定时间内获取任务的结果,如果超时则抛出TimeoutException
。
Future 的工作机制
当我们提交一个任务到线程池时,线程池会返回一个 Future
对象,通过这个对象可以控制任务的执行状态,并在任务完成后获取结果。
举个例子,假设我们有一个计算任务,需要返回一个整数结果。我们可以使用 ExecutorService
来提交任务,并获取一个 Future
对象:
ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<Integer> task = () -> {
// 模拟计算任务
Thread.sleep(1000);
return 123;
};
Future<Integer> future = executor.submit(task);
现在我们可以通过 Future
对象来检查任务状态或获取结果:
if (future.isDone()) {
try {
Integer result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
} else {
System.out.println("Task is still running...");
}
Future 的基本用法
为了更好地理解 Future
,我们通过一些具体示例来演示其基本用法。
示例一:使用 Future 获取异步任务结果
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<String> task = () -> {
// 模拟长时间任务
Thread.sleep(2000);
return "Task completed!";
};
Future<String> future = executor.submit(task);
// 执行其他任务
System.out.println("Doing other work...");
try {
// 获取异步任务结果
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
在这个例子中,我们提交了一个模拟长时间运行的任务,并在任务执行期间输出一些其他信息。通过 Future
对象的 get
方法,我们可以在任务完成后获取结果。
示例二:使用 Future 取消任务
import java.util.concurrent.*;
public class FutureCancelExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<String> task = () -> {
try {
// 模拟长时间任务
Thread.sleep(5000);
} catch (InterruptedException e) {
return "Task was interrupted";
}
return "Task completed";
};
Future<String> future = executor.submit(task);
try {
// 模拟一些操作
Thread.sleep(2000);
// 取消任务
future.cancel(true);
if (future.isCancelled()) {
System.out.println("Task was cancelled");
} else {
System.out.println("Task completed with result: " + future.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
在这个例子中,我们提交了一个长时间运行的任务,但在两秒钟后取消了该任务。通过 Future
的 cancel
方法可以中断任务的执行,并使用 isCancelled
方法检查任务是否已被取消。
Future 的局限性及改进
尽管 Future
提供了异步计算的基本功能,但它存在一些局限性:
- 获取结果阻塞:调用
get
方法时,如果任务尚未完成,会导致调用线程阻塞,影响程序的响应性。 - 缺乏回调机制:
Future
不支持在任务完成时自动执行某些操作,必须通过轮询或阻塞获取结果。 - 取消任务的复杂性:取消任务可能会导致中间状态的不一致,需谨慎处理。
解决方案:CompletableFuture
为了解决上述问题,Java 8 引入了 CompletableFuture
,它实现了 Future
接口,并提供了更丰富的功能,如回调机制、组合异步任务等。
以下是 CompletableFuture
的一些示例:
示例三:使用 CompletableFuture 实现回调
import java.util.concurrent.*;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
try {
// 模拟长时间任务
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task completed!";
}).thenAccept(result -> {
// 任务完成后的回调
System.out.println(result);
});
// 执行其他任务
System.out.println("Doing other work...");
// 保持主线程存活
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个例子中,我们使用 CompletableFuture.supplyAsync
方法异步执行任务,并通过 thenAccept
方法注册一个回调,在任务完成后输出结果。
示例四:组合多个 CompletableFuture
import java.util.concurrent.*;
public class CompletableFutureCombineExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from future2";
});
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " and " + result2);
combinedFuture.thenAccept(System.out::println);
// 保持主线程存活
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个例子中,我们创建了两个 CompletableFuture
,并通过 thenCombine
方法组合它们的结果,最终输出组合后的结果。
Future 的底层实现
理解 Future
的底层实现有助于我们更好地掌握其工作原理。Java 提供了多个实现 Future
接口的类,其中最常用的是 FutureTask
。
FutureTask 的实现原理
FutureTask
是 Runnable
和 Future
的结合体,它可以被线程池执行,并且提供了获取任务结果的方法。以下是 FutureTask
的部分代码实现:
public class FutureTask<V> implements RunnableFuture<V> {
// 任务的状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 任务的执行结果
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
// 运行任务
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
// 中断检查
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// 完成任务,释放资源
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
}
FutureTask 的状态管理
FutureTask
使用一个 volatile
变量 state
来管理任务的状态。状态包括 NEW
、COMPLETING
、NORMAL
、EXCEPTIONAL
、CANCELLED
、INTERRUPTING
和 INTERRUPTED
。状态转换通过 compareAndSwapInt
方法实现,确保线程安全。
任务的执行
当调用 run
方法时,FutureTask
会检查当前状态是否为 NEW
,如果是,则执行任务并设置结果。如果任务抛出异常,则调用 setException
方法设置异常结果。
获取任务结果
FutureTask
的 get
方法用于获取任务结果,如果任务尚未完成,会将当前线程添加到等待队列,并阻塞直到任务完成。以下是 get
方法的实现简化版:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
} else if (s == COMPLETING) // 让出CPU
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
} else
LockSupport.park(this);
}
}
在这个方法中,FutureTask
使用 LockSupport.park
来阻塞当前线程,直到任务完成或被中断。
任务取消
FutureTask
的 cancel
方法尝试将任务状态设置为 CANCELLED
或 INTERRUPTING
,并中断任务执行线程。以下是 cancel
方法的实现简化版:
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
} else {
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
}
finishCompletion();
return true;
}
在 cancel
方法中,如果 mayInterruptIfRunning
为 true
,则尝试中断任务执行线程,并将状态设置为 INTERRUPTED
。否则,直接将状态设置为 CANCELLED
。
小结
在本文中,我们深入探讨了 Java 中的 Future
接口及其实现原理。Future
提供了异步计算的基本功能,但也存在一些局限性。为了更好地支持异步编程,Java 8 引入了 CompletableFuture
,提供了更强大的功能和更灵活的操作方式。
通过理解 Future
和 FutureTask
的底层实现,我们可以更好地掌握并发编程的原理,编写出更高效、更健壮的并发程序。希望这篇文章能够帮助读者全面理解 Future
及其实现原理,提升异步编程的能力。