ForkJoin框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
与MapReduce思想非常类似。从字面意思上看,Fork就是把一个大任务切割成若干个子任务并行执行,Join就是合并这些子任务的执行结果,最后得到大任务的结果。主要采用工作窃取算法。
工作窃取算法是指某个线程从其他队列里窃取任务来执行。下面是工作窃取的流程图:
为什么要使用工作窃取算法?
加入我们需要做一个比较大的任务,我们可以把这个任务分割成若干个互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,为每个队列创建一个单独的线程来执行队列里面的任务,线程和队列一一对应。比如A线程负责处理A队列里面的任务,但是有些线程会先把自己的队列里面的任务干完,而其他线程还有对应的任务等待处理,干完活的线程就会帮其他线程干活,于是就会去其他线程的队列里窃取一个任务来执行,这时他们会访问同一个队列,所以为了减少窃取任务线程与被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务的线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。这种优点就是充分利用线程进行并行计算,减少了线程间的竞争,缺点是在某些情况下还是存在竞争(比如在双端队列只有一个任务时),同时也消耗了更多的系统资源(比如创建了多个线程和多个双端队列)。
对于ForkJoin框架而言,当一个任务正在等待他使用ForkJoin操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始他的执行,通过这种方式,线程充分利用他的运行时间来提高应用程序的性能,为了实现这个目标ForkJoin框架执行的任务有一些局限性。
局限性
- 任务只能使用Fork和Join操作来作为同步机制,如果使用了其他同步机制,那他们在同步操作时工作线程就不能执行其他任务了。比如在forkjoin框架中使任务进入sleep,在睡眠期间内正在执行这个任务的工作线程将不会执行其他任务了。
- 我们所拆分的任务不应该去执行IO操作。例如读写数据文件
- 任务不能抛出检查异常。必须通过必要的代码来处理他们
ForkJoin框架的核心是两个类:ForkJoinPool和ForkJoinTask。ForkJoinPool负责做实现(包括工作窃取算法)他管理工作线程提供任务的状态以及他们的执行信息。而ForkJoinTask则主要提供在任务中执行Fork和Join操作的机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 1@Slf4j
2public class ForkJoinTaskExample extends RecursiveTask<Integer> {
3
4 public static final int threshold = 2;
5 private int start;
6 private int end;
7
8 public ForkJoinTaskExample(int start, int end) {
9 this.start = start;
10 this.end = end;
11 }
12
13 @Override
14 protected Integer compute() {
15 int sum = 0;
16 //如果任务足够小就计算任务
17 boolean canCompute = (end - start) <= threshold;
18 if (canCompute) {
19 for (int i = start; i <= end; i++) {
20 sum += i;
21 }
22 } else {
23 // 如果任务大于阈值,就分裂成两个子任务计算
24 int middle = (start + end) / 2;
25 ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
26 ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
27
28 // 执行子任务
29 leftTask.fork();
30 rightTask.fork();
31
32 //等待任务执行结束合并其结果
33 int leftResult = leftTask.join();
34 int rightResult = rightTask.join();
35
36 // 合并子任务
37 sum = leftResult + rightResult;
38 }
39 return sum;
40 }
41
42 public static void main(String[] args) {
43 ForkJoinPool forkJoinPool = new ForkJoinPool();
44
45 //生成一个计算任务,计算1+2+3+4...+100
46 ForkJoinTaskExample taskExample = new ForkJoinTaskExample(1, 100);
47 Future<Integer> result = forkJoinPool.submit(taskExample);
48 try{
49 log.info("result:{}", result.get());
50 } catch (InterruptedException e) {
51 log.error("exception", e);
52 e.printStackTrace();
53 } catch (ExecutionException e) {
54 e.printStackTrace();
55 }
56 }
57}
58