KafkaController 分区Rebalance平衡机制
private def
checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) {
trace("checking need to trigger partition rebalance")
// 获取(存活的broker,AR副本集)
=> (2,Map([message,0]-> List(2, 0), [hadoop,0] -> List(2, 1)))
var preferredReplicasForTopicsByBrokers:
Map[Int, Map[TopicAndPartition,
Seq[Int]]] =
null
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers=
controllerContext.partitionReplicaAssignment.filterNot(p
=> deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy
{
case(topicAndPartition,
assignedReplicas) =>
assignedReplicas.head
}
}
debug("preferred replicas by broker "
+ preferredReplicasForTopicsByBrokers)
// 过滤每一个存活的broker,检查是否需要一个preferredreplica
选举被触发
preferredReplicasForTopicsByBrokers.foreach
{
case(leaderBroker,
topicAndPartitionsForBroker) => {
var imbalanceRatio: Double =
0
var topicsNotInPreferredReplica:
Map[TopicAndPartition,
Seq[Int]] =
null
inLock(controllerContext.controllerLock) {
// 我们知道,正常情况下,broker的AR副本集第一个副本(preferred
replica )就是leader
// 如果leader不是preferred replica,比如 Leader
: 0 ISR[2,0]
// 我们需要过滤出这种topicPartition,然后我们好进行在平衡
topicsNotInPreferredReplica=
topicAndPartitionsForBroker.filter
{
case(topicPartition,
replicas) => {
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= leaderBroker
}
}
debug("topics not in preferred replica "
+ topicsNotInPreferredReplica)
// broker 的AR副本数量
val totalTopicPartitionsForBroker
= topicAndPartitionsForBroker.size
// 过滤出的leader不是AR副本集的preferred
replica的数量
val totalTopicPartitionsNotLedByBroker
= topicsNotInPreferredReplica.size
// 计算(过滤出的leader不是AR副本集的preferred
replica的数量)/(broker
的AR副本数量)不平衡比例
imbalanceRatio=
totalTopicPartitionsNotLedByBroker.toDouble /
totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker,
imbalanceRatio))
}
// 如果比例大于我们配置的leader.imbalance.per.broker.percentage参数,比如50%,就触发这个topic
partitions的再平衡操作
if (imbalanceRatio
> (config.leaderImbalancePerBrokerPercentage.toDouble /
100)) {
topicsNotInPreferredReplica.foreach
{
case(topicPartition,
replicas) => {
inLock(controllerContext.controllerLock) {
// 首先确保broker存活,而且没有分区正在重新分配或者没有进行preferredreplica
选举,且没有分区将被删除
if (controllerContext.liveBrokerIds.contains(leaderBroker)
&&
controllerContext.partitionsBeingReassigned.isEmpty
&&
controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty
&&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic))
{
// 然后真正触发Prederred Replica选举操作
onPreferredReplicaElection(Set(topicPartition),
true)
}
}
}
}
}
}
}
}
}