java高并发(十六)J.U.C之ForkJoin

释放双眼,带上耳机,听听看~!

ForkJoin框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

与MapReduce思想非常类似。从字面意思上看,Fork就是把一个大任务切割成若干个子任务并行执行,Join就是合并这些子任务的执行结果,最后得到大任务的结果。主要采用工作窃取算法。

    工作窃取算法是指某个线程从其他队列里窃取任务来执行。下面是工作窃取的流程图:

java高并发(十六)J.U.C之ForkJoin

为什么要使用工作窃取算法?

    加入我们需要做一个比较大的任务,我们可以把这个任务分割成若干个互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,为每个队列创建一个单独的线程来执行队列里面的任务,线程和队列一一对应。比如A线程负责处理A队列里面的任务,但是有些线程会先把自己的队列里面的任务干完,而其他线程还有对应的任务等待处理,干完活的线程就会帮其他线程干活,于是就会去其他线程的队列里窃取一个任务来执行,这时他们会访问同一个队列,所以为了减少窃取任务线程与被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务的线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。这种优点就是充分利用线程进行并行计算,减少了线程间的竞争,缺点是在某些情况下还是存在竞争(比如在双端队列只有一个任务时),同时也消耗了更多的系统资源(比如创建了多个线程和多个双端队列)。

    对于ForkJoin框架而言,当一个任务正在等待他使用ForkJoin操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始他的执行,通过这种方式,线程充分利用他的运行时间来提高应用程序的性能,为了实现这个目标ForkJoin框架执行的任务有一些局限性。

局限性

  • 任务只能使用Fork和Join操作来作为同步机制,如果使用了其他同步机制,那他们在同步操作时工作线程就不能执行其他任务了。比如在forkjoin框架中使任务进入sleep,在睡眠期间内正在执行这个任务的工作线程将不会执行其他任务了。
  • 我们所拆分的任务不应该去执行IO操作。例如读写数据文件
  • 任务不能抛出检查异常。必须通过必要的代码来处理他们

ForkJoin框架的核心是两个类:ForkJoinPool和ForkJoinTask。ForkJoinPool负责做实现(包括工作窃取算法)他管理工作线程提供任务的状态以及他们的执行信息。而ForkJoinTask则主要提供在任务中执行Fork和Join操作的机制。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
1@Slf4j
2public class ForkJoinTaskExample extends RecursiveTask<Integer> {
3
4    public static final int threshold = 2;
5    private int start;
6    private int end;
7
8    public ForkJoinTaskExample(int start, int end) {
9        this.start = start;
10        this.end = end;
11    }
12
13    @Override
14    protected Integer compute() {
15        int sum = 0;
16        //如果任务足够小就计算任务
17        boolean canCompute = (end - start) <= threshold;
18        if (canCompute) {
19            for (int i = start; i <= end; i++) {
20                sum += i;
21            }
22        } else {
23            // 如果任务大于阈值,就分裂成两个子任务计算
24            int middle = (start + end) / 2;
25            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
26            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
27
28            // 执行子任务
29            leftTask.fork();
30            rightTask.fork();
31
32            //等待任务执行结束合并其结果
33            int leftResult = leftTask.join();
34            int rightResult = rightTask.join();
35
36            // 合并子任务
37            sum = leftResult + rightResult;
38        }
39        return sum;
40    }
41
42    public static void main(String[] args) {
43        ForkJoinPool forkJoinPool = new ForkJoinPool();
44
45        //生成一个计算任务,计算1+2+3+4...+100
46        ForkJoinTaskExample taskExample = new ForkJoinTaskExample(1, 100);
47        Future<Integer> result = forkJoinPool.submit(taskExample);
48        try{
49            log.info("result:{}", result.get());
50        } catch (InterruptedException e) {
51            log.error("exception", e);
52            e.printStackTrace();
53        } catch (ExecutionException e) {
54            e.printStackTrace();
55        }
56    }
57}
58

 

转载于:https://my.oschina.net/duanvincent/blog/3082024

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense(Google网站联盟)广告申请指南

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索