Quartz集群实战及原理解析

选Quartz的团队基本上是冲着Quartz本身实现的集群去的, 不然JDK自带Timer就可以实现相同的功能, 而Timer存在的单点故障是生产环境上所不能容忍的。 在自己造个有负载均衡和支持集群(高可用、伸缩性)的调度框架又影响项目的进度, 所以大多数团队都直接使用了Quartz来作为调度框架。

一、 Quartz集群的架构图:

Quartz集群实战及原理解析

二、 Quartz集群配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<!-- 调度任务 --> 
<beanid="jobDetail" 
        class="org.springframework.scheduling.quartz.JobDetailFactoryBean"
        <propertyname="jobClass"value="全类名"/> 
        <propertyname="durability"value="true"/> 
        <propertyname="targetMethod"value="execute"/> 
        <propertyname="concurrent"value="true"/> --> 
        <!-- <property name="shouldRecover" value="true" /> --> 
</bean
 
<!-- 调度工厂 --> 
<beanid="scheduler"lazy-init="false"autowire="no" 
    class="org.springframework.scheduling.quartz.SchedulerFactoryBean"
 
    <!-- 注册JobDetails --> 
    <propertyname="jobDetails"
        <list
            <refbean="jobDetail"/> 
        </list
    </property
 
    <!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 --> 
    <propertyname="overwriteExistingJobs"value="true"/> 
 
    <!-- 属性 --> 
    <propertyname="quartzProperties"
        <props
            <!-- 集群要求必须使用持久化存储 --> 
            <propkey="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreCMT</prop
 
            <propkey="org.quartz.scheduler.instanceName">EventScheduler</prop
            <!-- 每个集群节点要有独立的instanceId --> 
            <propkey="org.quartz.scheduler.instanceId">AUTO</prop
 
            <!-- Configure ThreadPool --> 
            <propkey="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop
            <propkey="org.quartz.threadPool.threadCount">50</prop
            <propkey="org.quartz.threadPool.threadPriority">5</prop
            <propkey="org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread">true</prop
            <!-- Configure JobStore --> 
            <propkey="org.quartz.jobStore.misfireThreshold">60000</prop
            <propkey="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.StdJDBCDelegate</prop
            <propkey="org.quartz.jobStore.tablePrefix">SCHEDULER_</prop
            <propkey="org.quartz.jobStore.maxMisfiresToHandleAtATime">10</prop
            <!-- 开启集群 --> 
            <propkey="org.quartz.jobStore.isClustered">true</prop
            <propkey="org.quartz.jobStore.clusterCheckinInterval">20000</prop
            <propkey="org.quartz.jobStore.dontSetAutoCommitFalse">true</prop
            <propkey="org.quartz.jobStore.txIsolationLevelSerializable">false</prop
            <propkey="org.quartz.jobStore.dataSource">myDS</prop
            <propkey="org.quartz.jobStore.nonManagedTXDataSource">myDS</prop
            <propkey="org.quartz.jobStore.useProperties">false</prop
            <!-- Configure Datasources  --> 
            <propkey="org.quartz.dataSource.myDS.driver">com.mysql.jdbc.Driver</prop
            <propkey="org.quartz.dataSource.myDS.URL">${db.url}</prop
            <propkey="org.quartz.dataSource.myDS.user">${db.username}</prop
            <propkey="org.quartz.dataSource.myDS.password">${db.password}</prop
            <propkey="org.quartz.dataSource.myDS.maxConnections">10</prop
            <propkey="org.quartz.dataSource.myDS.validationQuery">select 0 from dual</prop
        </props
    </property
    <propertyname="applicationContextSchedulerContextKey"value="applicationContext"/> 
</bean>

三、 集群源码分析

Quartz如何保证多个节点的应用只进行一次调度(即某一时刻的调度任务只由其中一台服务器执行)?

正如上面架构图所示, Quartz的集群是在同一个数据库下, 由数据库的数据来确定调度任务是否正在执行, 正在执行则其他服务器就不能去执行该行调度数据。 这个跟很多项目是用Zookeeper做集群不一样, 这些项目是靠Zookeeper选举出来的的服务器去执行, 可以理解为Quartz靠数据库选举一个服务器来执行。

如果之前看过这篇Quartz按时启动原理就应该了解到Quartz最主要的一个类QuartzSchedulerThread职责是触发任务, 是一个不断运行的Quartz主线程, 还是从这里入手了解集群原理。

集群配置里面有一个配置项:

1
<propkey="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreCMT</prop>

源码可以看到JobStoreCMT extends JobStoreSupport, 在QuartzSchedulerThread的run方法里面调用的acquireNextTriggers、 triggersFired、 releaseAcquiredTrigger方法都进行了加锁处理。

以acquireNextTriggers为例:

Quartz集群实战及原理解析

而LOCK_TRIGGER_ACCESS其实就是一个Java常量

1
protectedstaticfinal  String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";

这个常量传入加锁的核心方法executeInNonManagedTXLock: 处理逻辑前获取锁, 处理完成后在finally里面释放锁(一种典型的同步处理方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
protected<T> T executeInNonManagedTXLock( 
           String lockName,  
           TransactionCallback<T> txCallback, finalTransactionValidator<T> txValidator) throwsJobPersistenceException { 
       booleantransOwner = false
       Connection conn = null
       try
           if(lockName != null) { 
               // If we aren't using db locks, then delay getting DB connection  
               // until after acquiring the lock since it isn't needed. 
               if(getLockHandler().requiresConnection()) { 
                   conn = getNonManagedTXConnection(); 
               
               // 获取锁 
               transOwner = getLockHandler().obtainLock(conn, lockName); 
           
 
           if(conn == null) { 
               conn = getNonManagedTXConnection(); 
           
 
           finalT result = txCallback.execute(conn); 
           try
               commitConnection(conn); 
           }catch(JobPersistenceException e) { 
               rollbackConnection(conn); 
               if(txValidator == null|| !retryExecuteInNonManagedTXLock(lockName, newTransactionCallback<Boolean>() { 
                   @Override 
                   publicBoolean execute(Connection conn) throwsJobPersistenceException { 
                       returntxValidator.validate(conn, result); 
                   
               })) { 
                   throwe; 
               
           
 
           Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); 
           if(sigTime != null&& sigTime >= 0) { 
               signalSchedulingChangeImmediately(sigTime); 
           
 
           returnresult; 
       }catch(JobPersistenceException e) { 
           rollbackConnection(conn); 
           throwe; 
       }catch(RuntimeException e) { 
           rollbackConnection(conn); 
           thrownewJobPersistenceException("Unexpected runtime exception: " 
                   + e.getMessage(), e); 
       }finally
           try
            // 释放锁 
               releaseLock(lockName, transOwner); 
           }finally
               cleanupConnection(conn); 
           
       
   }

getLockHandler那么可以思考下这个LockHandler怎么来的?

最后发现在JobStoreSupport的initail方法赋值了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
publicvoidinitialize(ClassLoadHelper loadHelper, 
            SchedulerSignaler signaler) throwsSchedulerConfigException { 
 
        ... 
 
        // If the user hasn't specified an explicit lock handler, then  
        // choose one based on CMT/Clustered/UseDBLocks. 
        if(getLockHandler() == null) { 
 
            // If the user hasn't specified an explicit lock handler,  
            // then we *must* use DB locks with clustering 
            if(isClustered()) { 
                setUseDBLocks(true); 
            
 
            if(getUseDBLocks()) { 
                ... 
                // 在初始化方法里面赋值了 
                setLockHandler(newStdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL())); 
            }else
                getLog().info( 
                    "Using thread monitor-based data access locking (synchronization)."); 
                setLockHandler(newSimpleSemaphore()); 
            
        
 
    }

可以在StdRowLockSemaphore里面看到:

1
2
3
4
5
6
7
publicstaticfinal  String SELECT_FOR_LOCK = "SELECT * FROM " 
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE "  + COL_SCHEDULER_NAME + " = "  + SCHED_NAME_SUBST  
            +" AND "  + COL_LOCK_NAME + " = ? FOR UPDATE"
 
    publicstaticfinal  String INSERT_LOCK = "INSERT INTO " 
        + TABLE_PREFIX_SUBST + TABLE_LOCKS + "("+ COL_SCHEDULER_NAME + ", "  + COL_LOCK_NAME + ") VALUES ("  
        + SCHED_NAME_SUBST + ", ?)";

可以看出采用了悲观锁的方式对triggers表进行行加锁, 以保证任务同步的正确性。

当线程使用上述的SQL对表中的数据执行操作时,数据库对该行进行行加锁; 于此同时, 另一个线程对该行数据执行操作前需要获取锁, 而此时已被占用, 那么这个线程就只能等待, 直到该行锁被释放。quratz在获取数据库资源之前,先要以for update方式访问LOCKS表中相应LOCK_NAME数据将改行锁定.如果在此前该行已经被锁定,那么等待,如果没有被锁定,那么读取满足要求的trigger,并把它们的status置为STATE_ACQUIRED,如果有tirgger已被置为STATE_ACQUIRED,那么说明该trigger已被别的调度器实例认领,无需再次认领,调度器会忽略此trigger.调度器实例之间的间接通信就体现在这里.
JobStoreSupport.acquireNextTrigger()方法中:
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
最后释放锁,这时如果下一个调度器在排队获取trigger的话,则仍会执行相同的步骤.这种机制保证了trigger不会被重复获取.

Quartz的锁存放在:

1
2
3
4
5
CREATETABLE`scheduler_locks` ( 
  `SCHED_NAME`varchar(120)NOTNULLCOMMENT '调度名'
  `LOCK_NAME`varchar(40)NOTNULLCOMMENT '锁名'
  PRIMARYKEY(`SCHED_NAME`,`LOCK_NAME`) 
) ENGINE=InnoDB DEFAULTCHARSET=utf8

锁名和上述常量一一对应:

Quartz集群实战及原理解析

有可能你的任务不能支持并发执行(因为有可能任务还没执行完, 下一轮就trigger了, 如果没做同步处理可能造成严重的数据问题), 那么在任务类加上注解:

@DisallowConcurrentExecution

设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行

组件间的通讯图如*****:主要的sql语句附在文章最后)

Quartz集群实战及原理解析

quartz运行时由QuartzSchedulerThread类作为主体,循环执行调度流程。JobStore作为中间层,按照quartz的并发策略执行数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放入线程池运行。LockHandler负责获取LOCKS表中的数据库锁。

整个quartz对任务调度的时序大致如下:

Quartz集群实战及原理解析

梳理一下其中的流程,可以表示为:

0.调度器线程run()

1.获取待触发trigger

    1.1数据库LOCKS表TRIGGER_ACCESS行加锁

    1.2读取JobDetail信息

    1.3读取trigger表中触发器信息并标记为"已获取"

    1.4commit事务,释放锁

2.触发trigger

    2.1数据库LOCKS表STATE_ACCESS行加锁

    2.2确认trigger的状态

    2.3读取trigger的JobDetail信息

    2.4读取trigger的Calendar信息

    2.3更新trigger信息

    2.3commit事务,释放锁

3实例化并执行Job

    3.1从线程池获取线程执行JobRunShell的run方法

可以看到,这个过程中有两个相似的过程:同样是对数据表的更新操作,同样是在执行操作前获取锁 操作完成后释放锁.这一规则可以看做是quartz解决集群问题的核心思想.

[html] view plain copy
  1. 3.  
  2.   
  3. select TRIGGER_ACCESS from QRTZ2_LOCKS for update  
  4.   
  5. 4.  
  6.   
  7. SELECT TRIGGER_NAME,TRIGGER_GROUP,NEXT_FIRE_TIME,PRIORITY FROM QRTZ2_TRIGGERS  
  8.   
  9. WHERE SCHEDULER_NAME = 'CRMscheduler'  
  10.   
  11. AND TRIGGER_STATE = 'ACQUIRED'  
  12.   
  13. AND NEXT_FIRE_TIME <= '{timekey 30s latter}'  
  14.   
  15. AND ( MISFIRE_INSTR = -1  
  16.   
  17. OR ( MISFIRE_INSTR != -1  
  18.   
  19. AND NEXT_FIRE_TIME >= '{timekey now}' ) )  
  20.   
  21. ORDER BY NEXT_FIRE_TIME ASC,  
  22.   
  23. PRIORITY DESC;  
  24.   
  25. 5.  
  26.   
  27. SELECT *  
  28.   
  29. FROM QRTZ2_JOB_DETAILS  
  30.   
  31. WHERE SCHEDULER_NAME = CRMscheduler  
  32.   
  33. AND JOB_NAME = ?  
  34.   
  35. AND JOB_GROUP = ?;  
  36.   
  37. 6.  
  38.   
  39. UPDATE TQRTZ2_TRIGGERS  
  40.   
  41. SET TRIGGER_STATE = 'ACQUIRED'  
  42.   
  43. WHERE SCHED_NAME = 'CRMscheduler'  
  44.   
  45. AND TRIGGER_NAME = '{triggerName}'  
  46.   
  47. AND TRIGGER_GROUP = '{triggerGroup}'  
  48.   
  49. AND TRIGGER_STATE = 'waiting';  
  50.   
  51. 7.  
  52.   
  53. INSERT INTO QRTZ2_FIRED_TRIGGERS  
  54.   
  55. (SCHEDULER_NAME,  
  56.   
  57. ENTRY_ID,  
  58.   
  59. TRIGGER_NAME,  
  60.   
  61. TRIGGER_GROUP,  
  62.   
  63. INSTANCE_NAME,  
  64.   
  65. FIRED_TIME,  
  66.   
  67. SCHED_TIME,  
  68.   
  69. STATE,  
  70.   
  71. JOB_NAME,  
  72.   
  73. JOB_GROUP,  
  74.   
  75. IS_NONCONCURRENT,  
  76.   
  77. REQUESTS_RECOVERY,  
  78.   
  79. PRIORITY)  
  80.   
  81. VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);  
  82.   
  83. 8.  
  84.   
  85. commit;  
  86.   
  87. 12.  
  88.   
  89. select STAT_ACCESS from QRTZ2_LOCKS for update  
  90.   
  91. 13.  
  92.   
  93. SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?;  
  94.   
  95. 14.  
  96.   
  97. SELECT TRIGGER_STATE  
  98.   
  99. FROM QRTZ2_TRIGGERS  
  100.   
  101. WHERE SCHEDULER_NAME = 'CRMscheduler'  
  102.   
  103. AND TRIGGER_NAME = ?  
  104.   
  105. AND TRIGGER_GROUP = ?;  
  106.   
  107. 14.  
  108.   
  109. SELECT *  
  110.   
  111. FROM QRTZ2_JOB_DETAILS  
  112.   
  113. WHERE SCHEDULER_NAME = CRMscheduler  
  114.   
  115. AND JOB_NAME = ?  
  116.   
  117. AND JOB_GROUP = ?;  
  118.   
  119. 15.  
  120.   
  121. SELECT *  
  122.   
  123. FROM QRTZ2_CALENDARS  
  124.   
  125. WHERE SCHEDULER_NAME = 'CRMscheduler'  
  126.   
  127. AND CALENDAR_NAME = ?;  
  128.   
  129. 16.  
  130.   
  131. UPDATE QRTZ2_FIRED_TRIGGERS  
  132.   
  133. SET INSTANCE_NAME = ?,  
  134.   
  135. FIRED_TIME = ?,  
  136.   
  137. SCHED_TIME = ?,  
  138.   
  139. ENTRY_STATE = ?,  
  140.   
  141. JOB_NAME = ?,  
  142.   
  143. JOB_GROUP = ?,  
  144.   
  145. IS_NONCONCURRENT = ?,  
  146.   
  147. REQUESTS_RECOVERY = ?  
  148.   
  149. WHERE SCHEDULER_NAME = 'CRMscheduler'  
  150.   
  151. AND ENTRY_ID = ?;  
  152.   
  153. 17.  
  154.   
  155. UPDATE TQRTZ2_TRIGGERS  
  156.   
  157. SET TRIGGER_STATE = ?  
  158.   
  159. WHERE SCHED_NAME = 'CRMscheduler'  
  160.   
  161. AND TRIGGER_NAME = '{triggerName}'  
  162.   
  163. AND TRIGGER_GROUP = '{triggerGroup}'  
  164.   
  165. AND TRIGGER_STATE = ?;  
  166.   
  167. 18.  
  168.   
  169. UPDATE QRTZ2_TRIGGERS  
  170.   
  171. SET JOB_NAME = ?,  
  172.   
  173. JOB_GROUP = ?,  
  174.   
  175. DESCRIPTION = ?,  
  176.   
  177. NEXT_FIRE_TIME = ?,  
  178.   
  179. PREV_FIRE_TIME = ?,  
  180.   
  181. TRIGGER_STATE = ?,  
  182.   
  183. TRIGGER_TYPE = ?,  
  184.   
  185. START_TIME = ?,  
  186.   
  187. END_TIME = ?,  
  188.   
  189. CALENDAR_NAME = ?,  
  190.   
  191. MISFIRE_INSTRUCTION = ?,  
  192.   
  193. PRIORITY = ?,  
  194.   
  195. JOB_DATAMAP = ?  
  196.   
  197. WHERE SCHEDULER_NAME = SCHED_NAME_SUBST  
  198.   
  199. AND TRIGGER_NAME = ?  
  200.   
  201. AND TRIGGER_GROUP = ?;  
  202.   
  203. 19.  
  204.   
  205. commit;  


四、 参考资料

Quartz官网: http://quartz-scheduler.org/documentation/quartz-2.x/tutorials/tutorial-lesson-11