Fork/Join框架的介绍
Fork/Join框架是Java7提供了的一个用于并发执行任务的框架,是一个实现了ExecutorService接口的多线程处理器。它可以把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,充分利用可用的资源,进而提高应用的执行效率
Fork/Join执行逻辑
使用Fork/Join框架
第一步首先分割任务,需要一个fork类来把大任务分割成小任务,如果子任务还是很大,那继续分割直到足够小
第二步然后合并任务结果,分割子任务分别放在双端队列,然后几个启动线程分别从双端队列中获取任务执行,子任务执行完的结果统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
常用方法
-
compute(); 计算方法(分拆的子任务)
-
fork(); // 执行子任务
-
join(); // 子任务结束后返回对应结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 1
2import java.util.ArrayList;
3import java.util.concurrent.Callable;
4import java.util.concurrent.ForkJoinPool;
5import java.util.concurrent.ForkJoinTask;
6import java.util.concurrent.RecursiveTask;
7
8public class CountTask extends RecursiveTask<Long> {
9 private static final int THRESHOLD = 10000;//任务分解的规模
10
11 private long start;
12 private long end;
13
14 public CountTask(long start, long end) {
15 this.start = start;
16 this.end = end;
17 }
18
19 public Long compute() {
20 long sum = 0;
21 //如果需要求和的总数大于THRESHOLD,那么任务继续分解,否则直接可以执行了
22 boolean canCompute = (end - start) < THRESHOLD;
23 if (canCompute) {
24 for (long i = start; i < end; i++) {
25 sum += i;
26 }
27 } else {
28 // 分成100任务
29 long step = (start + end) / 100;
30 ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
31 long pos = start;
32 for (int i = 0; i < 100; i++) {
33 long lastOne = pos + step;
34 if (lastOne > end)
35 lastOne = end;
36 CountTask subTask = new CountTask(pos, lastOne);
37 pos += step + 1;
38 subTasks.add(subTask);
39 subTask.fork();
40 }
41 for (CountTask t : subTasks) {
42 sum += t.join();
43 }
44 }
45 return sum;
46 }
47
48 public static void main(String[] args) {
49 //创建一个ForkJoinPool线程池
50 ForkJoinPool forkJoinPool = new ForkJoinPool();
51 //构造一个任务
52 CountTask task = new CountTask(0, 200000L);
53 //提交线程池后拿到结果的任务
54 ForkJoinTask<Long> result = forkJoinPool.submit(task);
55 try {
56 //从任务获取结果
57 long res = result.get();
58 System.out.println("sum=" + res);
59 } catch (Exception e) {
60
61 }
62 }
63
64}
65
66
67
使用ForkJoin框架经常使用到两个任务模型RecursiveTask和RecursiveAction,RecursiveTask用于定义有返回值的任务,RecursiveAction用于定义没有返回值的任务,首先建立ForkJoinPool线程池,再构造一个计算1到200000求和的任务。将任务提交给线程池,线程池会返回一个携带结果的任务,通过get()获取最终结果,如果调用get()时候任务还没完成就等待。
如果任务的划分层次过深会出现两种情况:
1、系统内的线程数量越积越多,导致性能下降
2、函数调用层次很深,导致栈溢出
工作窃取算法
工作窃取算法是指某个线程从其他队列中窃取任务来执行
在一般情况下,一个物理线程实际需要处理多个逻辑任务,因此每个线程必然需要拥有一个任务队列。比如,线程A已经把自己所以任务都执行完了,而线程B还有一堆任务没有完成,那么线程A就会帮助线程B,从线程B中拿一些任务过来处理,尽可能达到平衡,
总是从任务队列的底部拿数据,而线程执行自己的任务是从相反的顶部开始拿。这样就有利于避免数据竞争
我们在实现 分治编程时,主要就是调用 ForkJoinTask 的 fork() 和 join() 方法。fork() 方法用于提交子任务,而 join() 方法则用于等待子任务的完成。而这个过程中,将涉及到 “工作窃取算法”。
1、 fork( ) 方法提交任务
1
2
3
4
5
6
7
8
9
10
11
12 1public final ForkJoinTask<V> fork() {
2 Thread t;
3 //判断是否是一个 工作线程
4 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
5 //加入到内部队列中
6 ((ForkJoinWorkerThread)t).workQueue.push(this);
7 else//由 common 线程池来执行任务
8 ForkJoinPool.common.externalPush(this);
9 return this;
10 }
11
12
fork()方法先判断当前线程(调用fork()来提交任务的线程)是不是一个 ForkJoinWorkerThread 的工作线程,如果是,则将任务加入到内部队列中,否则,由 ForkJoinPool 提供的内部公用的线程池 common 线程池 来执行这个任务。
根据上面的说法,意味着我们可以在普通线程池中直接调用 fork() 方法来提交任务到一个默认提供的线程池中。这将非常方便。假如,你要在程序中处理大任务,需要分治编程,但你仅仅只处理一次,以后就不会用到,而且任务不算太大,不需要设置特定的参数,那么你肯定不想为此创建一个线程池,这时默认的提供的线程池将会很有用。
2、join( ) 等待任务的完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1public final V join() {
2 int s;
3 if ((s = doJoin() & DONE_MASK) != NORMAL)
4 reportException(s);
5 return getRawResult();//直接返回结果
6 }
7 private int doJoin() {
8 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
9 return
10 //如果完成,直接返回s
11 (s = status) < 0 ? s :
12 //没有完成,判断是不是池中的 ForkJoinWorkerThread 工作线程
13 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
14 //如果是池中线程,执行这里
15 (w = (wt = (ForkJoinWorkerThread)t).workQueue).
16 tryUnpush(this) && (s = doExec()) < 0 ? s :
17 wt.pool.awaitJoin(w, this, 0L) :
18 //如果不是池中的线程池,则执行这里
19 externalAwaitDone();
20 }
21
22
仔细看上面的注释。当 dojoin( )方法发现任务没有完成且当前线程是池中线程时,执行了 tryUnpush( )方法。tryUnpush()方法尝试去执行此任务:如果要join的任务正好在当前任务队列的顶端,那么pop出这个任务,然后调用 doExec() 让当前线程去执行这个任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1final boolean tryUnpush(ForkJoinTask<?> t) {
2 ForkJoinTask<?>[] a; int s;
3 if ((a = array) != null && (s = top) != base &&
4 U.compareAndSwapObject
5 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
6 U.putOrderedInt(this, QTOP, s);
7 return true;
8 }
9 return false;
10 }
11 final int doExec() {
12 int s; boolean completed;
13 if ((s = status) >= 0) {
14 try {
15 completed = exec();
16 } catch (Throwable rex) {
17 return setExceptionalCompletion(rex);
18 }
19 if (completed)
20 s = setCompletion(NORMAL);
21 }
22 return s;
23 }
24
25
如果任务不是处于队列的顶端,那么就会执行 awaitJoin( ) 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
2 int s = 0;
3 if (task != null && w != null) {
4 ForkJoinTask<?> prevJoin = w.currentJoin;
5 U.putOrderedObject(w, QCURRENTJOIN, task);
6 CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
7 (CountedCompleter<?>)task : null;
8 for (;;) {
9 if ((s = task.status) < 0)//如果任务完成了,跳出死循环
10 break;
11 if (cc != null)//当前任务是CountedCompleter类型,则尝试从任务队列中获取当前任务的派生子任务来执行;
12 helpComplete(w, cc, 0);
13 else if (w.base == w.top || w.tryRemoveAndExec(task))//如果当前线程的内部队列为空,或者成功完成了任务,帮助某个线程完成任务。
14 helpStealer(w, task);
15 if ((s = task.status) < 0)//任务完成,跳出死循环
16 break;
17 long ms, ns;
18 if (deadline == 0L)
19 ms = 0L;
20 else if ((ns = deadline - System.nanoTime()) <= 0L)
21 break;
22 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
23 ms = 1L;
24 if (tryCompensate(w)) {
25 task.internalWait(ms);
26 U.getAndAddLong(this, CTL, AC_UNIT);
27 }
28 }
29 U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
30 }
31 return s;
32 }
33
34
重点说一下helpStealer。helpStealer的原则是你帮助我执行任务,我也帮你执行任务。
- 遍历奇数下标,如果发现队列对象currentSteal放置的刚好是自己要找的任务,则说明自己的任务被该队列A的owner线程偷来执行
- 如果队列A队列中有任务,则从队尾(base)取出执行;
- 如果发现队列A队列为空,则根据它正在join的任务,在拓扑找到相关的队列B去偷取任务执行。在执行的过程中要注意,我们应该完整的把任务完成
它们可能会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务线程永远从双端队列的尾部拿任务执行。
- 优点:充分利用线程进行并行计算,减少线程间的竞争。
- 缺点:在某些情况下还是会存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源, 比如创建多个线程和多个双端队列。
参考:《Java高并发程序设计》