Java并发编程(7)-ThreadPoolExecutor的饱和策略

释放双眼,带上耳机,听听看~!

文章目录

  • 一、ThreadPoolExecutor的饱和策略

  • 1.1、什么是饱和策略
    * 1.2、Abort策略
    * 1.3、CallerRuns策略
    * 1.4、Discard策略
    * 1.5、DiscardOldest策略

    
    
    1
    2
    1  * 二、ThreadPoolExecutor的扩展方法
    2


本文将介绍ThreadPoolExecutor的饱和策略以及ThreadPoolExecutor的扩展方法;当线程充满了ThreadPool的有界队列时,饱和策略开始起作用。饱和策略可以理解为队列饱和后,处理后续无法入队的任务的策略。在JDK中预先提供了几种饱和策略,ThreadPoolExecutor可以通过调用setRejectedExecutionHandler来修改饱和策略。ThreadPoolExecutor在设计的时候就支持扩展其方法,主要为前处理beforeExecute、后处理afterExecute

1
1`

以及执行完毕处理terminated。

一、ThreadPoolExecutor的饱和策略

1.1、什么是饱和策略

当线程充满了ThreadPool的有界队列时,饱和策略开始起作用。**饱和策略可以理解为队列饱和后,处理后续无法入队的任务的策略。**在JDK中预先提供了几种饱和策略,ThreadPoolExecutor可以通过调用setRejectedExecutionHandler来修改饱和策略。

1.2、Abort策略

ThreadPoolExecutor的 默认策略,新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1@Test
2    public void testPolicy() {
3        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4                5,
5                0,
6                TimeUnit.MICROSECONDS,
7                new ArrayBlockingQueue<Runnable>(5));
8        //设置饱和策略为AbortPolicy
9        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
10        //测试
11        for (int i = 1; i <= 11; i++) {
12
13            int count = i;
14            executor.execute(new Runnable() {
15                @Override
16                public void run() {
17                    //模拟耗时操作
18                    for (int j = 0; j < 100000; j++) {
19                        //...
20                    }
21                    System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22                }
23            });
24
25
26        }
27    }
28
29

运行后,由于设置了核心池线程数为5,有界队列能存储的线程数为5,故一共只有10个任务能被执行,多余的那1个任务将被以异常的形式抛出。
Java并发编程(7)-ThreadPoolExecutor的饱和策略

1.3、CallerRuns策略

既不抛弃任务也不抛出异常,而是将某些任务回退到调用者,让调用者去执行它。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1@Test
2   public void testPolicy() {
3       ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4               5,
5               0,
6               TimeUnit.MICROSECONDS,
7               new ArrayBlockingQueue<Runnable>(5));
8       //设置饱和策略为AbortPolicy
9       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
10       //测试
11       for (int i = 1; i <= 11; i++) {
12
13           int count = i;
14           executor.execute(new Runnable() {
15               @Override
16               public void run() {
17                   //模拟耗时操作
18                   for (int j = 0; j < 100000; j++) {
19                       //...
20                   }
21                   System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22               }
23           });
24
25
26       }
27   }
28
29

从执行结果中可以看到,多出来的那11个任务被executor的调用者main主线程执行。
Java并发编程(7)-ThreadPoolExecutor的饱和策略

1.4、Discard策略

多余的任务被抛弃,不执行。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1@Test
2   public void testPolicy() {
3       ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4               5,
5               0,
6               TimeUnit.MICROSECONDS,
7               new ArrayBlockingQueue<Runnable>(5));
8       //设置饱和策略为DiscardPolicy
9       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
10       //测试
11       for (int i = 1; i <= 11; i++) {
12
13           int count = i;
14           executor.execute(new Runnable() {
15               @Override
16               public void run() {
17                   //模拟耗时操作
18                   for (int j = 0; j < 100000; j++) {
19                       //...
20                   }
21                   System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22               }
23           });
24
25
26       }
27   }
28
29

Java并发编程(7)-ThreadPoolExecutor的饱和策略

1.5、DiscardOldest策略

抛弃工作队列中的旧任务,然后尝试提交新任务执行。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1@Test
2   public void testPolicy() {
3       ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
4               5,
5               0,
6               TimeUnit.MICROSECONDS,
7               new ArrayBlockingQueue<Runnable>(5));
8       //设置饱和策略为DiscardOldestPolicy
9       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
10       //测试
11       for (int i = 1; i <= 11; i++) {
12
13           int count = i;
14           executor.execute(new Runnable() {
15               @Override
16               public void run() {
17                   //模拟耗时操作
18                   for (int j = 0; j < 100000; j++) {
19                       //...
20                   }
21                   System.out.println("这是第" + count + "个线程在执行任务----" + Thread.currentThread().getName());
22               }
23           });
24
25
26       }
27   }
28
29

从执行结果可以看出,第11个任务被执行了,说明工作队列中的最旧任务被抛弃,执行了这个新的任务。
Java并发编程(7)-ThreadPoolExecutor的饱和策略

二、ThreadPoolExecutor的扩展方法

ThreadPoolExecutor在设计的时候就支持扩展其方法,主要为前处理beforeExecute、后处理afterExecute以及执行完毕处理terminated。

自定义的ThreadPoolExecutor:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1public class MyThreadPoolExecutor extends ThreadPoolExecutor {
2    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
3        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
4    }
5
6    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
7        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
8    }
9
10    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
11        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
12    }
13
14    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
15        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
16    }
17
18
19    @Override
20    protected void beforeExecute(Thread t, Runnable r) {
21        super.beforeExecute(t, r);
22        //before execute....
23        System.out.println("before execute....");
24    }
25
26    @Override
27    protected void afterExecute(Runnable r, Throwable t) {
28        super.afterExecute(r, t);
29        //after execute....
30        System.out.println("after execute....");
31    }
32
33    @Override
34    protected void terminated() {
35        super.terminated();
36        //terminated
37        System.out.println("terminated");
38    }
39}
40
41

Java并发编程(7)-ThreadPoolExecutor的饱和策略

给TA打赏
共{{data.count}}人
人已打赏
安全技术

15 个有趣的 JavaScript 与 CSS 库

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索