导读: 线程在java里面很多,本文不会介绍线程使用,只会介绍大型项目线程组件的开发模式。
一个大型项目中少不了对多线程的使用,在使用多线程的时候,可以使用Java 的API提供的Thread或者Runnable。随着API的丰富和发展,对程序员来说使用和管理线程也变得越来越方便了。例如我们利用线程池来管理线程,很常见的做法就是:
1
2
3
4
5
6
7 1 public void testThread() {
2 ExecutorService executorService= Executors.newSingleThreadExecutor();
3 executorService.execute(
4 ()-> System.out.println("线程任务"));
5 }
6
7
但是在实战项目中,我们需要对项目中的线程进行管理、和控制是具有严格,通常我们将线程的产生,都集中到一个地方,通过严格的参数控制,对外提供线程对象。工厂模式就很适合这种情况,在jdk1.5以上,提供了threadfactory 这个API,所以项目中我们可能会有这样的写法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 1//
2// Source code recreated from a .class file by IntelliJ IDEA
3// (powered by Fernflower decompiler)
4//
5
6package com.twjitm.threads.thread;
7
8import java.util.concurrent.ThreadFactory;
9import java.util.concurrent.atomic.AtomicInteger;
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12
13public class NettyThreadNameFactory implements ThreadFactory {
14 Logger logger;
15 private String threadName;
16 private ThreadGroup threadGroup;
17 private AtomicInteger threadNumber;
18 private boolean daemon;
19
20 public NettyThreadNameFactory(String threadName) {
21 this(threadName, false);
22 this.threadName = threadName;
23 }
24
25 public NettyThreadNameFactory(String threadName, boolean daemon) {
26 this.logger = LoggerFactory.getLogger(NettyThreadNameFactory.class);
27 this.threadNumber = new AtomicInteger(0);
28 this.threadName = threadName;
29 this.daemon = daemon;
30 SecurityManager s = System.getSecurityManager();
31 this.threadGroup = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
32 this.threadName = threadName + "-thread-";
33 this.daemon = daemon;
34 }
35
36 public Thread newThread(Runnable r) {
37 Thread thread = new Thread(this.threadGroup, r, this.threadName + this.threadNumber.getAndIncrement(), 0L);
38 if (this.daemon) {
39 thread.setDaemon(this.daemon);
40 } else {
41 if (thread.isDaemon()) {
42 thread.setDaemon(false);
43 }
44
45 if (thread.getPriority() != 5) {
46 thread.setPriority(5);
47 }
48 }
49
50 if (this.logger.isDebugEnabled()) {
51 this.logger.debug("创建线程:" + this.threadName);
52 }
53
54 return thread;
55 }
56
57 public String getThreadName() {
58 return this.threadName;
59 }
60}
61
62
63
可以控制线程的数量,线程名称,线程是否为守护线程。对外使用时如在启动netty服务器的时候,可以通过如下方式进行设置。
统一线程任务:
在大型项目系统中,为了统一代码风格,会将线程执行的任务封装到一个任务队列中去执行,,任务队列有很多方式,比如有序任务队列,无序任务队列等。这里我们主要介绍,通过扩展ThreadPoolExecutor 来实现有序队列和无序队列。
首先我们需要定义任务队列实体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 1package com.twjitm.threads.thread.task;
2
3import java.util.concurrent.BlockingQueue;
4import java.util.concurrent.LinkedBlockingQueue;
5
6/**
7 * @author twjitm - [Created on 2018-08-23 17:24]
8 * @company https://github.com/twjitm/
9 * @jdk java version "1.8.0_77"
10 */
11public class NettyTaskQueue<V> {
12
13 private boolean comple = true;
14 /**
15 * 任务队列
16 */
17 BlockingQueue<V> tasksQueue = new LinkedBlockingQueue<V>();
18
19 /**
20 * 下一执行命令
21 *
22 * @return
23 */
24 public V poll() {
25 return tasksQueue.poll();
26 }
27
28 /**
29 * 增加执行指令
30 *
31 * @param
32 * @return
33 */
34 public boolean add(V value) {
35 return tasksQueue.add(value);
36
37 }
38
39 /**
40 * 清理
41 */
42 public void clear() {
43 tasksQueue.clear();
44 }
45
46 /**
47 * 获取指令数量
48 *
49 * @return
50 */
51 public int size() {
52 return tasksQueue.size();
53 }
54
55 public boolean isComple() {
56 return comple;
57 }
58
59 public void setComple(boolean comple) {
60 this.comple = comple;
61 }
62
63}
64
65
66
定义一个任务实体队列。NettyOrderTaskQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 1package com.twjitm.threads.thread.task;
2
3import java.util.concurrent.ConcurrentHashMap;
4
5/**
6 * @author twjitm - [Created on 2018-08-23 17:36]
7 * @company https://github.com/twjitm/
8 * @jdk java version "1.8.0_77"
9 * 有序队列
10 */
11public class NettyOrderTaskQueue<K, V> {
12
13 private final ConcurrentHashMap<K, NettyTaskQueue<V>> taskOrderQueue = new ConcurrentHashMap<K, NettyTaskQueue<V>>();
14
15 /**
16 * 获得任务队列
17 *
18 * @param key
19 * @return
20 */
21 public NettyTaskQueue<V> getTasksQueue(K key) {
22 NettyTaskQueue<V> queue = taskOrderQueue.get(key);
23
24 if (queue == null) {
25 NettyTaskQueue<V> newQueue = new NettyTaskQueue<V>();
26 queue = taskOrderQueue.putIfAbsent(key, newQueue);
27 if (queue == null) {
28 queue = newQueue;
29 }
30 }
31
32 return queue;
33 }
34
35 /**
36 * 获得全部任务队列
37 *
38 * @return
39 */
40 public ConcurrentHashMap<K, NettyTaskQueue<V>> getTasksQueues() {
41 return taskOrderQueue;
42 }
43
44 /**
45 * 移除任务队列
46 *
47 * @return
48 */
49 public void removeTasksQueue(K key) {
50 taskOrderQueue.remove(key);
51 }
52
53}
54
55
56
通过ConcurrentHashMap 的独有特性,将任务保存到map中。
为了使用我们定义的任务队列,上面也说到,通过扩展ThreadPoolExecutor 来实现任务调度。因此,编写NettyOrderThreadPoolExecutor类来处理这样的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 1package com.twjitm.threads.common.executor;
2
3import com.twjitm.threads.common.logs.LoggerFactory;
4import com.twjitm.threads.entity.AbstractNettyTask;
5import com.twjitm.threads.thread.NettyThreadNameFactory;
6import com.twjitm.threads.thread.task.NettyOrderTaskQueue;
7import com.twjitm.threads.thread.task.NettyTaskQueue;
8import org.slf4j.Logger;
9
10import java.util.concurrent.*;
11import java.util.concurrent.locks.ReentrantLock;
12
13/**
14 *
15 * 扩展ThreadPoolExecutor 实现有序队列执行
16 *
17 * @author twjitm - [Created on 2018-08-23 17:45]
18 * @company https://github.com/twjitm/
19 * @jdk java version "1.8.0_77"
20 * <p>
21 * https://blog.csdn.net/u013256816/article/details/50403962
22 *
23 */
24public class NettyOrderThreadPoolExecutor extends ThreadPoolExecutor {
25 private Logger logger = LoggerFactory.logger;
26 private ReentrantLock lock = new ReentrantLock();
27
28 private int maxTasExecutorSize;
29
30 private NettyThreadNameFactory threadNameFactory;
31
32 public NettyOrderThreadPoolExecutor(String threadName, int corePoolSize, int maxTasExecutorSize) {
33
34 super(corePoolSize, maxTasExecutorSize * 2, 30,
35 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NettyThreadNameFactory(threadName));
36
37 this.maxTasExecutorSize = maxTasExecutorSize;
38 this.threadNameFactory = (NettyThreadNameFactory) getThreadFactory();
39
40 }
41
42 public NettyOrderThreadPoolExecutor(String threadName, int corePoolSize, int maxPollSize, int maxTasExecutorSize, RejectedExecutionHandler rejectedExecutionHandler) {
43
44 super(corePoolSize, maxPollSize, 30,
45 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
46 new NettyThreadNameFactory(threadName), rejectedExecutionHandler);
47
48 this.maxTasExecutorSize = maxTasExecutorSize;
49 this.threadNameFactory = (NettyThreadNameFactory) getThreadFactory();
50
51
52 }
53
54 public NettyOrderThreadPoolExecutor(String threadName, int corePoolSize, int maxTasExecutorSize, RejectedExecutionHandler rejectedExecutionHandler) {
55
56 super(corePoolSize, maxTasExecutorSize * 2, 30,
57 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
58 rejectedExecutionHandler);
59
60 this.maxTasExecutorSize = maxTasExecutorSize;
61 this.threadNameFactory = (NettyThreadNameFactory) getThreadFactory();
62 }
63
64
65 public NettyOrderThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
66 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
67 this.threadNameFactory = (NettyThreadNameFactory) getThreadFactory();
68 this.maxTasExecutorSize = maximumPoolSize;
69 }
70
71 NettyOrderTaskQueue<Long, AbstractNettyTask> poll = new NettyOrderTaskQueue<Long, AbstractNettyTask>();
72
73
74 /**
75 * 添加一个任务到队列里面
76 *
77 * @param taskId
78 * @param task
79 * @return
80 */
81 public boolean addTask(long taskId, AbstractNettyTask task) {
82 boolean run = false;
83 boolean result = false;
84 NettyTaskQueue<AbstractNettyTask> queue = poll.getTasksQueue(taskId);
85 lock.lock();
86 if (maxTasExecutorSize > 0) {
87 if (queue.size() > maxTasExecutorSize) {
88 if (logger.isWarnEnabled()) {
89 logger.warn("队列" + threadNameFactory.getThreadName() + "(" + taskId + ")" + "超过最大队列大小设置!");
90 }
91 }
92 result = queue.add(task);
93 if (result) {
94 task.setTaskBlockingQueue(queue);
95 if (queue.isComple()) {
96 queue.setComple(false);
97 run = true;
98 }
99 } else {
100 logger.info(" ADD TASK ERROR");
101 }
102 if (run) {
103 execute(task);
104 }
105 }
106 lock.unlock();
107 return result;
108 }
109
110
111 @Override
112 protected void afterExecute(Runnable r, Throwable t) {
113 super.afterExecute(r, t);
114
115 AbstractNettyTask work = (AbstractNettyTask) r;
116 NettyTaskQueue<AbstractNettyTask> queue = work.getTaskBlockingQueue();
117 if (queue != null) {
118 AbstractNettyTask afterWork = null;
119 synchronized (queue) {
120 afterWork = queue.poll();
121 if (afterWork == null) {
122 queue.setComple(true);
123 }
124 }
125 if (afterWork != null) {
126 execute(afterWork);
127 }
128 } else {
129 logger.error("执行队列为空");
130 }
131 }
132}
133
134
135
在上诉代码中,我们看到一个新的类AbstractTask,这是一个抽象的任务,需要自己去实现任务中所具体的业务逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1package com.twjitm.threads.entity;
2
3import com.twjitm.threads.thread.task.NettyTaskQueue;
4
5/**
6 * @author twjitm - [Created on 2018-08-23 17:33]
7 * @company https://github.com/twjitm/
8 * @jdk java version "1.8.0_77"
9 * 抽象任务实体
10 */
11public abstract class AbstractNettyTask implements Runnable {
12 private NettyTaskQueue<AbstractNettyTask> taskBlockingQueue;
13
14 public NettyTaskQueue<AbstractNettyTask> getTaskBlockingQueue() {
15 return taskBlockingQueue;
16 }
17
18 public void setTaskBlockingQueue(NettyTaskQueue<AbstractNettyTask> taskBlockingQueue) {
19 this.taskBlockingQueue = taskBlockingQueue;
20 }
21}
22
23
24
可能细心的同学看到,代码中还有一个重要的变量, private ReentrantLock lock = new ReentrantLock();;对,就是这个,关于他的使用方法和相关知识,不太清楚的同学可以去这看看reentrantlock .
在往队列中添加任务的时候我们需要采用同步锁机制。保证每个任务都是有序执行。
有了同步执行队列的基础,在实现无序队列其实相对也就简单很多。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1package com.twjitm.threads.common.executor;
2
3import com.twjitm.threads.entity.AbstractNettyTask;
4import com.twjitm.threads.thread.NettyThreadNameFactory;
5
6import java.util.concurrent.*;
7
8/**
9 * @author twjitm - [Created on 2018-08-24 15:43]
10 * @company https://github.com/twjitm/
11 * @jdk java version "1.8.0_77"
12 * 无序队列执行器
13 */
14public class NettyUnorderThreadPollExecutor extends ThreadPoolExecutor {
15 public NettyUnorderThreadPollExecutor(int corePoolSize) {
16 super(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
17 new LinkedBlockingQueue<Runnable>());
18 }
19
20 public NettyUnorderThreadPollExecutor(String name, int corePoolSize) {
21 super(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
22 new LinkedBlockingQueue<Runnable>(), new NettyThreadNameFactory(name));
23 }
24
25 public NettyUnorderThreadPollExecutor(String name, int corePoolSize, int maxPoolSize) {
26 super(corePoolSize, maxPoolSize, 30, TimeUnit.SECONDS,
27 new LinkedBlockingQueue<Runnable>(), new NettyThreadNameFactory(name));
28 }
29
30 public NettyUnorderThreadPollExecutor(String name, int corePoolSize, int maxSize, RejectedExecutionHandler rejectedExecutionHandler) {
31 super(corePoolSize, maxSize, 30, TimeUnit.SECONDS,
32 new LinkedBlockingQueue<Runnable>(), new NettyThreadNameFactory(name), rejectedExecutionHandler);
33 }
34
35
36 public NettyUnorderThreadPollExecutor(String name, int corePoolSize, int maxSize, BlockingQueue blockingQueue, RejectedExecutionHandler rejectedExecutionHandler) {
37 super(corePoolSize, maxSize, 30, TimeUnit.SECONDS,
38 blockingQueue, new NettyThreadNameFactory(name), rejectedExecutionHandler);
39 }
40
41
42 public void executeTask(AbstractNettyTask task) {
43 super.execute(task);
44 }
45}
46
47
48
在上面的代码中,构造函数中采用的线程池异常处理方式利用默认的方式,也就是RejectedExecutionHandler 。当然,我们可以完全重写它来实现我们自定义拒绝策略。
例如在项目中肯能会看到这样的代码。
每一种处理方式有各自的区别。
- AbortPolicy /*丢弃/
- BlockingPolicy /阻塞/
- CallerRunsPolicy /直接运行/
- DiscardOldestPolicy /抛弃老的/
- DiscardPolicy /删除/
如博主这样编写的一个直接丢弃的方式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1package com.twjitm.threads.thread.policy;
2
3
4import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6
7import java.util.concurrent.ThreadPoolExecutor;
8
9/**
10 * Created by IntelliJ IDEA.
11 * User: 文江 Date: 2018/8/19 Time: 10:52
12 * https://blog.csdn.net/baidu_23086307
13 * 队列满抛出异常
14 */
15public class AbortPolicy extends ThreadPoolExecutor.AbortPolicy {
16 private String threadName;
17 private Logger logger = LoggerFactory.getLogger(AbortPolicy.class);
18
19 public AbortPolicy() {
20 this(null);
21 }
22
23 public AbortPolicy(String threadName) {
24 this.threadName = threadName;
25 }
26
27 @Override
28 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
29 if (threadName != null) {
30 logger.error("THREAD POOL " + threadName + " IS EXHAUSTED, EXECUTOR=" + executor.toString());
31 }
32 String msg = String.format("Server["
33 + " THREAD NAME: %s, POOL SIZE: %d (ACTIVE: %d, CORE: %d, MAX: %d, LARGEST: %d), TASK: %d (COMPLETED: %d),"
34 + " EXECUTOR STATUS:(IS SHUTDOWN:%s, IS TERMINATED:%s, IS TERMINATING:%s)]",
35 threadName, executor.getPoolSize(), executor.getActiveCount(), executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getLargestPoolSize(),
36 executor.getTaskCount(), executor.getCompletedTaskCount(), executor.isShutdown(), executor.isTerminated(), executor.isTerminating());
37 logger.info(msg);
38 super.rejectedExecution(runnable, executor);
39 }
40}
41
42
43
总结:
线程使用规则在一个项目里面是一个重要的版块,如何实现线程的合理调度和使用,关系到系统性能和系统的稳定。我们只能不断的优化性能,提高cpu的使用效率,提高线程在系统中的效率,保证系统稳定高可用是一个长期的探索。