FutureTask详解
# FutureTask详解
# 1、FutureTask简介
FutureTask
主要用于异步任务的执行和结果获取。其最重要的特性就是可以被提交到线程池中执行,同时也可以用来获取执行结果或检查任务的状态。
# 2、FutureTask内部结构
# 继承结构
public class FutureTask<V> implements RunnableFuture<V> {}
复制成功!

FutureTask
实现了 RunnableFuture
接口,而 RunnableFuture
又继承 Runnable
和 Future
。
所以FutureTask
可以被提交到线程池中执行,同时也可以用来获取执行结果或检查任务的状态。
# 类属性
// FutureTask 的状态字段,用于标记任务的不同状态 private volatile int state; // volatile 确保线程间对该字段的可见性 private static final int NEW = 0; // 任务刚创建,尚未开始执行 private static final int COMPLETING = 1; // 任务正在完成(即运行中或正在设置结果) private static final int NORMAL = 2; // 任务正常完成 private static final int EXCEPTIONAL = 3; // 任务完成时抛出了异常 private static final int CANCELLED = 4; // 任务已被取消 private static final int INTERRUPTING = 5; // 任务中断中 private static final int INTERRUPTED = 6; // 任务已中断 /** 包含要执行的 callable 对象;在运行后会被置为 null */ private Callable<V> callable; /** 任务的结果或者在执行时抛出的异常,使用 Object 类型来存储结果或异常 */ private Object outcome; // non-volatile, 由 state 字段的读写保护 /** 正在运行任务的线程;在 run() 方法中使用 CAS 操作 */ private volatile Thread runner; /** 用于存储等待任务完成的线程的 Treiber 栈 */ // 可以理解成是一个 无锁且线程安全的栈 private volatile WaitNode waiters;
复制成功!
# 构造方法
- ①、
FutureTask(Callable<V> callable)
public FutureTask(Callable<V> callable) { if (callable == null) // 检查 callable 是否为空 throw new NullPointerException(); // 如果为空,抛出空指针异常 this.callable = callable; // 保存 callable 对象 this.state = NEW; // 初始化状态为 NEW,表示任务刚创建 }
复制成功!
- ②、
FutureTask(Runnable runnable, V result)
public FutureTask(Runnable runnable, V result) { // 使用 Executors.callable() 方法将 Runnable 转换为 Callable,并指定结果 this.callable = Executors.callable(runnable, result); this.state = NEW; // 初始化状态为 NEW,表示任务刚创建 }
复制成功!
Executors.callable方法
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } // 利用RunnableAdapter 适配器类 把Runnable 转换成 Callable static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
复制成功!
# 内部类WaitNode
WaitNode 是 FutureTask 类中的一个静态内部类,用于支持等待任务完成的线程管理。
WaitNode 主要作为一个节点,在 waiters 栈中构建链表,以便能够高效地管理等待线程。
static final class WaitNode { // 用于存储等待的线程 volatile Thread thread; // 用于指向下一个等待节点,形成链表结构 volatile WaitNode next; // 构造函数,初始化当前节点的线程为当前线程 WaitNode() { thread = Thread.currentThread(); } }
复制成功!
# 3、Runnable
、Callable
、Future
、RunnableFuture
接口
# ①、Runnable
接口
用于实现线程要执行的任务。可以将 Runnable
对象传递给 Thread
构造函数或线程池的 execute()
方法。
@FunctionalInterface public interface Runnable { void run(); }
复制成功!
Runnable
是一个函数式接口,定义了一个 run()
方法。
run()
方法没有返回值,也没有抛出检查型异常(checked exceptions)。
主要用于线程执行的任务,无需处理任务结果。
# ②、Callable
接口
用于执行有返回值的任务。通常与 Future
配合使用,可以在任务完成后获取结果。
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
复制成功!
Callable
是一个泛型接口,定义了一个 call()
方法。
call()
方法可以返回一个结果,并且可以抛出检查型异常。
泛型参数 <V>
表示任务执行后的结果类型。
# ③、Future
接口
用于管理异步计算任务,检查任务状态,获取计算结果或处理异常。
public interface Future<V> { // 尝试取消任务 boolean cancel(boolean mayInterruptIfRunning); // 检查任务是否被取消 boolean isCancelled(); // 检查任务是否完成 boolean isDone(); // 获取任务的结果,若任务尚未完成则阻塞调用get方法的线程 V get() throws InterruptedException, ExecutionException; // 带超时的获取结果方法 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
复制成功!
# ④、RunnableFuture
接口
RunnableFuture
主要用于 FutureTask
类的实现。它允许任务以 Runnable
方式提交到线程池中执行,并通过 Future 接口管理结果和状态。
public interface RunnableFuture<V> extends Runnable, Future<V> { }
复制成功!
RunnableFuture
接口继承了 Runnable
和 Future
接口。
结合了这两个接口的功能,即可以像 Runnable
一样执行任务,也可以像 Future
一样获取任务的结果或取消任务。
# 总结对比
接口 | 功能描述 | 主要方法/特点 | 适用场景 |
---|---|---|---|
Runnable | 定义要执行的任务,无返回值 | run() | 用于执行不需要结果的任务,如线程的任务 |
Callable | 定义要执行的任务,有返回值,并可能抛出异常 | call() | 用于执行有返回值的任务 |
Future | 管理异步计算的结果,检查状态和处理异常 | get() , cancel() , isDone() , isCancelled() | 用于管理异步任务的结果和状态 |
RunnableFuture | 结合 Runnable 和 Future 的功能,可以执行任务并管理结果 | 继承 Runnable 和 Future | 用于需要同时支持 Runnable 和 Future 功能的场景,如 FutureTask |
# 4、FutureTask的使用示例
# 普通Thread使用
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class TestA { public static void main(String[] args) { // 创建一个 Callable 对象 Callable<Integer> callable = () -> { Thread.sleep(2000); // 模拟长时间运行的任务 return 123; }; // 用 Callable 对象创建 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(callable); // 用普通 Thread 执行 FutureTask Thread thread = new Thread(futureTask); thread.start(); // 执行其他任务 System.out.println("主线程继续执行其他任务..."); try { // 获取 FutureTask 的结果 Integer result = futureTask.get(); // 这个方法会阻塞直到结果可用 System.out.println("获取 FutureTask 的结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
复制成功!
执行结果:
主线程继续执行其他任务... 获取 FutureTask 的结果: 123
复制成功!
# 线程池使用
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class TestA { private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 4, // 核心线程数 8, // 最大线程数 0, // 非核心线程的存活时间 TimeUnit.SECONDS, // 存活时间单位 new LinkedBlockingDeque<>(), // 存放任务的 阻塞队列 new MyThreadFactory("DogEatBones"), // 创建线程的 工厂 new ThreadPoolExecutor.AbortPolicy() // 线程池的拒绝策略 ); public static void main(String[] args) throws Exception { String dog1 = "秀逗"; // 使用线程池执行 Future<String> future1 = threadPoolExecutor.submit(() -> eat(dog1)); String result = future1.get(); System.out.println("FutureTask执行结果:" + result); // 关闭线程池 threadPoolExecutor.shutdown(); } public static String eat(String name) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return name + "说骨头真好吃!"; } } class MyThreadFactory implements ThreadFactory { private final String name; private final AtomicInteger threadNum = new AtomicInteger(); // 设置线程名称 public MyThreadFactory(String name) { this.name = name; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); // 设置线程名称和编号 thread.setName(name + " [#" + threadNum.incrementAndGet() + "]"); return thread; } }
复制成功!
执行结果:
FutureTask执行结果:秀逗说骨头真好吃!
复制成功!
# 5、FutureTask的核心方法
下面源码基于JDK8
# run()
方法
public void run() { // 检查任务状态是否为 NEW,且当前线程是否能成功设置为 runner if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 如果 callable 不为 null 且状态仍为 NEW,则执行 callable if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 调用 callable,并存储结果 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 如果发生异常,设置异常 } if (ran) set(result); // 如果 callable 成功执行,设置结果 } } finally { // 确保 runner 在 run() 完成后设为 null,以防止多次调用 runner = null; // 在将 runner 设为 null 后重新读取状态,以处理可能的中断 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
复制成功!
protected void set(V v) { // 使用 CAS 原子性地将状态从 NEW 更新为 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 存储结果 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置为最终状态 finishCompletion(); // 完成任务 } }
复制成功!
private void finishCompletion() { // 遍历所有等待线程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); // 唤醒等待的线程 } WaitNode next = q.next; if (next == null) break; q.next = null; // 解除链表节点连接,帮助垃圾回收 q = next; } break; } } done(); // 任务完成后执行的操作 callable = null; // 释放 callable 对象以减少内存占用 }
复制成功!
总结:
run() 方法: 该方法用于执行任务。
如果任务的状态是 NEW,runner 为空,则将当前线程设置为 runner 并执行 callable。
如果 callable.call() 执行成功,则将结果设置到 FutureTask 中;
如果执行失败,则捕获异常并设置异常。
无论任务成功还是失败,最后都要将 runner 设为 null,并根据状态处理可能的中断。
set(V v) 方法: 该方法用于设置任务的结果。
如果任务的状态是 NEW,则将状态更新为 COMPLETING 并存储结果。然后将状态设置为 NORMAL 表示任务完成,并调用 finishCompletion() 方法完成任务的后续处理。
finishCompletion() 方法: 该方法用于处理任务完成后的操作。首先唤醒所有等待的线程,然后执行 done() 方法以进行额外的完成操作。最后将 callable 设为 null 以减少内存占用。
# get()
方法
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果状态小于等于 COMPLETING,等待任务完成 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 根据状态报告结果 }
复制成功!
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果当前线程被中断,移除等待者并抛出 InterruptedException if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果任务已经完成,返回状态 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果任务正在完成中,当前线程主动让出 CPU else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); // 创建新的等待节点 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 将当前等待节点加入Treiber栈 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); // 超时,移除等待节点并返回状态 return state; } LockSupport.parkNanos(this, nanos); // 线程等待指定纳秒时间 } else LockSupport.park(this); // 无超时,线程等待 } }
复制成功!
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; // 清除等待节点的线程引用 retry: for (;;) { // 当存在竞争条件时,重新尝试移除等待节点 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // 获取当前节点的下一个节点 if (q.thread != null) pred = q; // 记录当前节点作为前驱节点 else if (pred != null) { pred.next = s; // 如果前驱节点存在且当前节点没有线程,则将前驱节点的 next 指向当前节点的下一个节点 if (pred.thread == null) // 检查前驱节点是否也没有线程,可能存在竞争条件 continue retry; // 如果存在竞争条件,重新尝试 } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) // 如果没有前驱节点,则通过 CAS 将等待队列的头节点更新为下一个节点 continue retry; // 如果 CAS 失败,重新尝试 } break; // 成功移除等待节点后跳出循环 } } }
复制成功!
static final class WaitNode { volatile Thread thread; // 线程引用 volatile WaitNode next; // 下一个等待节点 WaitNode() { thread = Thread.currentThread(); } // 构造函数中将当前线程设置为该节点的线程 }
复制成功!
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; // 如果状态为 NORMAL,返回结果 if (s >= CANCELLED) throw new CancellationException(); // 如果状态为 CANCELLED,抛出取消异常 throw new ExecutionException((Throwable)x); // 否则,抛出执行异常 }
复制成功!
总结:
get() 方法: 用于获取任务的结果。如果任务的状态小于等于 COMPLETING,则调用 awaitDone() 方法等待任务完成。最后,调用 report()
方法根据任务的最终状态报告结果或抛出异常。
awaitDone() 方法: 用于等待任务完成。通过检查任务状态并在必要时将当前线程挂起来等待。如果设置了超时,会在超时之前进行等待;如果没有超时,则无限等待。处理竞争条件时,确保等待节点正确地从等待队列中移除。
removeWaiter() 方法: 用于从等待队列中移除指定的等待节点。通过遍历等待队列,调整节点的连接,并处理竞争条件,确保等待节点被正确移除。
WaitNode 类: 用于表示一个等待的节点。包含对线程的引用和指向下一个等待节点的引用。
report() 方法: 根据任务的最终状态,返回结果或抛出异常。如果任务状态为 NORMAL,返回结果;如果状态为 CANCELLED,抛出 CancellationException;否则,抛出 ExecutionException。
# cancel()
方法
public boolean cancel(boolean mayInterruptIfRunning) { // 如果当前状态是 NEW,且使用 CAS 原子性地将状态设置为 INTERRUPTING 或 CANCELLED,成功则返回 true if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果中断当前线程的调用抛出异常,将在 finally 中处理 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); // 如果任务正在运行,尝试中断线程 } finally { // 设置最终状态 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); // 完成任务的清理工作 } return true; // 成功取消任务 }
复制成功!
总结:
使用 UNSAFE.compareAndSwapInt 原子性地将任务状态从 NEW 更新为 INTERRUPTING 或 CANCELLED。如果状态更新成功,返回 true;否则,返回 false,表示取消失败(例如任务已经开始执行或已完成)。
如果 mayInterruptIfRunning 为 true,尝试中断正在运行的线程。如果 runner 不为空,则中断该线程。
在 finally 块中,将任务状态设置为 INTERRUPTED,表示任务被中断。
调用 finishCompletion() 方法,完成任务的清理工作,移除等待线程,释放资源等。
如果成功取消任务,返回 true;否则,返回 false。
# 6、补充知识点Treiber
栈
Treiber 栈是一种并发栈的实现方式,旨在解决多线程环境下的栈操作问题。它由计算机科学家 Robert Treiber 提出,主要用于实现无锁数据结构。
# Treiber 栈的核心思想
无锁数据结构: Treiber 栈使用原子操作而不是传统的锁机制来保证线程安全,允许多个线程并发访问而不会阻塞。
基于链表: Treiber 栈通常使用链表实现,每个节点包含数据和指向下一个节点的引用。
原子操作: 使用 compare-and-swap (CAS) 操作来更新栈的头部指针,确保操作的原子性。
# Treiber 栈的特点
并发友好: Treiber 栈允许多个线程同时执行 push
和 pop
操作,而不需要传统的锁机制,因此提高了并发性能。
简洁性: 其实现相对简单,主要依赖于 CAS 操作来保证线程安全。
非阻塞: 由于使用了原子操作,Treiber 栈是非阻塞的,避免了锁带来的上下文切换和线程阻塞。
# Treiber
栈的Java简易实现
import java.util.concurrent.atomic.AtomicReference; public class TestA<T> { public static void main(String[] args) { SimpleTreiberStack<String> stack = new SimpleTreiberStack<>(); // 测试 push 和 pop Thread t1 = new Thread(() -> { stack.push("秀逗"); }, "t1"); Thread t2 = new Thread(() -> { stack.push("四眼"); }, "t2"); Thread t3 = new Thread(() -> { stack.push("大黄"); }, "t3"); t1.start(); t2.start(); t3.start(); try { t1.join(); t2.join(); t3.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(stack.pop()); System.out.println(stack.pop()); System.out.println(stack.pop()); } } class SimpleTreiberStack<T> { private final AtomicReference<Node<T>> top = new AtomicReference<>(null); private static class Node<T> { T value; Node<T> next; Node(T value) { this.value = value; this.next = null; } } public void push(T value) { Node<T> newNode = new Node<>(value); Node<T> currentTop; do { currentTop = top.get(); newNode.next = currentTop; // 将新节点的 next 指向当前栈顶 } while (!top.compareAndSet(currentTop, newNode)); } public T pop() { Node<T> oldTop, newTop; do { oldTop = top.get(); if (oldTop == null) return null; // 栈为空 newTop = oldTop.next; } while (!top.compareAndSet(oldTop, newTop)); return oldTop.value; } }
复制成功!
# Treiber
栈在FutureTask
中的应用
FutureTask在其内部定义了内部类WaitNode
static final class WaitNode { // 用于存储等待的线程 volatile Thread thread; // 用于指向下一个等待节点,形成链表结构 volatile WaitNode next; // 构造函数,初始化当前节点的线程为当前线程 WaitNode() { thread = Thread.currentThread(); } }
复制成功!
并定义了全局变量 private volatile WaitNode waiters;
waiters指向一个 Treiber
栈,该栈保存着所有等待任务执行结果的线程。
当调用FutureTask的get方法时,如果任务没有完成,则调用线程会被阻塞,本质上就是将要阻塞的线程包装成WaitNode结点保存到waiters指向的 Treiber
栈中。
在上面第5节 get()
方法的源码分析中:
下面这段代码就相当于Treiber栈的push
操作
else if (q == null) q = new WaitNode(); // 创建新的等待节点 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 将当前等待节点加入Treiber栈
复制成功!
下面这段代码就相当于Treiber栈的pop
操作
// 该方法在上面 在上面第5节 `get()`方法的源码分析中已经详细说明 这里不再赘述 removeWaiter(WaitNode node) {...}
复制成功!
最后推荐看下 Java线程池原理剖析和应用指南 (opens new window)中对于Future 模式的讲解。
# 7、补充FutureTask子类ScheduledFutureTask
ScheduledFutureTask
是 ScheduledThreadPoolExecutor
中的一个内部类,它继承自 FutureTask
并实现了 RunnableScheduledFuture
接口。这个类的主要作用是将任务包装成一个可以定时调度的任务 。
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>{}
复制成功!

总结:
继承 FutureTask: ScheduledFutureTask
继承自 FutureTask
,因此它具有 FutureTask
的所有功能,如任务的提交、执行、获取结果等。
实现 RunnableScheduledFuture:
RunnableScheduledFuture
是 Runnable
、ScheduledFuture
和 Delayed
的一个组合接口。它结合了这些接口的功能,使得 ScheduledFutureTask
不仅可以作为一个 Runnable
任务执行,还能支持调度和延迟操作。
ScheduledFutureTask
在 ScheduledThreadPoolExecutor
中的应用
在 ScheduledThreadPoolExecutor
中,调度相关的方法(如 schedule
和 scheduleAtFixedRate
)会将任务包装成 ScheduledFutureTask
对象。
ScheduledFutureTask的构造方法:
// 构造一个一次性任务,指定任务的执行时间(以纳秒为单位)。 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // 调用父类 FutureTask 的构造函数,传入 Runnable 和结果 this.time = ns; // 设置任务的触发时间(纳秒) this.period = 0; // 单次任务,所以周期为0 this.sequenceNumber = sequencer.getAndIncrement(); // 设置任务的序列号,用于排序 } /** * 创建一个周期性任务,指定任务的执行时间和周期时间(以纳秒为单位)。 */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); // 调用父类 FutureTask 的构造函数,传入 Runnable 和结果 this.time = ns; // 设置任务的初始触发时间(纳秒) this.period = period; // 设置任务的周期时间(纳秒),用于周期性任务 this.sequenceNumber = sequencer.getAndIncrement(); // 设置任务的序列号,用于排序 } /** * 创建一个一次性任务,指定任务的触发时间(以纳秒为单位),任务是 Callable 类型。 */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); // 调用父类 FutureTask 的构造函数,传入 Callable this.time = ns; // 设置任务的触发时间(纳秒) this.period = 0; // 单次任务,所以周期为0 this.sequenceNumber = sequencer.getAndIncrement(); // 设置任务的序列号,用于排序 }
复制成功!
ScheduledFutureTask关键特性
延迟和调度:
通过实现 Delayed 接口,ScheduledFutureTask 可以管理任务的延迟和调度逻辑。这使得任务可以在指定的时间后执行或周期性执行。
与 ScheduledThreadPoolExecutor 的集成:
在 ScheduledThreadPoolExecutor 中,任务被封装成 ScheduledFutureTask 对象,并根据调度策略插入到队列中。调度器会按照指定的时间或周期触发任务执行。
# 总结
FutureTask: 主要用于执行异步任务,提供任务的提交、执行、结果获取和取消功能。
ScheduledFutureTask: 在 FutureTask 的基础上扩展,支持定时和周期性任务调度,适用于需要在特定时间或周期内执行的任务。