文章目录
-
一、ThreadPoolExecutor的饱和策略
-
1.1、什么是饱和策略
* 1.2、Abort策略
* 1.3、CallerRuns策略
* 1.4、Discard策略
* 1.5、DiscardOldest策略1
21 * 二、ThreadPoolExecutor的扩展方法
2
本文将介绍ThreadPoolExecutor的饱和策略以及ThreadPoolExecutor的扩展方法;当线程充满了ThreadPool的有界队列时,饱和策略开始起作用。饱和策略可以理解为队列饱和后,处理后续无法入队的任务的策略。在JDK中预先提供了几种饱和策略,ThreadPoolExecutor可以通过调用setRejectedExecutionHandler来修改饱和策略。ThreadPoolExecutor在设计的时候就支持扩展其方法,主要为前处理beforeExecute、后处理afterExecute
1 1`以及执行完毕处理terminated。
一、ThreadPoolExecutor的饱和策略
1.1、什么是饱和策略
当线程充满了ThreadPool的有界队列时,饱和策略开始起作用。**饱和策略可以理解为队列饱和后,处理后续无法入队的任务的策略。**在JDK中预先提供了几种饱和策略,ThreadPoolExecutor可以通过调用setRejectedExecutionHandler来修改饱和策略。
1.2、Abort策略
ThreadPoolExecutor的 默认策略,新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1@Test
2 public void testPolicy() {
3 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4 5,
5 0,
6 TimeUnit.MICROSECONDS,
7 new ArrayBlockingQueue<Runnable>(5));
8 //设置饱和策略为AbortPolicy
9 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
10 //测试
11 for (int i = 1; i <= 11; i++) {
12
13 int count = i;
14 executor.execute(new Runnable() {
15 @Override
16 public void run() {
17 //模拟耗时操作
18 for (int j = 0; j < 100000; j++) {
19 //...
20 }
21 System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22 }
23 });
24
25
26 }
27 }
28
29
运行后,由于设置了核心池线程数为5,有界队列能存储的线程数为5,故一共只有10个任务能被执行,多余的那1个任务将被以异常的形式抛出。

1.3、CallerRuns策略
既不抛弃任务也不抛出异常,而是将某些任务回退到调用者,让调用者去执行它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1@Test
2 public void testPolicy() {
3 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4 5,
5 0,
6 TimeUnit.MICROSECONDS,
7 new ArrayBlockingQueue<Runnable>(5));
8 //设置饱和策略为AbortPolicy
9 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
10 //测试
11 for (int i = 1; i <= 11; i++) {
12
13 int count = i;
14 executor.execute(new Runnable() {
15 @Override
16 public void run() {
17 //模拟耗时操作
18 for (int j = 0; j < 100000; j++) {
19 //...
20 }
21 System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22 }
23 });
24
25
26 }
27 }
28
29
从执行结果中可以看到,多出来的那11个任务被executor的调用者main主线程执行。

1.4、Discard策略
多余的任务被抛弃,不执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1@Test
2 public void testPolicy() {
3 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4 5,
5 0,
6 TimeUnit.MICROSECONDS,
7 new ArrayBlockingQueue<Runnable>(5));
8 //设置饱和策略为DiscardPolicy
9 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
10 //测试
11 for (int i = 1; i <= 11; i++) {
12
13 int count = i;
14 executor.execute(new Runnable() {
15 @Override
16 public void run() {
17 //模拟耗时操作
18 for (int j = 0; j < 100000; j++) {
19 //...
20 }
21 System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22 }
23 });
24
25
26 }
27 }
28
29

1.5、DiscardOldest策略
抛弃工作队列中的旧任务,然后尝试提交新任务执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1@Test
2 public void testPolicy() {
3 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4 5,
5 0,
6 TimeUnit.MICROSECONDS,
7 new ArrayBlockingQueue<Runnable>(5));
8 //设置饱和策略为DiscardOldestPolicy
9 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
10 //测试
11 for (int i = 1; i <= 11; i++) {
12
13 int count = i;
14 executor.execute(new Runnable() {
15 @Override
16 public void run() {
17 //模拟耗时操作
18 for (int j = 0; j < 100000; j++) {
19 //...
20 }
21 System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22 }
23 });
24
25
26 }
27 }
28
29
从执行结果可以看出,第11个任务被执行了,说明工作队列中的最旧任务被抛弃,执行了这个新的任务。

二、ThreadPoolExecutor的扩展方法
ThreadPoolExecutor在设计的时候就支持扩展其方法,主要为前处理beforeExecute、后处理afterExecute以及执行完毕处理terminated。
自定义的ThreadPoolExecutor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1public class MyThreadPoolExecutor extends ThreadPoolExecutor {
2 public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
3 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
4 }
5
6 public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
7 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
8 }
9
10 public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
11 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
12 }
13
14 public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
15 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
16 }
17
18
19 @Override
20 protected void beforeExecute(Thread t, Runnable r) {
21 super.beforeExecute(t, r);
22 //before execute....
23 System.out.println("before execute....");
24 }
25
26 @Override
27 protected void afterExecute(Runnable r, Throwable t) {
28 super.afterExecute(r, t);
29 //after execute....
30 System.out.println("after execute....");
31 }
32
33 @Override
34 protected void terminated() {
35 super.terminated();
36 //terminated
37 System.out.println("terminated");
38 }
39}
40
41
