Elastic Job学习(二)启动分析
启动流程
作业的启动有如下2种方式:
public static void main(String[] args) throws IOException {
//启动方式1
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
//启动方式2
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
context.start();
System.in.read();
}
/**
* 作业配置
* @return
*/
private static LiteJobConfiguration createJobConfiguration() {
略
}
/**
* 注册中心
* @return
*/
private static CoordinatorRegistryCenter createRegistryCenter() {
略
}
1、启动
- 直接 new JobScheduler类,调用init()方法获取
- 通过spring启动时,先会解析相应的配置信息,并最终生成SpringJobScheduler实例,并设置其初始化方法init()
在JobScheduler的构造方法中,只是保存了Job实例需要的必要信息,如注册中心信息,作业信息等,且每一个Job都会生成一个JobScheduler实例。
执行Init方法,主要完成:
- job配置信息的持久化,即在ZK的config节点下生成相应子节点
- 将Job封装为quartz
- 通过调度器的门面类,注册作业启动信息
- 开始调度作业
/**
* 初始化作业.
*/
public void init() {
//保存Job配置信息
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
//设置job分片
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
//作业调度控制器,封装quartz
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
//启动监听器、选举、服务器、实例持久化
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
//cron设置,开始调度作业
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
2、开启监听
启动监听器,即注册相应的监听器,实际是通过监听zk节点的变化,来做相应的处理,注册完成之后,发起主节点选举
/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
//注册各种监听器
listenerManager.startAllListeners();
//发起选举
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
3、选举
选举是交给ZK来处理,选举的时候,会进行阻塞,直到选举完成;
选举完成之后,会调用回调LeaderElectionExecutionCallback,回调方法的作用,是会创建Leader/election/instance临时节点,该节点的值为作业服务器的IP+PID,代表该服务器为主节点
4、持久化服务器信息
在servers节点下,创建永久节点,名称为该作业服务器的IP,节点的值为空(如果该服务器被禁用,该节点的值为disabled,同时会触发相应的监听器,重新选举)
5、持久化实例信息
在instances节点下,创建临时节点,名称为该服务器的IP+PID
6、设置分片标识
创建临时节点leader/sharding/necessary,标识作业运行时,需要先分片
7、如果配置了监控端口,以及自诊断配置,则会启动相应监听器
8、设置任务的cron表达式,并启动job,job将按照设置的时间,等待调度。
启动过程中的几个对象:
JobRegistry: 单例模式,保存所有job的所有相关信息。
JobScheduleController:封装quartz的接口信息,Job实际处理的地方。
JobScheduler: 全部作业的入口
ShedulerFacade:调度服务的封装门店类
LitJobFacade:作业的门店类
类图: