文章目录
-
一、Executor线程调度框架
-
1.1、什么是线程调度框架
* 1.2、Executors
* 1.3、Executor
* 1.4、ExecutorService
* 1.5、ScheduledThreadPoolExecutor1
21 * 二、各类线程池
2 -
2.1、newFixedThreadPool
* 2.2、newCachedThreadPool
* 2.3、newScheduledThreadPool
* 2.4、newSingleThreadExecutor1
21 * 三、Runnable和Callable< T >任务调度接口
2 -
3.1、Runnable接口
* 3.2、Callable< T >接口和Future接口1
21 * 四、使用线程调度框架构造一个并发程序
2 -
4.1、单线程模式
* 4.2、多线程调度模式-使用Runnable
* 4.3、多线程调度模式-使用Callable< T >1
2
31 * 五、ExecutorCompletionService
2 * 六、ExecutorService的生命周期管理
3 -
6.1、shutdown方法
* 6.2、shutdownNow方法
* 6.3、isShutdown方法
* 6.4、isTerminated方法
* 6.5、awaitTermination方法
Executor框架是在JDK5之后引入的一种线程调度类的集合,这些类的集合可以被统称为线程调度框架,本文将介绍Executor线程调度框架、各类线程池、Runnable和Callable< T >任务调度接口以及如何使用线程调度框架构造一个并发程序,最后介绍一下ExecutorCompletionService。
本文总结自《Java并发编程实践》第六章 任务执行 ,以及一些相关博客。
一、Executor线程调度框架
1.1、什么是线程调度框架
Executor框架是在JDK5之后引入的一种线程调度类的集合,这些类的集合可以被统称为线程调度框架,在这个框架体系中,任务的执行最小单元不是线程(Thread),而是任务(Task);而Executor可视为任务的执行器,它的作用是执行任务,进行任务的调度。常用的线程调度类和接口有:
Executors工厂类、Executor接口、ExecutorService接口、ExecutorCompletionService类、Runnable任务接口、Callable< T >响应式任务接口、Future< T >任务状态类、newFixedThreadPool定长线程池、newCachedThreadPool可缓存线程池、newSingleThreadExecutor单一线程池、newScheduledThreadPool可调度线程池。以上可简单地用结构图表示:
1.2、Executors
提供了一系列静态工厂方法用于创建各种线程池,可返回一个Executor对象或者ExecutorService对象,用以执行任务。
1.3、Executor
一个接口,其定义了一个接收Runnable对象的方法executor(Runnable command),用以执行Runabl任务。
1.4、ExecutorService
继承于Executor,是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
1.5、ScheduledThreadPoolExecutor
ScheduledExecutorService的实现,一个可定时调度任务的线程池,由Executors.newScheduledThreadPool返回,可用来代替Timer进行定时任务调度。
二、各类线程池
2.1、newFixedThreadPool
创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。原理是有界的阻塞队列。
1
2
3
4
5
6
7
8
9
10
11 1
2
3public static ExecutorService newFixedThreadPool(int nThreads) {
4 return new ThreadPoolExecutor(nThreads, nThreads,
5 0L, TimeUnit.MILLISECONDS,
6 //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
7 new LinkedBlockingQueue<Runnable>());
8}
9
10
11
2.2、newCachedThreadPool
创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程。
1
2
3
4
5
6
7
8
9
10
11 1
2
3public static ExecutorService newCachedThreadPool() {
4 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
5 60L, TimeUnit.SECONDS,
6 //使用同步队列,将任务直接提交给线程
7 new SynchronousQueue<Runnable>());
8}
9
10
11
2.3、newScheduledThreadPool
创建一个可延迟执行或定期执行的线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13 1public class HeartBeat {
2 public static void main(String[] args) {
3 ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
4 Runnable task = new Runnable() {
5 public void run() {
6 System.out.println("HeartBeat.........................");
7 }
8 };
9 executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS); //5秒后第一次执行,之后每隔3秒执行一次
10 }
11}
12
13
2.4、newSingleThreadExecutor
创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务。
1
2
3
4
5
6
7
8
9 1public static ExecutorService newSingleThreadExecutor() {
2 return new FinalizableDelegatedExecutorService
3 //corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
4 (new ThreadPoolExecutor(1, 1,
5 0L, TimeUnit.MILLISECONDS,
6 new LinkedBlockingQueue<Runnable>()));
7}
8
9
三、Runnable和Callable< T >任务调度接口
3.1、Runnable接口
每个任务调度接口都是另起一个线程作业,其中Runnable接口中的run方法不能返回参数,所以也称为无响应任务接口,结合上面的内容,我们来看以下简单的使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1@Test
2 public void demo1(){
3 //使用Executors工厂类创建含10个线程的Executor执行器
4 Executor executor = Executors.newFixedThreadPool(10);
5 //创建一个无响应式任务
6 Runnable task = new Runnable(){
7 public void run(){
8 //具体任务实现
9 System.out.println("任务已执行");
10 }
11 };
12 //执行任务
13 executor.execute(task);
14 }
15
16
结果:
3.2、Callable< T >接口和Future接口
Callable< T >接口用以实现有返回值的任务,需要注意的是,应该声明ExecutorService 接口类型来提交callable任务,并且获得了当前任务的状态声明Future对象,通过它获取到任务返回的值,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1 @Test
2 public void demo2() throws ExecutionException, InterruptedException {
3 //使用Executors工厂类创建含10个线程的ExecutorService执行器
4 ExecutorService executor = Executors.newFixedThreadPool(10);
5 //创建一个响应式任务
6 Callable<Integer> task = new Callable<Integer>(){
7 public Integer call(){
8 //执行有返回值的任务
9 int i = 0;
10 return ++i;
11 }
12 };
13 //提交任务,获得任务状态类Futrue
14 Future<Integer> future = executor.submit(task);
15 //获得Future中的任务返回的响应值
16 Integer response = future.get();
17 System.out.println("任务执行,并返回了:"+response);
18 }
19
20
四、使用线程调度框架构造一个并发程序
4.1、单线程模式
结合上述知识,我们可以构造一个简单的并发程序了。设想这样一个情景:小明早上起床了,他去煮水。。。
如果是单线程程序,比如只用main()方法执行,小明只能等水煮完了,才能去看书,听歌,如以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1 public static void main(String[] args) {
2 //第一个循环执行10次,其中i表示小明正在煮水的进度
3 for (int i = 0; i < 10; i++) {
4 System.out.println("煮水中:"+i);
5 }
6
7 //第二个循环,表示看书
8 for (int i = 0; i < 5; i++) {
9 System.out.println("看书中:"+i);
10 }
11
12 //第三个循环,表示听音乐
13 for (int i = 0; i < 5; i++) {
14 System.out.println("听歌中: "+i);
15 }
16
17 //...
18 }
19
20
4.2、多线程调度模式-使用Runnable
上面的就是典型的单线程模式了,所有工作都要从头执行到尾,可是在现实中,小明可以变煮水边看书边听歌,这样就不会浪费时间,所以这里就要开启至少3个线程去做这3件事情:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1@Test
2 public void demo3() throws ExecutionException, InterruptedException {
3 //使用Executors工厂类创建含3个线程的ExecutorService执行器
4 ExecutorService executor = Executors.newFixedThreadPool(3);
5
6 //创建一个任务1-煮水
7 Runnable task1 = new Runnable(){
8 public void run(){
9 for (int i = 0; i < 10; i++) {
10 System.out.println(Thread.currentThread().getName()+"煮水中: "+i);
11 }
12 }
13 };
14
15 //创建一个任务2-看书
16 Runnable task2 = new Runnable(){
17 public void run(){
18 for (int i = 0; i < 5; i++) {
19 System.out.println(Thread.currentThread().getName()+"看书中: "+i);
20 }
21 }
22 };
23
24 //创建一个任务3-听歌
25 Runnable task3 = new Runnable(){
26 public void run(){
27 for (int i = 0; i < 5; i++) {
28 System.out.println(Thread.currentThread().getName()+"听歌中: "+i);
29 }
30 }
31 };
32
33
34 //提交3个任务给线程池去分配线程执行
35 executor.execute(task1);
36 executor.execute(task2);
37 executor.execute(task3);
38
39 }
40
41
可以看出,Executor线程调度框架分配了3个线程去分别执行了这3个任务,使得程序变成了并发的,提高了工作的效率。
4.3、多线程调度模式-使用Callable< T >
当然,我们也可以使用Callable< T >任务接口来完成上述的多线程调度例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1 @Test
2 public void fun14() throws ExecutionException, InterruptedException {
3 //使用Executors工厂类创建含3个线程的ExecutorService执行器
4 ExecutorService executor = Executors.newFixedThreadPool(3);
5
6 //创建一个任务1-煮水
7 Callable<Integer> task1 = new Callable<Integer>(){
8 public Integer call(){
9 for (int i = 0; i < 10; i++) {
10 System.out.println(Thread.currentThread().getName()+"煮水中 :"+i);
11 }
12 return null;
13 }
14 };
15
16
17 //创建一个任务2-看书
18 Callable<Integer> task2 = new Callable<Integer>(){
19
20 @Override
21 public Integer call(){
22 for (int i = 0; i < 5; i++) {
23 System.out.println(Thread.currentThread().getName()+"看书中 :"+i);
24 }
25 return null;
26 }
27
28 };
29
30 //创建一个任务3-听歌
31 Callable<Integer> task3 = new Callable<Integer>(){
32
33 @Override
34 public Integer call(){
35 for (int i = 0; i < 5; i++) {
36 System.out.println(Thread.currentThread().getName()+"听歌中 :"+i);
37 }
38 return null;
39 }
40
41 };
42
43 //提交3个任务给线程池去分配线程执行并获得线程状态对象
44 Future<Integer> future1 = executor.submit(task1);
45 Future<Integer> future2 = executor.submit(task2);
46 Future<Integer> future3 = executor.submit(task3);
47 }
48
49
五、ExecutorCompletionService
实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果Future,然后调用Future的get方法即可获得响应的结果了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1@Test
2 public void demo4() throws InterruptedException, ExecutionException {
3 CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
4 //获得含有缓存线程池的Executor
5 ExecutorService executor = Executors.newCachedThreadPool();
6 //创建一个ExecutorCompletionService,将executor交给它管理
7 ExecutorCompletionService completionService = new ExecutorCompletionService(executor);
8 for (int i = 0; i < 10; i++) {
9 int result = i;
10 //创建一个响应式任务,循环执行10次
11 Callable<Object> task = new Callable<Object>(){
12 public Object call() throws InterruptedException {
13 return result;
14 }
15 };
16 //提交任务
17 completionService.submit(task);
18 //将结果放入集合中
19 Object o = completionService.take().get();
20 list.add((Integer)o);
21 }
22
23 //将集合遍历
24 for (Object o : list) {
25 System.out.print((Integer)o+"\t");
26 }
27
28
29 }
30
31
六、ExecutorService的生命周期管理
ExecutorService继承自Executor接口,定义了一些生命周期的方法:
1
2
3
4
5
6
7
8
9
10 1 public interface ExecutorService extends Executor {
2 void shutdown();
3 List<Runnable> shutdownNow();
4 boolean isShutdown();
5 boolean isTerminated();
6 boolean awaitTermination(long timeout, TimeUnit unit)
7 throws InterruptedException;
8 }
9
10
6.1、shutdown方法
这个方法用于会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务。
6.2、shutdownNow方法
这个方法会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个List列表,列表中返回的是等待在工作队列中的任务。
6.3、isShutdown方法
这个方法在ExecutorService关闭后返回true,否则返回false。
6.4、isTerminated方法
这个方法会校验ExecutorService当前的状态是否为“TERMINATED”即关闭状态,当为关闭状态时时返回true否则返回false。
6.5、awaitTermination方法
这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。