Java并发编程 | 第六篇:Fork/Join框架

释放双眼,带上耳机,听听看~!

Fork/Join框架的介绍

Fork/Join框架是Java7提供了的一个用于并发执行任务的框架,是一个实现了ExecutorService接口的多线程处理器。它可以把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,充分利用可用的资源,进而提高应用的执行效率

Fork/Join执行逻辑

使用Fork/Join框架

第一步首先分割任务,需要一个fork类来把大任务分割成小任务,如果子任务还是很大,那继续分割直到足够小
第二步然后合并任务结果,分割子任务分别放在双端队列,然后几个启动线程分别从双端队列中获取任务执行,子任务执行完的结果统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

常用方法

  1. compute(); 计算方法(分拆的子任务)

  2. fork(); // 执行子任务

  3. join(); // 子任务结束后返回对应结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
1
2import java.util.ArrayList;
3import java.util.concurrent.Callable;
4import java.util.concurrent.ForkJoinPool;
5import java.util.concurrent.ForkJoinTask;
6import java.util.concurrent.RecursiveTask;
7
8public class CountTask extends RecursiveTask<Long> {
9   private static final int THRESHOLD = 10000;//任务分解的规模
10
11  private long start;
12  private long end;
13
14  public CountTask(long start, long end) {
15      this.start = start;
16      this.end = end;
17  }
18
19  public Long compute() {
20      long sum = 0;
21      //如果需要求和的总数大于THRESHOLD,那么任务继续分解,否则直接可以执行了
22      boolean canCompute = (end - start) < THRESHOLD;
23      if (canCompute) {
24          for (long i = start; i < end; i++) {
25              sum += i;
26          }
27      } else {
28          // 分成100任务
29          long step = (start + end) / 100;
30          ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
31          long pos = start;
32          for (int i = 0; i < 100; i++) {
33              long lastOne = pos + step;
34              if (lastOne > end)
35                  lastOne = end;
36              CountTask subTask = new CountTask(pos, lastOne);
37              pos += step + 1;
38              subTasks.add(subTask);
39              subTask.fork();
40          }
41          for (CountTask t : subTasks) {
42              sum += t.join();
43          }
44      }
45      return sum;
46  }
47
48  public static void main(String[] args) {
49  //创建一个ForkJoinPool线程池
50      ForkJoinPool forkJoinPool = new ForkJoinPool();
51      //构造一个任务
52      CountTask task = new CountTask(0, 200000L);
53      //提交线程池后拿到结果的任务
54      ForkJoinTask<Long> result = forkJoinPool.submit(task);
55      try {
56      //从任务获取结果
57          long res = result.get();
58          System.out.println("sum=" + res);
59      } catch (Exception e) {
60
61      }
62  }
63
64}
65
66
67

使用ForkJoin框架经常使用到两个任务模型RecursiveTask和RecursiveAction,RecursiveTask用于定义有返回值的任务,RecursiveAction用于定义没有返回值的任务,首先建立ForkJoinPool线程池,再构造一个计算1到200000求和的任务。将任务提交给线程池,线程池会返回一个携带结果的任务,通过get()获取最终结果,如果调用get()时候任务还没完成就等待。

如果任务的划分层次过深会出现两种情况:
1、系统内的线程数量越积越多,导致性能下降
2、函数调用层次很深,导致栈溢出

工作窃取算法

工作窃取算法是指某个线程从其他队列中窃取任务来执行
在一般情况下,一个物理线程实际需要处理多个逻辑任务,因此每个线程必然需要拥有一个任务队列。比如,线程A已经把自己所以任务都执行完了,而线程B还有一堆任务没有完成,那么线程A就会帮助线程B,从线程B中拿一些任务过来处理,尽可能达到平衡,
总是从任务队列的底部拿数据,而线程执行自己的任务是从相反的顶部开始拿。这样就有利于避免数据竞争

我们在实现 分治编程时,主要就是调用 ForkJoinTask 的 fork() 和 join() 方法。fork() 方法用于提交子任务,而 join() 方法则用于等待子任务的完成。而这个过程中,将涉及到 “工作窃取算法”。

1、 fork( ) 方法提交任务


1
2
3
4
5
6
7
8
9
10
11
12
1public final ForkJoinTask<V> fork() {
2        Thread t;
3        //判断是否是一个 工作线程
4        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
5            //加入到内部队列中
6            ((ForkJoinWorkerThread)t).workQueue.push(this);
7        else//由 common 线程池来执行任务
8            ForkJoinPool.common.externalPush(this);
9        return this;
10    }
11
12

fork()方法先判断当前线程(调用fork()来提交任务的线程)是不是一个 ForkJoinWorkerThread 的工作线程,如果是,则将任务加入到内部队列中,否则,由 ForkJoinPool 提供的内部公用的线程池 common 线程池 来执行这个任务。

根据上面的说法,意味着我们可以在普通线程池中直接调用 fork() 方法来提交任务到一个默认提供的线程池中。这将非常方便。假如,你要在程序中处理大任务,需要分治编程,但你仅仅只处理一次,以后就不会用到,而且任务不算太大,不需要设置特定的参数,那么你肯定不想为此创建一个线程池,这时默认的提供的线程池将会很有用。

2、join( ) 等待任务的完成


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1public final V join() {
2        int s;
3        if ((s = doJoin() & DONE_MASK) != NORMAL)
4            reportException(s);
5        return getRawResult();//直接返回结果
6    }
7 private int doJoin() {
8        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
9        return
10            //如果完成,直接返回s
11            (s = status) < 0 ? s :
12            //没有完成,判断是不是池中的 ForkJoinWorkerThread 工作线程
13            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
14            //如果是池中线程,执行这里
15            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
16            tryUnpush(this) && (s = doExec()) < 0 ? s :
17            wt.pool.awaitJoin(w, this, 0L) :
18            //如果不是池中的线程池,则执行这里
19            externalAwaitDone();
20    }
21
22

仔细看上面的注释。当 dojoin( )方法发现任务没有完成且当前线程是池中线程时,执行了 tryUnpush( )方法。tryUnpush()方法尝试去执行此任务:如果要join的任务正好在当前任务队列的顶端,那么pop出这个任务,然后调用 doExec() 让当前线程去执行这个任务。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1final boolean tryUnpush(ForkJoinTask<?> t) {
2            ForkJoinTask<?>[] a; int s;
3            if ((a = array) != null && (s = top) != base &&
4                U.compareAndSwapObject
5                (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
6                U.putOrderedInt(this, QTOP, s);
7                return true;
8            }
9            return false;
10        }
11 final int doExec() {
12        int s; boolean completed;
13        if ((s = status) >= 0) {
14            try {
15                completed = exec();
16            } catch (Throwable rex) {
17                return setExceptionalCompletion(rex);
18            }
19            if (completed)
20                s = setCompletion(NORMAL);
21        }
22        return s;
23    }
24
25

如果任务不是处于队列的顶端,那么就会执行 awaitJoin( ) 方法。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1   final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
2        int s = 0;
3        if (task != null && w != null) {
4            ForkJoinTask<?> prevJoin = w.currentJoin;
5            U.putOrderedObject(w, QCURRENTJOIN, task);
6            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
7                (CountedCompleter<?>)task : null;
8            for (;;) {
9                if ((s = task.status) < 0)//如果任务完成了,跳出死循环
10                    break;
11                if (cc != null)//当前任务是CountedCompleter类型,则尝试从任务队列中获取当前任务的派生子任务来执行;
12                    helpComplete(w, cc, 0);
13                else if (w.base == w.top || w.tryRemoveAndExec(task))//如果当前线程的内部队列为空,或者成功完成了任务,帮助某个线程完成任务。
14                    helpStealer(w, task);
15                if ((s = task.status) < 0)//任务完成,跳出死循环
16                    break;
17                long ms, ns;
18                if (deadline == 0L)
19                    ms = 0L;
20                else if ((ns = deadline - System.nanoTime()) <= 0L)
21                    break;
22                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
23                    ms = 1L;
24                if (tryCompensate(w)) {
25                    task.internalWait(ms);
26                    U.getAndAddLong(this, CTL, AC_UNIT);
27                }
28            }
29            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
30        }
31        return s;
32    }
33
34

重点说一下helpStealer。helpStealer的原则是你帮助我执行任务,我也帮你执行任务。

  1. 遍历奇数下标,如果发现队列对象currentSteal放置的刚好是自己要找的任务,则说明自己的任务被该队列A的owner线程偷来执行
  2. 如果队列A队列中有任务,则从队尾(base)取出执行;
  3. 如果发现队列A队列为空,则根据它正在join的任务,在拓扑找到相关的队列B去偷取任务执行。在执行的过程中要注意,我们应该完整的把任务完成

它们可能会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务线程永远从双端队列的尾部拿任务执行。

  • 优点:充分利用线程进行并行计算,减少线程间的竞争。
  • 缺点:在某些情况下还是会存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源, 比如创建多个线程和多个双端队列。

参考:《Java高并发程序设计》

给TA打赏
共{{data.count}}人
人已打赏
安全技术

bootstrap栅格系统自定义列

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索