【hadoop源码分析】——org.apache.hadoop.yarn.sls.SLSRunner
类内方法概览:
public static void main(String args[]) throws Exception
- 处理参数
bin/slsrun.sh --input-sls=$dir/my-input/sls-jobs.json --nodes=$dir/my-input/sls-nodes.json --output-dir=$dir/my-logs/ --track-jobs=job1_Siyi,job2_Siyi,job3_Siyi --print-simulation - 对于错误参数的处理
- 如果有–trackjobs参数的话,把这个参数的值存入
Set<String> trackedJobSet
- 创建SLSRunner类的对象并执行它的start()方法。
public void start() throws Exception
-
void startRM();
//新建rmConf对象,并为其设置scheduler类型和METRICS_OUTPUT_DIR
//new ResourceManager();将rmConf给他让他初始化,然后rm.start();- rm.start():进入started状态(该方法是AbstractService类里的方法)
-
void startNM();
//设置NM_HEARTBEAT_INTERVAL_MS、NM_MEMORY_MB、NM_VCORES。
//解析SLSTrace里的节点信息,然后放入nodeSet里。
//根据nodeSet创造一个个NMSimulator类的对象命名为nm,用上面的参数和rm初始化nm(初始化nm之后RM上就有了集群的资源情况了。比如多少节点、可用的内存和内核等)。并执行TaskRunner.schedule(nm),然后把这些节点所属的机架加入到rackSet。
//设置numNMs和numRacks。NMSimulator是继承于TaskRunner.Task
TaskRunner类:如下图:TaskRunner.schedule(nm)做的事情,就是按顺序把这些NMSimulator的对象放入一个DelayQueue(实质上是Queue<\Task>)中。
DelayQueue是一个BlockingQueue,其特化的参数是Delayed。
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。DelayQueue = BlockingQueue + PriorityQueue + Delayed
DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
转自:https://www.cnblogs.com/jobs/archive/2007/04/27/730255.html
-
startAM();
//设置AM_HEARTBEAT_INTERVAL_MS、CONTAINER_MEMORY_MB、CONTAINER_VCORES。
//解析SLSTrace里的节点信息,然后放入nodeSet里。
//判断是sls还是remen,执行各自的解析trace并启动应用AM的方法。如sls的话执行startAMFromSLSTraces(containerResource, heartbeatInterval);
//remainingApps = numAMs = amMap.size();- startAMFromSLSTraces(containerResource, heartbeatInterval);
//从sls trace 文件中解析工作负载的信息。
//对于每一个解析到的作业,创造AMSimulator类的对象amSim,并根据解析到的作业信息init。接着执行runner.schedule(amSim);然后执行amMap.put(oldAppId, amSim)建立起AMSimultor和appID的联系。
- startAMFromSLSTraces(containerResource, heartbeatInterval);
设置queue和tracked app 的信息
- printSimulationInfo()打印模拟的信息。像这样:
void waitForNodesRunning()
//阻塞,直到所有节点运行-
runner.start();
//runner是TaskRunner类的对象:private static TaskRunner runner = new TaskRunner();
//一旦一切准备就绪,就开始执行TaskRunner类的start();- TaskRunner. start()
- TaskRunner.schedule(Task task, long timeNow)
- TaskRunner. start()