Elastic Job学习(二)启动分析

启动流程
Elastic Job学习(二)启动分析
作业的启动有如下2种方式:

public static void main(String[] args) throws IOException {
        //启动方式1
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();

		//启动方式2
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
        context.start();
        System.in.read();
    }

    /**
     * 作业配置
     * @return
     */
    private static LiteJobConfiguration createJobConfiguration() {
        略
    }

    /**
     * 注册中心
     * @return
     */
    private static CoordinatorRegistryCenter createRegistryCenter() {
        略
    }

1、启动

  • 直接 new JobScheduler类,调用init()方法获取
  • 通过spring启动时,先会解析相应的配置信息,并最终生成SpringJobScheduler实例,并设置其初始化方法init()

在JobScheduler的构造方法中,只是保存了Job实例需要的必要信息,如注册中心信息,作业信息等,且每一个Job都会生成一个JobScheduler实例。
执行Init方法,主要完成:

  • job配置信息的持久化,即在ZK的config节点下生成相应子节点
  • 将Job封装为quartz
  • 通过调度器的门面类,注册作业启动信息
  • 开始调度作业
/**
     * 初始化作业.
     */
    public void init() {
        //保存Job配置信息
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        //设置job分片
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        
        //作业调度控制器,封装quartz
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());

        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        //启动监听器、选举、服务器、实例持久化
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        //cron设置,开始调度作业
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

2、开启监听
启动监听器,即注册相应的监听器,实际是通过监听zk节点的变化,来做相应的处理,注册完成之后,发起主节点选举

/**
     * 注册作业启动信息.
     * 
     * @param enabled 作业是否启用
     */
    public void registerStartUpInfo(final boolean enabled) {
    	//注册各种监听器
        listenerManager.startAllListeners();
        //发起选举
        leaderService.electLeader();
        serverService.persistOnline(enabled);
        instanceService.persistOnline();
        shardingService.setReshardingFlag();
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

3、选举
选举是交给ZK来处理,选举的时候,会进行阻塞,直到选举完成;
选举完成之后,会调用回调LeaderElectionExecutionCallback,回调方法的作用,是会创建Leader/election/instance临时节点,该节点的值为作业服务器的IP+PID,代表该服务器为主节点

4、持久化服务器信息
在servers节点下,创建永久节点,名称为该作业服务器的IP,节点的值为空(如果该服务器被禁用,该节点的值为disabled,同时会触发相应的监听器,重新选举)

5、持久化实例信息
在instances节点下,创建临时节点,名称为该服务器的IP+PID

6、设置分片标识
创建临时节点leader/sharding/necessary,标识作业运行时,需要先分片

7、如果配置了监控端口,以及自诊断配置,则会启动相应监听器

8、设置任务的cron表达式,并启动job,job将按照设置的时间,等待调度。

启动过程中的几个对象:
JobRegistry: 单例模式,保存所有job的所有相关信息。
JobScheduleController:封装quartz的接口信息,Job实际处理的地方。
JobScheduler: 全部作业的入口
ShedulerFacade:调度服务的封装门店类
LitJobFacade:作业的门店类
类图:
Elastic Job学习(二)启动分析