Hadoop Balancer源码解读

前言

最近在做一些Hadoop运维的相关工作,发现了一个有趣的问题,我们公司的Hadoop集群磁盘占比数值参差不齐,高的接近80%,低的接近40%,并没有充分利用好上面的资源,但是balance的操作跑的也是正常的啊,所以打算看一下Hadoop的balance的源代码,更深层次的去了解Hadoop Balance的机制。

Balancer和Distpatch

上面2个类的设计就是与Hadoop Balance操作最紧密联系的类,Balancer类负载找出<source, target>这样的起始,目标结果对,然后存入到Distpatch中,然后通过Distpatch进行分发,不过在分发之前,会进行block的验证,判断此block是否能被移动,这里会涉及到一些条件的判断,具体的可以通过后面的代码中的注释去发现。

在Balancer的最后阶段,会将source和Target加入到dispatcher中,详见下面的代码:

  1. /**
  2. *根据源节点和目标节点,构造任务对
  3. */
  4. privatevoidmatchSourceWithTargetToMove(Sourcesource,StorageGrouptarget){
  5. longsize=Math.min(source.availableSizeToMove(),target.availableSizeToMove());
  6. finalTasktask=newTask(target,size);
  7. source.addTask(task);
  8. target.incScheduledSize(task.getSize());
  9. //加入分发器中
  10. dispatcher.add(source,target);
  11. LOG.info("Decidedtomove"+StringUtils.byteDesc(size)+"bytesfrom"
  12. +source.getDisplayName()+"to"+target.getDisplayName());
  13. }
然后就是dispather类中的代码。然后进行block的分发操作:

  1. /**
  2. *Foreachdatanode,choosematchingnodesfromthecandidates.Eitherthe
  3. *datanodesorthecandidatesaresourcenodeswith(utilization>Avg),and
  4. *theothersaretargetnodeswith(utilization<Avg).
  5. */
  6. private<GextendsStorageGroup,CextendsStorageGroup>
  7. voidchooseStorageGroups(Collection<G>groups,Collection<C>candidates,
  8. Matchermatcher){
  9. for(finalIterator<G>i=groups.iterator();i.hasNext();){
  10. finalGg=i.next();
  11. for(;choose4One(g,candidates,matcher););
  12. if(!g.hasSpaceForScheduling()){
  13. //如果候选节点没有空间调度,则直接移除掉
  14. i.remove();
  15. }
  16. }
  17. }
继续调用后面的方法

  1. /**
  2. *Decideall<source,target>pairsand
  3. *thenumberofbytestomovefromasourcetoatarget
  4. *Maximumbytestobemovedperstoragegroupis
  5. *min(1Bandworthofbytes,MAX_SIZE_TO_MOVE).
  6. *从源节点列表和目标节点列表中各自选择节点组成一个个对,选择顺序优先为同节点组,同机架,然后是针对所有
  7. *@returntotalnumberofbytestomoveinthisiteration
  8. */
  9. privatelongchooseStorageGroups(){
  10. //First,matchnodesonthesamenodegroupifclusterisnodegroupaware
  11. if(dispatcher.getCluster().isNodeGroupAware()){
  12. //首先匹配的条件是同节点组
  13. chooseStorageGroups(Matcher.SAME_NODE_GROUP);
  14. }
  15. //Then,matchnodesonthesamerack
  16. //然后是同机架
  17. chooseStorageGroups(Matcher.SAME_RACK);
  18. //Atlast,matchallremainingnodes
  19. //最后是匹配所有的节点
  20. chooseStorageGroups(Matcher.ANY_OTHER);
  21. returndispatcher.bytesToMove();
  22. }
然后再通过调用Dispatch的层层方法,下面是简图:

Hadoop Balancer源码解读

最后核心的检验block块是否合适的代码为下面这个:

  1. /**
  2. *Decideiftheblockisagoodcandidatetobemovedfromsourcetotarget.
  3. *Ablockisagoodcandidateif
  4. *1.theblockisnotintheprocessofbeingmoved/hasnotbeenmoved;
  5. *移动的块不是正在被移动的块
  6. *2.theblockdoesnothaveareplicaonthetarget;
  7. *在目标节点上没有移动的block块
  8. *3.doingthemovedoesnotreducethenumberofracksthattheblockhas
  9. *移动之后,不同机架上的block块的数量应该是不变的.
  10. */
  11. privatebooleanisGoodBlockCandidate(Sourcesource,StorageGrouptarget,
  12. DBlockblock){
  13. if(source.storageType!=target.storageType){
  14. returnfalse;
  15. }
  16. //checkiftheblockismovedornot
  17. //如果所要移动的块是存在于正在被移动的块列表,则返回false
  18. if(movedBlocks.contains(block.getBlock())){
  19. returnfalse;
  20. }
  21. //如果移动的块已经存在于目标节点上,则返回false,将不会予以移动
  22. if(block.isLocatedOn(target)){
  23. returnfalse;
  24. }
  25. //如果开启了机架感知的配置,则目标节点不应该有相同的block
  26. if(cluster.isNodeGroupAware()
  27. &&isOnSameNodeGroupWithReplicas(target,block,source)){
  28. returnfalse;
  29. }
  30. //需要维持机架上的block块数量不变
  31. if(reduceNumOfRacks(source,target,block)){
  32. returnfalse;
  33. }
  34. returntrue;
  35. }

下面是Balancer.java和Dispatch.java类的完整代码解析:

Balancer.java:

  1. /**
  2. *LicensedtotheApacheSoftwareFoundation(ASF)underone
  3. *ormorecontributorlicenseagreements.SeetheNOTICEfile
  4. *distributedwiththisworkforadditionalinformation
  5. *regardingcopyrightownership.TheASFlicensesthisfile
  6. *toyouundertheApacheLicense,Version2.0(the
  7. *"License");youmaynotusethisfileexceptincompliance
  8. *withtheLicense.YoumayobtainacopyoftheLicenseat
  9. *
  10. *http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. *Unlessrequiredbyapplicablelaworagreedtoinwriting,software
  13. *distributedundertheLicenseisdistributedonan"ASIS"BASIS,
  14. *WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
  15. *SeetheLicenseforthespecificlanguagegoverningpermissionsand
  16. *limitationsundertheLicense.
  17. */
  18. packageorg.apache.hadoop.hdfs.server.balancer;
  19. importstaticcom.google.common.base.Preconditions.checkArgument;
  20. importjava.io.IOException;
  21. importjava.io.PrintStream;
  22. importjava.net.URI;
  23. importjava.text.DateFormat;
  24. importjava.util.ArrayList;
  25. importjava.util.Arrays;
  26. importjava.util.Collection;
  27. importjava.util.Collections;
  28. importjava.util.Date;
  29. importjava.util.Formatter;
  30. importjava.util.Iterator;
  31. importjava.util.LinkedList;
  32. importjava.util.List;
  33. importjava.util.Set;
  34. importorg.apache.commons.logging.Log;
  35. importorg.apache.commons.logging.LogFactory;
  36. importorg.apache.hadoop.classification.InterfaceAudience;
  37. importorg.apache.hadoop.conf.Configuration;
  38. importorg.apache.hadoop.conf.Configured;
  39. importorg.apache.hadoop.fs.Path;
  40. importorg.apache.hadoop.hdfs.DFSConfigKeys;
  41. importorg.apache.hadoop.hdfs.DFSUtil;
  42. importorg.apache.hadoop.hdfs.HdfsConfiguration;
  43. importorg.apache.hadoop.hdfs.StorageType;
  44. importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
  45. importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
  46. importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
  47. importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
  48. importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
  49. importorg.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
  50. importorg.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
  51. importorg.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
  52. importorg.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
  53. importorg.apache.hadoop.hdfs.server.protocol.StorageReport;
  54. importorg.apache.hadoop.util.StringUtils;
  55. importorg.apache.hadoop.util.Time;
  56. importorg.apache.hadoop.util.Tool;
  57. importorg.apache.hadoop.util.ToolRunner;
  58. importcom.google.common.base.Preconditions;
  59. /**<p>ThebalancerisatoolthatbalancesdiskspaceusageonanHDFScluster
  60. *whensomedatanodesbecomefullorwhennewemptynodesjointhecluster.
  61. *Thetoolisdeployedasanapplicationprogramthatcanberunbythe
  62. *clusteradministratoronaliveHDFSclusterwhileapplications
  63. *addinganddeletingfiles.
  64. *
  65. *<p>SYNOPSIS
  66. *<pre>
  67. *Tostart:
  68. *bin/start-balancer.sh[-threshold<threshold>]
  69. *Example:bin/start-balancer.sh
  70. *startthebalancerwithadefaultthresholdof10%
  71. *bin/start-balancer.sh-threshold5
  72. *startthebalancerwithathresholdof5%
  73. *Tostop:
  74. *bin/stop-balancer.sh
  75. *</pre>
  76. *
  77. *<p>DESCRIPTION
  78. *<p>Thethresholdparameterisafractionintherangeof(1%,100%)witha
  79. *defaultvalueof10%.Thethresholdsetsatargetforwhetherthecluster
  80. *isbalanced.Aclusterisbalancedifforeachdatanode,theutilization
  81. *ofthenode(ratioofusedspaceatthenodetototalcapacityofthenode)
  82. *differsfromtheutilizationofthe(ratioofusedspaceinthecluster
  83. *tototalcapacityofthecluster)bynomorethanthethresholdvalue.
  84. *Thesmallerthethreshold,themorebalancedaclusterwillbecome.
  85. *Ittakesmoretimetorunthebalancerforsmallthresholdvalues.
  86. *Alsoforaverysmallthresholdtheclustermaynotbeabletoreachthe
  87. *balancedstatewhenapplicationswriteanddeletefilesconcurrently.
  88. *
  89. *<p>Thetoolmovesblocksfromhighlyutilizeddatanodestopoorly
  90. *utilizeddatanodesiteratively.Ineachiterationadatanodemovesor
  91. *receivesnomorethanthelesserof10Gbytesorthethresholdfraction
  92. *ofitscapacity.Eachiterationrunsnomorethan20minutes.
  93. *每次移动不超过10G大小,每次移动不超过20分钟。
  94. *Attheendofeachiteration,thebalancerobtainsupdateddatanodes
  95. *informationfromthenamenode.
  96. *
  97. *<p>Asystempropertythatlimitsthebalancer'suseofbandwidthis
  98. *definedinthedefaultconfigurationfile:
  99. *<pre>
  100. *<property>
  101. *<name>dfs.balance.bandwidthPerSec</name>
  102. *<value>1048576</value>
  103. *<description>Specifiesthemaximumbandwidththateachdatanode
  104. *canutilizeforthebalancingpurposeintermofthenumberofbytes
  105. *persecond.</description>
  106. *</property>
  107. *</pre>
  108. *
  109. *<p>Thispropertydeterminesthemaximumspeedatwhichablockwillbe
  110. *movedfromonedatanodetoanother.Thedefaultvalueis1MB/s.Thehigher
  111. *thebandwidth,thefasteraclustercanreachthebalancedstate,
  112. *butwithgreatercompetitionwithapplicationprocesses.Ifan
  113. *administratorchangesthevalueofthispropertyintheconfiguration
  114. *file,thechangeisobservedwhenHDFSisnextrestarted.
  115. *
  116. *<p>MONITERINGBALANCERPROGRESS
  117. *<p>Afterthebalancerisstarted,anoutputfilenamewherethebalancer
  118. *progresswillberecordedisprintedonthescreen.Theadministrator
  119. *canmonitortherunningofthebalancerbyreadingtheoutputfile.
  120. *Theoutputshowsthebalancer'sstatusiterationbyiteration.Ineach
  121. *iterationitprintsthestartingtime,theiterationnumber,thetotal
  122. *numberofbytesthathavebeenmovedinthepreviousiterations,
  123. *thetotalnumberofbytesthatarelefttomoveinorderforthecluster
  124. *tobebalanced,andthenumberofbytesthatarebeingmovedinthis
  125. *iteration.Normally"BytesAlreadyMoved"isincreasingwhile"BytesLeft
  126. *ToMove"isdecreasing.
  127. *
  128. *<p>RunningmultipleinstancesofthebalancerinanHDFSclusteris
  129. *prohibitedbythetool.
  130. *
  131. *<p>Thebalancerautomaticallyexitswhenanyofthefollowingfive
  132. *conditionsissatisfied:
  133. *<ol>
  134. *<li>Theclusterisbalanced;
  135. *<li>Noblockcanbemoved;
  136. *<li>Noblockhasbeenmovedforfiveconsecutive(连续)iterations;
  137. *<li>AnIOExceptionoccurswhilecommunicatingwiththenamenode;
  138. *<li>Anotherbalancerisrunning.
  139. *</ol>
  140. *下面5种情况会导致Balance操作的失败
  141. *1、整个集群已经达到平衡状态
  142. *2、经过计算发现没有可以被移动的block块
  143. *3、在连续5次的迭代中,没有block块被移动
  144. *4、当datanode节点与namenode节点通信的时候,发生IO异常
  145. *5、已经存在一个Balance操作
  146. *
  147. *<p>Uponexit,abalancerreturnsanexitcodeandprintsoneofthe
  148. *followingmessagestotheoutputfileincorrespondingtotheaboveexit
  149. *reasons:
  150. *<ol>
  151. *<li>Theclusterisbalanced.Exiting
  152. *<li>Noblockcanbemoved.Exiting...
  153. *<li>Noblockhasbeenmovedfor5iterations.Exiting...
  154. *<li>ReceivedanIOexception:failurereason.Exiting...
  155. *<li>Anotherbalancerisrunning.Exiting...
  156. *</ol>
  157. *在下面的5种情况下,balancer操作会自动退出
  158. *1、整个集群已经达到平衡的状态
  159. *2、经过计算发现没有可以被移动block块
  160. *3、在5论的迭代没有block被移动
  161. *4、接收端发生了I异常
  162. *5、已经存在一个balanr进程在工作
  163. *
  164. *<p>Theadministratorcaninterrupttheexecutionofthebalanceratany
  165. *timebyrunningthecommand"stop-balancer.sh"onthemachinewherethe
  166. *balancerisrunning.
  167. */
  168. @InterfaceAudience.Private
  169. publicclassBalancer{
  170. staticfinalLogLOG=LogFactory.getLog(Balancer.class);
  171. privatestaticfinalPathBALANCER_ID_PATH=newPath("/system/balancer.id");
  172. privatestaticfinallongGB=1L<<30;//1GB
  173. privatestaticfinallongMAX_SIZE_TO_MOVE=10*GB;
  174. privatestaticfinalStringUSAGE="Usage:java"
  175. +Balancer.class.getSimpleName()
  176. +"\n\t[-policy<policy>]\tthebalancingpolicy:"
  177. +BalancingPolicy.Node.INSTANCE.getName()+"or"
  178. +BalancingPolicy.Pool.INSTANCE.getName()
  179. +"\n\t[-threshold<threshold>]\tPercentageofdiskcapacity"
  180. +"\n\t[-exclude[-f<hosts-file>|comma-speratedlistofhosts]]"
  181. +"\tExcludesthespecifieddatanodes."
  182. +"\n\t[-include[-f<hosts-file>|comma-speratedlistofhosts]]"
  183. +"\tIncludesonlythespecifieddatanodes.";
  184. privatefinalDispatcherdispatcher;
  185. privatefinalBalancingPolicypolicy;
  186. privatefinaldoublethreshold;
  187. //alldatanodelists
  188. //四种datanode节点类型
  189. privatefinalCollection<Source>overUtilized=newLinkedList<Source>();
  190. privatefinalCollection<Source>aboveAvgUtilized=newLinkedList<Source>();
  191. privatefinalCollection<StorageGroup>belowAvgUtilized
  192. =newLinkedList<StorageGroup>();
  193. privatefinalCollection<StorageGroup>underUtilized
  194. =newLinkedList<StorageGroup>();
  195. /*CheckthatthisBalanceriscompatible(兼容)withtheBlockPlacementPolicy
  196. *usedbytheNamenode.
  197. *检测此balancer均衡工具是否于与目前的namenode节点所用的block存放策略相兼容
  198. */
  199. privatestaticvoidcheckReplicationPolicyCompatibility(Configurationconf
  200. )throwsUnsupportedActionException{
  201. if(!(BlockPlacementPolicy.getInstance(conf,null,null,null)instanceof
  202. BlockPlacementPolicyDefault)){
  203. thrownewUnsupportedActionException(
  204. //如果不兼容则抛异常
  205. "BalancerwithoutBlockPlacementPolicyDefault");
  206. }
  207. }
  208. /**
  209. *Constructabalancer.
  210. *Initializebalancer.Itsetsthevalueofthethreshold,and
  211. *buildsthecommunicationproxiesto
  212. *namenodeasaclientandasecondarynamenodeandretryproxies
  213. *whenconnectionfails.
  214. *
  215. *构造一个balancer均衡器,设置threshold值,读取配置中的分发线程数的值
  216. */
  217. Balancer(NameNodeConnectortheblockpool,Parametersp,Configurationconf){
  218. finallongmovedWinWidth=conf.getLong(
  219. DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
  220. DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
  221. //移动线程数
  222. finalintmoverThreads=conf.getInt(
  223. DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
  224. DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
  225. //分发线程数
  226. finalintdispatcherThreads=conf.getInt(
  227. DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
  228. DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
  229. finalintmaxConcurrentMovesPerNode=conf.getInt(
  230. DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
  231. DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
  232. this.dispatcher=newDispatcher(theblockpool,p.nodesToBeIncluded,
  233. p.nodesToBeExcluded,movedWinWidth,moverThreads,dispatcherThreads,
  234. maxConcurrentMovesPerNode,conf);
  235. this.threshold=p.threshold;
  236. this.policy=p.policy;
  237. }
  238. /**
  239. *获取节点总容量大小
  240. */
  241. privatestaticlonggetCapacity(DatanodeStorageReportreport,StorageTypet){
  242. longcapacity=0L;
  243. for(StorageReportr:report.getStorageReports()){
  244. if(r.getStorage().getStorageType()==t){
  245. capacity+=r.getCapacity();
  246. }
  247. }
  248. returncapacity;
  249. }
  250. /**
  251. *获取节点剩余可用容量大小
  252. */
  253. privatestaticlonggetRemaining(DatanodeStorageReportreport,StorageTypet){
  254. longremaining=0L;
  255. for(StorageReportr:report.getStorageReports()){
  256. if(r.getStorage().getStorageType()==t){
  257. remaining+=r.getRemaining();
  258. }
  259. }
  260. returnremaining;
  261. }
  262. /**
  263. *Givenadatanodestorageset,buildanetworktopologyanddecide
  264. *over-utilizedstorages,aboveaverageutilizedstorages,
  265. *belowaverageutilizedstorages,andunderutilizedstorages.
  266. *Theinputdatanodestoragesetisshuffledinordertorandomize
  267. *tothestoragematchinglateron.
  268. *给定一个datanode集合,创建一个网络拓扑逻辑并划分出过度使用,使用率超出平均值,
  269. *低于平均值的,以及未能充分利用资源的4种类型
  270. *
  271. *@returnthenumberofbytesneededtomoveinordertobalancethecluster.
  272. */
  273. privatelonginit(List<DatanodeStorageReport>reports){
  274. //计算平均使用率
  275. for(DatanodeStorageReportr:reports){
  276. //累加每个节点上的使用空间
  277. policy.accumulateSpaces(r);
  278. }
  279. //计算出平均值
  280. policy.initAvgUtilization();
  281. longoverLoadedBytes=0L,underLoadedBytes=0L;
  282. //进行使用率等级的划分,总共4种,over-utilized,above-average,below-averageandunder-utilized
  283. for(DatanodeStorageReportr:reports){
  284. finalDDatanodedn=dispatcher.newDatanode(r);
  285. for(StorageTypet:StorageType.asList()){
  286. finalDoubleutilization=policy.getUtilization(r,t);
  287. if(utilization==null){//datanodedoesnothavesuchstoragetype
  288. continue;
  289. }
  290. finallongcapacity=getCapacity(r,t);
  291. finaldoubleutilizationDiff=utilization-policy.getAvgUtilization(t);
  292. finaldoublethresholdDiff=Math.abs(utilizationDiff)-threshold;
  293. //计算理论上最大移动空间
  294. finallongmaxSize2Move=computeMaxSize2Move(capacity,
  295. getRemaining(r,t),utilizationDiff,threshold);
  296. finalStorageGroupg;
  297. if(utilizationDiff>0){
  298. //使用率超出平均值,加入发送节点列表中
  299. finalSources=dn.addSource(t,maxSize2Move,dispatcher);
  300. if(thresholdDiff<=0){//withinthreshold
  301. //如果在threshold范围之内,则加入aboveAvgUtilized
  302. aboveAvgUtilized.add(s);
  303. }else{
  304. //否则加入overUtilized,并计算超出空间
  305. overLoadedBytes+=precentage2bytes(thresholdDiff,capacity);
  306. overUtilized.add(s);
  307. }
  308. g=s;
  309. }else{
  310. //与上面的相反
  311. g=dn.addStorageGroup(t,maxSize2Move);
  312. if(thresholdDiff<=0){//withinthreshold
  313. belowAvgUtilized.add(g);
  314. }else{
  315. underLoadedBytes+=precentage2bytes(thresholdDiff,capacity);
  316. underUtilized.add(g);
  317. }
  318. }
  319. dispatcher.getStorageGroupMap().put(g);
  320. }
  321. }
  322. logUtilizationCollections();
  323. Preconditions.checkState(dispatcher.getStorageGroupMap().size()
  324. ==overUtilized.size()+underUtilized.size()+aboveAvgUtilized.size()
  325. +belowAvgUtilized.size(),
  326. "Mismatchednumberofstoragegroups");
  327. //returnnumberofbytestobemovedinordertomaketheclusterbalanced
  328. returnMath.max(overLoadedBytes,underLoadedBytes);
  329. }
  330. privatestaticlongcomputeMaxSize2Move(finallongcapacity,finallongremaining,
  331. finaldoubleutilizationDiff,finaldoublethreshold){
  332. finaldoublediff=Math.min(threshold,Math.abs(utilizationDiff));
  333. longmaxSizeToMove=precentage2bytes(diff,capacity);
  334. if(utilizationDiff<0){
  335. maxSizeToMove=Math.min(remaining,maxSizeToMove);
  336. }
  337. returnMath.min(MAX_SIZE_TO_MOVE,maxSizeToMove);
  338. }
  339. privatestaticlongprecentage2bytes(doubleprecentage,longcapacity){
  340. Preconditions.checkArgument(precentage>=0,
  341. "precentage="+precentage+"<0");
  342. return(long)(precentage*capacity/100.0);
  343. }
  344. /*logtheoverutilized&underutilizednodes*/
  345. privatevoidlogUtilizationCollections(){
  346. logUtilizationCollection("over-utilized",overUtilized);
  347. if(LOG.isTraceEnabled()){
  348. logUtilizationCollection("above-average",aboveAvgUtilized);
  349. logUtilizationCollection("below-average",belowAvgUtilized);
  350. }
  351. logUtilizationCollection("underutilized",underUtilized);
  352. }
  353. privatestatic<TextendsStorageGroup>
  354. voidlogUtilizationCollection(Stringname,Collection<T>items){
  355. LOG.info(items.size()+""+name+":"+items);
  356. }
  357. /**
  358. *Decideall<source,target>pairsand
  359. *thenumberofbytestomovefromasourcetoatarget
  360. *Maximumbytestobemovedperstoragegroupis
  361. *min(1Bandworthofbytes,MAX_SIZE_TO_MOVE).
  362. *从源节点列表和目标节点列表中各自选择节点组成一个个对,选择顺序优先为同节点组,同机架,然后是针对所有
  363. *@returntotalnumberofbytestomoveinthisiteration
  364. */
  365. privatelongchooseStorageGroups(){
  366. //First,matchnodesonthesamenodegroupifclusterisnodegroupaware
  367. if(dispatcher.getCluster().isNodeGroupAware()){
  368. //首先匹配的条件是同节点组
  369. chooseStorageGroups(Matcher.SAME_NODE_GROUP);
  370. }
  371. //Then,matchnodesonthesamerack
  372. //然后是同机架
  373. chooseStorageGroups(Matcher.SAME_RACK);
  374. //Atlast,matchallremainingnodes
  375. //最后是匹配所有的节点
  376. chooseStorageGroups(Matcher.ANY_OTHER);
  377. returndispatcher.bytesToMove();
  378. }
  379. /**Decideall<source,target>pairsaccordingtothematcher.*/
  380. privatevoidchooseStorageGroups(finalMatchermatcher){
  381. /*firststep:matcheachoverUtilizeddatanode(source)to
  382. *oneormoreunderUtilizeddatanodes(targets).
  383. *
  384. *把over组的数据移动under组中
  385. */
  386. chooseStorageGroups(overUtilized,underUtilized,matcher);
  387. /*matcheachremainingoverutilizeddatanode(source)to
  388. *belowaverageutilizeddatanodes(targets).
  389. *Noteonlyoverutilizeddatanodesthathaven'thadthatmaxbytestomove
  390. *satisfiedinstep1areselected
  391. *把over组的数据移动到below
  392. */
  393. chooseStorageGroups(overUtilized,belowAvgUtilized,matcher);
  394. /*matcheachremainingunderutilizeddatanode(target)to
  395. *aboveaverageutilizeddatanodes(source).
  396. *Noteonlyunderutilizeddatanodesthathavenothadthatmaxbytesto
  397. *movesatisfiedinstep1areselected.
  398. *
  399. *然后,再把under组的数据移动一部分到above组中
  400. */
  401. chooseStorageGroups(underUtilized,aboveAvgUtilized,matcher);
  402. }
  403. /**
  404. *Foreachdatanode,choosematchingnodesfromthecandidates.Eitherthe
  405. *datanodesorthecandidatesaresourcenodeswith(utilization>Avg),and
  406. *theothersaretargetnodeswith(utilization<Avg).
  407. */
  408. private<GextendsStorageGroup,CextendsStorageGroup>
  409. voidchooseStorageGroups(Collection<G>groups,Collection<C>candidates,
  410. Matchermatcher){
  411. for(finalIterator<G>i=groups.iterator();i.hasNext();){
  412. finalGg=i.next();
  413. for(;choose4One(g,candidates,matcher););
  414. if(!g.hasSpaceForScheduling()){
  415. //如果候选节点没有空间调度,则直接移除掉
  416. i.remove();
  417. }
  418. }
  419. }
  420. /**
  421. *Forthegivendatanode,chooseacandidateandthenscheduleit.
  422. *@returntrueifacandidateischosen;falseifnocandidatesischosen.
  423. */
  424. private<CextendsStorageGroup>booleanchoose4One(StorageGroupg,
  425. Collection<C>candidates,Matchermatcher){
  426. finalIterator<C>i=candidates.iterator();
  427. finalCchosen=chooseCandidate(g,i,matcher);
  428. if(chosen==null){
  429. returnfalse;
  430. }
  431. if(ginstanceofSource){
  432. matchSourceWithTargetToMove((Source)g,chosen);
  433. }else{
  434. matchSourceWithTargetToMove((Source)chosen,g);
  435. }
  436. if(!chosen.hasSpaceForScheduling()){
  437. i.remove();
  438. }
  439. returntrue;
  440. }
  441. /**
  442. *根据源节点和目标节点,构造任务对
  443. */
  444. privatevoidmatchSourceWithTargetToMove(Sourcesource,StorageGrouptarget){
  445. longsize=Math.min(source.availableSizeToMove(),target.availableSizeToMove());
  446. finalTasktask=newTask(target,size);
  447. source.addTask(task);
  448. target.incScheduledSize(task.getSize());
  449. //加入分发器中
  450. dispatcher.add(source,target);
  451. LOG.info("Decidedtomove"+StringUtils.byteDesc(size)+"bytesfrom"
  452. +source.getDisplayName()+"to"+target.getDisplayName());
  453. }
  454. /**Chooseacandidateforthegivendatanode.*/
  455. private<GextendsStorageGroup,CextendsStorageGroup>
  456. CchooseCandidate(Gg,Iterator<C>candidates,Matchermatcher){
  457. if(g.hasSpaceForScheduling()){
  458. for(;candidates.hasNext();){
  459. finalCc=candidates.next();
  460. if(!c.hasSpaceForScheduling()){
  461. candidates.remove();
  462. }elseif(matcher.match(dispatcher.getCluster(),
  463. g.getDatanodeInfo(),c.getDatanodeInfo())){
  464. //如果满足匹配的条件,则返回值
  465. returnc;
  466. }
  467. }
  468. }
  469. returnnull;
  470. }
  471. /*resetallfieldsinabalancerpreparingforthenextiteration*/
  472. privatevoidresetData(Configurationconf){
  473. this.overUtilized.clear();
  474. this.aboveAvgUtilized.clear();
  475. this.belowAvgUtilized.clear();
  476. this.underUtilized.clear();
  477. this.policy.reset();
  478. dispatcher.reset(conf);;
  479. }
  480. /**Runaniterationforalldatanodes.*/
  481. privateExitStatusrun(intiteration,Formatterformatter,
  482. Configurationconf){
  483. try{
  484. finalList<DatanodeStorageReport>reports=dispatcher.init();
  485. finallongbytesLeftToMove=init(reports);
  486. if(bytesLeftToMove==0){
  487. System.out.println("Theclusterisbalanced.Exiting...");
  488. returnExitStatus.SUCCESS;
  489. }else{
  490. LOG.info("Needtomove"+StringUtils.byteDesc(bytesLeftToMove)
  491. +"tomaketheclusterbalanced.");
  492. }
  493. /*Decideallthenodesthatwillparticipateintheblockmoveand
  494. *thenumberofbytesthatneedtobemovedfromonenodetoanother
  495. *inthisiteration.Maximumbytestobemovedpernodeis
  496. *Min(1Bandworthofbytes,MAX_SIZE_TO_MOVE).
  497. */
  498. finallongbytesToMove=chooseStorageGroups();
  499. if(bytesToMove==0){
  500. System.out.println("Noblockcanbemoved.Exiting...");
  501. returnExitStatus.NO_MOVE_BLOCK;
  502. }else{
  503. LOG.info("Willmove"+StringUtils.byteDesc(bytesToMove)+
  504. "inthisiteration");
  505. }
  506. formatter.format("%-24s%10d%19s%18s%17s%n",
  507. DateFormat.getDateTimeInstance().format(newDate()),
  508. iteration,
  509. StringUtils.byteDesc(dispatcher.getBytesMoved()),
  510. StringUtils.byteDesc(bytesLeftToMove),
  511. StringUtils.byteDesc(bytesToMove)
  512. );
  513. /*Foreachpairof<source,target>,startathreadthatrepeatedly
  514. *decideablocktobemovedanditsproxysource,
  515. *theninitiatesthemoveuntilallbytesaremovedornomoreblock
  516. *availabletomove.
  517. *Exitnobytehasbeenmovedfor5consecutiveiterations.
  518. *
  519. *如果发现在5次连续的迭代中还是没有字节被移动,则退出
  520. */
  521. if(!dispatcher.dispatchAndCheckContinue()){
  522. returnExitStatus.NO_MOVE_PROGRESS;
  523. }
  524. returnExitStatus.IN_PROGRESS;
  525. }catch(IllegalArgumentExceptione){
  526. System.out.println(e+".Exiting...");
  527. returnExitStatus.ILLEGAL_ARGUMENTS;
  528. }catch(IOExceptione){
  529. System.out.println(e+".Exiting...");
  530. returnExitStatus.IO_EXCEPTION;
  531. }catch(InterruptedExceptione){
  532. System.out.println(e+".Exiting...");
  533. returnExitStatus.INTERRUPTED;
  534. }finally{
  535. dispatcher.shutdownNow();
  536. }
  537. }
  538. /**
  539. *Balanceallnamenodes.
  540. *Foreachiteration,
  541. *foreachnamenode,
  542. *executea{@linkBalancer}toworkthroughalldatanodesonce.
  543. *
  544. *开放给外部调用的run方法
  545. */
  546. staticintrun(Collection<URI>namenodes,finalParametersp,
  547. Configurationconf)throwsIOException,InterruptedException{
  548. finallongsleeptime=2000*conf.getLong(
  549. DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
  550. DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
  551. LOG.info("namenodes="+namenodes);
  552. LOG.info("parameters="+p);
  553. finalFormatterformatter=newFormatter(System.out);
  554. System.out.println("TimeStampIteration#BytesAlreadyMovedBytesLeftToMoveBytesBeingMoved");
  555. finalList<NameNodeConnector>connectors
  556. =newArrayList<NameNodeConnector>(namenodes.size());
  557. try{
  558. for(URIuri:namenodes){
  559. finalNameNodeConnectornnc=newNameNodeConnector(
  560. Balancer.class.getSimpleName(),uri,BALANCER_ID_PATH,conf);
  561. nnc.getKeyManager().startBlockKeyUpdater();
  562. connectors.add(nnc);
  563. }
  564. booleandone=false;
  565. for(intiteration=0;!done;iteration++){
  566. done=true;
  567. Collections.shuffle(connectors);
  568. for(NameNodeConnectornnc:connectors){
  569. //初始化均衡器具
  570. finalBalancerb=newBalancer(nnc,p,conf);
  571. //均衡器执行balance操作
  572. finalExitStatusr=b.run(iteration,formatter,conf);
  573. //cleanalllists
  574. b.resetData(conf);
  575. if(r==ExitStatus.IN_PROGRESS){
  576. done=false;
  577. }elseif(r!=ExitStatus.SUCCESS){
  578. //mustbeanerrorstatue,return.
  579. returnr.getExitCode();
  580. }
  581. }
  582. if(!done){
  583. Thread.sleep(sleeptime);
  584. }
  585. }
  586. }finally{
  587. for(NameNodeConnectornnc:connectors){
  588. nnc.close();
  589. }
  590. }
  591. returnExitStatus.SUCCESS.getExitCode();
  592. }
  593. /*GivenelaspedTimeinms,returnaprintablestring*/
  594. privatestaticStringtime2Str(longelapsedTime){
  595. Stringunit;
  596. doubletime=elapsedTime;
  597. if(elapsedTime<1000){
  598. unit="milliseconds";
  599. }elseif(elapsedTime<60*1000){
  600. unit="seconds";
  601. time=time/1000;
  602. }elseif(elapsedTime<3600*1000){
  603. unit="minutes";
  604. time=time/(60*1000);
  605. }else{
  606. unit="hours";
  607. time=time/(3600*1000);
  608. }
  609. returntime+""+unit;
  610. }
  611. staticclassParameters{
  612. staticfinalParametersDEFAULT=newParameters(
  613. BalancingPolicy.Node.INSTANCE,10.0,
  614. Collections.<String>emptySet(),Collections.<String>emptySet());
  615. finalBalancingPolicypolicy;
  616. finaldoublethreshold;
  617. //excludethenodesinthissetfrombalancingoperations
  618. Set<String>nodesToBeExcluded;
  619. //includeonlythesenodesinbalancingoperations
  620. Set<String>nodesToBeIncluded;
  621. Parameters(BalancingPolicypolicy,doublethreshold,
  622. Set<String>nodesToBeExcluded,Set<String>nodesToBeIncluded){
  623. this.policy=policy;
  624. this.threshold=threshold;
  625. this.nodesToBeExcluded=nodesToBeExcluded;
  626. this.nodesToBeIncluded=nodesToBeIncluded;
  627. }
  628. @Override
  629. publicStringtoString(){
  630. returnBalancer.class.getSimpleName()+"."+getClass().getSimpleName()
  631. +"["+policy+",threshold="+threshold+
  632. ",numberofnodestobeexcluded="+nodesToBeExcluded.size()+
  633. ",numberofnodestobeincluded="+nodesToBeIncluded.size()+"]";
  634. }
  635. }
  636. staticclassCliextendsConfiguredimplementsTool{
  637. /**
  638. *ParseargumentsandthenrunBalancer.
  639. *
  640. *@paramargscommandspecificarguments.
  641. *@returnexitcode.0indicatessuccess,non-zeroindicatesfailure.
  642. */
  643. @Override
  644. publicintrun(String[]args){
  645. finallongstartTime=Time.now();
  646. finalConfigurationconf=getConf();
  647. try{
  648. checkReplicationPolicyCompatibility(conf);
  649. finalCollection<URI>namenodes=DFSUtil.getNsServiceRpcUris(conf);
  650. returnBalancer.run(namenodes,parse(args),conf);
  651. }catch(IOExceptione){
  652. System.out.println(e+".Exiting...");
  653. returnExitStatus.IO_EXCEPTION.getExitCode();
  654. }catch(InterruptedExceptione){
  655. System.out.println(e+".Exiting...");
  656. returnExitStatus.INTERRUPTED.getExitCode();
  657. }finally{
  658. System.out.format("%-24s",DateFormat.getDateTimeInstance().format(newDate()));
  659. System.out.println("Balancingtook"+time2Str(Time.now()-startTime));
  660. }
  661. }
  662. /**parsecommandlinearguments*/
  663. staticParametersparse(String[]args){
  664. BalancingPolicypolicy=Parameters.DEFAULT.policy;
  665. doublethreshold=Parameters.DEFAULT.threshold;
  666. Set<String>nodesTobeExcluded=Parameters.DEFAULT.nodesToBeExcluded;
  667. Set<String>nodesTobeIncluded=Parameters.DEFAULT.nodesToBeIncluded;
  668. if(args!=null){
  669. try{
  670. for(inti=0;i<args.length;i++){
  671. if("-threshold".equalsIgnoreCase(args[i])){
  672. checkArgument(++i<args.length,
  673. "Thresholdvalueismissing:args="+Arrays.toString(args));
  674. try{
  675. threshold=Double.parseDouble(args[i]);
  676. if(threshold<1||threshold>100){
  677. thrownewIllegalArgumentException(
  678. "Numberoutofrange:threshold="+threshold);
  679. }
  680. LOG.info("Usingathresholdof"+threshold);
  681. }catch(IllegalArgumentExceptione){
  682. System.err.println(
  683. "Expectinganumberintherangeof[1.0,100.0]:"
  684. +args[i]);
  685. throwe;
  686. }
  687. }elseif("-policy".equalsIgnoreCase(args[i])){
  688. checkArgument(++i<args.length,
  689. "Policyvalueismissing:args="+Arrays.toString(args));
  690. try{
  691. policy=BalancingPolicy.parse(args[i]);
  692. }catch(IllegalArgumentExceptione){
  693. System.err.println("Illegalpolicyname:"+args[i]);
  694. throwe;
  695. }
  696. }elseif("-exclude".equalsIgnoreCase(args[i])){
  697. checkArgument(++i<args.length,
  698. "Listofnodestoexclude|-f<filename>ismissing:args="
  699. +Arrays.toString(args));
  700. if("-f".equalsIgnoreCase(args[i])){
  701. checkArgument(++i<args.length,
  702. "Filecontainingnodestoexcludeisnotspecified:args="
  703. +Arrays.toString(args));
  704. nodesTobeExcluded=Util.getHostListFromFile(args[i],"exclude");
  705. }else{
  706. nodesTobeExcluded=Util.parseHostList(args[i]);
  707. }
  708. }elseif("-include".equalsIgnoreCase(args[i])){
  709. checkArgument(++i<args.length,
  710. "Listofnodestoinclude|-f<filename>ismissing:args="
  711. +Arrays.toString(args));
  712. if("-f".equalsIgnoreCase(args[i])){
  713. checkArgument(++i<args.length,
  714. "Filecontainingnodestoincludeisnotspecified:args="
  715. +Arrays.toString(args));
  716. nodesTobeIncluded=Util.getHostListFromFile(args[i],"include");
  717. }else{
  718. nodesTobeIncluded=Util.parseHostList(args[i]);
  719. }
  720. }else{
  721. thrownewIllegalArgumentException("args="
  722. +Arrays.toString(args));
  723. }
  724. }
  725. checkArgument(nodesTobeExcluded.isEmpty()||nodesTobeIncluded.isEmpty(),
  726. "-excludeand-includeoptionscannotbespecifiedtogether.");
  727. }catch(RuntimeExceptione){
  728. printUsage(System.err);
  729. throwe;
  730. }
  731. }
  732. returnnewParameters(policy,threshold,nodesTobeExcluded,nodesTobeIncluded);
  733. }
  734. privatestaticvoidprintUsage(PrintStreamout){
  735. out.println(USAGE+"\n");
  736. }
  737. }
  738. /**
  739. *Runabalancer
  740. *@paramargsCommandlinearguments
  741. */
  742. publicstaticvoidmain(String[]args){
  743. if(DFSUtil.parseHelpArgument(args,USAGE,System.out,true)){
  744. System.exit(0);
  745. }
  746. try{
  747. System.exit(ToolRunner.run(newHdfsConfiguration(),newCli(),args));
  748. }catch(Throwablee){
  749. LOG.error("Exitingbalancerdueanexception",e);
  750. System.exit(-1);
  751. }
  752. }
  753. }
下面是Dispatch.java类的解析:

  1. /**
  2. *LicensedtotheApacheSoftwareFoundation(ASF)underone
  3. *ormorecontributorlicenseagreements.SeetheNOTICEfile
  4. *distributedwiththisworkforadditionalinformation
  5. *regardingcopyrightownership.TheASFlicensesthisfile
  6. *toyouundertheApacheLicense,Version2.0(the
  7. *"License");youmaynotusethisfileexceptincompliance
  8. *withtheLicense.YoumayobtainacopyoftheLicenseat
  9. *
  10. *http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. *Unlessrequiredbyapplicablelaworagreedtoinwriting,software
  13. *distributedundertheLicenseisdistributedonan"ASIS"BASIS,
  14. *WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
  15. *SeetheLicenseforthespecificlanguagegoverningpermissionsand
  16. *limitationsundertheLicense.
  17. */
  18. packageorg.apache.hadoop.hdfs.server.balancer;
  19. importstaticorg.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
  20. importjava.io.BufferedInputStream;
  21. importjava.io.BufferedOutputStream;
  22. importjava.io.DataInputStream;
  23. importjava.io.DataOutputStream;
  24. importjava.io.IOException;
  25. importjava.io.InputStream;
  26. importjava.io.OutputStream;
  27. importjava.net.Socket;
  28. importjava.util.ArrayList;
  29. importjava.util.Arrays;
  30. importjava.util.Collection;
  31. importjava.util.EnumMap;
  32. importjava.util.HashMap;
  33. importjava.util.HashSet;
  34. importjava.util.Iterator;
  35. importjava.util.List;
  36. importjava.util.Map;
  37. importjava.util.Set;
  38. importjava.util.concurrent.ExecutionException;
  39. importjava.util.concurrent.ExecutorService;
  40. importjava.util.concurrent.Executors;
  41. importjava.util.concurrent.Future;
  42. importjava.util.concurrent.atomic.AtomicLong;
  43. importorg.apache.commons.logging.Log;
  44. importorg.apache.commons.logging.LogFactory;
  45. importorg.apache.hadoop.classification.InterfaceAudience;
  46. importorg.apache.hadoop.conf.Configuration;
  47. importorg.apache.hadoop.fs.CommonConfigurationKeys;
  48. importorg.apache.hadoop.hdfs.DFSUtil;
  49. importorg.apache.hadoop.hdfs.StorageType;
  50. importorg.apache.hadoop.hdfs.protocol.Block;
  51. importorg.apache.hadoop.hdfs.protocol.DatanodeInfo;
  52. importorg.apache.hadoop.hdfs.protocol.ExtendedBlock;
  53. importorg.apache.hadoop.hdfs.protocol.HdfsConstants;
  54. importorg.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
  55. importorg.apache.hadoop.hdfs.protocol.datatransfer.Sender;
  56. importorg.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
  57. importorg.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
  58. importorg.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
  59. importorg.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
  60. importorg.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
  61. importorg.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
  62. importorg.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
  63. importorg.apache.hadoop.hdfs.server.common.HdfsServerConstants;
  64. importorg.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
  65. importorg.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
  66. importorg.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
  67. importorg.apache.hadoop.io.IOUtils;
  68. importorg.apache.hadoop.net.NetUtils;
  69. importorg.apache.hadoop.net.NetworkTopology;
  70. importorg.apache.hadoop.security.token.Token;
  71. importorg.apache.hadoop.util.HostsFileReader;
  72. importorg.apache.hadoop.util.StringUtils;
  73. importorg.apache.hadoop.util.Time;
  74. importcom.google.common.base.Preconditions;
  75. /**Dispatchingblockreplicamovesbetweendatanodes.*/
  76. @InterfaceAudience.Private
  77. publicclassDispatcher{
  78. staticfinalLogLOG=LogFactory.getLog(Dispatcher.class);
  79. privatestaticfinallongGB=1L<<30;//1GB
  80. privatestaticfinallongMAX_BLOCKS_SIZE_TO_FETCH=2*GB;
  81. privatestaticfinalintMAX_NO_PENDING_MOVE_ITERATIONS=5;
  82. privatestaticfinallongDELAY_AFTER_ERROR=10*1000L;//10seconds
  83. privatefinalNameNodeConnectornnc;
  84. privatefinalSaslDataTransferClientsaslClient;
  85. /**Setofdatanodestobeexcluded.*/
  86. privatefinalSet<String>excludedNodes;
  87. /**Restricttothefollowingnodes.*/
  88. privatefinalSet<String>includedNodes;
  89. privatefinalCollection<Source>sources=newHashSet<Source>();
  90. privatefinalCollection<StorageGroup>targets=newHashSet<StorageGroup>();
  91. privatefinalGlobalBlockMapglobalBlocks=newGlobalBlockMap();
  92. privatefinalMovedBlocks<StorageGroup>movedBlocks;
  93. /**Map(datanodeUuid,storageType->StorageGroup)*/
  94. privatefinalStorageGroupMapstorageGroupMap=newStorageGroupMap();
  95. privateNetworkTopologycluster;
  96. privatefinalExecutorServicemoveExecutor;
  97. privatefinalExecutorServicedispatchExecutor;
  98. /**Themaximumnumberofconcurrentblocksmovesatadatanode*/
  99. privatefinalintmaxConcurrentMovesPerNode;
  100. privatefinalAtomicLongbytesMoved=newAtomicLong();
  101. privatestaticclassGlobalBlockMap{
  102. privatefinalMap<Block,DBlock>map=newHashMap<Block,DBlock>();
  103. /**
  104. *Gettheblockfromthemap;
  105. *iftheblockisnotfound,createanewblockandputitinthemap.
  106. */
  107. privateDBlockget(Blockb){
  108. DBlockblock=map.get(b);
  109. if(block==null){
  110. block=newDBlock(b);
  111. map.put(b,block);
  112. }
  113. returnblock;
  114. }
  115. /**Removeallblocksexceptforthemovedblocks.*/
  116. privatevoidremoveAllButRetain(MovedBlocks<StorageGroup>movedBlocks){
  117. for(Iterator<Block>i=map.keySet().iterator();i.hasNext();){
  118. if(!movedBlocks.contains(i.next())){
  119. i.remove();
  120. }
  121. }
  122. }
  123. }
  124. staticclassStorageGroupMap{
  125. privatestaticStringtoKey(StringdatanodeUuid,StorageTypestorageType){
  126. returndatanodeUuid+":"+storageType;
  127. }
  128. privatefinalMap<String,StorageGroup>map=newHashMap<String,StorageGroup>();
  129. StorageGroupget(StringdatanodeUuid,StorageTypestorageType){
  130. returnmap.get(toKey(datanodeUuid,storageType));
  131. }
  132. voidput(StorageGroupg){
  133. finalStringkey=toKey(g.getDatanodeInfo().getDatanodeUuid(),g.storageType);
  134. finalStorageGroupexisting=map.put(key,g);
  135. Preconditions.checkState(existing==null);
  136. }
  137. intsize(){
  138. returnmap.size();
  139. }
  140. voidclear(){
  141. map.clear();
  142. }
  143. }
  144. /**Thisclasskeepstrackofascheduledblockmove*/
  145. privateclassPendingMove{
  146. privateDBlockblock;
  147. privateSourcesource;
  148. privateDDatanodeproxySource;
  149. privateStorageGrouptarget;
  150. privatePendingMove(){
  151. }
  152. @Override
  153. publicStringtoString(){
  154. finalBlockb=block.getBlock();
  155. returnb+"withsize="+b.getNumBytes()+"from"
  156. +source.getDisplayName()+"to"+target.getDisplayName()
  157. +"through"+proxySource.datanode;
  158. }
  159. /**
  160. *Chooseablock&aproxysourceforthispendingMovewhosesource&
  161. *targethavealreadybeenchosen.
  162. *
  163. *@returntrueifablockanditsproxyarechosen;falseotherwise
  164. */
  165. privatebooleanchooseBlockAndProxy(){
  166. //iterateallsource'sblocksuntilfindagoodone
  167. for(Iterator<DBlock>i=source.getBlockIterator();i.hasNext();){
  168. if(markMovedIfGoodBlock(i.next())){
  169. i.remove();
  170. returntrue;
  171. }
  172. }
  173. returnfalse;
  174. }
  175. /**
  176. *@returntrueifthegivenblockisgoodforthetentativemove.
  177. */
  178. privatebooleanmarkMovedIfGoodBlock(DBlockblock){
  179. synchronized(block){
  180. synchronized(movedBlocks){
  181. if(isGoodBlockCandidate(source,target,block)){
  182. this.block=block;
  183. if(chooseProxySource()){
  184. movedBlocks.put(block);
  185. if(LOG.isDebugEnabled()){
  186. LOG.debug("Decidedtomove"+this);
  187. }
  188. returntrue;
  189. }
  190. }
  191. }
  192. }
  193. returnfalse;
  194. }
  195. /**
  196. *Chooseaproxysource.
  197. *
  198. *@returntrueifaproxyisfound;otherwisefalse
  199. */
  200. privatebooleanchooseProxySource(){
  201. finalDatanodeInfotargetDN=target.getDatanodeInfo();
  202. //ifnodegroupissupported,firsttryaddnodesinthesamenodegroup
  203. if(cluster.isNodeGroupAware()){
  204. for(StorageGrouploc:block.getLocations()){
  205. if(cluster.isOnSameNodeGroup(loc.getDatanodeInfo(),targetDN)
  206. &&addTo(loc)){
  207. returntrue;
  208. }
  209. }
  210. }
  211. //checkifthereisreplicawhichisonthesamerackwiththetarget
  212. for(StorageGrouploc:block.getLocations()){
  213. if(cluster.isOnSameRack(loc.getDatanodeInfo(),targetDN)&&addTo(loc)){
  214. returntrue;
  215. }
  216. }
  217. //findoutanon-busyreplica
  218. for(StorageGrouploc:block.getLocations()){
  219. if(addTo(loc)){
  220. returntrue;
  221. }
  222. }
  223. returnfalse;
  224. }
  225. /**addtoaproxysourceforspecificblockmovement*/
  226. privatebooleanaddTo(StorageGroupg){
  227. finalDDatanodedn=g.getDDatanode();
  228. if(dn.addPendingBlock(this)){
  229. proxySource=dn;
  230. returntrue;
  231. }
  232. returnfalse;
  233. }
  234. /**Dispatchthemovetotheproxysource&waitfortheresponse.*/
  235. privatevoiddispatch(){
  236. if(LOG.isDebugEnabled()){
  237. LOG.debug("Startmoving"+this);
  238. }
  239. Socketsock=newSocket();
  240. DataOutputStreamout=null;
  241. DataInputStreamin=null;
  242. try{
  243. sock.connect(
  244. NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
  245. HdfsServerConstants.READ_TIMEOUT);
  246. sock.setKeepAlive(true);
  247. OutputStreamunbufOut=sock.getOutputStream();
  248. InputStreamunbufIn=sock.getInputStream();
  249. ExtendedBlockeb=newExtendedBlock(nnc.getBlockpoolID(),
  250. block.getBlock());
  251. finalKeyManagerkm=nnc.getKeyManager();
  252. Token<BlockTokenIdentifier>accessToken=km.getAccessToken(eb);
  253. IOStreamPairsaslStreams=saslClient.socketSend(sock,unbufOut,
  254. unbufIn,km,accessToken,target.getDatanodeInfo());
  255. unbufOut=saslStreams.out;
  256. unbufIn=saslStreams.in;
  257. out=newDataOutputStream(newBufferedOutputStream(unbufOut,
  258. HdfsConstants.IO_FILE_BUFFER_SIZE));
  259. in=newDataInputStream(newBufferedInputStream(unbufIn,
  260. HdfsConstants.IO_FILE_BUFFER_SIZE));
  261. sendRequest(out,eb,accessToken);
  262. receiveResponse(in);
  263. bytesMoved.addAndGet(block.getNumBytes());
  264. LOG.info("Successfullymoved"+this);
  265. }catch(IOExceptione){
  266. LOG.warn("Failedtomove"+this+":"+e.getMessage());
  267. //Proxyortargetmayhavesomeissues,delaybeforeusingthesenodes
  268. //furtherinordertoavoidapotentialstormof"threadsquota
  269. //exceeded"warningswhenthedispatchergetsoutofsyncwithwork
  270. //goingonindatanodes.
  271. proxySource.activateDelay(DELAY_AFTER_ERROR);
  272. target.getDDatanode().activateDelay(DELAY_AFTER_ERROR);
  273. }finally{
  274. IOUtils.closeStream(out);
  275. IOUtils.closeStream(in);
  276. IOUtils.closeSocket(sock);
  277. proxySource.removePendingBlock(this);
  278. target.getDDatanode().removePendingBlock(this);
  279. synchronized(this){
  280. reset();
  281. }
  282. synchronized(Dispatcher.this){
  283. Dispatcher.this.notifyAll();
  284. }
  285. }
  286. }
  287. /**Sendablockreplacerequesttotheoutputstream*/
  288. privatevoidsendRequest(DataOutputStreamout,ExtendedBlockeb,
  289. Token<BlockTokenIdentifier>accessToken)throwsIOException{
  290. newSender(out).replaceBlock(eb,target.storageType,accessToken,
  291. source.getDatanodeInfo().getDatanodeUuid(),proxySource.datanode);
  292. }
  293. /**Receiveablockcopyresponsefromtheinputstream*/
  294. privatevoidreceiveResponse(DataInputStreamin)throwsIOException{
  295. BlockOpResponseProtoresponse=
  296. BlockOpResponseProto.parseFrom(vintPrefixed(in));
  297. while(response.getStatus()==Status.IN_PROGRESS){
  298. //readintermediateresponses
  299. response=BlockOpResponseProto.parseFrom(vintPrefixed(in));
  300. }
  301. if(response.getStatus()!=Status.SUCCESS){
  302. if(response.getStatus()==Status.ERROR_ACCESS_TOKEN){
  303. thrownewIOException("blockmovefailedduetoaccesstokenerror");
  304. }
  305. thrownewIOException("blockmoveisfailed:"+response.getMessage());
  306. }
  307. }
  308. /**resettheobject*/
  309. privatevoidreset(){
  310. block=null;
  311. source=null;
  312. proxySource=null;
  313. target=null;
  314. }
  315. }
  316. /**Aclassforkeepingtrackofblocklocationsinthedispatcher.*/
  317. privatestaticclassDBlockextendsMovedBlocks.Locations<StorageGroup>{
  318. DBlock(Blockblock){
  319. super(block);
  320. }
  321. }
  322. /**Theclassrepresentsadesiredmove.*/
  323. staticclassTask{
  324. privatefinalStorageGrouptarget;
  325. privatelongsize;//bytesscheduledtomove
  326. Task(StorageGrouptarget,longsize){
  327. this.target=target;
  328. this.size=size;
  329. }
  330. longgetSize(){
  331. returnsize;
  332. }
  333. }
  334. /**Aclassthatkeepstrackofadatanode.*/
  335. staticclassDDatanode{
  336. /**Agroupofstoragesinadatanodewiththesamestoragetype.*/
  337. classStorageGroup{
  338. finalStorageTypestorageType;
  339. finallongmaxSize2Move;
  340. privatelongscheduledSize=0L;
  341. privateStorageGroup(StorageTypestorageType,longmaxSize2Move){
  342. this.storageType=storageType;
  343. this.maxSize2Move=maxSize2Move;
  344. }
  345. privateDDatanodegetDDatanode(){
  346. returnDDatanode.this;
  347. }
  348. DatanodeInfogetDatanodeInfo(){
  349. returnDDatanode.this.datanode;
  350. }
  351. /**Decideifstillneedtomovemorebytes*/
  352. synchronizedbooleanhasSpaceForScheduling(){
  353. returnavailableSizeToMove()>0L;
  354. }
  355. /**@returnthetotalnumberofbytesthatneedtobemoved*/
  356. synchronizedlongavailableSizeToMove(){
  357. returnmaxSize2Move-scheduledSize;
  358. }
  359. /**incrementscheduledsize*/
  360. synchronizedvoidincScheduledSize(longsize){
  361. scheduledSize+=size;
  362. }
  363. /**@returnscheduledsize*/
  364. synchronizedlonggetScheduledSize(){
  365. returnscheduledSize;
  366. }
  367. /**Resetscheduledsizetozero.*/
  368. synchronizedvoidresetScheduledSize(){
  369. scheduledSize=0L;
  370. }
  371. /**@returnthenamefordisplay*/
  372. StringgetDisplayName(){
  373. returndatanode+":"+storageType;
  374. }
  375. @Override
  376. publicStringtoString(){
  377. returngetDisplayName();
  378. }
  379. }
  380. finalDatanodeInfodatanode;
  381. finalEnumMap<StorageType,StorageGroup>storageMap
  382. =newEnumMap<StorageType,StorageGroup>(StorageType.class);
  383. protectedlongdelayUntil=0L;
  384. /**blocksbeingmovedbutnotconfirmedyet*/
  385. privatefinalList<PendingMove>pendings;
  386. privatefinalintmaxConcurrentMoves;
  387. @Override
  388. publicStringtoString(){
  389. returngetClass().getSimpleName()+":"+datanode+":"+storageMap.values();
  390. }
  391. privateDDatanode(DatanodeStorageReportr,intmaxConcurrentMoves){
  392. this.datanode=r.getDatanodeInfo();
  393. this.maxConcurrentMoves=maxConcurrentMoves;
  394. this.pendings=newArrayList<PendingMove>(maxConcurrentMoves);
  395. }
  396. privatevoidput(StorageTypestorageType,StorageGroupg){
  397. finalStorageGroupexisting=storageMap.put(storageType,g);
  398. Preconditions.checkState(existing==null);
  399. }
  400. StorageGroupaddStorageGroup(StorageTypestorageType,longmaxSize2Move){
  401. finalStorageGroupg=newStorageGroup(storageType,maxSize2Move);
  402. put(storageType,g);
  403. returng;
  404. }
  405. SourceaddSource(StorageTypestorageType,longmaxSize2Move,Dispatcherd){
  406. finalSources=d.newSource(storageType,maxSize2Move,this);
  407. put(storageType,s);
  408. returns;
  409. }
  410. synchronizedprivatevoidactivateDelay(longdelta){
  411. delayUntil=Time.monotonicNow()+delta;
  412. }
  413. synchronizedprivatebooleanisDelayActive(){
  414. if(delayUntil==0||Time.monotonicNow()>delayUntil){
  415. delayUntil=0;
  416. returnfalse;
  417. }
  418. returntrue;
  419. }
  420. /**Checkifthenodecanschedulemoreblockstomove*/
  421. synchronizedbooleanisPendingQNotFull(){
  422. returnpendings.size()<maxConcurrentMoves;
  423. }
  424. /**Checkifallthedispatchedmovesaredone*/
  425. synchronizedbooleanisPendingQEmpty(){
  426. returnpendings.isEmpty();
  427. }
  428. /**Addascheduledblockmovetothenode*/
  429. synchronizedbooleanaddPendingBlock(PendingMovependingBlock){
  430. if(!isDelayActive()&&isPendingQNotFull()){
  431. returnpendings.add(pendingBlock);
  432. }
  433. returnfalse;
  434. }
  435. /**Removeascheduledblockmovefromthenode*/
  436. synchronizedbooleanremovePendingBlock(PendingMovependingBlock){
  437. returnpendings.remove(pendingBlock);
  438. }
  439. }
  440. /**Anodethatcanbethesourcesofablockmove*/
  441. classSourceextendsDDatanode.StorageGroup{
  442. privatefinalList<Task>tasks=newArrayList<Task>(2);
  443. privatelongblocksToReceive=0L;
  444. /**
  445. *Sourceblockspointtotheobjectsin{@linkDispatcher#globalBlocks}
  446. *becausewewanttokeeponecopyofablockandbeawarethatthe
  447. *locationsarechangingovertime.
  448. */
  449. privatefinalList<DBlock>srcBlocks=newArrayList<DBlock>();
  450. privateSource(StorageTypestorageType,longmaxSize2Move,DDatanodedn){
  451. dn.super(storageType,maxSize2Move);
  452. }
  453. /**Addatask*/
  454. voidaddTask(Tasktask){
  455. Preconditions.checkState(task.target!=this,
  456. "Sourceandtargetarethesamestoragegroup"+getDisplayName());
  457. incScheduledSize(task.size);
  458. tasks.add(task);
  459. }
  460. /**@returnaniteratortothissource'sblocks*/
  461. Iterator<DBlock>getBlockIterator(){
  462. returnsrcBlocks.iterator();
  463. }
  464. /**
  465. *Fetchnewblocksofthissourcefromnamenodeandupdatethissource's
  466. *blocklist&{@linkDispatcher#globalBlocks}.
  467. *
  468. *@returnthetotalsizeofthereceivedblocksinthenumberofbytes.
  469. */
  470. privatelonggetBlockList()throwsIOException{
  471. finallongsize=Math.min(MAX_BLOCKS_SIZE_TO_FETCH,blocksToReceive);
  472. finalBlocksWithLocationsnewBlocks=nnc.getBlocks(getDatanodeInfo(),size);
  473. longbytesReceived=0;
  474. for(BlockWithLocationsblk:newBlocks.getBlocks()){
  475. bytesReceived+=blk.getBlock().getNumBytes();
  476. synchronized(globalBlocks){
  477. finalDBlockblock=globalBlocks.get(blk.getBlock());
  478. synchronized(block){
  479. block.clearLocations();
  480. //updatelocations
  481. finalString[]datanodeUuids=blk.getDatanodeUuids();
  482. finalStorageType[]storageTypes=blk.getStorageTypes();
  483. for(inti=0;i<datanodeUuids.length;i++){
  484. finalStorageGroupg=storageGroupMap.get(
  485. datanodeUuids[i],storageTypes[i]);
  486. if(g!=null){//notunknown
  487. block.addLocation(g);
  488. }
  489. }
  490. }
  491. if(!srcBlocks.contains(block)&&isGoodBlockCandidate(block)){
  492. //filterbadcandidates
  493. srcBlocks.add(block);
  494. }
  495. }
  496. }
  497. returnbytesReceived;
  498. }
  499. /**Decideifthegivenblockisagoodcandidatetomoveornot*/
  500. privatebooleanisGoodBlockCandidate(DBlockblock){
  501. for(Taskt:tasks){
  502. if(Dispatcher.this.isGoodBlockCandidate(this,t.target,block)){
  503. returntrue;
  504. }
  505. }
  506. returnfalse;
  507. }
  508. /**
  509. *Chooseamoveforthesource.Theblock'ssource,target,andproxy
  510. *aredeterminedtoo.Whenchoosingproxyandtarget,source&
  511. *targetthrottlinghasbeenconsidered.Theyarechosenonlywhenthey
  512. *havethecapacitytosupportthisblockmove.Theblockshouldbe
  513. *dispatchedimmediatelyafterthismethodisreturned.
  514. *
  515. *@returnamovethat'sgoodforthesourcetodispatchimmediately.
  516. */
  517. privatePendingMovechooseNextMove(){
  518. for(Iterator<Task>i=tasks.iterator();i.hasNext();){
  519. finalTasktask=i.next();
  520. finalDDatanodetarget=task.target.getDDatanode();
  521. PendingMovependingBlock=newPendingMove();
  522. if(target.addPendingBlock(pendingBlock)){
  523. //targetisnotbusy,sodoatentativeblockallocation
  524. pendingBlock.source=this;
  525. pendingBlock.target=task.target;
  526. if(pendingBlock.chooseBlockAndProxy()){
  527. longblockSize=pendingBlock.block.getNumBytes();
  528. incScheduledSize(-blockSize);
  529. task.size-=blockSize;
  530. if(task.size==0){
  531. i.remove();
  532. }
  533. returnpendingBlock;
  534. }else{
  535. //cancelthetentativemove
  536. target.removePendingBlock(pendingBlock);
  537. }
  538. }
  539. }
  540. returnnull;
  541. }
  542. /**Iterateallsource'sblockstoremovemovedones*/
  543. privatevoidremoveMovedBlocks(){
  544. for(Iterator<DBlock>i=getBlockIterator();i.hasNext();){
  545. if(movedBlocks.contains(i.next().getBlock())){
  546. i.remove();
  547. }
  548. }
  549. }
  550. privatestaticfinalintSOURCE_BLOCKS_MIN_SIZE=5;
  551. /**@returnifshouldfetchmoreblocksfromnamenode*/
  552. privatebooleanshouldFetchMoreBlocks(){
  553. returnsrcBlocks.size()<SOURCE_BLOCKS_MIN_SIZE&&blocksToReceive>0;
  554. }
  555. privatestaticfinallongMAX_ITERATION_TIME=20*60*1000L;//20mins
  556. /**
  557. *Thismethoditerativelydoesthefollowing:itfirstselectsablockto
  558. *move,thensendsarequesttotheproxysourcetostarttheblockmove
  559. *whenthesource'sblocklistfallsbelowathreshold,itasksthe
  560. *namenodeformoreblocks.Itterminateswhenithasdispatchenoughblock
  561. *movetasksorithasreceivedenoughblocksfromthenamenode,orthe
  562. *elapsedtimeoftheiterationhasexceededthemaxtimelimit.
  563. */
  564. privatevoiddispatchBlocks(){
  565. finallongstartTime=Time.monotonicNow();
  566. this.blocksToReceive=2*getScheduledSize();
  567. booleanisTimeUp=false;
  568. intnoPendingMoveIteration=0;
  569. while(!isTimeUp&&getScheduledSize()>0
  570. &&(!srcBlocks.isEmpty()||blocksToReceive>0)){
  571. finalPendingMovep=chooseNextMove();
  572. if(p!=null){
  573. //movetheblock
  574. moveExecutor.execute(newRunnable(){
  575. @Override
  576. publicvoidrun(){
  577. p.dispatch();
  578. }
  579. });
  580. continue;
  581. }
  582. //Sincewecannotscheduleanyblocktomove,
  583. //removeanymovedblocksfromthesourceblocklistand
  584. removeMovedBlocks();//filteralreadymovedblocks
  585. //checkifweshouldfetchmoreblocksfromthenamenode
  586. if(shouldFetchMoreBlocks()){
  587. //fetchnewblocks
  588. try{
  589. blocksToReceive-=getBlockList();
  590. continue;
  591. }catch(IOExceptione){
  592. LOG.warn("Exceptionwhilegettingblocklist",e);
  593. return;
  594. }
  595. }else{
  596. //sourcenodecannotfindapendingblocktomove,iteration+1
  597. noPendingMoveIteration++;
  598. //incasenoblockscanbemovedforsourcenode'stask,
  599. //jumpoutofwhile-loopafter5iterations.
  600. if(noPendingMoveIteration>=MAX_NO_PENDING_MOVE_ITERATIONS){
  601. resetScheduledSize();
  602. }
  603. }
  604. //checkiftimeisupornot
  605. if(Time.monotonicNow()-startTime>MAX_ITERATION_TIME){
  606. isTimeUp=true;
  607. continue;
  608. }
  609. //Nowwecannotscheduleanyblocktomoveandthereare
  610. //nonewblocksaddedtothesourceblocklist,sowewait.
  611. try{
  612. synchronized(Dispatcher.this){
  613. Dispatcher.this.wait(1000);//waitfortargets/sourcestobeidle
  614. }
  615. }catch(InterruptedExceptionignored){
  616. }
  617. }
  618. }
  619. }
  620. publicDispatcher(NameNodeConnectornnc,Set<String>includedNodes,
  621. Set<String>excludedNodes,longmovedWinWidth,intmoverThreads,
  622. intdispatcherThreads,intmaxConcurrentMovesPerNode,Configurationconf){
  623. this.nnc=nnc;
  624. this.excludedNodes=excludedNodes;
  625. this.includedNodes=includedNodes;
  626. this.movedBlocks=newMovedBlocks<StorageGroup>(movedWinWidth);
  627. this.cluster=NetworkTopology.getInstance(conf);
  628. this.moveExecutor=Executors.newFixedThreadPool(moverThreads);
  629. this.dispatchExecutor=Executors.newFixedThreadPool(dispatcherThreads);
  630. this.maxConcurrentMovesPerNode=maxConcurrentMovesPerNode;
  631. finalbooleanfallbackToSimpleAuthAllowed=conf.getBoolean(
  632. CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
  633. CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
  634. this.saslClient=newSaslDataTransferClient(
  635. DataTransferSaslUtil.getSaslPropertiesResolver(conf),
  636. TrustedChannelResolver.getInstance(conf),fallbackToSimpleAuthAllowed);
  637. }
  638. StorageGroupMapgetStorageGroupMap(){
  639. returnstorageGroupMap;
  640. }
  641. NetworkTopologygetCluster(){
  642. returncluster;
  643. }
  644. longgetBytesMoved(){
  645. returnbytesMoved.get();
  646. }
  647. longbytesToMove(){
  648. Preconditions.checkState(
  649. storageGroupMap.size()>=sources.size()+targets.size(),
  650. "Mismatchednumberofstoragegroups("+storageGroupMap.size()
  651. +"<"+sources.size()+"sources+"+targets.size()
  652. +"targets)");
  653. longb=0L;
  654. for(Sourcesrc:sources){
  655. b+=src.getScheduledSize();
  656. }
  657. returnb;
  658. }
  659. voidadd(Sourcesource,StorageGrouptarget){
  660. sources.add(source);
  661. targets.add(target);
  662. }
  663. privatebooleanshouldIgnore(DatanodeInfodn){
  664. //ignoredecommissionednodes
  665. finalbooleandecommissioned=dn.isDecommissioned();
  666. //ignoredecommissioningnodes
  667. finalbooleandecommissioning=dn.isDecommissionInProgress();
  668. //ignorenodesinexcludelist
  669. finalbooleanexcluded=Util.isExcluded(excludedNodes,dn);
  670. //ignorenodesnotintheincludelist(ifincludelistisnotempty)
  671. finalbooleannotIncluded=!Util.isIncluded(includedNodes,dn);
  672. if(decommissioned||decommissioning||excluded||notIncluded){
  673. if(LOG.isTraceEnabled()){
  674. LOG.trace("Excludingdatanode"+dn+":"+decommissioned+","
  675. +decommissioning+","+excluded+","+notIncluded);
  676. }
  677. returntrue;
  678. }
  679. returnfalse;
  680. }
  681. /**Getlivedatanodestoragereportsandthenbuildthenetworktopology.*/
  682. List<DatanodeStorageReport>init()throwsIOException{
  683. finalDatanodeStorageReport[]reports=nnc.getLiveDatanodeStorageReport();
  684. finalList<DatanodeStorageReport>trimmed=newArrayList<DatanodeStorageReport>();
  685. //createnetworktopologyandclassifyutilizationcollections:
  686. //over-utilized,above-average,below-averageandunder-utilized.
  687. for(DatanodeStorageReportr:DFSUtil.shuffle(reports)){
  688. finalDatanodeInfodatanode=r.getDatanodeInfo();
  689. if(shouldIgnore(datanode)){
  690. continue;
  691. }
  692. trimmed.add(r);
  693. cluster.add(datanode);
  694. }
  695. returntrimmed;
  696. }
  697. publicDDatanodenewDatanode(DatanodeStorageReportr){
  698. returnnewDDatanode(r,maxConcurrentMovesPerNode);
  699. }
  700. publicbooleandispatchAndCheckContinue()throwsInterruptedException{
  701. returnnnc.shouldContinue(dispatchBlockMoves());
  702. }
  703. /**
  704. *Dispatchblockmovesforeachsource.Thethreadselectsblockstomove&
  705. *sendsrequesttoproxysourcetoinitiateblockmove.Theprocessisflow
  706. *controlled.Blockselectionisblockediftherearetoomanyun-confirmed
  707. *blockmoves.
  708. *
  709. *@returnthetotalnumberofbytessuccessfullymovedinthisiteration.
  710. */
  711. privatelongdispatchBlockMoves()throwsInterruptedException{
  712. finallongbytesLastMoved=bytesMoved.get();
  713. finalFuture<?>[]futures=newFuture<?>[sources.size()];
  714. finalIterator<Source>i=sources.iterator();
  715. for(intj=0;j<futures.length;j++){
  716. finalSources=i.next();
  717. futures[j]=dispatchExecutor.submit(newRunnable(){
  718. @Override
  719. publicvoidrun(){
  720. s.dispatchBlocks();
  721. }
  722. });
  723. }
  724. //waitforalldispatcherthreadstofinish
  725. for(Future<?>future:futures){
  726. try{
  727. future.get();
  728. }catch(ExecutionExceptione){
  729. LOG.warn("Dispatcherthreadfailed",e.getCause());
  730. }
  731. }
  732. //waitforallblockmovingtobedone
  733. waitForMoveCompletion();
  734. returnbytesMoved.get()-bytesLastMoved;
  735. }
  736. /**Thesleepingperiodbeforecheckingifblockmoveiscompletedagain*/
  737. staticprivatelongblockMoveWaitTime=30000L;
  738. /**setthesleepingperiodforblockmovecompletioncheck*/
  739. staticvoidsetBlockMoveWaitTime(longtime){
  740. blockMoveWaitTime=time;
  741. }
  742. /**Waitforallblockmoveconfirmations.*/
  743. privatevoidwaitForMoveCompletion(){
  744. for(;;){
  745. booleanempty=true;
  746. for(StorageGroupt:targets){
  747. if(!t.getDDatanode().isPendingQEmpty()){
  748. empty=false;
  749. break;
  750. }
  751. }
  752. if(empty){
  753. return;//allpendingqueuesareempty
  754. }
  755. try{
  756. Thread.sleep(blockMoveWaitTime);
  757. }catch(InterruptedExceptionignored){
  758. }
  759. }
  760. }
  761. /**
  762. *Decideiftheblockisagoodcandidatetobemovedfromsourcetotarget.
  763. *Ablockisagoodcandidateif
  764. *1.theblockisnotintheprocessofbeingmoved/hasnotbeenmoved;
  765. *移动的块不是正在被移动的块
  766. *2.theblockdoesnothaveareplicaonthetarget;
  767. *在目标节点上没有移动的block块
  768. *3.doingthemovedoesnotreducethenumberofracksthattheblockhas
  769. *移动之后,不同机架上的block块的数量应该是不变的.
  770. */
  771. privatebooleanisGoodBlockCandidate(Sourcesource,StorageGrouptarget,
  772. DBlockblock){
  773. if(source.storageType!=target.storageType){
  774. returnfalse;
  775. }
  776. //checkiftheblockismovedornot
  777. //如果所要移动的块是存在于正在被移动的块列表,则返回false
  778. if(movedBlocks.contains(block.getBlock())){
  779. returnfalse;
  780. }
  781. //如果移动的块已经存在于目标节点上,则返回false,将不会予以移动
  782. if(block.isLocatedOn(target)){
  783. returnfalse;
  784. }
  785. //如果开启了机架感知的配置,则目标节点不应该有相同的block
  786. if(cluster.isNodeGroupAware()
  787. &&isOnSameNodeGroupWithReplicas(target,block,source)){
  788. returnfalse;
  789. }
  790. //需要维持机架上的block块数量不变
  791. if(reduceNumOfRacks(source,target,block)){
  792. returnfalse;
  793. }
  794. returntrue;
  795. }
  796. /**
  797. *Determinewhethermovingthegivenblockreplicafromsourcetotarget
  798. *wouldreducethenumberofracksoftheblockreplicas.
  799. */
  800. privatebooleanreduceNumOfRacks(Sourcesource,StorageGrouptarget,
  801. DBlockblock){
  802. finalDatanodeInfosourceDn=source.getDatanodeInfo();
  803. if(cluster.isOnSameRack(sourceDn,target.getDatanodeInfo())){
  804. //sourceandtargetareonthesamerack
  805. returnfalse;
  806. }
  807. booleannotOnSameRack=true;
  808. synchronized(block){
  809. for(StorageGrouploc:block.getLocations()){
  810. if(cluster.isOnSameRack(loc.getDatanodeInfo(),target.getDatanodeInfo())){
  811. notOnSameRack=false;
  812. break;
  813. }
  814. }
  815. }
  816. if(notOnSameRack){
  817. //targetisnotonthesamerackasanyreplica
  818. returnfalse;
  819. }
  820. for(StorageGroupg:block.getLocations()){
  821. if(g!=source&&cluster.isOnSameRack(g.getDatanodeInfo(),sourceDn)){
  822. //sourceisonthesamerackofanotherreplica
  823. returnfalse;
  824. }
  825. }
  826. returntrue;
  827. }
  828. /**
  829. *Checkifthereareanyreplica(otherthansource)onthesamenodegroup
  830. *withtarget.Iftrue,thentargetisnotagoodcandidateforplacing
  831. *specificreplicaaswedon'twant2replicasunderthesamenodegroup.
  832. *
  833. *@returntrueifthereareanyreplica(otherthansource)onthesamenode
  834. *groupwithtarget
  835. */
  836. privatebooleanisOnSameNodeGroupWithReplicas(
  837. StorageGrouptarget,DBlockblock,Sourcesource){
  838. finalDatanodeInfotargetDn=target.getDatanodeInfo();
  839. for(StorageGroupg:block.getLocations()){
  840. if(g!=source&&cluster.isOnSameNodeGroup(g.getDatanodeInfo(),targetDn)){
  841. returntrue;
  842. }
  843. }
  844. returnfalse;
  845. }
  846. /**Resetallfieldsinordertoprepareforthenextiteration*/
  847. voidreset(Configurationconf){
  848. cluster=NetworkTopology.getInstance(conf);
  849. storageGroupMap.clear();
  850. sources.clear();
  851. targets.clear();
  852. globalBlocks.removeAllButRetain(movedBlocks);
  853. movedBlocks.cleanup();
  854. }
  855. /**shutdownthreadpools*/
  856. voidshutdownNow(){
  857. dispatchExecutor.shutdownNow();
  858. moveExecutor.shutdownNow();
  859. }
  860. staticclassUtil{
  861. /**@returntrueifdatanodeispartoftheexcludedNodes.*/
  862. staticbooleanisExcluded(Set<String>excludedNodes,DatanodeInfodn){
  863. returnisIn(excludedNodes,dn);
  864. }
  865. /**
  866. *@returntrueifincludedNodesisemptyordatanodeispartofthe
  867. *includedNodes.
  868. */
  869. staticbooleanisIncluded(Set<String>includedNodes,DatanodeInfodn){
  870. return(includedNodes.isEmpty()||isIn(includedNodes,dn));
  871. }
  872. /**
  873. *Matchischeckedusinghostname,ipaddresswithandwithoutport
  874. *number.
  875. *
  876. *@returntrueifthedatanode'stransferaddressmatchesthesetofnodes.
  877. */
  878. privatestaticbooleanisIn(Set<String>datanodes,DatanodeInfodn){
  879. returnisIn(datanodes,dn.getPeerHostName(),dn.getXferPort())
  880. ||isIn(datanodes,dn.getIpAddr(),dn.getXferPort())
  881. ||isIn(datanodes,dn.getHostName(),dn.getXferPort());
  882. }
  883. /**@returntrueifnodescontainshostorhost:port*/
  884. privatestaticbooleanisIn(Set<String>nodes,Stringhost,intport){
  885. if(host==null){
  886. returnfalse;
  887. }
  888. return(nodes.contains(host)||nodes.contains(host+":"+port));
  889. }
  890. /**
  891. *Parseacommaseparatedstringtoobtainsetofhostnames
  892. *
  893. *@returnsetofhostnames
  894. */
  895. staticSet<String>parseHostList(Stringstring){
  896. String[]addrs=StringUtils.getTrimmedStrings(string);
  897. returnnewHashSet<String>(Arrays.asList(addrs));
  898. }
  899. /**
  900. *Readsetofhostnamesfromafile
  901. *
  902. *@returnsetofhostnames
  903. */
  904. staticSet<String>getHostListFromFile(StringfileName,Stringtype){
  905. Set<String>nodes=newHashSet<String>();
  906. try{
  907. HostsFileReader.readFileToSet(type,fileName,nodes);
  908. returnStringUtils.getTrimmedStrings(nodes);
  909. }catch(IOExceptione){
  910. thrownewIllegalArgumentException(
  911. "Failedtoreadhostlistfromfile:"+fileName);
  912. }
  913. }
  914. }
  915. }