大多数情况下,定时任务我们一般使用quartz开源框架就能满足应用场景。但如果考虑到健壮性等其它一些因素,就需要自己下点工夫,比如:要避免单点故障,至少得部署2个节点吧,但是部署多个节点,又有其它问题,有些数据在某一个时刻只能处理一次,比如 i = i+1 这些无法保证幂等的操作,run多次跟run一次,完全是不同的效果。
对于上面的问题,我曾经自行设计过一个基于zk分布式锁的解决方案:
1、每类定时job,可以分配一个独立的标识(比如:xxx_job)
2、这类job的实例,部署在多个节点上时,每个节点启动前,向zk申请一个分布式锁(在xxx_job节点下)
3、拿到锁的实例,才允许启动定时任务(通过代码控制quartz的schedule),没拿到锁的,处于standby状态,一直监听锁的变化
4、如果某个节点挂了,分布式锁自动释放,其它节点这时会抢到锁,按上面的逻辑,就会从standby状态,转为**状态,小三正式上位,继续执行定时job。
这个方案,基本上解决了HA和业务正确性的问题,但是美中不足的地方有2点:
1、无法充分利用机器性能,处于standby的节点,实际上只是一个备胎,平时啥也不干
2、性能不方便扩展,比如:某个job一次要处理上千万的数据,仅1个**节点,要处理很久
好了,前戏铺垫了这么久,该请主角登场了,elastic-job相当于quartz+zk的加强版,它允许对定时任务分片,可以集群部署(每个job的"分片"会分散到各个节点上),如果某个节点挂了,该节点上的分片,会调度到其它节点上。官网上有比较详细的教程,一般情况下,使用SimpleJob这种就可以了。
使用步骤:
前提:要先添加下面二个jar的依赖
1
2
|
compile "com.dangdang:elastic-job-lite-core:2.1.5"
compile "com.dangdang:elastic-job-lite-spring:2.1.5"
|
1、自己的job要继承自SimpleJob,然后实现void execute(ShardingContext shardingContext)。
1
2
3
4
5
6
7
8
9
|
public interface SimpleJob extends ElasticJob {
/**
* 执行作业.
*
* @param shardingContext 分片上下文
*/
void execute(ShardingContext shardingContext);
}
|
注意这里面有一个shardingContext参数,看下源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
/**
* 分片上下文.
*
* @author zhangliang
*/
@Getter
@ToString
public final class ShardingContext {
/**
* 作业名称.
*/
private final String jobName;
/**
* 作业任务ID.
*/
private final String taskId;
/**
* 分片总数.
*/
private final int shardingTotalCount;
/**
* 作业自定义参数.
* 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.
*/
private final String jobParameter;
/**
* 分配于本作业实例的分片项.
*/
private final int shardingItem;
/**
* 分配于本作业实例的分片参数.
*/
private final String shardingParameter;
public ShardingContext( final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this .shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
}
|
这里面有2个很重要的属性:shardingTotalCount 分片总数(比如:2)、shardingItem 当前分片索引(比如:1),前面提到的性能扩容,就可以根据2个参数进行简单的处理,假设在电商系统中,每天晚上有个定时任务,要统计每家店的销量。商家id一般在表设计上是一个自增数字,如果总共2个分片(注:通常也就是部署2个节点),可以把 id为奇数的放到分片0,id为偶数的放到分片1,这样2个机器各跑一半,相对只有1台机器而言,就快多了。
伪代码如下:
1
2
3
4
5
6
7
8
9
10
11
|
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardIndx = shardingContext.getShardingItem();
if (shardIndx == 0 ) {
//处理id为奇数的商家
} else {
//处理id为偶数的商家
}
}
}
|
这个还可以进一步简化,如果使用mysql查询商家列表,mysql中有一个mod函数,直接可以对商家id进行取模运算
1
|
select * from shop where mod(shop_id,2)=0
|
如果把上面的2、0换成参数,mybatis中类似这样:
1
|
select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}
|
这样逻辑就转换到sql中处理了,java代码中把参数传进来就行,连if都可以省掉。
2、接下来看看如何配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
<?xml version= "1.0" encoding= "UTF-8" ?>
<beans xmlns= "http://www.springframework.org/schema/beans"
xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg= "http://www.dangdang.com/schema/ddframe/reg"
xmlns:job= "http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation= "http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd" >
<reg:zookeeper id= "regCenter" server-lists= "${zk_address}" namespace= "my-xxx-job"
base-sleep- time -milliseconds= "1000"
max -sleep- time -milliseconds= "3000" max -retries= "3" />
<job:simple id= "xxxJob" class= "com.cnblogs.yjmyzz.XXXJob" registry-center-ref= "regCenter"
cron= "${xxxJob_cornExpress}"
sharding-total- count = "2" sharding-item-parameters= "0=A,1=B" />
...
</beans>
|
与常规的spring配置几乎没啥区别,几个要点如下:
a) 因为分片调度是基于zk的,所以要先配置zk注册中心,其中${zk_address}大家可以改成实际的zk地址列表,比如:10.x.x.1:2181,10.x.x.2:2181,10.x.x.3:2181
b) 每个job中的corn属性,就是quartz中的cornExpress表达式,然后sharding-total-count即总分片数,而sharding-item-parameters则是指定每个分片中的具体参数
(注:刚才的电商每天晚上算销量,这个case其实只用到了分片索引、分片数,并不需要参数,所以这里随便配置一个类似0=A, 1=B就可以了,如果有些业务场景,希望在知道分片索引的同时,还希望额外传一些参数进来,就可以在这里配置你希望的参数,然后在execute中,也能读到相应的参数)
3、控制台
elastic-job还提供了一个不错的UI控制台,项目源代码git clone到本地,mvn install就能得到一个elastic-job-lite-console-${version}.tar.gz的包,解压,然后运行里面的bin/start.sh 就能跑起来,界面类似如下:

通过这个控制台,可以动态调整每个定时任务的触发时间(即:cornExpress)。详情可参考官网文档-运维平台部分。
4、与spring-cloud/spring-boot的整合
如果是传统的spring项目,按上面的步骤就可以无缝整合了,如果是spring-cloud/spring-boot,则稍微要复杂点。
由于spring-boot倡导零xml配置,所以大部分配置就用代码替代了,先定义一个elasticJob的配置类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@Data
@Configuration
public class ElasticJobConfig {
@Value ( "${rxQuartz.app.zkAddress}" )
private String zkNodes;
@Value ( "${rxQuartz.app.namespace}" )
private String namespace;
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(zkNodes, namespace);
}
@Bean (initMethod = "init" )
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
}
|
上面这段代码,主要是解决zk注册中心的注入问题,然后各种xxxJob,由于要让spring自动注入,需要打上component注解
1
2
3
4
5
|
@Component ( "xxxJob" )
public class XXXJob extends AbstractJob {
...
}
|
然后在真正要用的地方,把他们组装起来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: yangjunming
*/
@Configuration
public class ElasticJobs {
@Autowired
@Qualifier ( "xxxJob" )
public SimpleJob xxxJob;
@Autowired
private ZookeeperRegistryCenter regCenter;
@Bean (initMethod = "init" )
public JobScheduler settlementJobScheduler( @Autowired @Qualifier ( "xxxJob" ) SimpleJob simpleJob,
@Value ( "${xxxJob.billCronExpress}" ) final String cron,
@Value ( "${xxxJob.shardingCount}" ) int shardingCount,
@Value ( "${xxxJob.shardingItemParameters}" ) String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));
}
private LiteJobConfiguration getLiteJobConfiguration( final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder( new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite( true ).build();
}
}
|
大功告成,祝大家周末撸码愉快!
作者:菩提树下的杨过
出处:http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。