Hadoop Balancer源码解读
前言
最近在做一些Hadoop运维的相关工作,发现了一个有趣的问题,我们公司的Hadoop集群磁盘占比数值参差不齐,高的接近80%,低的接近40%,并没有充分利用好上面的资源,但是balance的操作跑的也是正常的啊,所以打算看一下Hadoop的balance的源代码,更深层次的去了解Hadoop Balance的机制。
Balancer和Distpatch
上面2个类的设计就是与Hadoop Balance操作最紧密联系的类,Balancer类负载找出<source, target>这样的起始,目标结果对,然后存入到Distpatch中,然后通过Distpatch进行分发,不过在分发之前,会进行block的验证,判断此block是否能被移动,这里会涉及到一些条件的判断,具体的可以通过后面的代码中的注释去发现。
在Balancer的最后阶段,会将source和Target加入到dispatcher中,详见下面的代码:
- /**
- *根据源节点和目标节点,构造任务对
- */
- privatevoidmatchSourceWithTargetToMove(Sourcesource,StorageGrouptarget){
- longsize=Math.min(source.availableSizeToMove(),target.availableSizeToMove());
- finalTasktask=newTask(target,size);
- source.addTask(task);
- target.incScheduledSize(task.getSize());
- //加入分发器中
- dispatcher.add(source,target);
- LOG.info("Decidedtomove"+StringUtils.byteDesc(size)+"bytesfrom"
- +source.getDisplayName()+"to"+target.getDisplayName());
- }
- /**
- *Foreachdatanode,choosematchingnodesfromthecandidates.Eitherthe
- *datanodesorthecandidatesaresourcenodeswith(utilization>Avg),and
- *theothersaretargetnodeswith(utilization<Avg).
- */
- private<GextendsStorageGroup,CextendsStorageGroup>
- voidchooseStorageGroups(Collection<G>groups,Collection<C>candidates,
- Matchermatcher){
- for(finalIterator<G>i=groups.iterator();i.hasNext();){
- finalGg=i.next();
- for(;choose4One(g,candidates,matcher););
- if(!g.hasSpaceForScheduling()){
- //如果候选节点没有空间调度,则直接移除掉
- i.remove();
- }
- }
- }
- /**
- *Decideall<source,target>pairsand
- *thenumberofbytestomovefromasourcetoatarget
- *Maximumbytestobemovedperstoragegroupis
- *min(1Bandworthofbytes,MAX_SIZE_TO_MOVE).
- *从源节点列表和目标节点列表中各自选择节点组成一个个对,选择顺序优先为同节点组,同机架,然后是针对所有
- *@returntotalnumberofbytestomoveinthisiteration
- */
- privatelongchooseStorageGroups(){
- //First,matchnodesonthesamenodegroupifclusterisnodegroupaware
- if(dispatcher.getCluster().isNodeGroupAware()){
- //首先匹配的条件是同节点组
- chooseStorageGroups(Matcher.SAME_NODE_GROUP);
- }
- //Then,matchnodesonthesamerack
- //然后是同机架
- chooseStorageGroups(Matcher.SAME_RACK);
- //Atlast,matchallremainingnodes
- //最后是匹配所有的节点
- chooseStorageGroups(Matcher.ANY_OTHER);
- returndispatcher.bytesToMove();
- }
最后核心的检验block块是否合适的代码为下面这个:
- /**
- *Decideiftheblockisagoodcandidatetobemovedfromsourcetotarget.
- *Ablockisagoodcandidateif
- *1.theblockisnotintheprocessofbeingmoved/hasnotbeenmoved;
- *移动的块不是正在被移动的块
- *2.theblockdoesnothaveareplicaonthetarget;
- *在目标节点上没有移动的block块
- *3.doingthemovedoesnotreducethenumberofracksthattheblockhas
- *移动之后,不同机架上的block块的数量应该是不变的.
- */
- privatebooleanisGoodBlockCandidate(Sourcesource,StorageGrouptarget,
- DBlockblock){
- if(source.storageType!=target.storageType){
- returnfalse;
- }
- //checkiftheblockismovedornot
- //如果所要移动的块是存在于正在被移动的块列表,则返回false
- if(movedBlocks.contains(block.getBlock())){
- returnfalse;
- }
- //如果移动的块已经存在于目标节点上,则返回false,将不会予以移动
- if(block.isLocatedOn(target)){
- returnfalse;
- }
- //如果开启了机架感知的配置,则目标节点不应该有相同的block
- if(cluster.isNodeGroupAware()
- &&isOnSameNodeGroupWithReplicas(target,block,source)){
- returnfalse;
- }
- //需要维持机架上的block块数量不变
- if(reduceNumOfRacks(source,target,block)){
- returnfalse;
- }
- returntrue;
- }
下面是Balancer.java和Dispatch.java类的完整代码解析:
Balancer.java:
- /**
- *LicensedtotheApacheSoftwareFoundation(ASF)underone
- *ormorecontributorlicenseagreements.SeetheNOTICEfile
- *distributedwiththisworkforadditionalinformation
- *regardingcopyrightownership.TheASFlicensesthisfile
- *toyouundertheApacheLicense,Version2.0(the
- *"License");youmaynotusethisfileexceptincompliance
- *withtheLicense.YoumayobtainacopyoftheLicenseat
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unlessrequiredbyapplicablelaworagreedtoinwriting,software
- *distributedundertheLicenseisdistributedonan"ASIS"BASIS,
- *WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
- *SeetheLicenseforthespecificlanguagegoverningpermissionsand
- *limitationsundertheLicense.
- */
- packageorg.apache.hadoop.hdfs.server.balancer;
- importstaticcom.google.common.base.Preconditions.checkArgument;
- importjava.io.IOException;
- importjava.io.PrintStream;
- importjava.net.URI;
- importjava.text.DateFormat;
- importjava.util.ArrayList;
- importjava.util.Arrays;
- importjava.util.Collection;
- importjava.util.Collections;
- importjava.util.Date;
- importjava.util.Formatter;
- importjava.util.Iterator;
- importjava.util.LinkedList;
- importjava.util.List;
- importjava.util.Set;
- importorg.apache.commons.logging.Log;
- importorg.apache.commons.logging.LogFactory;
- importorg.apache.hadoop.classification.InterfaceAudience;
- importorg.apache.hadoop.conf.Configuration;
- importorg.apache.hadoop.conf.Configured;
- importorg.apache.hadoop.fs.Path;
- importorg.apache.hadoop.hdfs.DFSConfigKeys;
- importorg.apache.hadoop.hdfs.DFSUtil;
- importorg.apache.hadoop.hdfs.HdfsConfiguration;
- importorg.apache.hadoop.hdfs.StorageType;
- importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
- importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
- importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
- importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
- importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
- importorg.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
- importorg.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
- importorg.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
- importorg.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
- importorg.apache.hadoop.hdfs.server.protocol.StorageReport;
- importorg.apache.hadoop.util.StringUtils;
- importorg.apache.hadoop.util.Time;
- importorg.apache.hadoop.util.Tool;
- importorg.apache.hadoop.util.ToolRunner;
- importcom.google.common.base.Preconditions;
- /**<p>ThebalancerisatoolthatbalancesdiskspaceusageonanHDFScluster
- *whensomedatanodesbecomefullorwhennewemptynodesjointhecluster.
- *Thetoolisdeployedasanapplicationprogramthatcanberunbythe
- *clusteradministratoronaliveHDFSclusterwhileapplications
- *addinganddeletingfiles.
- *
- *<p>SYNOPSIS
- *<pre>
- *Tostart:
- *bin/start-balancer.sh[-threshold<threshold>]
- *Example:bin/start-balancer.sh
- *startthebalancerwithadefaultthresholdof10%
- *bin/start-balancer.sh-threshold5
- *startthebalancerwithathresholdof5%
- *Tostop:
- *bin/stop-balancer.sh
- *</pre>
- *
- *<p>DESCRIPTION
- *<p>Thethresholdparameterisafractionintherangeof(1%,100%)witha
- *defaultvalueof10%.Thethresholdsetsatargetforwhetherthecluster
- *isbalanced.Aclusterisbalancedifforeachdatanode,theutilization
- *ofthenode(ratioofusedspaceatthenodetototalcapacityofthenode)
- *differsfromtheutilizationofthe(ratioofusedspaceinthecluster
- *tototalcapacityofthecluster)bynomorethanthethresholdvalue.
- *Thesmallerthethreshold,themorebalancedaclusterwillbecome.
- *Ittakesmoretimetorunthebalancerforsmallthresholdvalues.
- *Alsoforaverysmallthresholdtheclustermaynotbeabletoreachthe
- *balancedstatewhenapplicationswriteanddeletefilesconcurrently.
- *
- *<p>Thetoolmovesblocksfromhighlyutilizeddatanodestopoorly
- *utilizeddatanodesiteratively.Ineachiterationadatanodemovesor
- *receivesnomorethanthelesserof10Gbytesorthethresholdfraction
- *ofitscapacity.Eachiterationrunsnomorethan20minutes.
- *每次移动不超过10G大小,每次移动不超过20分钟。
- *Attheendofeachiteration,thebalancerobtainsupdateddatanodes
- *informationfromthenamenode.
- *
- *<p>Asystempropertythatlimitsthebalancer'suseofbandwidthis
- *definedinthedefaultconfigurationfile:
- *<pre>
- *<property>
- *<name>dfs.balance.bandwidthPerSec</name>
- *<value>1048576</value>
- *<description>Specifiesthemaximumbandwidththateachdatanode
- *canutilizeforthebalancingpurposeintermofthenumberofbytes
- *persecond.</description>
- *</property>
- *</pre>
- *
- *<p>Thispropertydeterminesthemaximumspeedatwhichablockwillbe
- *movedfromonedatanodetoanother.Thedefaultvalueis1MB/s.Thehigher
- *thebandwidth,thefasteraclustercanreachthebalancedstate,
- *butwithgreatercompetitionwithapplicationprocesses.Ifan
- *administratorchangesthevalueofthispropertyintheconfiguration
- *file,thechangeisobservedwhenHDFSisnextrestarted.
- *
- *<p>MONITERINGBALANCERPROGRESS
- *<p>Afterthebalancerisstarted,anoutputfilenamewherethebalancer
- *progresswillberecordedisprintedonthescreen.Theadministrator
- *canmonitortherunningofthebalancerbyreadingtheoutputfile.
- *Theoutputshowsthebalancer'sstatusiterationbyiteration.Ineach
- *iterationitprintsthestartingtime,theiterationnumber,thetotal
- *numberofbytesthathavebeenmovedinthepreviousiterations,
- *thetotalnumberofbytesthatarelefttomoveinorderforthecluster
- *tobebalanced,andthenumberofbytesthatarebeingmovedinthis
- *iteration.Normally"BytesAlreadyMoved"isincreasingwhile"BytesLeft
- *ToMove"isdecreasing.
- *
- *<p>RunningmultipleinstancesofthebalancerinanHDFSclusteris
- *prohibitedbythetool.
- *
- *<p>Thebalancerautomaticallyexitswhenanyofthefollowingfive
- *conditionsissatisfied:
- *<ol>
- *<li>Theclusterisbalanced;
- *<li>Noblockcanbemoved;
- *<li>Noblockhasbeenmovedforfiveconsecutive(连续)iterations;
- *<li>AnIOExceptionoccurswhilecommunicatingwiththenamenode;
- *<li>Anotherbalancerisrunning.
- *</ol>
- *下面5种情况会导致Balance操作的失败
- *1、整个集群已经达到平衡状态
- *2、经过计算发现没有可以被移动的block块
- *3、在连续5次的迭代中,没有block块被移动
- *4、当datanode节点与namenode节点通信的时候,发生IO异常
- *5、已经存在一个Balance操作
- *
- *<p>Uponexit,abalancerreturnsanexitcodeandprintsoneofthe
- *followingmessagestotheoutputfileincorrespondingtotheaboveexit
- *reasons:
- *<ol>
- *<li>Theclusterisbalanced.Exiting
- *<li>Noblockcanbemoved.Exiting...
- *<li>Noblockhasbeenmovedfor5iterations.Exiting...
- *<li>ReceivedanIOexception:failurereason.Exiting...
- *<li>Anotherbalancerisrunning.Exiting...
- *</ol>
- *在下面的5种情况下,balancer操作会自动退出
- *1、整个集群已经达到平衡的状态
- *2、经过计算发现没有可以被移动block块
- *3、在5论的迭代没有block被移动
- *4、接收端发生了I异常
- *5、已经存在一个balanr进程在工作
- *
- *<p>Theadministratorcaninterrupttheexecutionofthebalanceratany
- *timebyrunningthecommand"stop-balancer.sh"onthemachinewherethe
- *balancerisrunning.
- */
- @InterfaceAudience.Private
- publicclassBalancer{
- staticfinalLogLOG=LogFactory.getLog(Balancer.class);
- privatestaticfinalPathBALANCER_ID_PATH=newPath("/system/balancer.id");
- privatestaticfinallongGB=1L<<30;//1GB
- privatestaticfinallongMAX_SIZE_TO_MOVE=10*GB;
- privatestaticfinalStringUSAGE="Usage:java"
- +Balancer.class.getSimpleName()
- +"\n\t[-policy<policy>]\tthebalancingpolicy:"
- +BalancingPolicy.Node.INSTANCE.getName()+"or"
- +BalancingPolicy.Pool.INSTANCE.getName()
- +"\n\t[-threshold<threshold>]\tPercentageofdiskcapacity"
- +"\n\t[-exclude[-f<hosts-file>|comma-speratedlistofhosts]]"
- +"\tExcludesthespecifieddatanodes."
- +"\n\t[-include[-f<hosts-file>|comma-speratedlistofhosts]]"
- +"\tIncludesonlythespecifieddatanodes.";
- privatefinalDispatcherdispatcher;
- privatefinalBalancingPolicypolicy;
- privatefinaldoublethreshold;
- //alldatanodelists
- //四种datanode节点类型
- privatefinalCollection<Source>overUtilized=newLinkedList<Source>();
- privatefinalCollection<Source>aboveAvgUtilized=newLinkedList<Source>();
- privatefinalCollection<StorageGroup>belowAvgUtilized
- =newLinkedList<StorageGroup>();
- privatefinalCollection<StorageGroup>underUtilized
- =newLinkedList<StorageGroup>();
- /*CheckthatthisBalanceriscompatible(兼容)withtheBlockPlacementPolicy
- *usedbytheNamenode.
- *检测此balancer均衡工具是否于与目前的namenode节点所用的block存放策略相兼容
- */
- privatestaticvoidcheckReplicationPolicyCompatibility(Configurationconf
- )throwsUnsupportedActionException{
- if(!(BlockPlacementPolicy.getInstance(conf,null,null,null)instanceof
- BlockPlacementPolicyDefault)){
- thrownewUnsupportedActionException(
- //如果不兼容则抛异常
- "BalancerwithoutBlockPlacementPolicyDefault");
- }
- }
- /**
- *Constructabalancer.
- *Initializebalancer.Itsetsthevalueofthethreshold,and
- *buildsthecommunicationproxiesto
- *namenodeasaclientandasecondarynamenodeandretryproxies
- *whenconnectionfails.
- *
- *构造一个balancer均衡器,设置threshold值,读取配置中的分发线程数的值
- */
- Balancer(NameNodeConnectortheblockpool,Parametersp,Configurationconf){
- finallongmovedWinWidth=conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
- //移动线程数
- finalintmoverThreads=conf.getInt(
- DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
- //分发线程数
- finalintdispatcherThreads=conf.getInt(
- DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
- finalintmaxConcurrentMovesPerNode=conf.getInt(
- DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
- DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
- this.dispatcher=newDispatcher(theblockpool,p.nodesToBeIncluded,
- p.nodesToBeExcluded,movedWinWidth,moverThreads,dispatcherThreads,
- maxConcurrentMovesPerNode,conf);
- this.threshold=p.threshold;
- this.policy=p.policy;
- }
- /**
- *获取节点总容量大小
- */
- privatestaticlonggetCapacity(DatanodeStorageReportreport,StorageTypet){
- longcapacity=0L;
- for(StorageReportr:report.getStorageReports()){
- if(r.getStorage().getStorageType()==t){
- capacity+=r.getCapacity();
- }
- }
- returncapacity;
- }
- /**
- *获取节点剩余可用容量大小
- */
- privatestaticlonggetRemaining(DatanodeStorageReportreport,StorageTypet){
- longremaining=0L;
- for(StorageReportr:report.getStorageReports()){
- if(r.getStorage().getStorageType()==t){
- remaining+=r.getRemaining();
- }
- }
- returnremaining;
- }
- /**
- *Givenadatanodestorageset,buildanetworktopologyanddecide
- *over-utilizedstorages,aboveaverageutilizedstorages,
- *belowaverageutilizedstorages,andunderutilizedstorages.
- *Theinputdatanodestoragesetisshuffledinordertorandomize
- *tothestoragematchinglateron.
- *给定一个datanode集合,创建一个网络拓扑逻辑并划分出过度使用,使用率超出平均值,
- *低于平均值的,以及未能充分利用资源的4种类型
- *
- *@returnthenumberofbytesneededtomoveinordertobalancethecluster.
- */
- privatelonginit(List<DatanodeStorageReport>reports){
- //计算平均使用率
- for(DatanodeStorageReportr:reports){
- //累加每个节点上的使用空间
- policy.accumulateSpaces(r);
- }
- //计算出平均值
- policy.initAvgUtilization();
- longoverLoadedBytes=0L,underLoadedBytes=0L;
- //进行使用率等级的划分,总共4种,over-utilized,above-average,below-averageandunder-utilized
- for(DatanodeStorageReportr:reports){
- finalDDatanodedn=dispatcher.newDatanode(r);
- for(StorageTypet:StorageType.asList()){
- finalDoubleutilization=policy.getUtilization(r,t);
- if(utilization==null){//datanodedoesnothavesuchstoragetype
- continue;
- }
- finallongcapacity=getCapacity(r,t);
- finaldoubleutilizationDiff=utilization-policy.getAvgUtilization(t);
- finaldoublethresholdDiff=Math.abs(utilizationDiff)-threshold;
- //计算理论上最大移动空间
- finallongmaxSize2Move=computeMaxSize2Move(capacity,
- getRemaining(r,t),utilizationDiff,threshold);
- finalStorageGroupg;
- if(utilizationDiff>0){
- //使用率超出平均值,加入发送节点列表中
- finalSources=dn.addSource(t,maxSize2Move,dispatcher);
- if(thresholdDiff<=0){//withinthreshold
- //如果在threshold范围之内,则加入aboveAvgUtilized
- aboveAvgUtilized.add(s);
- }else{
- //否则加入overUtilized,并计算超出空间
- overLoadedBytes+=precentage2bytes(thresholdDiff,capacity);
- overUtilized.add(s);
- }
- g=s;
- }else{
- //与上面的相反
- g=dn.addStorageGroup(t,maxSize2Move);
- if(thresholdDiff<=0){//withinthreshold
- belowAvgUtilized.add(g);
- }else{
- underLoadedBytes+=precentage2bytes(thresholdDiff,capacity);
- underUtilized.add(g);
- }
- }
- dispatcher.getStorageGroupMap().put(g);
- }
- }
- logUtilizationCollections();
- Preconditions.checkState(dispatcher.getStorageGroupMap().size()
- ==overUtilized.size()+underUtilized.size()+aboveAvgUtilized.size()
- +belowAvgUtilized.size(),
- "Mismatchednumberofstoragegroups");
- //returnnumberofbytestobemovedinordertomaketheclusterbalanced
- returnMath.max(overLoadedBytes,underLoadedBytes);
- }
- privatestaticlongcomputeMaxSize2Move(finallongcapacity,finallongremaining,
- finaldoubleutilizationDiff,finaldoublethreshold){
- finaldoublediff=Math.min(threshold,Math.abs(utilizationDiff));
- longmaxSizeToMove=precentage2bytes(diff,capacity);
- if(utilizationDiff<0){
- maxSizeToMove=Math.min(remaining,maxSizeToMove);
- }
- returnMath.min(MAX_SIZE_TO_MOVE,maxSizeToMove);
- }
- privatestaticlongprecentage2bytes(doubleprecentage,longcapacity){
- Preconditions.checkArgument(precentage>=0,
- "precentage="+precentage+"<0");
- return(long)(precentage*capacity/100.0);
- }
- /*logtheoverutilized&underutilizednodes*/
- privatevoidlogUtilizationCollections(){
- logUtilizationCollection("over-utilized",overUtilized);
- if(LOG.isTraceEnabled()){
- logUtilizationCollection("above-average",aboveAvgUtilized);
- logUtilizationCollection("below-average",belowAvgUtilized);
- }
- logUtilizationCollection("underutilized",underUtilized);
- }
- privatestatic<TextendsStorageGroup>
- voidlogUtilizationCollection(Stringname,Collection<T>items){
- LOG.info(items.size()+""+name+":"+items);
- }
- /**
- *Decideall<source,target>pairsand
- *thenumberofbytestomovefromasourcetoatarget
- *Maximumbytestobemovedperstoragegroupis
- *min(1Bandworthofbytes,MAX_SIZE_TO_MOVE).
- *从源节点列表和目标节点列表中各自选择节点组成一个个对,选择顺序优先为同节点组,同机架,然后是针对所有
- *@returntotalnumberofbytestomoveinthisiteration
- */
- privatelongchooseStorageGroups(){
- //First,matchnodesonthesamenodegroupifclusterisnodegroupaware
- if(dispatcher.getCluster().isNodeGroupAware()){
- //首先匹配的条件是同节点组
- chooseStorageGroups(Matcher.SAME_NODE_GROUP);
- }
- //Then,matchnodesonthesamerack
- //然后是同机架
- chooseStorageGroups(Matcher.SAME_RACK);
- //Atlast,matchallremainingnodes
- //最后是匹配所有的节点
- chooseStorageGroups(Matcher.ANY_OTHER);
- returndispatcher.bytesToMove();
- }
- /**Decideall<source,target>pairsaccordingtothematcher.*/
- privatevoidchooseStorageGroups(finalMatchermatcher){
- /*firststep:matcheachoverUtilizeddatanode(source)to
- *oneormoreunderUtilizeddatanodes(targets).
- *
- *把over组的数据移动under组中
- */
- chooseStorageGroups(overUtilized,underUtilized,matcher);
- /*matcheachremainingoverutilizeddatanode(source)to
- *belowaverageutilizeddatanodes(targets).
- *Noteonlyoverutilizeddatanodesthathaven'thadthatmaxbytestomove
- *satisfiedinstep1areselected
- *把over组的数据移动到below
- */
- chooseStorageGroups(overUtilized,belowAvgUtilized,matcher);
- /*matcheachremainingunderutilizeddatanode(target)to
- *aboveaverageutilizeddatanodes(source).
- *Noteonlyunderutilizeddatanodesthathavenothadthatmaxbytesto
- *movesatisfiedinstep1areselected.
- *
- *然后,再把under组的数据移动一部分到above组中
- */
- chooseStorageGroups(underUtilized,aboveAvgUtilized,matcher);
- }
- /**
- *Foreachdatanode,choosematchingnodesfromthecandidates.Eitherthe
- *datanodesorthecandidatesaresourcenodeswith(utilization>Avg),and
- *theothersaretargetnodeswith(utilization<Avg).
- */
- private<GextendsStorageGroup,CextendsStorageGroup>
- voidchooseStorageGroups(Collection<G>groups,Collection<C>candidates,
- Matchermatcher){
- for(finalIterator<G>i=groups.iterator();i.hasNext();){
- finalGg=i.next();
- for(;choose4One(g,candidates,matcher););
- if(!g.hasSpaceForScheduling()){
- //如果候选节点没有空间调度,则直接移除掉
- i.remove();
- }
- }
- }
- /**
- *Forthegivendatanode,chooseacandidateandthenscheduleit.
- *@returntrueifacandidateischosen;falseifnocandidatesischosen.
- */
- private<CextendsStorageGroup>booleanchoose4One(StorageGroupg,
- Collection<C>candidates,Matchermatcher){
- finalIterator<C>i=candidates.iterator();
- finalCchosen=chooseCandidate(g,i,matcher);
- if(chosen==null){
- returnfalse;
- }
- if(ginstanceofSource){
- matchSourceWithTargetToMove((Source)g,chosen);
- }else{
- matchSourceWithTargetToMove((Source)chosen,g);
- }
- if(!chosen.hasSpaceForScheduling()){
- i.remove();
- }
- returntrue;
- }
- /**
- *根据源节点和目标节点,构造任务对
- */
- privatevoidmatchSourceWithTargetToMove(Sourcesource,StorageGrouptarget){
- longsize=Math.min(source.availableSizeToMove(),target.availableSizeToMove());
- finalTasktask=newTask(target,size);
- source.addTask(task);
- target.incScheduledSize(task.getSize());
- //加入分发器中
- dispatcher.add(source,target);
- LOG.info("Decidedtomove"+StringUtils.byteDesc(size)+"bytesfrom"
- +source.getDisplayName()+"to"+target.getDisplayName());
- }
- /**Chooseacandidateforthegivendatanode.*/
- private<GextendsStorageGroup,CextendsStorageGroup>
- CchooseCandidate(Gg,Iterator<C>candidates,Matchermatcher){
- if(g.hasSpaceForScheduling()){
- for(;candidates.hasNext();){
- finalCc=candidates.next();
- if(!c.hasSpaceForScheduling()){
- candidates.remove();
- }elseif(matcher.match(dispatcher.getCluster(),
- g.getDatanodeInfo(),c.getDatanodeInfo())){
- //如果满足匹配的条件,则返回值
- returnc;
- }
- }
- }
- returnnull;
- }
- /*resetallfieldsinabalancerpreparingforthenextiteration*/
- privatevoidresetData(Configurationconf){
- this.overUtilized.clear();
- this.aboveAvgUtilized.clear();
- this.belowAvgUtilized.clear();
- this.underUtilized.clear();
- this.policy.reset();
- dispatcher.reset(conf);;
- }
- /**Runaniterationforalldatanodes.*/
- privateExitStatusrun(intiteration,Formatterformatter,
- Configurationconf){
- try{
- finalList<DatanodeStorageReport>reports=dispatcher.init();
- finallongbytesLeftToMove=init(reports);
- if(bytesLeftToMove==0){
- System.out.println("Theclusterisbalanced.Exiting...");
- returnExitStatus.SUCCESS;
- }else{
- LOG.info("Needtomove"+StringUtils.byteDesc(bytesLeftToMove)
- +"tomaketheclusterbalanced.");
- }
- /*Decideallthenodesthatwillparticipateintheblockmoveand
- *thenumberofbytesthatneedtobemovedfromonenodetoanother
- *inthisiteration.Maximumbytestobemovedpernodeis
- *Min(1Bandworthofbytes,MAX_SIZE_TO_MOVE).
- */
- finallongbytesToMove=chooseStorageGroups();
- if(bytesToMove==0){
- System.out.println("Noblockcanbemoved.Exiting...");
- returnExitStatus.NO_MOVE_BLOCK;
- }else{
- LOG.info("Willmove"+StringUtils.byteDesc(bytesToMove)+
- "inthisiteration");
- }
- formatter.format("%-24s%10d%19s%18s%17s%n",
- DateFormat.getDateTimeInstance().format(newDate()),
- iteration,
- StringUtils.byteDesc(dispatcher.getBytesMoved()),
- StringUtils.byteDesc(bytesLeftToMove),
- StringUtils.byteDesc(bytesToMove)
- );
- /*Foreachpairof<source,target>,startathreadthatrepeatedly
- *decideablocktobemovedanditsproxysource,
- *theninitiatesthemoveuntilallbytesaremovedornomoreblock
- *availabletomove.
- *Exitnobytehasbeenmovedfor5consecutiveiterations.
- *
- *如果发现在5次连续的迭代中还是没有字节被移动,则退出
- */
- if(!dispatcher.dispatchAndCheckContinue()){
- returnExitStatus.NO_MOVE_PROGRESS;
- }
- returnExitStatus.IN_PROGRESS;
- }catch(IllegalArgumentExceptione){
- System.out.println(e+".Exiting...");
- returnExitStatus.ILLEGAL_ARGUMENTS;
- }catch(IOExceptione){
- System.out.println(e+".Exiting...");
- returnExitStatus.IO_EXCEPTION;
- }catch(InterruptedExceptione){
- System.out.println(e+".Exiting...");
- returnExitStatus.INTERRUPTED;
- }finally{
- dispatcher.shutdownNow();
- }
- }
- /**
- *Balanceallnamenodes.
- *Foreachiteration,
- *foreachnamenode,
- *executea{@linkBalancer}toworkthroughalldatanodesonce.
- *
- *开放给外部调用的run方法
- */
- staticintrun(Collection<URI>namenodes,finalParametersp,
- Configurationconf)throwsIOException,InterruptedException{
- finallongsleeptime=2000*conf.getLong(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
- LOG.info("namenodes="+namenodes);
- LOG.info("parameters="+p);
- finalFormatterformatter=newFormatter(System.out);
- System.out.println("TimeStampIteration#BytesAlreadyMovedBytesLeftToMoveBytesBeingMoved");
- finalList<NameNodeConnector>connectors
- =newArrayList<NameNodeConnector>(namenodes.size());
- try{
- for(URIuri:namenodes){
- finalNameNodeConnectornnc=newNameNodeConnector(
- Balancer.class.getSimpleName(),uri,BALANCER_ID_PATH,conf);
- nnc.getKeyManager().startBlockKeyUpdater();
- connectors.add(nnc);
- }
- booleandone=false;
- for(intiteration=0;!done;iteration++){
- done=true;
- Collections.shuffle(connectors);
- for(NameNodeConnectornnc:connectors){
- //初始化均衡器具
- finalBalancerb=newBalancer(nnc,p,conf);
- //均衡器执行balance操作
- finalExitStatusr=b.run(iteration,formatter,conf);
- //cleanalllists
- b.resetData(conf);
- if(r==ExitStatus.IN_PROGRESS){
- done=false;
- }elseif(r!=ExitStatus.SUCCESS){
- //mustbeanerrorstatue,return.
- returnr.getExitCode();
- }
- }
- if(!done){
- Thread.sleep(sleeptime);
- }
- }
- }finally{
- for(NameNodeConnectornnc:connectors){
- nnc.close();
- }
- }
- returnExitStatus.SUCCESS.getExitCode();
- }
- /*GivenelaspedTimeinms,returnaprintablestring*/
- privatestaticStringtime2Str(longelapsedTime){
- Stringunit;
- doubletime=elapsedTime;
- if(elapsedTime<1000){
- unit="milliseconds";
- }elseif(elapsedTime<60*1000){
- unit="seconds";
- time=time/1000;
- }elseif(elapsedTime<3600*1000){
- unit="minutes";
- time=time/(60*1000);
- }else{
- unit="hours";
- time=time/(3600*1000);
- }
- returntime+""+unit;
- }
- staticclassParameters{
- staticfinalParametersDEFAULT=newParameters(
- BalancingPolicy.Node.INSTANCE,10.0,
- Collections.<String>emptySet(),Collections.<String>emptySet());
- finalBalancingPolicypolicy;
- finaldoublethreshold;
- //excludethenodesinthissetfrombalancingoperations
- Set<String>nodesToBeExcluded;
- //includeonlythesenodesinbalancingoperations
- Set<String>nodesToBeIncluded;
- Parameters(BalancingPolicypolicy,doublethreshold,
- Set<String>nodesToBeExcluded,Set<String>nodesToBeIncluded){
- this.policy=policy;
- this.threshold=threshold;
- this.nodesToBeExcluded=nodesToBeExcluded;
- this.nodesToBeIncluded=nodesToBeIncluded;
- }
- @Override
- publicStringtoString(){
- returnBalancer.class.getSimpleName()+"."+getClass().getSimpleName()
- +"["+policy+",threshold="+threshold+
- ",numberofnodestobeexcluded="+nodesToBeExcluded.size()+
- ",numberofnodestobeincluded="+nodesToBeIncluded.size()+"]";
- }
- }
- staticclassCliextendsConfiguredimplementsTool{
- /**
- *ParseargumentsandthenrunBalancer.
- *
- *@paramargscommandspecificarguments.
- *@returnexitcode.0indicatessuccess,non-zeroindicatesfailure.
- */
- @Override
- publicintrun(String[]args){
- finallongstartTime=Time.now();
- finalConfigurationconf=getConf();
- try{
- checkReplicationPolicyCompatibility(conf);
- finalCollection<URI>namenodes=DFSUtil.getNsServiceRpcUris(conf);
- returnBalancer.run(namenodes,parse(args),conf);
- }catch(IOExceptione){
- System.out.println(e+".Exiting...");
- returnExitStatus.IO_EXCEPTION.getExitCode();
- }catch(InterruptedExceptione){
- System.out.println(e+".Exiting...");
- returnExitStatus.INTERRUPTED.getExitCode();
- }finally{
- System.out.format("%-24s",DateFormat.getDateTimeInstance().format(newDate()));
- System.out.println("Balancingtook"+time2Str(Time.now()-startTime));
- }
- }
- /**parsecommandlinearguments*/
- staticParametersparse(String[]args){
- BalancingPolicypolicy=Parameters.DEFAULT.policy;
- doublethreshold=Parameters.DEFAULT.threshold;
- Set<String>nodesTobeExcluded=Parameters.DEFAULT.nodesToBeExcluded;
- Set<String>nodesTobeIncluded=Parameters.DEFAULT.nodesToBeIncluded;
- if(args!=null){
- try{
- for(inti=0;i<args.length;i++){
- if("-threshold".equalsIgnoreCase(args[i])){
- checkArgument(++i<args.length,
- "Thresholdvalueismissing:args="+Arrays.toString(args));
- try{
- threshold=Double.parseDouble(args[i]);
- if(threshold<1||threshold>100){
- thrownewIllegalArgumentException(
- "Numberoutofrange:threshold="+threshold);
- }
- LOG.info("Usingathresholdof"+threshold);
- }catch(IllegalArgumentExceptione){
- System.err.println(
- "Expectinganumberintherangeof[1.0,100.0]:"
- +args[i]);
- throwe;
- }
- }elseif("-policy".equalsIgnoreCase(args[i])){
- checkArgument(++i<args.length,
- "Policyvalueismissing:args="+Arrays.toString(args));
- try{
- policy=BalancingPolicy.parse(args[i]);
- }catch(IllegalArgumentExceptione){
- System.err.println("Illegalpolicyname:"+args[i]);
- throwe;
- }
- }elseif("-exclude".equalsIgnoreCase(args[i])){
- checkArgument(++i<args.length,
- "Listofnodestoexclude|-f<filename>ismissing:args="
- +Arrays.toString(args));
- if("-f".equalsIgnoreCase(args[i])){
- checkArgument(++i<args.length,
- "Filecontainingnodestoexcludeisnotspecified:args="
- +Arrays.toString(args));
- nodesTobeExcluded=Util.getHostListFromFile(args[i],"exclude");
- }else{
- nodesTobeExcluded=Util.parseHostList(args[i]);
- }
- }elseif("-include".equalsIgnoreCase(args[i])){
- checkArgument(++i<args.length,
- "Listofnodestoinclude|-f<filename>ismissing:args="
- +Arrays.toString(args));
- if("-f".equalsIgnoreCase(args[i])){
- checkArgument(++i<args.length,
- "Filecontainingnodestoincludeisnotspecified:args="
- +Arrays.toString(args));
- nodesTobeIncluded=Util.getHostListFromFile(args[i],"include");
- }else{
- nodesTobeIncluded=Util.parseHostList(args[i]);
- }
- }else{
- thrownewIllegalArgumentException("args="
- +Arrays.toString(args));
- }
- }
- checkArgument(nodesTobeExcluded.isEmpty()||nodesTobeIncluded.isEmpty(),
- "-excludeand-includeoptionscannotbespecifiedtogether.");
- }catch(RuntimeExceptione){
- printUsage(System.err);
- throwe;
- }
- }
- returnnewParameters(policy,threshold,nodesTobeExcluded,nodesTobeIncluded);
- }
- privatestaticvoidprintUsage(PrintStreamout){
- out.println(USAGE+"\n");
- }
- }
- /**
- *Runabalancer
- *@paramargsCommandlinearguments
- */
- publicstaticvoidmain(String[]args){
- if(DFSUtil.parseHelpArgument(args,USAGE,System.out,true)){
- System.exit(0);
- }
- try{
- System.exit(ToolRunner.run(newHdfsConfiguration(),newCli(),args));
- }catch(Throwablee){
- LOG.error("Exitingbalancerdueanexception",e);
- System.exit(-1);
- }
- }
- }
- /**
- *LicensedtotheApacheSoftwareFoundation(ASF)underone
- *ormorecontributorlicenseagreements.SeetheNOTICEfile
- *distributedwiththisworkforadditionalinformation
- *regardingcopyrightownership.TheASFlicensesthisfile
- *toyouundertheApacheLicense,Version2.0(the
- *"License");youmaynotusethisfileexceptincompliance
- *withtheLicense.YoumayobtainacopyoftheLicenseat
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unlessrequiredbyapplicablelaworagreedtoinwriting,software
- *distributedundertheLicenseisdistributedonan"ASIS"BASIS,
- *WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
- *SeetheLicenseforthespecificlanguagegoverningpermissionsand
- *limitationsundertheLicense.
- */
- packageorg.apache.hadoop.hdfs.server.balancer;
- importstaticorg.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
- importjava.io.BufferedInputStream;
- importjava.io.BufferedOutputStream;
- importjava.io.DataInputStream;
- importjava.io.DataOutputStream;
- importjava.io.IOException;
- importjava.io.InputStream;
- importjava.io.OutputStream;
- importjava.net.Socket;
- importjava.util.ArrayList;
- importjava.util.Arrays;
- importjava.util.Collection;
- importjava.util.EnumMap;
- importjava.util.HashMap;
- importjava.util.HashSet;
- importjava.util.Iterator;
- importjava.util.List;
- importjava.util.Map;
- importjava.util.Set;
- importjava.util.concurrent.ExecutionException;
- importjava.util.concurrent.ExecutorService;
- importjava.util.concurrent.Executors;
- importjava.util.concurrent.Future;
- importjava.util.concurrent.atomic.AtomicLong;
- importorg.apache.commons.logging.Log;
- importorg.apache.commons.logging.LogFactory;
- importorg.apache.hadoop.classification.InterfaceAudience;
- importorg.apache.hadoop.conf.Configuration;
- importorg.apache.hadoop.fs.CommonConfigurationKeys;
- importorg.apache.hadoop.hdfs.DFSUtil;
- importorg.apache.hadoop.hdfs.StorageType;
- importorg.apache.hadoop.hdfs.protocol.Block;
- importorg.apache.hadoop.hdfs.protocol.DatanodeInfo;
- importorg.apache.hadoop.hdfs.protocol.ExtendedBlock;
- importorg.apache.hadoop.hdfs.protocol.HdfsConstants;
- importorg.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
- importorg.apache.hadoop.hdfs.protocol.datatransfer.Sender;
- importorg.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
- importorg.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
- importorg.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
- importorg.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
- importorg.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
- importorg.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
- importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
- importorg.apache.hadoop.hdfs.server.common.HdfsServerConstants;
- importorg.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
- importorg.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
- importorg.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
- importorg.apache.hadoop.io.IOUtils;
- importorg.apache.hadoop.net.NetUtils;
- importorg.apache.hadoop.net.NetworkTopology;
- importorg.apache.hadoop.security.token.Token;
- importorg.apache.hadoop.util.HostsFileReader;
- importorg.apache.hadoop.util.StringUtils;
- importorg.apache.hadoop.util.Time;
- importcom.google.common.base.Preconditions;
- /**Dispatchingblockreplicamovesbetweendatanodes.*/
- @InterfaceAudience.Private
- publicclassDispatcher{
- staticfinalLogLOG=LogFactory.getLog(Dispatcher.class);
- privatestaticfinallongGB=1L<<30;//1GB
- privatestaticfinallongMAX_BLOCKS_SIZE_TO_FETCH=2*GB;
- privatestaticfinalintMAX_NO_PENDING_MOVE_ITERATIONS=5;
- privatestaticfinallongDELAY_AFTER_ERROR=10*1000L;//10seconds
- privatefinalNameNodeConnectornnc;
- privatefinalSaslDataTransferClientsaslClient;
- /**Setofdatanodestobeexcluded.*/
- privatefinalSet<String>excludedNodes;
- /**Restricttothefollowingnodes.*/
- privatefinalSet<String>includedNodes;
- privatefinalCollection<Source>sources=newHashSet<Source>();
- privatefinalCollection<StorageGroup>targets=newHashSet<StorageGroup>();
- privatefinalGlobalBlockMapglobalBlocks=newGlobalBlockMap();
- privatefinalMovedBlocks<StorageGroup>movedBlocks;
- /**Map(datanodeUuid,storageType->StorageGroup)*/
- privatefinalStorageGroupMapstorageGroupMap=newStorageGroupMap();
- privateNetworkTopologycluster;
- privatefinalExecutorServicemoveExecutor;
- privatefinalExecutorServicedispatchExecutor;
- /**Themaximumnumberofconcurrentblocksmovesatadatanode*/
- privatefinalintmaxConcurrentMovesPerNode;
- privatefinalAtomicLongbytesMoved=newAtomicLong();
- privatestaticclassGlobalBlockMap{
- privatefinalMap<Block,DBlock>map=newHashMap<Block,DBlock>();
- /**
- *Gettheblockfromthemap;
- *iftheblockisnotfound,createanewblockandputitinthemap.
- */
- privateDBlockget(Blockb){
- DBlockblock=map.get(b);
- if(block==null){
- block=newDBlock(b);
- map.put(b,block);
- }
- returnblock;
- }
- /**Removeallblocksexceptforthemovedblocks.*/
- privatevoidremoveAllButRetain(MovedBlocks<StorageGroup>movedBlocks){
- for(Iterator<Block>i=map.keySet().iterator();i.hasNext();){
- if(!movedBlocks.contains(i.next())){
- i.remove();
- }
- }
- }
- }
- staticclassStorageGroupMap{
- privatestaticStringtoKey(StringdatanodeUuid,StorageTypestorageType){
- returndatanodeUuid+":"+storageType;
- }
- privatefinalMap<String,StorageGroup>map=newHashMap<String,StorageGroup>();
- StorageGroupget(StringdatanodeUuid,StorageTypestorageType){
- returnmap.get(toKey(datanodeUuid,storageType));
- }
- voidput(StorageGroupg){
- finalStringkey=toKey(g.getDatanodeInfo().getDatanodeUuid(),g.storageType);
- finalStorageGroupexisting=map.put(key,g);
- Preconditions.checkState(existing==null);
- }
- intsize(){
- returnmap.size();
- }
- voidclear(){
- map.clear();
- }
- }
- /**Thisclasskeepstrackofascheduledblockmove*/
- privateclassPendingMove{
- privateDBlockblock;
- privateSourcesource;
- privateDDatanodeproxySource;
- privateStorageGrouptarget;
- privatePendingMove(){
- }
- @Override
- publicStringtoString(){
- finalBlockb=block.getBlock();
- returnb+"withsize="+b.getNumBytes()+"from"
- +source.getDisplayName()+"to"+target.getDisplayName()
- +"through"+proxySource.datanode;
- }
- /**
- *Chooseablock&aproxysourceforthispendingMovewhosesource&
- *targethavealreadybeenchosen.
- *
- *@returntrueifablockanditsproxyarechosen;falseotherwise
- */
- privatebooleanchooseBlockAndProxy(){
- //iterateallsource'sblocksuntilfindagoodone
- for(Iterator<DBlock>i=source.getBlockIterator();i.hasNext();){
- if(markMovedIfGoodBlock(i.next())){
- i.remove();
- returntrue;
- }
- }
- returnfalse;
- }
- /**
- *@returntrueifthegivenblockisgoodforthetentativemove.
- */
- privatebooleanmarkMovedIfGoodBlock(DBlockblock){
- synchronized(block){
- synchronized(movedBlocks){
- if(isGoodBlockCandidate(source,target,block)){
- this.block=block;
- if(chooseProxySource()){
- movedBlocks.put(block);
- if(LOG.isDebugEnabled()){
- LOG.debug("Decidedtomove"+this);
- }
- returntrue;
- }
- }
- }
- }
- returnfalse;
- }
- /**
- *Chooseaproxysource.
- *
- *@returntrueifaproxyisfound;otherwisefalse
- */
- privatebooleanchooseProxySource(){
- finalDatanodeInfotargetDN=target.getDatanodeInfo();
- //ifnodegroupissupported,firsttryaddnodesinthesamenodegroup
- if(cluster.isNodeGroupAware()){
- for(StorageGrouploc:block.getLocations()){
- if(cluster.isOnSameNodeGroup(loc.getDatanodeInfo(),targetDN)
- &&addTo(loc)){
- returntrue;
- }
- }
- }
- //checkifthereisreplicawhichisonthesamerackwiththetarget
- for(StorageGrouploc:block.getLocations()){
- if(cluster.isOnSameRack(loc.getDatanodeInfo(),targetDN)&&addTo(loc)){
- returntrue;
- }
- }
- //findoutanon-busyreplica
- for(StorageGrouploc:block.getLocations()){
- if(addTo(loc)){
- returntrue;
- }
- }
- returnfalse;
- }
- /**addtoaproxysourceforspecificblockmovement*/
- privatebooleanaddTo(StorageGroupg){
- finalDDatanodedn=g.getDDatanode();
- if(dn.addPendingBlock(this)){
- proxySource=dn;
- returntrue;
- }
- returnfalse;
- }
- /**Dispatchthemovetotheproxysource&waitfortheresponse.*/
- privatevoiddispatch(){
- if(LOG.isDebugEnabled()){
- LOG.debug("Startmoving"+this);
- }
- Socketsock=newSocket();
- DataOutputStreamout=null;
- DataInputStreamin=null;
- try{
- sock.connect(
- NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
- HdfsServerConstants.READ_TIMEOUT);
- sock.setKeepAlive(true);
- OutputStreamunbufOut=sock.getOutputStream();
- InputStreamunbufIn=sock.getInputStream();
- ExtendedBlockeb=newExtendedBlock(nnc.getBlockpoolID(),
- block.getBlock());
- finalKeyManagerkm=nnc.getKeyManager();
- Token<BlockTokenIdentifier>accessToken=km.getAccessToken(eb);
- IOStreamPairsaslStreams=saslClient.socketSend(sock,unbufOut,
- unbufIn,km,accessToken,target.getDatanodeInfo());
- unbufOut=saslStreams.out;
- unbufIn=saslStreams.in;
- out=newDataOutputStream(newBufferedOutputStream(unbufOut,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
- in=newDataInputStream(newBufferedInputStream(unbufIn,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
- sendRequest(out,eb,accessToken);
- receiveResponse(in);
- bytesMoved.addAndGet(block.getNumBytes());
- LOG.info("Successfullymoved"+this);
- }catch(IOExceptione){
- LOG.warn("Failedtomove"+this+":"+e.getMessage());
- //Proxyortargetmayhavesomeissues,delaybeforeusingthesenodes
- //furtherinordertoavoidapotentialstormof"threadsquota
- //exceeded"warningswhenthedispatchergetsoutofsyncwithwork
- //goingonindatanodes.
- proxySource.activateDelay(DELAY_AFTER_ERROR);
- target.getDDatanode().activateDelay(DELAY_AFTER_ERROR);
- }finally{
- IOUtils.closeStream(out);
- IOUtils.closeStream(in);
- IOUtils.closeSocket(sock);
- proxySource.removePendingBlock(this);
- target.getDDatanode().removePendingBlock(this);
- synchronized(this){
- reset();
- }
- synchronized(Dispatcher.this){
- Dispatcher.this.notifyAll();
- }
- }
- }
- /**Sendablockreplacerequesttotheoutputstream*/
- privatevoidsendRequest(DataOutputStreamout,ExtendedBlockeb,
- Token<BlockTokenIdentifier>accessToken)throwsIOException{
- newSender(out).replaceBlock(eb,target.storageType,accessToken,
- source.getDatanodeInfo().getDatanodeUuid(),proxySource.datanode);
- }
- /**Receiveablockcopyresponsefromtheinputstream*/
- privatevoidreceiveResponse(DataInputStreamin)throwsIOException{
- BlockOpResponseProtoresponse=
- BlockOpResponseProto.parseFrom(vintPrefixed(in));
- while(response.getStatus()==Status.IN_PROGRESS){
- //readintermediateresponses
- response=BlockOpResponseProto.parseFrom(vintPrefixed(in));
- }
- if(response.getStatus()!=Status.SUCCESS){
- if(response.getStatus()==Status.ERROR_ACCESS_TOKEN){
- thrownewIOException("blockmovefailedduetoaccesstokenerror");
- }
- thrownewIOException("blockmoveisfailed:"+response.getMessage());
- }
- }
- /**resettheobject*/
- privatevoidreset(){
- block=null;
- source=null;
- proxySource=null;
- target=null;
- }
- }
- /**Aclassforkeepingtrackofblocklocationsinthedispatcher.*/
- privatestaticclassDBlockextendsMovedBlocks.Locations<StorageGroup>{
- DBlock(Blockblock){
- super(block);
- }
- }
- /**Theclassrepresentsadesiredmove.*/
- staticclassTask{
- privatefinalStorageGrouptarget;
- privatelongsize;//bytesscheduledtomove
- Task(StorageGrouptarget,longsize){
- this.target=target;
- this.size=size;
- }
- longgetSize(){
- returnsize;
- }
- }
- /**Aclassthatkeepstrackofadatanode.*/
- staticclassDDatanode{
- /**Agroupofstoragesinadatanodewiththesamestoragetype.*/
- classStorageGroup{
- finalStorageTypestorageType;
- finallongmaxSize2Move;
- privatelongscheduledSize=0L;
- privateStorageGroup(StorageTypestorageType,longmaxSize2Move){
- this.storageType=storageType;
- this.maxSize2Move=maxSize2Move;
- }
- privateDDatanodegetDDatanode(){
- returnDDatanode.this;
- }
- DatanodeInfogetDatanodeInfo(){
- returnDDatanode.this.datanode;
- }
- /**Decideifstillneedtomovemorebytes*/
- synchronizedbooleanhasSpaceForScheduling(){
- returnavailableSizeToMove()>0L;
- }
- /**@returnthetotalnumberofbytesthatneedtobemoved*/
- synchronizedlongavailableSizeToMove(){
- returnmaxSize2Move-scheduledSize;
- }
- /**incrementscheduledsize*/
- synchronizedvoidincScheduledSize(longsize){
- scheduledSize+=size;
- }
- /**@returnscheduledsize*/
- synchronizedlonggetScheduledSize(){
- returnscheduledSize;
- }
- /**Resetscheduledsizetozero.*/
- synchronizedvoidresetScheduledSize(){
- scheduledSize=0L;
- }
- /**@returnthenamefordisplay*/
- StringgetDisplayName(){
- returndatanode+":"+storageType;
- }
- @Override
- publicStringtoString(){
- returngetDisplayName();
- }
- }
- finalDatanodeInfodatanode;
- finalEnumMap<StorageType,StorageGroup>storageMap
- =newEnumMap<StorageType,StorageGroup>(StorageType.class);
- protectedlongdelayUntil=0L;
- /**blocksbeingmovedbutnotconfirmedyet*/
- privatefinalList<PendingMove>pendings;
- privatefinalintmaxConcurrentMoves;
- @Override
- publicStringtoString(){
- returngetClass().getSimpleName()+":"+datanode+":"+storageMap.values();
- }
- privateDDatanode(DatanodeStorageReportr,intmaxConcurrentMoves){
- this.datanode=r.getDatanodeInfo();
- this.maxConcurrentMoves=maxConcurrentMoves;
- this.pendings=newArrayList<PendingMove>(maxConcurrentMoves);
- }
- privatevoidput(StorageTypestorageType,StorageGroupg){
- finalStorageGroupexisting=storageMap.put(storageType,g);
- Preconditions.checkState(existing==null);
- }
- StorageGroupaddStorageGroup(StorageTypestorageType,longmaxSize2Move){
- finalStorageGroupg=newStorageGroup(storageType,maxSize2Move);
- put(storageType,g);
- returng;
- }
- SourceaddSource(StorageTypestorageType,longmaxSize2Move,Dispatcherd){
- finalSources=d.newSource(storageType,maxSize2Move,this);
- put(storageType,s);
- returns;
- }
- synchronizedprivatevoidactivateDelay(longdelta){
- delayUntil=Time.monotonicNow()+delta;
- }
- synchronizedprivatebooleanisDelayActive(){
- if(delayUntil==0||Time.monotonicNow()>delayUntil){
- delayUntil=0;
- returnfalse;
- }
- returntrue;
- }
- /**Checkifthenodecanschedulemoreblockstomove*/
- synchronizedbooleanisPendingQNotFull(){
- returnpendings.size()<maxConcurrentMoves;
- }
- /**Checkifallthedispatchedmovesaredone*/
- synchronizedbooleanisPendingQEmpty(){
- returnpendings.isEmpty();
- }
- /**Addascheduledblockmovetothenode*/
- synchronizedbooleanaddPendingBlock(PendingMovependingBlock){
- if(!isDelayActive()&&isPendingQNotFull()){
- returnpendings.add(pendingBlock);
- }
- returnfalse;
- }
- /**Removeascheduledblockmovefromthenode*/
- synchronizedbooleanremovePendingBlock(PendingMovependingBlock){
- returnpendings.remove(pendingBlock);
- }
- }
- /**Anodethatcanbethesourcesofablockmove*/
- classSourceextendsDDatanode.StorageGroup{
- privatefinalList<Task>tasks=newArrayList<Task>(2);
- privatelongblocksToReceive=0L;
- /**
- *Sourceblockspointtotheobjectsin{@linkDispatcher#globalBlocks}
- *becausewewanttokeeponecopyofablockandbeawarethatthe
- *locationsarechangingovertime.
- */
- privatefinalList<DBlock>srcBlocks=newArrayList<DBlock>();
- privateSource(StorageTypestorageType,longmaxSize2Move,DDatanodedn){
- dn.super(storageType,maxSize2Move);
- }
- /**Addatask*/
- voidaddTask(Tasktask){
- Preconditions.checkState(task.target!=this,
- "Sourceandtargetarethesamestoragegroup"+getDisplayName());
- incScheduledSize(task.size);
- tasks.add(task);
- }
- /**@returnaniteratortothissource'sblocks*/
- Iterator<DBlock>getBlockIterator(){
- returnsrcBlocks.iterator();
- }
- /**
- *Fetchnewblocksofthissourcefromnamenodeandupdatethissource's
- *blocklist&{@linkDispatcher#globalBlocks}.
- *
- *@returnthetotalsizeofthereceivedblocksinthenumberofbytes.
- */
- privatelonggetBlockList()throwsIOException{
- finallongsize=Math.min(MAX_BLOCKS_SIZE_TO_FETCH,blocksToReceive);
- finalBlocksWithLocationsnewBlocks=nnc.getBlocks(getDatanodeInfo(),size);
- longbytesReceived=0;
- for(BlockWithLocationsblk:newBlocks.getBlocks()){
- bytesReceived+=blk.getBlock().getNumBytes();
- synchronized(globalBlocks){
- finalDBlockblock=globalBlocks.get(blk.getBlock());
- synchronized(block){
- block.clearLocations();
- //updatelocations
- finalString[]datanodeUuids=blk.getDatanodeUuids();
- finalStorageType[]storageTypes=blk.getStorageTypes();
- for(inti=0;i<datanodeUuids.length;i++){
- finalStorageGroupg=storageGroupMap.get(
- datanodeUuids[i],storageTypes[i]);
- if(g!=null){//notunknown
- block.addLocation(g);
- }
- }
- }
- if(!srcBlocks.contains(block)&&isGoodBlockCandidate(block)){
- //filterbadcandidates
- srcBlocks.add(block);
- }
- }
- }
- returnbytesReceived;
- }
- /**Decideifthegivenblockisagoodcandidatetomoveornot*/
- privatebooleanisGoodBlockCandidate(DBlockblock){
- for(Taskt:tasks){
- if(Dispatcher.this.isGoodBlockCandidate(this,t.target,block)){
- returntrue;
- }
- }
- returnfalse;
- }
- /**
- *Chooseamoveforthesource.Theblock'ssource,target,andproxy
- *aredeterminedtoo.Whenchoosingproxyandtarget,source&
- *targetthrottlinghasbeenconsidered.Theyarechosenonlywhenthey
- *havethecapacitytosupportthisblockmove.Theblockshouldbe
- *dispatchedimmediatelyafterthismethodisreturned.
- *
- *@returnamovethat'sgoodforthesourcetodispatchimmediately.
- */
- privatePendingMovechooseNextMove(){
- for(Iterator<Task>i=tasks.iterator();i.hasNext();){
- finalTasktask=i.next();
- finalDDatanodetarget=task.target.getDDatanode();
- PendingMovependingBlock=newPendingMove();
- if(target.addPendingBlock(pendingBlock)){
- //targetisnotbusy,sodoatentativeblockallocation
- pendingBlock.source=this;
- pendingBlock.target=task.target;
- if(pendingBlock.chooseBlockAndProxy()){
- longblockSize=pendingBlock.block.getNumBytes();
- incScheduledSize(-blockSize);
- task.size-=blockSize;
- if(task.size==0){
- i.remove();
- }
- returnpendingBlock;
- }else{
- //cancelthetentativemove
- target.removePendingBlock(pendingBlock);
- }
- }
- }
- returnnull;
- }
- /**Iterateallsource'sblockstoremovemovedones*/
- privatevoidremoveMovedBlocks(){
- for(Iterator<DBlock>i=getBlockIterator();i.hasNext();){
- if(movedBlocks.contains(i.next().getBlock())){
- i.remove();
- }
- }
- }
- privatestaticfinalintSOURCE_BLOCKS_MIN_SIZE=5;
- /**@returnifshouldfetchmoreblocksfromnamenode*/
- privatebooleanshouldFetchMoreBlocks(){
- returnsrcBlocks.size()<SOURCE_BLOCKS_MIN_SIZE&&blocksToReceive>0;
- }
- privatestaticfinallongMAX_ITERATION_TIME=20*60*1000L;//20mins
- /**
- *Thismethoditerativelydoesthefollowing:itfirstselectsablockto
- *move,thensendsarequesttotheproxysourcetostarttheblockmove
- *whenthesource'sblocklistfallsbelowathreshold,itasksthe
- *namenodeformoreblocks.Itterminateswhenithasdispatchenoughblock
- *movetasksorithasreceivedenoughblocksfromthenamenode,orthe
- *elapsedtimeoftheiterationhasexceededthemaxtimelimit.
- */
- privatevoiddispatchBlocks(){
- finallongstartTime=Time.monotonicNow();
- this.blocksToReceive=2*getScheduledSize();
- booleanisTimeUp=false;
- intnoPendingMoveIteration=0;
- while(!isTimeUp&&getScheduledSize()>0
- &&(!srcBlocks.isEmpty()||blocksToReceive>0)){
- finalPendingMovep=chooseNextMove();
- if(p!=null){
- //movetheblock
- moveExecutor.execute(newRunnable(){
- @Override
- publicvoidrun(){
- p.dispatch();
- }
- });
- continue;
- }
- //Sincewecannotscheduleanyblocktomove,
- //removeanymovedblocksfromthesourceblocklistand
- removeMovedBlocks();//filteralreadymovedblocks
- //checkifweshouldfetchmoreblocksfromthenamenode
- if(shouldFetchMoreBlocks()){
- //fetchnewblocks
- try{
- blocksToReceive-=getBlockList();
- continue;
- }catch(IOExceptione){
- LOG.warn("Exceptionwhilegettingblocklist",e);
- return;
- }
- }else{
- //sourcenodecannotfindapendingblocktomove,iteration+1
- noPendingMoveIteration++;
- //incasenoblockscanbemovedforsourcenode'stask,
- //jumpoutofwhile-loopafter5iterations.
- if(noPendingMoveIteration>=MAX_NO_PENDING_MOVE_ITERATIONS){
- resetScheduledSize();
- }
- }
- //checkiftimeisupornot
- if(Time.monotonicNow()-startTime>MAX_ITERATION_TIME){
- isTimeUp=true;
- continue;
- }
- //Nowwecannotscheduleanyblocktomoveandthereare
- //nonewblocksaddedtothesourceblocklist,sowewait.
- try{
- synchronized(Dispatcher.this){
- Dispatcher.this.wait(1000);//waitfortargets/sourcestobeidle
- }
- }catch(InterruptedExceptionignored){
- }
- }
- }
- }
- publicDispatcher(NameNodeConnectornnc,Set<String>includedNodes,
- Set<String>excludedNodes,longmovedWinWidth,intmoverThreads,
- intdispatcherThreads,intmaxConcurrentMovesPerNode,Configurationconf){
- this.nnc=nnc;
- this.excludedNodes=excludedNodes;
- this.includedNodes=includedNodes;
- this.movedBlocks=newMovedBlocks<StorageGroup>(movedWinWidth);
- this.cluster=NetworkTopology.getInstance(conf);
- this.moveExecutor=Executors.newFixedThreadPool(moverThreads);
- this.dispatchExecutor=Executors.newFixedThreadPool(dispatcherThreads);
- this.maxConcurrentMovesPerNode=maxConcurrentMovesPerNode;
- finalbooleanfallbackToSimpleAuthAllowed=conf.getBoolean(
- CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
- CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
- this.saslClient=newSaslDataTransferClient(
- DataTransferSaslUtil.getSaslPropertiesResolver(conf),
- TrustedChannelResolver.getInstance(conf),fallbackToSimpleAuthAllowed);
- }
- StorageGroupMapgetStorageGroupMap(){
- returnstorageGroupMap;
- }
- NetworkTopologygetCluster(){
- returncluster;
- }
- longgetBytesMoved(){
- returnbytesMoved.get();
- }
- longbytesToMove(){
- Preconditions.checkState(
- storageGroupMap.size()>=sources.size()+targets.size(),
- "Mismatchednumberofstoragegroups("+storageGroupMap.size()
- +"<"+sources.size()+"sources+"+targets.size()
- +"targets)");
- longb=0L;
- for(Sourcesrc:sources){
- b+=src.getScheduledSize();
- }
- returnb;
- }
- voidadd(Sourcesource,StorageGrouptarget){
- sources.add(source);
- targets.add(target);
- }
- privatebooleanshouldIgnore(DatanodeInfodn){
- //ignoredecommissionednodes
- finalbooleandecommissioned=dn.isDecommissioned();
- //ignoredecommissioningnodes
- finalbooleandecommissioning=dn.isDecommissionInProgress();
- //ignorenodesinexcludelist
- finalbooleanexcluded=Util.isExcluded(excludedNodes,dn);
- //ignorenodesnotintheincludelist(ifincludelistisnotempty)
- finalbooleannotIncluded=!Util.isIncluded(includedNodes,dn);
- if(decommissioned||decommissioning||excluded||notIncluded){
- if(LOG.isTraceEnabled()){
- LOG.trace("Excludingdatanode"+dn+":"+decommissioned+","
- +decommissioning+","+excluded+","+notIncluded);
- }
- returntrue;
- }
- returnfalse;
- }
- /**Getlivedatanodestoragereportsandthenbuildthenetworktopology.*/
- List<DatanodeStorageReport>init()throwsIOException{
- finalDatanodeStorageReport[]reports=nnc.getLiveDatanodeStorageReport();
- finalList<DatanodeStorageReport>trimmed=newArrayList<DatanodeStorageReport>();
- //createnetworktopologyandclassifyutilizationcollections:
- //over-utilized,above-average,below-averageandunder-utilized.
- for(DatanodeStorageReportr:DFSUtil.shuffle(reports)){
- finalDatanodeInfodatanode=r.getDatanodeInfo();
- if(shouldIgnore(datanode)){
- continue;
- }
- trimmed.add(r);
- cluster.add(datanode);
- }
- returntrimmed;
- }
- publicDDatanodenewDatanode(DatanodeStorageReportr){
- returnnewDDatanode(r,maxConcurrentMovesPerNode);
- }
- publicbooleandispatchAndCheckContinue()throwsInterruptedException{
- returnnnc.shouldContinue(dispatchBlockMoves());
- }
- /**
- *Dispatchblockmovesforeachsource.Thethreadselectsblockstomove&
- *sendsrequesttoproxysourcetoinitiateblockmove.Theprocessisflow
- *controlled.Blockselectionisblockediftherearetoomanyun-confirmed
- *blockmoves.
- *
- *@returnthetotalnumberofbytessuccessfullymovedinthisiteration.
- */
- privatelongdispatchBlockMoves()throwsInterruptedException{
- finallongbytesLastMoved=bytesMoved.get();
- finalFuture<?>[]futures=newFuture<?>[sources.size()];
- finalIterator<Source>i=sources.iterator();
- for(intj=0;j<futures.length;j++){
- finalSources=i.next();
- futures[j]=dispatchExecutor.submit(newRunnable(){
- @Override
- publicvoidrun(){
- s.dispatchBlocks();
- }
- });
- }
- //waitforalldispatcherthreadstofinish
- for(Future<?>future:futures){
- try{
- future.get();
- }catch(ExecutionExceptione){
- LOG.warn("Dispatcherthreadfailed",e.getCause());
- }
- }
- //waitforallblockmovingtobedone
- waitForMoveCompletion();
- returnbytesMoved.get()-bytesLastMoved;
- }
- /**Thesleepingperiodbeforecheckingifblockmoveiscompletedagain*/
- staticprivatelongblockMoveWaitTime=30000L;
- /**setthesleepingperiodforblockmovecompletioncheck*/
- staticvoidsetBlockMoveWaitTime(longtime){
- blockMoveWaitTime=time;
- }
- /**Waitforallblockmoveconfirmations.*/
- privatevoidwaitForMoveCompletion(){
- for(;;){
- booleanempty=true;
- for(StorageGroupt:targets){
- if(!t.getDDatanode().isPendingQEmpty()){
- empty=false;
- break;
- }
- }
- if(empty){
- return;//allpendingqueuesareempty
- }
- try{
- Thread.sleep(blockMoveWaitTime);
- }catch(InterruptedExceptionignored){
- }
- }
- }
- /**
- *Decideiftheblockisagoodcandidatetobemovedfromsourcetotarget.
- *Ablockisagoodcandidateif
- *1.theblockisnotintheprocessofbeingmoved/hasnotbeenmoved;
- *移动的块不是正在被移动的块
- *2.theblockdoesnothaveareplicaonthetarget;
- *在目标节点上没有移动的block块
- *3.doingthemovedoesnotreducethenumberofracksthattheblockhas
- *移动之后,不同机架上的block块的数量应该是不变的.
- */
- privatebooleanisGoodBlockCandidate(Sourcesource,StorageGrouptarget,
- DBlockblock){
- if(source.storageType!=target.storageType){
- returnfalse;
- }
- //checkiftheblockismovedornot
- //如果所要移动的块是存在于正在被移动的块列表,则返回false
- if(movedBlocks.contains(block.getBlock())){
- returnfalse;
- }
- //如果移动的块已经存在于目标节点上,则返回false,将不会予以移动
- if(block.isLocatedOn(target)){
- returnfalse;
- }
- //如果开启了机架感知的配置,则目标节点不应该有相同的block
- if(cluster.isNodeGroupAware()
- &&isOnSameNodeGroupWithReplicas(target,block,source)){
- returnfalse;
- }
- //需要维持机架上的block块数量不变
- if(reduceNumOfRacks(source,target,block)){
- returnfalse;
- }
- returntrue;
- }
- /**
- *Determinewhethermovingthegivenblockreplicafromsourcetotarget
- *wouldreducethenumberofracksoftheblockreplicas.
- */
- privatebooleanreduceNumOfRacks(Sourcesource,StorageGrouptarget,
- DBlockblock){
- finalDatanodeInfosourceDn=source.getDatanodeInfo();
- if(cluster.isOnSameRack(sourceDn,target.getDatanodeInfo())){
- //sourceandtargetareonthesamerack
- returnfalse;
- }
- booleannotOnSameRack=true;
- synchronized(block){
- for(StorageGrouploc:block.getLocations()){
- if(cluster.isOnSameRack(loc.getDatanodeInfo(),target.getDatanodeInfo())){
- notOnSameRack=false;
- break;
- }
- }
- }
- if(notOnSameRack){
- //targetisnotonthesamerackasanyreplica
- returnfalse;
- }
- for(StorageGroupg:block.getLocations()){
- if(g!=source&&cluster.isOnSameRack(g.getDatanodeInfo(),sourceDn)){
- //sourceisonthesamerackofanotherreplica
- returnfalse;
- }
- }
- returntrue;
- }
- /**
- *Checkifthereareanyreplica(otherthansource)onthesamenodegroup
- *withtarget.Iftrue,thentargetisnotagoodcandidateforplacing
- *specificreplicaaswedon'twant2replicasunderthesamenodegroup.
- *
- *@returntrueifthereareanyreplica(otherthansource)onthesamenode
- *groupwithtarget
- */
- privatebooleanisOnSameNodeGroupWithReplicas(
- StorageGrouptarget,DBlockblock,Sourcesource){
- finalDatanodeInfotargetDn=target.getDatanodeInfo();
- for(StorageGroupg:block.getLocations()){
- if(g!=source&&cluster.isOnSameNodeGroup(g.getDatanodeInfo(),targetDn)){
- returntrue;
- }
- }
- returnfalse;
- }
- /**Resetallfieldsinordertoprepareforthenextiteration*/
- voidreset(Configurationconf){
- cluster=NetworkTopology.getInstance(conf);
- storageGroupMap.clear();
- sources.clear();
- targets.clear();
- globalBlocks.removeAllButRetain(movedBlocks);
- movedBlocks.cleanup();
- }
- /**shutdownthreadpools*/
- voidshutdownNow(){
- dispatchExecutor.shutdownNow();
- moveExecutor.shutdownNow();
- }
- staticclassUtil{
- /**@returntrueifdatanodeispartoftheexcludedNodes.*/
- staticbooleanisExcluded(Set<String>excludedNodes,DatanodeInfodn){
- returnisIn(excludedNodes,dn);
- }
- /**
- *@returntrueifincludedNodesisemptyordatanodeispartofthe
- *includedNodes.
- */
- staticbooleanisIncluded(Set<String>includedNodes,DatanodeInfodn){
- return(includedNodes.isEmpty()||isIn(includedNodes,dn));
- }
- /**
- *Matchischeckedusinghostname,ipaddresswithandwithoutport
- *number.
- *
- *@returntrueifthedatanode'stransferaddressmatchesthesetofnodes.
- */
- privatestaticbooleanisIn(Set<String>datanodes,DatanodeInfodn){
- returnisIn(datanodes,dn.getPeerHostName(),dn.getXferPort())
- ||isIn(datanodes,dn.getIpAddr(),dn.getXferPort())
- ||isIn(datanodes,dn.getHostName(),dn.getXferPort());
- }
- /**@returntrueifnodescontainshostorhost:port*/
- privatestaticbooleanisIn(Set<String>nodes,Stringhost,intport){
- if(host==null){
- returnfalse;
- }
- return(nodes.contains(host)||nodes.contains(host+":"+port));
- }
- /**
- *Parseacommaseparatedstringtoobtainsetofhostnames
- *
- *@returnsetofhostnames
- */
- staticSet<String>parseHostList(Stringstring){
- String[]addrs=StringUtils.getTrimmedStrings(string);
- returnnewHashSet<String>(Arrays.asList(addrs));
- }
- /**
- *Readsetofhostnamesfromafile
- *
- *@returnsetofhostnames
- */
- staticSet<String>getHostListFromFile(StringfileName,Stringtype){
- Set<String>nodes=newHashSet<String>();
- try{
- HostsFileReader.readFileToSet(type,fileName,nodes);
- returnStringUtils.getTrimmedStrings(nodes);
- }catch(IOExceptione){
- thrownewIllegalArgumentException(
- "Failedtoreadhostlistfromfile:"+fileName);
- }
- }
- }
- }