Java并发编程(5)-Executor线程调度框架的使用

释放双眼,带上耳机,听听看~!

文章目录

  • 一、Executor线程调度框架

  • 1.1、什么是线程调度框架
    * 1.2、Executors
    * 1.3、Executor
    * 1.4、ExecutorService
    * 1.5、ScheduledThreadPoolExecutor

    
    
    1
    2
    1  * 二、各类线程池
    2
  • 2.1、newFixedThreadPool
    * 2.2、newCachedThreadPool
    * 2.3、newScheduledThreadPool
    * 2.4、newSingleThreadExecutor

    
    
    1
    2
    1  * 三、Runnable和Callable< T >任务调度接口
    2
  • 3.1、Runnable接口
    * 3.2、Callable< T >接口和Future接口

    
    
    1
    2
    1  * 四、使用线程调度框架构造一个并发程序
    2
  • 4.1、单线程模式
    * 4.2、多线程调度模式-使用Runnable
    * 4.3、多线程调度模式-使用Callable< T >

    
    
    1
    2
    3
    1  * 五、ExecutorCompletionService
    2  * 六、ExecutorService的生命周期管理
    3
  • 6.1、shutdown方法
    * 6.2、shutdownNow方法
    * 6.3、isShutdown方法
    * 6.4、isTerminated方法
    * 6.5、awaitTermination方法



Executor框架是在JDK5之后引入的一种线程调度类的集合,这些类的集合可以被统称为线程调度框架,本文将介绍Executor线程调度框架、各类线程池、Runnable和Callable< T >任务调度接口以及如何使用线程调度框架构造一个并发程序,最后介绍一下ExecutorCompletionService。
本文总结自《Java并发编程实践》第六章 任务执行 ,以及一些相关博客。

一、Executor线程调度框架

1.1、什么是线程调度框架

Executor框架是在JDK5之后引入的一种线程调度类的集合,这些类的集合可以被统称为线程调度框架,在这个框架体系中,任务的执行最小单元不是线程(Thread),而是任务(Task);而Executor可视为任务的执行器,它的作用是执行任务,进行任务的调度。常用的线程调度类和接口有:
Executors工厂类Executor接口ExecutorService接口ExecutorCompletionService类Runnable任务接口Callable< T >响应式任务接口Future< T >任务状态类newFixedThreadPool定长线程池newCachedThreadPool可缓存线程池newSingleThreadExecutor单一线程池newScheduledThreadPool可调度线程池。以上可简单地用结构图表示:
Java并发编程(5)-Executor线程调度框架的使用

1.2、Executors

提供了一系列静态工厂方法用于创建各种线程池,可返回一个Executor对象或者ExecutorService对象,用以执行任务。

1.3、Executor

一个接口,其定义了一个接收Runnable对象的方法executor(Runnable command),用以执行Runabl任务。

1.4、ExecutorService

继承于Executor,是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。

1.5、ScheduledThreadPoolExecutor

ScheduledExecutorService的实现,一个可定时调度任务的线程池,由Executors.newScheduledThreadPool返回,可用来代替Timer进行定时任务调度。

二、各类线程池

2.1、newFixedThreadPool

创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。原理是有界的阻塞队列。


1
2
3
4
5
6
7
8
9
10
11
1
2
3public static ExecutorService newFixedThreadPool(int nThreads) {
4    return new ThreadPoolExecutor(nThreads, nThreads,
5                                  0L, TimeUnit.MILLISECONDS,
6                                  //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
7                                  new LinkedBlockingQueue&lt;Runnable&gt;());
8}
9
10
11

2.2、newCachedThreadPool

创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程。


1
2
3
4
5
6
7
8
9
10
11
1
2
3public static ExecutorService newCachedThreadPool() {
4    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
5                                  60L, TimeUnit.SECONDS,
6                                  //使用同步队列,将任务直接提交给线程
7                                  new SynchronousQueue&lt;Runnable&gt;());
8}
9
10
11

2.3、newScheduledThreadPool

创建一个可延迟执行或定期执行的线程池。


1
2
3
4
5
6
7
8
9
10
11
12
13
1public class HeartBeat {
2    public static void main(String[] args) {
3        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
4        Runnable task = new Runnable() {
5            public void run() {
6                System.out.println(&quot;HeartBeat.........................&quot;);
7            }
8        };
9        executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS);   //5秒后第一次执行,之后每隔3秒执行一次
10    }
11}
12
13

2.4、newSingleThreadExecutor

创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务。


1
2
3
4
5
6
7
8
9
1public static ExecutorService newSingleThreadExecutor() {
2   return new FinalizableDelegatedExecutorService
3                     //corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
4                        (new ThreadPoolExecutor(1, 1,
5                                                0L, TimeUnit.MILLISECONDS,
6                                                new LinkedBlockingQueue&lt;Runnable&gt;()));
7}
8
9

三、Runnable和Callable< T >任务调度接口

3.1、Runnable接口

每个任务调度接口都是另起一个线程作业,其中Runnable接口中的run方法不能返回参数,所以也称为无响应任务接口,结合上面的内容,我们来看以下简单的使用:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1@Test
2        public void demo1(){
3            //使用Executors工厂类创建含10个线程的Executor执行器
4            Executor executor = Executors.newFixedThreadPool(10);
5            //创建一个无响应式任务
6            Runnable task = new Runnable(){
7                public void run(){
8                    //具体任务实现
9                    System.out.println(&quot;任务已执行&quot;);
10                }
11            };
12            //执行任务
13            executor.execute(task);
14        }
15
16

结果:
Java并发编程(5)-Executor线程调度框架的使用

3.2、Callable< T >接口和Future接口

Callable< T >接口用以实现有返回值的任务,需要注意的是,应该声明ExecutorService 接口类型来提交callable任务,并且获得了当前任务的状态声明Future对象,通过它获取到任务返回的值,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1 @Test
2    public void demo2() throws ExecutionException, InterruptedException {
3        //使用Executors工厂类创建含10个线程的ExecutorService执行器
4        ExecutorService executor = Executors.newFixedThreadPool(10);
5        //创建一个响应式任务
6        Callable&lt;Integer&gt; task = new Callable&lt;Integer&gt;(){
7            public Integer call(){
8                //执行有返回值的任务
9                int i = 0;
10                return ++i;
11            }
12        };
13        //提交任务,获得任务状态类Futrue
14        Future&lt;Integer&gt; future = executor.submit(task);
15        //获得Future中的任务返回的响应值
16        Integer response = future.get();
17        System.out.println(&quot;任务执行,并返回了:&quot;+response);
18    }
19
20

Java并发编程(5)-Executor线程调度框架的使用

四、使用线程调度框架构造一个并发程序

4.1、单线程模式

结合上述知识,我们可以构造一个简单的并发程序了。设想这样一个情景:小明早上起床了,他去煮水。。。
如果是单线程程序,比如只用main()方法执行,小明只能等水煮完了,才能去看书,听歌,如以下代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1  public static void main(String[] args) {
2            //第一个循环执行10次,其中i表示小明正在煮水的进度
3        for (int i = 0; i &lt; 10; i++) {
4            System.out.println(&quot;煮水中:&quot;+i);
5        }
6
7        //第二个循环,表示看书
8        for (int i = 0; i &lt; 5; i++) {
9            System.out.println(&quot;看书中:&quot;+i);
10        }
11
12        //第三个循环,表示听音乐
13        for (int i = 0; i &lt; 5; i++) {
14            System.out.println(&quot;听歌中: &quot;+i);
15        }
16
17        //...
18    }
19
20

Java并发编程(5)-Executor线程调度框架的使用

4.2、多线程调度模式-使用Runnable

上面的就是典型的单线程模式了,所有工作都要从头执行到尾,可是在现实中,小明可以变煮水边看书边听歌,这样就不会浪费时间,所以这里就要开启至少3个线程去做这3件事情:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1@Test
2   public void demo3() throws ExecutionException, InterruptedException {
3       //使用Executors工厂类创建含3个线程的ExecutorService执行器
4       ExecutorService executor = Executors.newFixedThreadPool(3);
5
6       //创建一个任务1-煮水
7       Runnable task1 = new Runnable(){
8           public void run(){
9               for (int i = 0; i &lt; 10; i++) {
10                   System.out.println(Thread.currentThread().getName()+&quot;煮水中: &quot;+i);
11               }
12           }
13       };
14
15       //创建一个任务2-看书
16       Runnable task2 = new Runnable(){
17           public void run(){
18               for (int i = 0; i &lt; 5; i++) {
19                   System.out.println(Thread.currentThread().getName()+&quot;看书中: &quot;+i);
20               }
21           }
22       };
23
24       //创建一个任务3-听歌
25       Runnable task3 = new Runnable(){
26           public void run(){
27               for (int i = 0; i &lt; 5; i++) {
28                   System.out.println(Thread.currentThread().getName()+&quot;听歌中: &quot;+i);
29               }
30           }
31       };
32
33
34       //提交3个任务给线程池去分配线程执行
35       executor.execute(task1);
36       executor.execute(task2);
37       executor.execute(task3);
38
39   }
40
41

Java并发编程(5)-Executor线程调度框架的使用

可以看出,Executor线程调度框架分配了3个线程去分别执行了这3个任务,使得程序变成了并发的,提高了工作的效率。

4.3、多线程调度模式-使用Callable< T >

当然,我们也可以使用Callable< T >任务接口来完成上述的多线程调度例子:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1 @Test
2    public void fun14() throws ExecutionException, InterruptedException {
3        //使用Executors工厂类创建含3个线程的ExecutorService执行器
4        ExecutorService executor = Executors.newFixedThreadPool(3);
5
6        //创建一个任务1-煮水
7        Callable&lt;Integer&gt; task1 = new Callable&lt;Integer&gt;(){
8            public Integer call(){
9                for (int i = 0; i &lt; 10; i++) {
10                    System.out.println(Thread.currentThread().getName()+&quot;煮水中 :&quot;+i);
11                }
12                return null;
13            }
14        };
15
16
17        //创建一个任务2-看书
18        Callable&lt;Integer&gt; task2 = new Callable&lt;Integer&gt;(){
19
20            @Override
21            public Integer call(){
22                for (int i = 0; i &lt; 5; i++) {
23                    System.out.println(Thread.currentThread().getName()+&quot;看书中 :&quot;+i);
24                }
25                    return null;
26            }
27
28        };
29
30        //创建一个任务3-听歌
31        Callable&lt;Integer&gt; task3 = new Callable&lt;Integer&gt;(){
32
33            @Override
34            public Integer call(){
35                for (int i = 0; i &lt; 5; i++) {
36                    System.out.println(Thread.currentThread().getName()+&quot;听歌中 :&quot;+i);
37                }
38                return null;
39            }
40
41        };
42
43        //提交3个任务给线程池去分配线程执行并获得线程状态对象
44        Future&lt;Integer&gt; future1 = executor.submit(task1);
45        Future&lt;Integer&gt; future2 = executor.submit(task2);
46        Future&lt;Integer&gt; future3 = executor.submit(task3);
47    }
48
49

五、ExecutorCompletionService

实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果Future,然后调用Future的get方法即可获得响应的结果了。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1@Test
2    public void demo4() throws InterruptedException, ExecutionException {
3        CopyOnWriteArrayList&lt;Object&gt; list = new CopyOnWriteArrayList&lt;&gt;();
4            //获得含有缓存线程池的Executor
5        ExecutorService executor = Executors.newCachedThreadPool();
6        //创建一个ExecutorCompletionService,将executor交给它管理
7        ExecutorCompletionService completionService = new ExecutorCompletionService(executor);
8        for (int i = 0; i &lt; 10; i++) {
9            int result = i;
10            //创建一个响应式任务,循环执行10次
11            Callable&lt;Object&gt; task = new Callable&lt;Object&gt;(){
12                public Object call() throws InterruptedException {
13                    return result;
14                }
15            };
16            //提交任务
17            completionService.submit(task);
18            //将结果放入集合中
19            Object o = completionService.take().get();
20            list.add((Integer)o);
21        }
22
23        //将集合遍历
24        for (Object o : list) {
25            System.out.print((Integer)o+&quot;\t&quot;);
26        }
27
28
29    }
30
31

Java并发编程(5)-Executor线程调度框架的使用

六、ExecutorService的生命周期管理

ExecutorService继承自Executor接口,定义了一些生命周期的方法:


1
2
3
4
5
6
7
8
9
10
1  public interface ExecutorService extends Executor {  
2    void shutdown();  
3    List&lt;Runnable&gt; shutdownNow();  
4    boolean isShutdown();  
5    boolean isTerminated();  
6    boolean awaitTermination(long timeout, TimeUnit unit)  
7            throws InterruptedException;  
8    }  
9
10

6.1、shutdown方法

这个方法用于会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务。

6.2、shutdownNow方法

这个方法会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个List列表,列表中返回的是等待在工作队列中的任务。

6.3、isShutdown方法

这个方法在ExecutorService关闭后返回true,否则返回false。

6.4、isTerminated方法

这个方法会校验ExecutorService当前的状态是否为“TERMINATED”即关闭状态,当为关闭状态时时返回true否则返回false。

6.5、awaitTermination方法

这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。

给TA打赏
共{{data.count}}人
人已打赏
安全技术

基于Node.js的自动化构建工具Grunt.js

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索