FutureTask 深入浅出,通过源码讲解彻底吃透

时间:2021-1-18 作者:admin

本文章欢迎转载,但是转载请标明出处。程序锋子https://blog.csdn.net/l13591302862/article/details/112757053

一 前言

平常看到很多的源码中都使用到了 FutureTask 对象,例如 ThreadPoolExecutorSpring MVC 以及 Dubbo。但是对 FutureTask 的学习还只是停留在了表面,今天进行了深入学习,对源码进行了解析,希望对大家有所帮助。

FutureTask 深入浅出,通过源码讲解彻底吃透

二 简介

2.1 什么是 FutureTask

a 官方描述

A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation.

The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset).

A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.

进行下简单的翻译:

  • 可取消的异步计算。此类提供 Future 的基本实现,其中包含开始和取消计算,查询以查看计算是否完成以及检索计算结果的方法。

  • 只有在计算完成后才能检索结果,如果计算尚未完成,则 get 方法将阻塞。一旦计算完成,就不能重新开始或取消计算(除非使用 runAndReset 调用计算)。

  • FutureTask 可以用于包装 CallableRunnable 对象。由于 FutureTask 实现了 Runnable, 因此可以将 FutureTask 提交给 Executor 以便执行。

  • 除了用作独立类之外,此类还提供 protected 方法,这些功能在创建自定义任务类时可能很有用。

b 个人理解

  • 我觉得 FutureTask 是一个将任务执行和结果返回相分离的任务类,可以延迟计算,或者延迟获取,并且保证并发安全。既可以单独执行,也可以放入线程池中执行,方便进行并发异步执行,提高运行效率。

c FutureTask 类图

同时看下 FutureTask 的类图,我们可以发现,FutureTask 实现了 RunableFuture 接口,而 RunableFuture 又继承了 RunableFuture 接口,即 FutureTask 兼具 FutureRunnable 的能力。
FutureTask 深入浅出,通过源码讲解彻底吃透

d 核心方法

public class FutureTask<V> implements RunnableFuture<V> {
    
    // 获取结果,计算未完成,当前线程会被阻塞
    public V get() throws InterruptedException, ExecutionException {...}

    // 获取结果,计算未完成,当前线程会被阻塞,但是有时间限制
    // 如果超时还未完成,抛出 TimeoutException
    public V get(long timeout, TimeUnit unit){...}

    // 进行计算,运行 Callable 的 call 方法,然后对结果进行赋值
    public void run(){...}
    
    // 进行计算,运行 Callable 的 call 方法,然后将 state 重置为 NEW
    // 不对结果进行赋值,可以重复多次运行,只有子类可以调用,用来进行扩展,例如 ScheduledFutureTask
    protected void runAndReset(){...}

    // 取消任务,参数 mayInterruptIfRunning 表示取消过程中是否中断当前线程
    public boolean cancel(boolean mayInterruptIfRunning){...}

    // 查看任务是否被取消
    public boolean isCancelled(){...}

    // 查看任务是否完成
    public boolean isDone(){...}
    
}   

2.2 FutureTask 的使用案例

a 单独使用

/**
 * @author CodderFengzi
 * @date 2021/1/16
 **/
public class FutureTaskDemo {

    private Response response = new Response("无响应");

    /**
     * 获得响应
     *
     * @param delay    延迟
     * @param timeUnit 时间单位
     * @return {@link Response}
     */
    private Response getResponse(long delay, TimeUnit timeUnit) {
        FutureTask<Response> ft = new FutureTask<>(new Task());
        try {
            response = ft.get(delay, timeUnit);
        } catch (InterruptedException e) {
            response = new Response("中断");
        } catch (ExecutionException e) {
            response = new Response("错误");
        } catch (CancellationException e) {
            response = new Response("被取消");
        } catch (TimeoutException e) {
            response = new Response("请求超时");
            ft.cancel(true);
        }
        return response;
    }

    private static class Response {

        private final String content;

        private Response(String content) {
            this.content = content;
        }

        @Override
        public String toString() {
            return "Response{" + "content='" + content + '\'' +
                    '}';
        }

    }

    private static class Task implements Callable<Response> {

        @Override
        public Response call() throws Exception {
            // 模拟网络传输时间消耗
            Thread.sleep(2000L);
            return new Response("正确的响应");
        }

    }

    public static void main(String[] args) {
        FutureTaskDemo0 taskDemo = new FutureTaskDemo0();
        Response timeoutResponse = taskDemo.getResponse(1000L, TimeUnit.MILLISECONDS);
        Response normalResponse = taskDemo.getResponse(3000L, TimeUnit.MILLISECONDS);
        // 打印结果:Response{content='请求超时'}
        System.out.println(timeoutResponse);
        // 打印结果:Response{content='正确的响应'}
        System.out.println(normalResponse);
    }

}

b 在线程池中使用

以下提供一个简单的例子,创建 FutureTask 对象,并放入 ThreadPoolExecutor 线程池中执行。可以设置超时时间,如果超过超时时间任务未执行完,则抛出 TimeoutException

/**
 * @author CodderFengzi
 * @date 2021/1/16
 **/
public class FutureTaskDemo1 {

    private Response response = new Response("无响应");

    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4,
            4,
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));

    /**
     * 获得响应
     *
     * @param delay    延迟
     * @param timeUnit 时间单位
     * @return {@link Response}
     */
    private Response getResponse(long delay, TimeUnit timeUnit) {
        FutureTask<Response> ft = new FutureTask<>(new Task());
        executor.execute(ft);
        try {
            response = ft.get(delay, timeUnit);
        } catch (InterruptedException e) {
            response = new Response("中断");
        } catch (ExecutionException e) {
            response = new Response("错误");
        } catch (CancellationException e) {
            response = new Response("被取消");
        } catch (TimeoutException e) {
            response = new Response("请求超时");
            ft.cancel(true);
        }
        return response;
    }

    private static class Response {

        private final String content;

        private Response(String content) {
            this.content = content;
        }

        @Override
        public String toString() {
            return "Response{" + "content='" + content + '\'' +
                    '}';
        }

    }

    private static class Task implements Callable<Response> {

        @Override
        public Response call() throws Exception {
            // 模拟网络传输时间消耗
            Thread.sleep(2000L);
            return new Response("正确的响应");
        }

    }

    private static class NamedThreadFactory implements ThreadFactory {

        private final static AtomicInteger COUNT = new AtomicInteger(1);

        private final String name;

        private NamedThreadFactory(String name) {
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, name + "-" + COUNT.getAndIncrement());
        }

    }

    public static void main(String[] args) {
        FutureTaskDemo taskDemo = new FutureTaskDemo();
        Response timeoutResponse = taskDemo.getResponse(1000L, TimeUnit.MILLISECONDS);
        Response normalResponse = taskDemo.getResponse(3000L, TimeUnit.MILLISECONDS);
        // 打印结果:Response{content='请求超时'}
        System.out.println(timeoutResponse);
        // 打印结果:Response{content='正确的响应'}
        System.out.println(normalResponse);
    }

}

c 用作缓存的值

同时也有一个将 FutureTask 用作缓存的例子,但是这个只是简单的使用,没有书写缓存淘汰的逻辑。

/**
 * @author CodderFengzi
 * @date 2021/1/16
 **/
public class FutureTaskDemo2 {


    private final ConcurrentHashMap<Integer, Future<String>> map = new ConcurrentHashMap<>();

    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            4,
            4,
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));

    /**
     * 获得结果
     *
     * @param key 键
     * @return {@link String}
     */
    private String getResult(int key) throws InterruptedException, ExecutionException {
        Future<String> f;
        if ((f = map.get(key)) == null) {
            // 创建任务
            FutureTask<String> fu = new FutureTask<>(new Task(key));
            // 如果 put 成功,返回 null,失败返回之前的插入的结果,并发条件下,仅有一个能成功
            f = map.putIfAbsent(key, fu);
            if (f == null) {
                f = fu;
                fu.run();
            }
        }
        try {
            return f.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            map.remove(key);
            throw e;
        } catch (ExecutionException e) {
            // 移除任务
            map.remove(key);
            throw e;
        }
    }

    private static class Task implements Callable<String> {

        private final int number;

        public Task(int number) {
            this.number = number;
        }

        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " 进行计算");
            return "CoderFengzi" + number;
        }

    }

    private static class NamedThreadFactory implements ThreadFactory {

        private final static AtomicInteger COUNT = new AtomicInteger(1);

        private final String name;

        private NamedThreadFactory(String name) {
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, name + "-" + COUNT.getAndIncrement());
        }

    }

    public static void main(String[] args) throws InterruptedException {
        final FutureTaskDemo2 taskDemo = new FutureTaskDemo2();
        final CountDownLatch cdl = new CountDownLatch(1);
        final int count = 1000;
        for (int i = 0; i < count; i++) {
            EXECUTOR.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 模拟多个线程同一时刻并发执行
                        cdl.await();
                        String result = taskDemo.getResult(666);
                        System.out.println(Thread.currentThread().getName() + " result = " + result);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        System.out.println("执行错误");
                    }
                }
            });
        }
        Thread.sleep(5000L);
        cdl.countDown();
        // 运行结果:只进行了一次计算,其他的都直接从缓存获取
        // CoderFengzi-1 进行计算
        // CoderFengzi-3 result = CoderFengzi666
        // CoderFengzi-4 result = CoderFengzi666
        // CoderFengzi-3 result = CoderFengzi666
        // CoderFengzi-2 result = CoderFengzi666
        // CoderFengzi-1 result = CoderFengzi666
        // CoderFengzi-2 result = CoderFengzi666
    }

}

d 自定义子类进行扩展

如果我们想要对 FutureTask 进行扩展,那么可以实现 FutureTask 创建自定义的子类,以下实现了一个自定义的子类,可以进行重复多次运行

/**
 * @author CoderFengzi
 * @date 2021/1/17
 **/
public class RepeatableFutureTask<V> extends FutureTask<V> {

    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            4,
            4,
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));

    /**
     * 运行次数
     */
    private final int count;

    public RepeatableFutureTask(Callable<V> callable, int count) {
        super(callable);
        this.count = count;
    }
    
    @Override
    protected void done() {
        super.done();
        System.out.println("完成了赋值,并且释放了所有线程");
    }

    @Override
    public void run() {
        runAndReset(count);
    }

    /**
     * 重复运行
     *
     * @param count 运行次数
     * @return boolean
     */
    public synchronized boolean runAndReset(int count) {
        if (count < 0) {
            throw new IllegalArgumentException("count must be positive");
        }
        for (int i = 0; i < count; i++) {
            if (!super.runAndReset()) {
                break;
            }
        }
        return true;
    }

    private static class Task implements Callable<Void> {

        public static final String CODER_FENGZI = " CoderFengzi 666";

        @Override
        public Void call() throws Exception {
            System.out.println(Thread.currentThread().getName() + CODER_FENGZI);
            return null;
        }

    }

    private static class NamedThreadFactory implements ThreadFactory {

        private final static AtomicInteger COUNT = new AtomicInteger(1);

        private final String name;

        private NamedThreadFactory(String name) {
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, name + "-" + COUNT.getAndIncrement());
        }

    }


    public static void main(String[] args) throws InterruptedException {
        final RepeatableFutureTask<Void> rfu = new RepeatableFutureTask<>(new Task(), 3);
        final int num = 4;
        for (int i = 0; i < num; i++) {
            EXECUTOR.execute(rfu);
        }
        // 运行结果:
        // CoderFengzi-1 CoderFengzi 666
        // CoderFengzi-1 CoderFengzi 666
        // CoderFengzi-1 CoderFengzi 666
        // CoderFengzi-3 CoderFengzi 666
        // CoderFengzi-3 CoderFengzi 666
        // CoderFengzi-3 CoderFengzi 666
        // CoderFengzi-2 CoderFengzi 666
        // CoderFengzi-2 CoderFengzi 666
        // CoderFengzi-2 CoderFengzi 666
        // CoderFengzi-4 CoderFengzi 666
        // CoderFengzi-4 CoderFengzi 666
        // CoderFengzi-4 CoderFengzi 666
    }
}

三 源码解析

3.1 核心变量

在讲解源码之前,我们先来了解下 FutureTask 的一些重要的变量。

a 状态讲解

stateFutureTask 用来表示状态的变量,一共有 NEW(新建)、COMPLETING(计算中)、NORMAL(成功)、EXCEPTIONAL(异常)、CANCELLED(取消)、INTERRUPTING(中断中)、INTERRUPTED(已中断)这 7 种状态,分别用 0 ~ 6 这 7 个数字表示。初始状态是 NEW,而COMPLETINGINTERRUPTING 都是中间状态,NORMALEXCEPTIONALINTERRUPTED 都是最终状态,无法回退。

    /**
     * 此任务的运行状态,最初为 NEW。运行状态仅在 set,setException 和 cancel 方法中转换为终端状态。
     * 在完成期间,状态可能会采用 COMPLETING(正在设置结果时)或 INTERRUPTING(仅在中断跑步者满足cancel(true)时)的瞬间值。
     * 从这些中间状态到最终状态的转换使用更高效的有序写入,值是递增的,因为值是唯一的,无法进一步修改。
     * 为了保内存证可见性,为 state 加 volatile 修饰符
     *
     * 这里应该是用了状态模式,可能的状态转换:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    
    // 新建状态,FutureTask 被创建时的初始状态
    private static final int NEW          = 0;
    // 执行完 Callable 的 call 方法之后的状态,但是还未进行赋值
    private static final int COMPLETING   = 1;
    // 正常返回结果的状态,即获取结果,并且赋值成功
    private static final int NORMAL       = 2;
    // 执行异常状态,即调用 Callable 的 call 方法时出现异常,会将异常赋值给结果
    private static final int EXCEPTIONAL  = 3;
    // 取消状态,NEW 状态下调用 cancel(false) 方法后的状态
    private static final int CANCELLED    = 4;
    // 正在中断的状态,NEW 状态下调用 cancel(true) 方法后,但是还未终止当前线程时的状态
    private static final int INTERRUPTING = 5;
    // 已中断的状态,INTERRUPTING 状态下调用线程的 interrupt() 方法后的状态
    private static final int INTERRUPTED  = 6;

FutureTask 其实用到了状态模式,我们用一张图来表示这几种状态的转换:

FutureTask 深入浅出,通过源码讲解彻底吃透

b 其他变量

	/**
     * The underlying callable; nulled out after running
     * 需要执行的 Callable 对象
     */
    private Callable<V> callable;

    /**
     * The result to return or exception to throw from get()
     * FurtureTask 的结果,可以通过 get() 方法获得。因为 outcome 是在 state 变量被操作后才进行赋值操作,
     * 而 state 有 volatile 修饰,因此 outcome 不加 volatile 修改也可保证可见性。
     * 这里可以参考知乎的 Forest Wang 的解答来理解上面的话,https://www.zhihu.com/question/41016480/answer/551056899
     */
    private Object outcome; // non-volatile, protected by state reads/writes

    /**
     * The thread running the callable; CASed during run()
     * 用来执行 callable 对象的线程,该线程是成功进行 CAS 操作,将 runner 引用指向自己的线程。
     * 同时为了保证内存可见性,加上了 volatile 修饰符
     */
    private volatile Thread runner;

    /**
     * Treiber stack of waiting threads
     * 用于存储等待线程的 Treiber stack,一种基于 CAS 来保证并发安全的无锁栈结构,可以 https://segmentfault.com/a/1190000012463330
     */
    private volatile WaitNode waiters;

c 核心数据结构 – Treiber stack

FutureTask 的并发安全是基于 Treiber stack 的,主要是用来存储调用 get 方法后被阻塞住的线程,后面的 get 方法将会进行解析,这里我们先简单了解下什么是 Treiber stack,并且小伙伴们看完后也可以思考下FutureTask 是如何使用 Treiber stack?下面开始讲解 Treiber stack

  • Treiber stack 是一个支持并发操作的无锁栈,基于 CAS 实现并发,即每次入栈和出栈都需要进行 CAS 操作。

  • 入栈时,获取栈顶,然后将新创建的节点的 next 指向栈顶,然后再次获取栈顶和原栈顶进行比较,如果相等则将栈顶修改为新创建的节点,否则重新进行 CAS 操作,直到成功为止。

  • 出栈时,获取栈顶,如果栈顶为 null,直接返回。否则获取栈顶的 next,然后再次获取栈顶和原栈顶进行比较,如果相等则将把栈顶修改为 next,并返回原栈顶,否则重新进行 CAS 操作,直到成功为止。

下面是 Treiber stack 的简单实现,摘自《JAVA并发编程实践(Doug Lea)》 一书。

public class ConcurrentStack<E> {
    private AtomicReference<Node<E>> top = new AtomicReference<>();

    public void push(E item) {
        Node<E> newHead = new Node<>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    private static class Node<E> {
        public final E item;
        public Node<E> next;
        public Node(E item) {
            this.item = item;
        }
    }
}

以上的代码如果觉得不太好理解,那么可以结合下面的图去理解:
FutureTask 深入浅出,通过源码讲解彻底吃透

3.2 get 方法

java.util.concurrent.FutureTask#get

该方法被线程执行时,如果计算完成,outcome 参数不为 null,那么获取结果,出现执行异常时,该结果为异常。有两个版本,一个有时间限制,一个没有时间限制,有时间限制的超时会抛出 TimeoutException

// 获取结果,计算未完成,当前线程会被阻塞
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果是 COMPLETING(含)以下的状态
    // 即 NEW 或者 COMPLETING 状态
    if (s <= COMPLETING)
        // 进行等待,我们下面会进行讲解
        s = awaitDone(false, 0L);
    // 此时的状态可能是 NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED
    // 获取结果
    return report(s);
}

// 获取结果,计算未完成,当前线程会被阻塞,但是有时间限制
// 如果超时还未完成,抛出 TimeoutException
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    // 如果是 COMPLETING(含)以下的状态
    // 即 NEW 或者 COMPLETING 状态,那么安装时间参数进行等待
    // 如果时间到了,还是 COMPLETING 以下的状态,表示超时了
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        // 超时抛出超时异常
        throw new TimeoutException();
    // 此时的状态可能是 NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED
    // 获取结果
    return report(s);
}

java.util.concurrent.FutureTask#awaitDone

该方法用来让线程进行阻塞,以等待结果的返回,并且会将线程包装成 WaitNode 对象,同时尝试通过 CAS 操作置换 waiters

// 当前线程进行阻塞,等待 run() 方法运行完成,或者在中断或超时时中止。
// timed 表示是否有时间限制,nanos 表示等待时间
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 循环
    for (;;) {
        // Thread.interrupted 静态方法会判定当前线程是否中断,并且重置中断状态
        // 如果当前线程中断
        if (Thread.interrupted()) {
            // 进入这里表明线程被中断了
            // 移除中断或超时的节点,下面进行讲解,此方法正是使用了 Treiber stack
            removeWaiter(q);
            // 抛出中断异常
            throw new InterruptedException();
        }

        int s = state;
        // 如果 state 为 COMPLETING(3) 以上的状态,表示已经结束
        if (s > COMPLETING) {
            if (q != null)
                // 将节点的线程设 null,原因如下:
                // 如果不为空,后序的 finishCompletion 方法会对该线程调用 LockSupport.unpark 方法
                // 此处为了减少不必要的 unpark
                q.thread = null;
            // 直接返回 state
            return s;
        }
        // 如果正在完成
        else if (s == COMPLETING) // cannot time out yet
            // 那么当前线程先稍等下其他线程
            // 如果其他线程能完成的话,当前线程后面就没必要进入等待栈和被阻塞
            Thread.yield();
        else if (q == null)
            // 创建等待节点,节点的 thread 变量为当前线程
            q = new WaitNode();
        // 如果排队失败,继续自旋
        else if (!queued)
            // 此处也使用了 Treiber stack,进行 CAS 自旋操作,尝试入栈,即尝试将节点放入栈顶
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 以上如果都不是,即当前还处于 NEW 状态,那么下面要阻塞该线程
        // 如果有时间限制
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // 移除中断或超时的节点,此处是表明执行超时
                removeWaiter(q);
                return state;
            }
            // 在 nanos 时间内,进行阻塞,后面自行恢复
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 进行阻塞
            // 之后如果其他线程调用 run 方法对 outcome 赋值成功
       	 	// 那么又会调用 finishCompletion 方法完成对所有线程的 unpark 操作
            LockSupport.park(this);
    }
}

// 简单的链表,用来记录等待的线程。
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    // 将 thread 设置为当前线程
    WaitNode() { thread = Thread.currentThread(); }
}

java.util.concurrent.FutureTask#removeWaiter

该方法用来删除超时和线程中断的等待节点,此处正是用到了 Treiber stack,利用 CAS 进行出栈,删除等待节点。

/** 
* 尝试删除超时或中断的等待节点,以避免积累垃圾。
* 内部节点在没有 CAS 的情况下,其数据不会发生,因此无论如何遍历它们,都是无害的。
* 为了避免在删除等待节点的时候,其他线程修改了头节点,在出现明显竞争的情况下将重新遍历该列表。
*/
private void removeWaiter(WaitNode node) {
    // 只有在超时和中断的情况下,才进入该方法,用来删除超时和中断的等待节点
    // 如果等待节点不为空
    if (node != null) {
        // 将等待节点线程设置为 null
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            // 进行遍历
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                // s 用于记录下个节点
                s = q.next;
                // 如果等待节点的 thread 变量不为空,表明该线程是处于阻塞状态
                if (q.thread != null)
                    // pred 用于记录前个节点
                    pred = q;
                // 如果 pred 不为空
                else if (pred != null) {
                    // 如果 q.thread 为 null,此操作为删除 q 节点,否则没有任何变化
                    // 即从 pred -> q -> s 变成 pred -> s
                    // 因为此处是内部节点很安全,因此可以进行非并发地删除
                    pred.next = s;
                    // 如果 pred 的 thread 为 null,即节点的数据被其他线程修改了
                    if (pred.thread == null) // check for race
                        // 防止出现错误,重新进行外层循环,即重复获取 waiters,重新遍历
                        continue retry;
                }
                // 此处表明等待节点的线程为空,因此使用 CAS 操作进行节点删除
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    // 删除失败,重新进行外层循环,即重复获取 waiters,重新遍历
                    continue retry;
            }
            // 如果前面的操作在删除节点的过程中,节点没有被其他线程修改
            // 即正常删除,那么就会走到这里,然后返回
            break;
        }
    }
}

3.3 run 方法

java.util.concurrent.FutureTask#run

该方法用来调用 Callable 对象的 call 方法,并将返回结果赋值给 outcome 变量,如果出现执行异常,就将异常赋值给 outcome

// 除非已取消,否则为此 Future 的 outcome 设置为其运行结果。
public void run() {
    // 如果 state 不是 NEW,或者 state 为 NEW 但是 CAS 失败时,直接返回
    // 意味着只能有一个线程能,其他线程直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 如果 callable 不为空,且 state 为 NEW
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用 call 方法获取 result
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // outcome 设置为 ex
                setException(ex);
            }
            // 如果 ran 为 true,即执行成功,没有异常
            if (ran)
                // outcome 设置为 result
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 如果处于 INTERRUPTING 或者 INTERRUPTED 状态
        if (s >= INTERRUPTING)
            // 使用 Thread.yield() 方法,尝试让出线程执行权,直到结果不为 INTERRUPTING
            handlePossibleCancellationInterrupt(s);
    }
}

// 设置返回结果
protected void set(V v) {
    // CAS 将 NEW 状态转换成 COMPLETING 状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将返回结果赋值给 outcome
        outcome = v;
        // 结果赋值成功,将状态变成 NORMAL 状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

// 设置异常
protected void setException(Throwable t) {
    // CAS 将 NEW 状态转换成 COMPLETING 状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将异常赋值给 outcome
        outcome = t;
        // 结果赋值成功,将状态变成 EXCEPTIONAL 状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // 释放所有的阻塞线程,运行 done 方法
        finishCompletion();
    }
}

java.util.concurrent.FutureTask#finishCompletion

该方法用来释放所有等待返回结果的线程,并且清除所有 waiters 上的线程。

// 删除 waiters 的所有的节点上的等待线程,并且释放所有等待的线程
// 同时调用 done(),并使 callable 无效(置空)。
private void finishCompletion() {
    // assert state > COMPLETING;
    // 如果 waiters 不为空就一直循环,防止阻塞链表上的线程没有被释放
    // 因为是多线程运行,即在此期间,仍然可能有线程进入 waiters 链表
    for (WaitNode q; (q = waiters) != null;) {
        // 将 waiters 通过 CAS 赋值为 null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 从头节点开始遍历
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 释放阻塞线程
                    LockSupport.unpark(t);
                }
                // 继续向下遍历链表
                WaitNode next = q.next;
                if (next == null)
                    // 遍历结束,跳出循环
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    // 空方法,扩展点,由子类实现
    done();
    // 任务置空,帮助 gc
    callable = null;        // to reduce footprint
}

四 注意事项

  • FutureTask 的状态是不断向前的,不会回退,只能运行一次计算,即一旦进入终端的状态即 NORMALEXCEPTIONALINTERRUPTED 状态,那么就无法发生改变。
  • 如果我们想要能够重复运行,可以实现 FutureTask,调用 runAndReset 方法,该方法可以将状态重新变为 NEW,但是注意此方法不会对 outcome 赋值,详见 FutureTask 的子类 ScheduledFutureTask
  • 调用完 get() 方法,一定要记得调用 run() 方法,否则线程将会一直被阻塞住,因此也建议最好使用 get(long, TimeUnit) 方法。
  • 如果我们想要进行扩展,可以实现 FutureTask ,其留有扩展点 done 方法,以及有许多的 protected 方法。

五 参考文章

如果有兴趣可以微信搜一搜程序锋子,关注本人的微信公众号
FutureTask 深入浅出,通过源码讲解彻底吃透

声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。