分布式定时任务

释放双眼,带上耳机,听听看~!

分布式定时任务

1,什么是分布式定时任务;
2,为什么要采用分布式定时任务
3,怎么样设计实现一个分布式定时任务
4,当前比较流行的分布式定时任务框架

1,什么是分布式定时任务

  • 首先,我们要了解计划任务这个概念,计划任务是指由计划的定时运行或者周期性运行的程序。我们最常见的就是Linux的‘crontab’和Windows的‘计划任务’。

  • 那么什么是分布式定时任务,个人总结为:把分散的,可靠性差的计划任务纳入统一的平台,并实现集群管理调度和分布式部署的一种定时任务的管理方式。叫做分布式定时任务。

2,为什么要采用分布式定时任务

单点定时任务的缺点:

  • 功能相对简单,交互性差,任务部署效率低,开发和维护成本比较高,不能很好的满足各系统定时任务的管理和控制,尤其在多系统的环境下更加明显;
  • 许多任务都是单机部署,可用性差;
  • 任务跟踪和告警难以实现。

分布式定时任务的优势

  • 通过集群的方式进行管理调度,大大降低了开发和维护成本;
  • 分布式部署,保证了系统的高可用性,伸缩性,负载均衡,提高了容错;
  • 可以通过控制台部署和管理定时任务,方便灵活高效;
  • 任务都可以持久化到数据库,避免了宕机和数据丢失带来的隐患,同时有完善的任务失败重做机制和详细的任务跟踪及告警策略。

3,怎么样设计和实现一个分布式定时任务
3.1 分时方案

  • 严格划分时间片,交替运行计划任务,当主系统宕机后,备用系统仍然工作,但是处理初期被拉长了。
  • 缺点:周期延长了。

untitled.png

3.2 HA高可用方案:

  • 正常情况下主系统工作,备用系统守候,心跳检测发现主系统出现故障备用系统启动。
  • 缺点:单一系统,不能做负载均衡,只能垂直扩展,也就是硬件层面的升级,无法做水平扩展。

untitled1.png

3.3 多路心跳方案:

  • 采用多路心跳,做服务级,进程级的,IP和端口级别的心跳检测,正常情况是主系统工作,备用系统守候,心跳检测主系统出现故障,备用系统启动,当再次检测到主系统工作,则将执行权交回主系统。
  • 缺点:开发比较复杂,程序健壮性要求高。

untitled2.png

3.4 任务抢占方案:

  • A,B两台服务器同时工作,启动需要存在一前一后,谁先启动谁率先加锁,其他服务器只能等待,他们同时对互斥锁进行监控,一旦发现锁被释放,其他服务那个先抢到,那个运行,运行前加排他锁。
  • 优点:可以进一步实现多服务器横向扩展。
  • 缺点:开发复杂,程序健壮性要求高,有时候会出现不释放锁的问题。

untitled4.png

3.5 任务轮询或任务轮询+抢占排队方案

  • 每个服务器首次启动时加入队列;
  • 每次任务运行首先判断自己是否是当前可运行任务,如果是便运行;
  • 如果不是当前运行的任务,检查自己是否在队列中,如果在,便推出,如果不在队列中,便键入队列。

untitled5.png

通过以上这些方案,可以看出3.5的方案才是优先选择的,扩展性好,开发复杂度不是很高。那么这种方案需要的需要的技术原理是什么呢,那就是分布式互斥锁和队列。

3.6 原理

  • 分布式互斥锁:

互斥锁也叫排他锁,用于并发时管理多进程和多进程同一时刻只能有一个进程或者线程操作一个功能。我们将进程,线程中的锁延伸到互联网上,实现对一个节点运行的进程或线程加 锁,解锁操作。这样便能控制节点上的进程或线程的并发。如下图:

untitled7.png

有两台服务器运行定时任务,其中serverA的T2做了加锁操作,其他程序必须等它释放锁了才能运行。 那么如果serverA在加锁的过程中,出现宕机怎么办,是否会一直处于别锁状态。那么我们可以在每个锁都设置一个超时阈值,一旦超时便自动解锁。这样就不会因为宕机导致锁一直不被释 放。另外我们还要考虑命名空间的问题,主要是防止出现同名锁,导致被覆盖。

  • 队列:

在上面的基础上,排队运行任务。

untitled8.png

从上图中可以看出,TaskQueue中排队情况,运行是自上而下的,当然这个顺序可以自己设置规则,只需要先进先出的远程即可。另外,Task Queue我们需要做至少两个节点,他们遵循主 从结构的原则,主节点需要实时向从节点同步数据,保证在主节点不可用,从节点可以替代。当然,这里可以使用权重轮询的方式,加上数据异步同步,让所有节点都可以做主从的切换, 根据运行状况来分配,可能会更好,但是这样开发难度也有所提高,但是大大增加了高可用性。

3.7 总结:

  • 最后,我们要根据我们实际的情况,需要提供数据库和缓存方面的一些配套服务,这里就不做详解;

  • 这样我们整体的一个分布式定时任务平台就可以实现了,就可以保证计划任务的分布式运行。

4,当前比较流行的分布式定时任务框架:
4.1 Quartz:

  • Quartz是Java领域最著名的开源任务调度工具。Quartz提供了极为广泛的特性如持久化任务,集群和分布式任务

  • 特点:

  • 完全由Java写成,同时可以很方便的和java的另外一个框架spring集成;

    • 强大的调度功能:支持丰富多样的调度方法,可以满足各种常规及特殊需求;
    • 灵活的应用方式:支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
    • 分布式和集群能力,负载均衡和高可用性;

4.2 Elastic-job:

  • Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架。去掉了和dd-job中的监控和ddframe接入规范部分。该项目基于成熟的开源产品Quartz和 Zookeeper及其客户端Curator进行二次开发

  • 项目开源地址:https://github.com/dangdangdotcom/elastic-job

  • 特点:

  • 定时任务:基于成熟的定时任务作业框架Quartz cron表达式执行定时任务;

    • 作业注册中心:基于Zookeeper和其客户端Curator实现全局作业注册控制中心。用于注册,控制和协调分布式作业执行。
    • 作业分片:将要给任务分片成多个小任务项到多服务器上同时执行;
    • 弹性扩容缩容:运行中的作业服务器崩溃,或新增N台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行;
  • 支持多种作业执行模式:支持OneOff,Perpetual和SequenecePerpetual三种作业模式;

  • 失效转移:运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器空闲,抓取未完成的孤儿分片项 执行;

  • 运行时状态收集:监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间;

  • 作业停止,恢复和禁用:用于操作作业启动和停止,并可以禁止某作业运行,一般在上线时常用;

  • 被错过执行的作业重触发:自动记录错过执行的作业,并在上次作业完成后自动触发。

  • 多线程快速处理数据:使用多线程处理抓取到的数据,提升吞吐量;

  • 幂等性:重复作业任务项判定,不重复执行已运行的作业任务项;

  • 容错处理:作业服务器和Zookeeper服务器通信失败后则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器任在执行任务,导致重复执行。

  • Spring支持:支持Spring容器,自定义命名空间,支持占位符;

  • 运维平台:提供了运维平台,可以管理作业和注册中心。

从以上可以看出Elastic-job是在Quartz的基础上又做了一次全面的升级,做了配套的周边基础服务工作,完全成为了一个成熟的分布式定时任务框架。后面会分别介绍Quartz和 Elastic-job的详细原理和具体的使用方法。

///////////////////////

 

Quartz应用和集群原理分析:

使用的环境版本:spring4.x+quartz2.2.x

****1.1 如何在spring中集成quartz集群****

1.1.1 基于maven项目,需要在pom.xml引入的j依赖为:


1
2
3
4
5
6
1<dependency>
2            <groupId>org.quartz-scheduler</groupId>
3            <artifactId>quartz</artifactId>
4</dependency>
5
6

1.1.2 Quartz集群的基本配置信息:命名为quartz.properties


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1#调度标识名 集群中每一个实例都必须使用相同的名称
2org.quartz.scheduler.instanceName: DefaultQuartzScheduler
3#远程管理相关的配置,全部关闭
4org.quartz.scheduler.rmi.export: false
5org.quartz.scheduler.rmi.proxy: false
6org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
7ThreadPool 实现的类名
8org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
9#线程数量
10org.quartz.threadPool.threadCount: 10
11#线程优先级
12org.quartz.threadPool.threadPriority: 5
13#自创建父线程
14org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
15#容许的最大作业延
16org.quartz.jobStore.misfireThreshold: 60000
17#ID设置为自动获取 每一个必须不同
18org.quartz.scheduler.instanceId: AUTO
19#数据保存方式为持久化
20org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
21#加入集群
22org.quartz.jobStore.isClustered: true
23#调度实例失效的检查时间间隔
24org.quartz.jobStore.clusterCheckinInterval: 10000
25
26

1.1.3 在项目中加入Quartz的初始化信息: 命名spring-quartz.xml


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
1<?xml version="1.0" encoding="UTF-8"?>
2  <beans       xmlns="http://www.springframework.org/schema/beans"
3    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4    xmlns:mvc="http://www.springframework.org/schema/mvc"
5    xmlns:aop="http://www.springframework.org/schema/aop"
6xmlns:context="http://www.springframework.org/schema/context"    xsi:schemaLocation="http://www.springframework.org/schema/beans
7       http://www.springframework.org/schema/beans/spring-beans.xsd
8       http://www.springframework.org/schema/mvc
9       http://www.springframework.org/schema/mvc/spring-mvc.xsd
10       http://www.springframework.org/schema/aop
11       http://www.springframework.org/schema/aop/spring-aop.xsd
12       http://www.springframework.org/schema/context
13       http://www.springframework.org/schema/context/spring-context.xsd">
14       <bean id="quartzScheduler"        class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
15            <!-- 自定义的bean注入类,解决job里面无法注入spring的service的问题 -->
16            <property name="jobFactory">
17                <bean class="com.fc.sales.control.statistics.job.SpringBeanJobFactory" />
18            </property>
19            <!-- quartz的数据源 -->
20            <property name="dataSource" ref="quartz" />
21            <!-- quartz的基本配置信息引入 -->
22            <property name="configLocation" value="classpath:quartz.properties"/>
23<!-- 调度标识名 -->
24            <property name="schedulerName" value="DefaultQuartzScheduler" />
25            <!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动 -->
26            <property name="startupDelay" value="30" />
27            <!-- 通过applicationContextSchedulerContextKey属性配置spring上下文 -->
28            <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
29            <!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
30            <property name="overwriteExistingJobs" value="true" />
31            <!-- 设置自动启动 -->
32            <property name="autoStartup" value="true" />
33            <!-- 注册触发器 -->
34            <property  name="triggers">
35                <list>
36                    <ref bean="orderSyncScannerTrigger" />
37                </list>
38            </property>
39            <!-- 注册jobDetail -->
40            <property name="jobDetails">
41                <list>
42                    <ref bean="orderSyncDetail" />
43                </list>
44            </property>
45        </bean>  
46        <!--配置调度具体执行的方法-->  
47        <bean id="orderSyncDetail"  
48            class="org.springframework.scheduling.quartz.JobDetailFactoryBean">  
49           <property name="jobClass" value="com.fc.sales.control.statistics.job.OrderSyncJob"/>
50            <property name="durability" value="true" />    
51            <property name="requestsRecovery" value="true" />
52        </bean>
53        <!--配置调度执行的触发的时间-->  
54  <bean id="orderSyncScannerTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">  
55            <property name="jobDetail" ref="orderSyncDetail" />  
56            <property name="cronExpression">  
57                <!-- 每天上午00:30点执行任务调度 -->  
58                <value>0 30 00 * * ?</value>  
59            </property>  
60        </bean>
61
62
  • 1.1.4 在web.xml启动项中加入spring-quartz.xml文件*


1
2
3
4
5
6
7
8
9
10
11
1 <servlet>
2        <servlet-name>dispatcherServlet</servlet-name>
3        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
4        <init-param>
5            <param-name>contextConfigLocation</param-name>
6            <param-value>classpath:/spring/spring-quartz.xml</param-value>
7        </init-param>
8        <load-on-startup>1</load-on-startup>
9  </servlet>
10
11
  • 1.1.5 附上对应的解决无法注入的jobFactory的代码*:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1
2import org.quartz.spi.TriggerFiredBundle;
3import org.springframework.beans.factory.annotation.Autowired;
4import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
5import org.springframework.scheduling.quartz.AdaptableJobFactory;
6import org.springframework.stereotype.Component;
7
8/**
9 * Created by lyndon on 16/9/13.
10 */
11@Component
12public class jobFactory extends AdaptableJobFactory {
13
14    //这个对象Spring会帮我们自动注入进来,也属于Spring技术范畴.
15    @Autowired
16    private AutowireCapableBeanFactory capableBeanFactory;
17
18    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
19        //调用父类的方法
20        Object jobInstance = super.createJobInstance(bundle);
21        //进行注入,这属于Spring的技术,不清楚的可以查看Spring的API.
22        capableBeanFactory.autowireBean(jobInstance);
23        return jobInstance;
24    }
25}
26
27
28

****1.2 quartz框架实现分布式定时任务的原理****;
Quartz集群中每个节点都是一个单独的Quartz应用,它又管理着其他的节点。这个集群需要每个节点单独的启动或停止;和我们的应用服务器集群不同,独立的Quratz节点之间是不需要 通信的。不同节点之间是通过数据库表来感知另一个应用。只有使用持久的JobStore才能完成Quartz集群。

untitled21.jpg

  • 1.2.1 既然Quartz分布式集群是利用数据库锁机制来实现集群环境下的并发控制,我们就需要了解Quratz的数据库表:可以去官方现在对于版本的sql文件导入。

untitled22.png

  • 1.2.2 Quartz线程模型:

Quartz中有两类线程:Scheduler调度线程和任务执行线程。

  • 任务执行线程: Quartz不会在主线程(QuartzSchedulerThread)中处理用户job。Quratz是将线程管理的职责委托给ThreadPool,一般的设置使用SimpleThreadPool,SimpleThreadPool创建一定数量的工作线程(WorkerThread),当然这样就意味所有的线程都是异步操作的,所以我们在工作线程的job里面实现业务的时候是没必要重新去创建一个新的线程的,在Quartz创建工作线程的时候已经完成了异步任务的创建。

  • Scheduler调度线程:QuartzScheduler被创建的时候会创建一个QuratzSchedulerThread实例。

  • 1.2.3 Quartz源码分析:

  • QuartzSchedulerThreand包含有决定何时下一个Job将被触发的处理循环,主要的逻辑在其的run()方法中,如下图:

untitled23.png

由此可知,QuartzSchedulerThread不断的在获取trigger,触发trigger,释放trigger。
那么具体又是如何获取trigger的呢,可以从上面的源码中可以发现:qsRsrcs.getJobStore()返回对象是JobStore ,具体的集群配置参考1.1.2. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
JobStoreTx继承自JobStoreSupport,而JobStoreSupport的acquireNextTrigger,triggerFired,releaseAcquiredTrigger方法负责具体trigger相关操作,都必须获得TRIGGER-ACCESS锁。核心逻辑在executeInNonManagedTxLock方法中。

untitled24.png

由上代码可知Quartz集群基于数据库锁的同步操作流程如下图所示:

untitled25.png

/////////////////////////////////

 

Quartz分布式定时任务的暂停和恢复等:

前两篇我们了解了quartz分布式定时任务的基本原理和实现方式,知道所有的定时任务都会被持久化到数据库。那么我们肯定可以通过操作数据库来做定时任务的暂停,恢复,立即启动,添加等操作。
事实上,quartz已经给我们提供来一些列的api接口来操作对应的定时任务,我们只需要在这个基础之上做进一步的扩展和封装就可以实现我们自己业务,下面,将围绕定时任务的控制,提供一个简单的实现方式。

使用的环境版本:spring4.x+quartz2.2.x

1,首先,我们需要创建一个我们自己job的实体类ScheduleJob:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1/**
2* Created by lyndon on 16/9/13. * job的实体类
3 */
4public class ScheduleJob {  
5
6        private String jobNo; //任务编号    
7        private String jobName; //任务名称    
8        private String jobGroup; //任务所属组    
9        private String desc; //任务描述          
10        private String jobStatus; //任务状态    
11        private String cronExpression; //任务对应的时间表达式
12        private String triggerName; //触发器名称
13
14         //此处省略get和set方法
15  }
16
17

2, 创建我们自己的QuartzImplService服务层:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
1
2
3import org.quartz.*;
4import org.quartz.impl.matchers.GroupMatcher;
5import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7import org.springframework.stereotype.Service;
8
9import javax.annotation.Resource;
10import java.util.*;
11
12/**
13 * Created by lyndon on 16/9/13.
14 * quartz_job的工具类
15 */
16@Service
17public class QuartzUtils {
18
19    private final Logger logger = LoggerFactory.getLogger(QuartzUtils.class);
20
21    @Resource
22    private Scheduler scheduler;
23
24
25    /**
26     *
27     * 获取计划任务列表
28     * @return  List<ScheduleJob>
29     */
30    public List<ScheduleJob> getPlanJobList() throws SchedulerException{
31        List<ScheduleJob> jobList = new ArrayList<>();
32        GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
33        Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);;
34        jobKeys = scheduler.getJobKeys(matcher);
35        for (JobKey jobKey : jobKeys) {
36            List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
37            for (Trigger trigger : triggers) {
38                ScheduleJob job = new ScheduleJob();
39                job.setJobName(jobKey.getName());
40                job.setJobGroup(jobKey.getGroup());
41                // 此处是我自己业务需要,给每个定时任务配置类对应的编号和描述
42                String value = PropertiesUtils.getStringCN(jobKey.getName());
43                if(null != value && !"".equals(value)){
44                    job.setJobNo(value.split("/")[0]);
45                    job.setDesc(value.split("/")[1]);
46                }else{
47                    job.setJobNo("0000");
48                    job.setDesc("未监控任务");
49                }
50                job.setTriggerName("触发器:" + trigger.getKey());
51                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
52                job.setJobStatus(triggerState.name());
53                if (trigger instanceof CronTrigger) {
54                    CronTrigger cronTrigger = (CronTrigger) trigger;
55                    String cronExpression = cronTrigger.getCronExpression();
56                    job.setCronExpression(cronExpression);
57                }
58                jobList.add(job);
59            }
60        }
61      // 对返回的定时任务安装编号做排序
62        Collections.sort(jobList,new Comparator<ScheduleJob>(){
63            public int compare(ScheduleJob arg0, ScheduleJob arg1) {
64                return arg0.getJobNo().compareTo(arg1.getJobNo());
65            }
66        });
67
68        return jobList;
69    }
70
71
72    /**
73     * 获取正在运行的任务列表
74     * @return List<ScheduleJob>
75     */
76    public List<ScheduleJob> getCurrentJobList() throws SchedulerException{
77        List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();;
78        List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size());;
79        for (JobExecutionContext executingJob : executingJobs) {
80            ScheduleJob job = new ScheduleJob();
81            JobDetail jobDetail = executingJob.getJobDetail();
82            JobKey jobKey = jobDetail.getKey();
83            Trigger trigger = executingJob.getTrigger();
84            job.setJobName(jobKey.getName());
85            job.setJobGroup(jobKey.getGroup());
86            String value = PropertiesUtils.getStringCN(jobKey.getName());
87            if(null != value && !"".equals(value)){
88                job.setJobNo(value.split("/")[0]);
89                job.setDesc(value.split("/")[1]);
90            }else{
91                job.setJobNo("0000");
92                job.setDesc("未监控任务");
93            }
94            job.setTriggerName("触发器:" + trigger.getKey());
95            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
96            job.setJobStatus(triggerState.name());
97            if (trigger instanceof CronTrigger) {
98                CronTrigger cronTrigger = (CronTrigger) trigger;
99                String cronExpression = cronTrigger.getCronExpression();
100                job.setCronExpression(cronExpression);
101            }
102            jobList.add(job);
103        }
104        Collections.sort(jobList,new Comparator<ScheduleJob>(){
105            public int compare(ScheduleJob arg0, ScheduleJob arg1) {
106                return arg0.getJobNo().compareTo(arg1.getJobNo());
107            }
108        });
109        return  jobList;
110    }
111
112    /**
113     * 暂停当前任务
114     * @param scheduleJob
115     */
116    public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException{
117        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
118        if(scheduler.checkExists(jobKey)){
119            scheduler.pauseJob(jobKey);
120        }
121    }
122
123    /**
124     * 恢复当前任务
125     * @param scheduleJob
126     */
127    public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException{
128        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
129
130        if(scheduler.checkExists(jobKey)){
131            //并恢复
132            scheduler.resumeJob(jobKey);
133            //重置当前时间
134            this.rescheduleJob(scheduleJob);
135        }
136    }
137
138    /**
139     * 删除任务
140     * @param scheduleJob
141     * @return boolean
142     */
143    public boolean deleteJob(ScheduleJob scheduleJob) throws SchedulerException{
144        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
145        if(scheduler.checkExists(jobKey)){
146            return scheduler.deleteJob(jobKey);
147        }
148        return false;
149
150    }
151
152    /**
153     * 立即触发当前任务
154     * @param scheduleJob
155     */
156    public void triggerJob(ScheduleJob scheduleJob) throws SchedulerException{
157        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
158        if(scheduler.checkExists(jobKey)){
159            scheduler.triggerJob(jobKey);
160        }
161
162    }
163
164    /**
165     * 更新任务的时间表达式
166     * @param scheduleJob
167     * @return Date
168     */
169    public Date rescheduleJob(ScheduleJob scheduleJob) throws SchedulerException{
170        TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),
171                scheduleJob.getJobGroup());
172        if(scheduler.checkExists(triggerKey)){
173            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
174            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob
175                    .getCronExpression());
176            //按新的cronExpression表达式重新构建trigger
177            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
178                    .withSchedule(scheduleBuilder).build();
179            //按新的trigger重新设置job执行
180            return scheduler.rescheduleJob(triggerKey, trigger);
181        }
182        return null;
183    }
184
185    /**
186     * 查询其中一个任务的状态
187     * @param scheduleJob
188     * @return
189     * @throws SchedulerException
190     */
191    public String scheduleJob(ScheduleJob scheduleJob) throws SchedulerException {
192        String status = null;
193        TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
194        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
195        if (null != trigger) {
196            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
197            status = triggerState.name();
198        }
199        return status;
200    }
201
202    /**
203     * 校验job是否已经加载
204     * @param scheduleJob  JOB基本信息参数
205     * @return          是否已经加载
206     */
207    public boolean checkJobExisted(ScheduleJob scheduleJob) throws SchedulerException {
208        return scheduler.checkExists(new JobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()));
209    }
210
211  
212    private String getStatuDesc(String status){
213        if(status.equalsIgnoreCase("NORMAL")){
214            return "正常";
215        }else if(status.equalsIgnoreCase("PAUSED")){
216            return "暂停";
217        }else{
218            return "异常";
219        }
220    }
221}
222
223
224

3,提供对应的Controller


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
1
2import com.innmall.hotelmanager.common.Result;
3import com.innmall.hotelmanager.service.quartz.QuartzUtils;
4import com.innmall.hotelmanager.service.quartz.ScheduleJob;
5import org.quartz.SchedulerException;
6import org.slf4j.Logger;
7import org.slf4j.LoggerFactory;
8import org.springframework.web.bind.annotation.RequestMapping;
9import org.springframework.web.bind.annotation.RestController;
10
11import javax.annotation.Resource;
12import java.util.List;
13
14/**
15 * Created by lyndon on 16/9/13.
16 */
17@RestController
18@RequestMapping(value = {"/v1/job"})
19public class QuartzController {
20
21    private Logger logger = LoggerFactory.getLogger(this.getClass());
22
23    @Resource
24    private QuartzUtils quartzUtils;
25
26    //获取定时任务的列表
27    @RequestMapping(value = {"/getJobList"})
28    public Result getPlanJobList(String openId){
29        //QuartzUtils quartzUtils = new QuartzUtils();
30        List<ScheduleJob> list = null;
31        try {
32            list = quartzUtils.getPlanJobList();
33        } catch (SchedulerException e) {
34            e.printStackTrace();
35        }
36        return Result.success(list);
37    }
38
39    //暂停任务
40    @RequestMapping(value = {"/pauseJob"})
41    public Result pauseJob(String openId){
42        //QuartzUtils quartzUtils = new QuartzUtils();
43        ScheduleJob job = new ScheduleJob();
44        job.setJobGroup("innmall_job");
45        job.setJobName("refreshWxToKenJobDetail");
46        try {
47            quartzUtils.pauseJob(job);
48        } catch (SchedulerException e) {
49            e.printStackTrace();
50        }
51        return Result.success("暂停成功");
52    }
53
54    //恢复任务
55    @RequestMapping(value = {"/resumeJob"})
56    public Result resumeJob(String openId){
57        //QuartzUtils quartzUtils = new QuartzUtils();
58        ScheduleJob job = new ScheduleJob();
59        job.setJobGroup("innmall_job");
60        job.setJobName("refreshWxToKenJobDetail");
61        try {
62            quartzUtils.resumeJob(job);
63        } catch (SchedulerException e) {
64            e.printStackTrace();
65        }
66        return Result.success("恢复成功");
67    }
68
69    //立即触发任务
70    @RequestMapping(value = {"/triggerJob"})
71    public Result triggerJob(String openId){
72        //QuartzUtils quartzUtils = new QuartzUtils();
73        ScheduleJob job = new ScheduleJob();
74        job.setJobGroup("innmall_job");
75        job.setJobName("refreshWxToKenJobDetail");
76        try {
77            quartzUtils.triggerJob(job);
78        } catch (SchedulerException e) {
79            e.printStackTrace();
80        }
81        return Result.success("触发成功");
82    }
83
84    //删除任务
85    @RequestMapping(value = {"/deleteJob"})
86    public Result deleteJob(String openId){
87        //QuartzUtils quartzUtils = new QuartzUtils();
88        ScheduleJob job = new ScheduleJob();
89        job.setJobGroup("innmall_job");
90        job.setJobName("refreshWxToKenJobDetail");
91        try {
92            quartzUtils.deleteJob(job);
93        } catch (SchedulerException e) {
94            e.printStackTrace();
95        }
96        return Result.success("触发成功");
97    }
98  
99}
100
101
102

4,接下来,我们就可以进行单元测试了。

5,需要注意的地方:

5.1 service层:


1
2
3
4
1@Resource
2private Scheduler scheduler;
3
4

这里是因为我们在xml里面已经配置对应的工厂bean,所以可以在这里可以直接注入:


1
2
3
4
1<bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"    
2 destroy-method="destroy">
3
4

5.2 关于区分不同业务的触发器和任务,可以配置job和trigger的group属性,这样我们便以区分,如果不设置,quartz将使用default关键字:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1   <bean id="refreshWxToKenJobDetail"
2          class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
3        <property name="jobClass" value="com.innmall.hotelmanager.timer.RefreshWxToKen"/>
4        <property name="durability" value="true" />
5        <property name="requestsRecovery" value="true" />
6        <property name="group" value="innmall_job"/>
7    </bean>
8    <bean id="refreshWxToKenTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
9        <property name="jobDetail" ref="refreshWxToKenJobDetail"/>
10        <!-- 每10s钟运行一次 -->
11        <property name="cronExpression" value="0/10 * * * * ?"/>
12        <property name="misfireInstruction" value="2"/>
13        <property name="group" value="innmall_trigger"/>
14    </bean>
15
16
17

5.3 关于定时任务恢复后,我们如果不需要让之前错过的定时任务再执行一次,可以设置misfireInstruction的属性,其实就是


1
2
3
1CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING
2
3

进去可以看见对应的值为2.
并且需要在我们恢复任务的时候调用更新的方法,可以见上文的QuartzUtil中的方法。


1
2
3
4
1//重置当前时间
2this.rescheduleJob(scheduleJob);
3
4

5.4 如果需要定时任务恢复后,需要将之前错过的执行一次,那么只需要在xml里面去除misfireInstruction属性,其实就是使用默认配置,并且在恢复的时候不调用更新的方法。

关于quartz的使用方法,暂时就介绍到这里,如果有什么地方有问题,欢迎指正,后面将持续研究对应的异常处理机制,敬请关注~

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全资讯

彭博社:马云再成中国首富!财富上涨36亿美元

2016-12-29 7:40:29

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索