Runnable任务类在提交的时候我们并不能检测到运行结果,也不能抛出异常供上层代码捕捉,这个时候就需要有一些标准的阻塞库,能让我们得到结果前阻塞,并且能捕捉异常。FutureTask就是这样一个阻塞库,内部采用的是FILO的非公平链表实现。
测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1FutureTask<String> task=new FutureTask<String>(new Callable<String>() {
2 @Override
3 public String call() throws Exception {
4 return "success!";
5 }
6 });
7 for (int i = 0; i <2 ; i++) {
8 new Thread(new Runnable() {
9 @Override
10 public void run() {
11 try{
12 String result=task.get();
13 System.out.println(Thread.currentThread().getName()+" has complete " );
14 }catch(Exception e){
15 e.printStackTrace();
16 }
17 }
18 }).start();
19 }
20 new Thread(task).start();
21
运行中通过debugger可以得到链表的结构:
测试结果
可以看到测试结果与链表的顺序是一致的。
接下来分析源码:
->get()
1
2
3
4
5
6
7 1 public V get() throws InterruptedException, ExecutionException {
2 int s = state;
3 if (s <= COMPLETING)
4 s = awaitDone(false, 0L);
5 return report(s);
6 }
7
在state变成完成状态及之前,进入awaitDone();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1 private int awaitDone(boolean timed, long nanos)
2 throws InterruptedException {
3 final long deadline = timed ? System.nanoTime() + nanos : 0L;
4 WaitNode q = null;
5 boolean queued = false;
6 for (;;) {
7 if (Thread.interrupted()) {
8 removeWaiter(q);
9 throw new InterruptedException();
10 }
11
12 int s = state;
13 if (s > COMPLETING) {
14 if (q != null)
15 q.thread = null;
16 return s;
17 }
18 else if (s == COMPLETING) // cannot time out yet
19 Thread.yield();
20 else if (q == null)
21 q = new WaitNode();
22 else if (!queued)
23 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
24 q.next = waiters, q);
25 else if (timed) {
26 nanos = deadline - System.nanoTime();
27 if (nanos <= 0L) {
28 removeWaiter(q);
29 return state;
30 }
31 LockSupport.parkNanos(this, nanos);
32 }
33 else
34 LockSupport.park(this);
35 }
36 }
37
第一步:检测是否被中断,中断则从链表中移除节点
第二步:s > COMPLETING,如果已经完成,清空运行线程;
s == COMPLETING,任务完成,但未设置最终状态,可以请求退出线程的运行权,等待工作线程完成;
q == null,新的线程调用get;
!queued,比较并交换对象,将新的节点的下一个节点指向之前创建的节点,并将waiters指向新节点,这是原子操作,如果成功
下一步会阻塞线程,如果失败继续执行该方法直到成功为止,并阻塞。
分析到这里完成了阻塞的过程。
->唤醒过程 run
线程启动回调run方法,run方法执行任务类的run,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1public void run() {
2 if (state != NEW ||
3 !UNSAFE.compareAndSwapObject(this, runnerOffset,
4 null, Thread.currentThread()))
5 return;
6 try {
7 Callable<V> c = callable;
8 if (c != null && state == NEW) {
9 V result;
10 boolean ran;
11 try {
12 result = c.call();
13 ran = true;
14 } catch (Throwable ex) {
15 result = null;
16 ran = false;
17 setException(ex);
18 }
19 if (ran)
20 set(result);
21 }
22 } finally {
23 // runner must be non-null until state is settled to
24 // prevent concurrent calls to run()
25 runner = null;
26 // state must be re-read after nulling runner to prevent
27 // leaked interrupts
28 int s = state;
29 if (s >= INTERRUPTING)
30 handlePossibleCancellationInterrupt(s);
31 }
32 }
33
1
2
3
4
5
6 1第一步:
2state != NEW ||
3 !UNSAFE.compareAndSwapObject(this, runnerOffset,
4 null, Thread.currentThread())
5
6
当任务的状态不为新建或者运行线程由null变为当前线程失败时退出。这个很好分析,当state!=new说明此任务被其它的线程占用,必须退出。即使没有其他的线程执行该任务,也可能出现多个线程抢夺该任务执行权的情况,这个时候通过compareAndSwapObject操作的才代表获得了运行权,其他的线程直接退出,一个任务类在通常情况下是不需要被执行两次的。显然FutrueTask也是这么设计的。
得到了任务执行权的线程继续执行,接着调用callable的call方法,线程顺利执行,接着调用 set(result)方法设置线程的状态和唤醒阻塞线程。
1
2
3
4
5
6
7
8 1 protected void set(V v) {
2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3 outcome = v;
4 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
5 finishCompletion();
6 }
7 }
8
线程首先将状态设置为完成状态,这里要使用原子操作,因为任务的可能被中断和取消,任务的最终状态为normal,接着调用finishCompletion()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1private void finishCompletion() {
2 // assert state > COMPLETING;
3 for (WaitNode q; (q = waiters) != null;) {
4 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
5 for (;;) {
6 Thread t = q.thread;
7 if (t != null) {
8 q.thread = null;
9 LockSupport.unpark(t);
10 }
11 WaitNode next = q.next;
12 if (next == null)
13 break;
14 q.next = null; // unlink to help gc
15 q = next;
16 }
17 break;
18 }
19 }
20
21 done();
22
23 callable = null; // to reduce footprint
24 }
25
循环唤醒阻塞线程,首先将等待队列的第一个节点赋值给q,并使用原子操作将等待节点设置为null,这么做可以保证循环节点时只会有一个线程执行该方法,剩下的就不用详细解释了。
FutureTask可以简单的理解为只执行一次任务线程,并且可以阻塞多个线程直到获取结果的非公平的阻塞库。
补充1:这里有一个removeWaiter方法,里面有很多操作性。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1 private void removeWaiter(WaitNode node) {
2 if (node != null) {
3 node.thread = null;
4 retry:
5 for (;;) { // restart on removeWaiter race
6 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
7 s = q.next;
8 if (q.thread != null)
9 pred = q;
10 else if (pred != null) {
11 pred.next = s;
12 if (pred.thread == null) // check for race
13 continue retry;
14 }
15 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
16 q, s))
17 continue retry;
18 }
19 break;
20 }
21 }
22 }
23
这段代码的主要目的是将当前节点的执行线程置为null,并将前一个节点指向删除节点的下一个节点,相关代码,
q = waiters;
s = q.next;
pred = q;
pred.next = s;
这几段代码在多层循环中会出现上述描述的结果。
因为是并发编程,需要分类讨论多种情况:
- 有新的节点加入,影响是被删除节点靠后。
- 被删除节点为头节点。
- 被删除节点为已丢失,这种情况可能是由于之前已经被中断或者中断过迟,任务已经执行成功,并且队列被刷新。
情况1:
当有新的线程调用get,再分
1.1任务线程未完成,会不断的从头节点循环,直到当前节点的线程thread为null,但并不是找到thread为null的就可以了,从多线程的角度来看,其他的线程也可能发生中断,这里只讨论在被删除节点之前的节点,之后可以类推。
1.1.1如果被删除节点的前一个节点也被删除,会执行这段代码
1
2
3
4
5
6
7 1else if (pred != null) {
2 pred.next = s;
3 if (pred.thread == null) // check for race
4 continue retry;
5}
6
7
循环会继续,这里采用的是goto语法,细想一下,这里只有采用goto语法是比较方便的。
1.1.2如果被删除节点的前一个节点正常,会执行这段代码
1
2
3
4
5
6 1else if (pred != null) {
2 pred.next = s;
3 if (pred.thread == null) // check for race
4 continue retry;
5}
6
不会进入goto语句。
1.1.3如果被删除节点之前的节点都被删除了,也就是被删除节点是头节点,参考情况2。
情况2:被删除节点是头节点,显然q.thread == null;pred == null;会执行这段代码
1
2
3
4 1else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
2 q, s))
3 continue retry;
4
这里执行失败的会进入goto,直到所有节点的操作全部执行完,该删的删。
情况3:如果中断过迟,任务线程执行完,这里又可以分为几种情况.
3.1这时候新增的线程直接返回并调用report返回结果
3.2如果之前的线程堵塞在下面这里,这也是有可能的,有可能之前cpu负荷,这时候才分配得到执行权。
1
2
3
4 1else if (!queued)
2 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
3 q.next = waiters, q);
4
3.2.1如果执行了finishCompletion,waiters为null,这时候waiters被重新赋值给q,下次循环时还是直接退出。
3.2.2如果没有执行finishCompletion,队列会继续增加,这时候显然remove是没有作用的,但是不影响结果。
讨论的情况还有很多,就不一一列举了。
补充2:futureTask的取消策略
一个设计优秀的代码必须带有优秀的取消策略,不管这里的取消是定义为取消还是中断,但是中断通常是取消策略的首选。
Java的中断并不是直接中断线程,而是采用的一种线程协调机制,依赖于用户设置安全的取消点,Java并没有规定取消的时间长短,但通常而言速度是非常快的。
这里看下futureTask的取消策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1 public boolean cancel(boolean mayInterruptIfRunning) {
2 if (!(state == NEW &&
3 UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
4 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
5 return false;
6 try { // in case call to interrupt throws exception
7 if (mayInterruptIfRunning) {
8 try {
9 Thread t = runner;
10 if (t != null)
11 t.interrupt();
12 } finally { // final state
13 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
14 }
15 }
16 } finally {
17 finishCompletion();
18 }
19 return true;
20 }
21
- cancel(true)中断策略
当任务状态不为NEW或者任务状态由NEW转化为中断失败时,直接返回;然后线程执行中断方法,最后调用finishCompletion()唤醒阻塞线程。
2.cancel(false)取消策略
当任务状态不为NEW或者任务状态由NEW转化为中断失败时,直接返回;取消并没有做什么操作,这个需要扩展。