开源分布式调度DST
分布式调度的演化
互联网日新月异,业务也是越发的复杂,不容置疑调度(定时任务)成为了互联网业务系统中不可或缺的一部分。业务是催生技术变革的强大动力,高并发、大吞吐、高可用等也都在催动技术不停变革,对于调度也追求更高,分布式调度应用而生。那究竟什么是分布式调度?如下随着我的“栗子”,一起来演进。
业务背景
对用户数据进行异步实名认证,用户每天都有新的注册量,随着业务的增长,每天的注册量都在增长。用户表为user,认证初始状态(0)认证成功(1),认证失败(2),为了快速响应业务,程序员小黄用java编写了一个定时任务完美交差了,可是随着业务量的增长发现前一天的数据到了第二天都没有处理完,程序员小黄的老大找到他要求优化,小黄说修改一下调度频率就解决了,确实调度频率修改完之后也确实满足了当前的需求,就这样又跑了三个月,这个问题又复现了。小黄发现应用服务器和数据库的性能并没有达到瓶颈,就考虑采用线程池,一个线程池负责load数据,一组线程在处理数据,就这样优化后系统又在线上跑了一段时间。突然有一天公司领导找到小黄说系统目前没有提供服务,数据积压严重,而且公司马上要上市,数据量还会暴增。小王很快排查到是因为服务宕机了,小黄很快解决了问题,但是值得小黄深思的问题可真不少,小黄尝试引入分布式调度框架,满足未来的业务暴增、服务横向扩展、高可用等。
应用
单机版
部署了一个应用实例,里面有一段程序定时从user表中读取数据,对user表中的每个用户数据调用公安部接口进行实名认证,并且将认证之后的信息保存,最后将user中的状态字段从初始状态(0)更新为认证成功(1),如果失败更新为(2)
- 多实例版
部署了两台应用实例(A实例,B实例),分别定时从user表中读取数据,后续业务不变。其中读取数据的时候要保证数据不被重复读取,我们这里做了一个取模的方式,比如id除以2,如果余数为0的数据由A实例获取并执行,如果余数为1的数据由B实例获取并执行。
问题1、
如果要三个实例或者四个五个实例处理数据,应该怎么办?其实也很简单修改数据的取模方式,然后每个应用在上线。这样也就导致了应用的配置不同
问题2、
如果其中一个实例挂了,那部分数据谁来处理,如果上面的单机版基本上这个调度对应的业务也就废了?除非重启,或重新部署
- 分布式调度要做什么?
针对上面的场景,分布式调度框架要做到数据被分摊到多个实例上处理、数据不被重复处理,系统宕机后任务能够重新分配、数据暴增时应用可以横向扩展。有了这些功能上面的场景也就可以得到很好满足了。
DST概述
DST(Distributed scheduling task)是一个基于数据库为注册中心的轻量级的分布式调度框架,由java编写,几乎无依赖第三方jar,核心jar不超过100KB。框架设计旨在为提供一套集功能性、可靠性、易用性、效率性、可维护性、可移植性于一体的高质量分布式调度框架。现已开放全部源代码,并且提供了springboot、spring+mybatis+springmvc、java-application等多种集成方式,后续还会实现python等多种client核心和报警机制。
DST分为dst-client和dst-console两个部分,两部分无直接关系。dst-client是调度的核心,应用程序集成dst-client并实现加单配置就可以调度托管于dst-console。dst-console提供了应用和调度任务的管理,统计报表等功能,dst-console宕机不影响所有调度的正常运行。
DST特性
- 应用实例自动注册
DST应用实例自行注册,并且会保持心跳,自动探测实例存活。注册和管理中心单一,容易维护。
- 应用实例自动选主(HA)
DST应用实例会在启动过程中实现自动选主(Master),Master负责任务的分片,保障实例拿到任务的概率一致性。
- DST实例的自动分片
DST根据应用实例有Master实例负责对所有已经**的调度任务进行分片,分片规则可自行配置。
- 调度实现简单
只需要实现定义的接口就可以完成自定义方法的调度和高效调度。
- 调度中心化管理
管理平台可以实现所有任务的管理,可以对调度任务实现工台添加和启停,根据需要自动绑定服务器,动态修改调度时间配置不停歇,动态配置处理线程,动态修改调度数据量。
- 调度规则优化配置
调度规则支持标准的Cron表达式,也可以制定简单的时间间隔(比如:500s运行一次,只需要配置500)。
- 动态扩容
可根据业务容量不停服添加机器,自行获取调度分片,任意加入调度。
- 动态缩容
在部分服务宕机或者缩容的情况下,应用会重新进行选主,重新计算分片,对数据处理无阻碍。
- 调度统计
在线查看调度趋势图和调度明细,查看每个线程额处理结果和耗时。
- 架构优越
系统架构优越,更能节省服务器资源,每个DST实例统一的任务刷新、统计、清理线程,无额外资源损耗。应用基于spring-boot+maven构建。
架构设计
整体设计
控制台

快速入门
1、环境准备
JDK:1.8+
Mysql:5+
Maven:3+
git;2+
2、下载
git clone https://gitee.com/xiaoleiziemail/xlz-dst.git
源码下载后有两部分dst和dst-demo。dst里面是dst的核心代码,dst-demo是各种接入方式
对dst进行maven编译
3、应用集成dst-client
当前以spring+mybatis等工程为例,采用maven进行构建,jdk要求1.7+,当前以dst-demo中dst-ssm为例。
3.1添加依赖,当前稳定版本为1.0.1
<dependency>
<groupId>com.xlz</groupId>
<artifactId>dst-client</artifactId>
<version>1.0.1</version>
</dependency>
框架在设计之初就考虑了依赖性问题,所以尽量的没有使用到第三方的jar,不过还是用到了一些jar,如果出现jar冲突请自行排除,最好是如下配置:
<dependency>
<groupId>com.xlz</groupId>
<artifactId>dst-client</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
3.2、应用程序配置方面需要配置调度中心的数据库数据源:
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="${jdbc.driverClassName.db2}" />
<property name="url" value="${jdbc.url.db2}" />
<property name="username" value="${jdbc.username.db2}" />
<property name="password" value="${jdbc.password.db2}" />
<property name="maxActive" value="${jdbc.maxActive.db2}" />
<property name="initialSize" value="${jdbc.initialSize.db2}" />
<property name="maxWait" value="${jdbc.maxWait.db2}" />
<property name="maxIdle" value="${jdbc.maxIdle.db2}" />
<property name="testOnBorrow" value="${jdbc.testOnBorrow.db2}" />
<property name="validationQuery" value="select 1 from dual" />
<property name="defaultAutoCommit" value="false"></property>
</bean>
3.3、引入分布式调度框架
<bean id="dstManagerBeanFactory" class="com.xlz.manager.DstManagerBeanFactory">
<property name="dataSource" ref="dataSource"></property>
<property name="appNo" value="ssm"></property> <!--应用编号-->
</bean>
3.4、开发调度任务,需要继承AbstractWorker抽象类,里面主要由select方法和execute
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.test.bo.MyTestDO;
import com.test.service.MyTestService;
import com.xlz.domain.ShardItem;
import com.xlz.worker.AbstractWorker;
public class TestScheduleBean1 extends AbstractWorker<MyTestDO>{
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@Autowired
private MyTestService myTestService;
@Override
public List<MyTestDO> select() {
ShardItem[] items = getShardItems();
StringBuffer item = new StringBuffer();
for(int i =0 ;i < items.length;i++){
if(i > 0)
item.append(",");
item.append(items[i].getItemId());
}
System.out.println("当前任务:"+getAppTask().getId()+"==========当前分片:"+item.toString());
//获取数据加入到待处理单据队列
String tableName = getAppTask().getTaskParam();
List<MyTestDO> list = myTestService.
getMyTestList(getShardItemCount(),item.toString(),
getAppTask().getFetchDataNumber(),tableName);
/*if("my_test_1".equals(tableName)){
LOG.error("本次加载到的记录条数为:{},当前任务总分片数为:{},当前调度获取到的分片为拼接后:{}---原始为{}",list.size(),getShardItemCount(),item.toString(),getShardItems());
}*/
return list;
}
@Override
protected void execute(MyTestDO obj) throws Exception{
String tableName = getAppTask().getTaskParam();
myTestService.update(obj,tableName,getRegisterWorker());
}
public void test(){
LOG.info("============我是自定义方法=============="+Arrays.toString(getShardItems()));
}
}
到此应用配置结束,但此时未完成控制台的配置,调度不能执行。
4、dst-console安装使用
4.1、部署
在mysql中初始化dst和dst_console两个数据库,脚本在dst/sql目录中
当前dst-console采用springboot开发,jdk1.8+,编译完后可以直接命令行:
java -jar dst-console.jar
linux后台启动可以执行命令:
nohup java -jar dst-console.jar &
也可以打包成war丢进tomact中部署,要求tomcat1.8+
4.2、应用配置
初始化数据中我们看到有配置的应用数据,当前我们使用的ssm这个测试应用
4.3、任务配置
初始化数据中我们看到有配置的任务数据,当前我们使用的是
至此系统已经可以按照指定的调度时间进行调度。
4.4dst-console中还提供了丰富的报表功能
应用任务分布图:
实例分布图:
调度明细:
处理明细: