Hadoop3.2.0 源码分析: NodeManager
概念
NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。
构造图:
类图:
代码分解:
启动入口:
org.apache.hadoop.yarn.server.nodemanager.NodeManager#main
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Failed to start if we're a Unix based system but we don't have bash.
// Bash is necessary to launch containers under Unix-based systems.
if (!Shell.WINDOWS) {
if (!Shell.checkIsBashSupported()) {
String message =
"Failing NodeManager start since we're on a "
+ "Unix-based system but bash doesn't seem to be available.";
LOG.error(message);
throw new YarnRuntimeException(message);
}
}
// Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
}
// todo 增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
// System exit should be called only when NodeManager is instantiated from
// main() funtion
this.shouldExitOnShutdownEvent = true;
// todo 调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
this.init(conf);
// todo 启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
this.start();
} catch (Throwable t) {
LOG.error("Error starting NodeManager", t);
System.exit(-1);
}
}
接下来,核心的就是两个地方, 一个初始化各项组件服务, 一个启动各项组件服务
初始化
先说初始化各项组件服务, 这项最为关键, 里面会初始化各种服务.
代码如下:
@Override
protected void serviceInit(Configuration conf) throws Exception {
rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
try {
initAndStartRecoveryStore(conf);
} catch (IOException e) {
String recoveryDirName = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
throw new
YarnRuntimeException("Unable to initialize recovery directory at "
+ recoveryDirName, e);
}
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf, nmStore);
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM(nmStore);
recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
this.dirsHandler = new LocalDirsHandlerService(metrics);
boolean isDistSchedulingEnabled =
conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
ResourcePluginManager pluginManager = createResourcePluginManager();
pluginManager.initialize(context);
((NMContext)context).setResourcePluginManager(pluginManager);
// todo 始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法,
// todo 包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class
// todo 决定ContainerExecutor的实例, 默认为DefaultContainerExecutor.
ContainerExecutor exec = createContainerExecutor(conf);
try {
exec.init(context);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher
// todo NodeManager level dispatcher 异步分发器
this.dispatcher = createNMDispatcher();
// todo 可以通过此服务查询node是否健康,
// 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy
nodeHealthChecker =
new NodeHealthCheckerService(
getNodeHealthScriptRunner(conf), dirsHandler);
addService(nodeHealthChecker);
((NMContext)context).setContainerExecutor(exec);
((NMContext)context).setDeletionService(del);
// todo 创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态).
// todo 这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (nodeLabelsProvider != null) {
addIfService(nodeLabelsProvider);
nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
}
nodeAttributesProvider = createNodeAttributesProvider(conf);
if (nodeAttributesProvider != null) {
addIfService(nodeAttributesProvider);
nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}
// todo 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started)
nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
// todo 创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议,
// ContainerManager协议为APP向NodeManager通信的协议
containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
context);
addService(nmLogAggregationStatusTracker);
((NMContext)context).setNMLogAggregationStatusTracker(
this.nmLogAggregationStatusTracker);
// todo 创建webServer, 启动NodeManager的web服务.
// 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
((NMContext) context).setQueueableContainerAllocator(
new OpportunisticContainerAllocator(
context.getContainerTokenSecretManager()));
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
// todo 初始化监控
DefaultMetricsSystem.initialize("NodeManager");
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
this.nmCollectorService = createNMCollectorService(context);
addService(nmCollectorService);
}
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
nmStore.setNodeStatusUpdater(nodeStatusUpdater);
// Do secure login before calling init for added services.
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
registerMXBean();
super.serviceInit(conf);
// TODO add local dirs to del
}
启动各项服务:
// todo 重写自Service类,会调用serviceStart方法
// todo 当前服务状态不允许start时,会抛出ServiceStateException
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
//todo 启动服务
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
LOG.debug("Service {} is started", getName());
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
显示任务节点: