FutureTask源码分析

释放双眼,带上耳机,听听看~!

Runnable任务类在提交的时候我们并不能检测到运行结果,也不能抛出异常供上层代码捕捉,这个时候就需要有一些标准的阻塞库,能让我们得到结果前阻塞,并且能捕捉异常。FutureTask就是这样一个阻塞库,内部采用的是FILO的非公平链表实现。

测试代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1FutureTask<String> task=new FutureTask<String>(new Callable<String>() {
2                @Override
3                public String call() throws Exception {
4                    return "success!";
5                }
6            });
7            for (int i = 0; i <2 ; i++) {
8                new Thread(new Runnable() {
9                    @Override
10                    public void run() {
11                        try{
12                            String result=task.get();
13                            System.out.println(Thread.currentThread().getName()+" has complete  " );
14                        }catch(Exception e){
15                            e.printStackTrace();
16                        }
17                    }
18                }).start();
19            }
20            new Thread(task).start();
21

运行中通过debugger可以得到链表的结构:

FutureTask源码分析

测试结果

FutureTask源码分析

可以看到测试结果与链表的顺序是一致的。

接下来分析源码:

->get()


1
2
3
4
5
6
7
1   public V get() throws InterruptedException, ExecutionException {
2        int s = state;
3        if (s <= COMPLETING)
4            s = awaitDone(false, 0L);
5        return report(s);
6    }
7

在state变成完成状态及之前,进入awaitDone();


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1    private int awaitDone(boolean timed, long nanos)
2        throws InterruptedException {
3        final long deadline = timed ? System.nanoTime() + nanos : 0L;
4        WaitNode q = null;
5        boolean queued = false;
6        for (;;) {
7            if (Thread.interrupted()) {
8                removeWaiter(q);
9                throw new InterruptedException();
10            }
11
12            int s = state;
13            if (s > COMPLETING) {
14                if (q != null)
15                    q.thread = null;
16                return s;
17            }
18            else if (s == COMPLETING) // cannot time out yet
19                Thread.yield();
20            else if (q == null)
21                q = new WaitNode();
22            else if (!queued)
23                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
24                                                     q.next = waiters, q);
25            else if (timed) {
26                nanos = deadline - System.nanoTime();
27                if (nanos <= 0L) {
28                    removeWaiter(q);
29                    return state;
30                }
31                LockSupport.parkNanos(this, nanos);
32            }
33            else
34                LockSupport.park(this);
35        }
36    }
37

第一步:检测是否被中断,中断则从链表中移除节点

第二步:s > COMPLETING,如果已经完成,清空运行线程;

s == COMPLETING,任务完成,但未设置最终状态,可以请求退出线程的运行权,等待工作线程完成;

q == null,新的线程调用get;

!queued,比较并交换对象,将新的节点的下一个节点指向之前创建的节点,并将waiters指向新节点,这是原子操作,如果成功

下一步会阻塞线程,如果失败继续执行该方法直到成功为止,并阻塞。

分析到这里完成了阻塞的过程。

->唤醒过程 run

线程启动回调run方法,run方法执行任务类的run,代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1public void run() {
2        if (state != NEW ||
3            !UNSAFE.compareAndSwapObject(this, runnerOffset,
4                                         null, Thread.currentThread()))
5            return;
6        try {
7            Callable<V> c = callable;
8            if (c != null && state == NEW) {
9                V result;
10                boolean ran;
11                try {
12                    result = c.call();
13                    ran = true;
14                } catch (Throwable ex) {
15                    result = null;
16                    ran = false;
17                    setException(ex);
18                }
19                if (ran)
20                    set(result);
21            }
22        } finally {
23            // runner must be non-null until state is settled to
24            // prevent concurrent calls to run()
25            runner = null;
26            // state must be re-read after nulling runner to prevent
27            // leaked interrupts
28            int s = state;
29            if (s >= INTERRUPTING)
30                handlePossibleCancellationInterrupt(s);
31        }
32    }
33

1
2
3
4
5
6
1第一步:
2​​​​state != NEW ||
3    !UNSAFE.compareAndSwapObject(this, runnerOffset,
4                                 null, Thread.currentThread())
5
6

当任务的状态不为新建或者运行线程由null变为当前线程失败时退出。这个很好分析,当state!=new说明此任务被其它的线程占用,必须退出。即使没有其他的线程执行该任务,也可能出现多个线程抢夺该任务执行权的情况,这个时候通过compareAndSwapObject操作的才代表获得了运行权,其他的线程直接退出,一个任务类在通常情况下是不需要被执行两次的。显然FutrueTask也是这么设计的。

得到了任务执行权的线程继续执行,接着调用callable的call方法,线程顺利执行,接着调用 set(result)方法设置线程的状态和唤醒阻塞线程。


1
2
3
4
5
6
7
8
1 protected void set(V v) {
2        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3            outcome = v;
4            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
5            finishCompletion();
6        }
7    }
8

线程首先将状态设置为完成状态,这里要使用原子操作,因为任务的可能被中断和取消,任务的最终状态为normal,接着调用finishCompletion()方法。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1private void finishCompletion() {
2        // assert state > COMPLETING;
3        for (WaitNode q; (q = waiters) != null;) {
4            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
5                for (;;) {
6                    Thread t = q.thread;
7                    if (t != null) {
8                        q.thread = null;
9                        LockSupport.unpark(t);
10                    }
11                    WaitNode next = q.next;
12                    if (next == null)
13                        break;
14                    q.next = null; // unlink to help gc
15                    q = next;
16                }
17                break;
18            }
19        }
20
21        done();
22
23        callable = null;        // to reduce footprint
24    }
25

循环唤醒阻塞线程,首先将等待队列的第一个节点赋值给q,并使用原子操作将等待节点设置为null,这么做可以保证循环节点时只会有一个线程执行该方法,剩下的就不用详细解释了。

FutureTask可以简单的理解为只执行一次任务线程,并且可以阻塞多个线程直到获取结果的非公平的阻塞库。

 

补充1:这里有一个removeWaiter方法,里面有很多操作性。

代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1    private void removeWaiter(WaitNode node) {
2        if (node != null) {
3            node.thread = null;
4            retry:
5            for (;;) {          // restart on removeWaiter race
6                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
7                    s = q.next;
8                    if (q.thread != null)
9                        pred = q;
10                    else if (pred != null) {
11                        pred.next = s;
12                        if (pred.thread == null) // check for race
13                            continue retry;
14                    }
15                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
16                                                          q, s))
17                        continue retry;
18                }
19                break;
20            }
21        }
22    }
23

这段代码的主要目的是将当前节点的执行线程置为null,并将前一个节点指向删除节点的下一个节点,相关代码,

q = waiters;
s = q.next;
pred = q;
pred.next = s;

这几段代码在多层循环中会出现上述描述的结果。

因为是并发编程,需要分类讨论多种情况:

  1. 有新的节点加入,影响是被删除节点靠后。
  2. 被删除节点为头节点。
  3. 被删除节点为已丢失,这种情况可能是由于之前已经被中断或者中断过迟,任务已经执行成功,并且队列被刷新。

情况1:

当有新的线程调用get,再分

1.1任务线程未完成,会不断的从头节点循环,直到当前节点的线程thread为null,但并不是找到thread为null的就可以了,从多线程的角度来看,其他的线程也可能发生中断,这里只讨论在被删除节点之前的节点,之后可以类推。

1.1.1如果被删除节点的前一个节点也被删除,会执行这段代码


1
2
3
4
5
6
7
1else if (pred != null) {
2    pred.next = s;
3    if (pred.thread == null) // check for race
4        continue retry;
5}
6
7

循环会继续,这里采用的是goto语法,细想一下,这里只有采用goto语法是比较方便的。

1.1.2如果被删除节点的前一个节点正常,会执行这段代码


1
2
3
4
5
6
1else if (pred != null) {
2    pred.next = s;
3    if (pred.thread == null) // check for race
4        continue retry;
5}
6

不会进入goto语句。

1.1.3如果被删除节点之前的节点都被删除了,也就是被删除节点是头节点,参考情况2。

情况2:被删除节点是头节点,显然q.thread == null;pred == null;会执行这段代码


1
2
3
4
1else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
2                                      q, s))
3    continue retry;
4

这里执行失败的会进入goto,直到所有节点的操作全部执行完,该删的删。

情况3:如果中断过迟,任务线程执行完,这里又可以分为几种情况.

3.1这时候新增的线程直接返回并调用report返回结果

3.2如果之前的线程堵塞在下面这里,这也是有可能的,有可能之前cpu负荷,这时候才分配得到执行权。

 


1
2
3
4
1else if (!queued)
2    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
3                                         q.next = waiters, q);
4

3.2.1如果执行了finishCompletion,waiters为null,这时候waiters被重新赋值给q,下次循环时还是直接退出。

3.2.2如果没有执行finishCompletion,队列会继续增加,这时候显然remove是没有作用的,但是不影响结果。

讨论的情况还有很多,就不一一列举了。

 补充2:futureTask的取消策略

一个设计优秀的代码必须带有优秀的取消策略,不管这里的取消是定义为取消还是中断,但是中断通常是取消策略的首选。

Java的中断并不是直接中断线程,而是采用的一种线程协调机制,依赖于用户设置安全的取消点,Java并没有规定取消的时间长短,但通常而言速度是非常快的。

这里看下futureTask的取消策略


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1    public boolean cancel(boolean mayInterruptIfRunning) {
2        if (!(state == NEW &&
3              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
4                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
5            return false;
6        try {    // in case call to interrupt throws exception
7            if (mayInterruptIfRunning) {
8                try {
9                    Thread t = runner;
10                    if (t != null)
11                        t.interrupt();
12                } finally { // final state
13                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
14                }
15            }
16        } finally {
17            finishCompletion();
18        }
19        return true;
20    }
21
  1. cancel(true)中断策略

当任务状态不为NEW或者任务状态由NEW转化为中断失败时,直接返回;然后线程执行中断方法,最后调用finishCompletion()唤醒阻塞线程。

2.cancel(false)取消策略

当任务状态不为NEW或者任务状态由NEW转化为中断失败时,直接返回;取消并没有做什么操作,这个需要扩展。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

英文站如何做Google Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索