Akka的Cluster源码分析

1.概述

Akka这样一个scala世界里的明星,给我们提供了各种各样吸引人的功能和特性,尤其在分布式、高并发领域。但就像任何其他优秀的框架,它的实现也必然会有其复杂性,在Roland Kuhn(Akka Tech Lead)的带领下,Akka的实现原理吸收了各个领域内成熟、领先的理论。尤其是Akkacluster的实现,更是体现了非常多的优秀理论和实战经验。为了更方便大家在实际使用中碰到问题的时候,可以试着进行分析和解决。从今天开始,我将从以下几个方面试着说明cluster的实现(基于2.3.1版本)

#集群的启动

 

#Gossip协议的实现

 

#一个普通节点的一生

 

2.集群启动

要使用一个cluster首先要启动它,所以我们先从启动这个步骤的实现开始进行分析。

Akka集群的启动首先就是要启动一种叫做种子节点(SeedNode)的节点们。只有种子节点启动成功,其他节点才能选择任意一个种子节点加入集群。

         种子节点默认可配置多个,它们之间没有任何区别,种子节点的启动分以下几种情况:

1.某种子节点启动,它首先判断自己的ip是否在种子节点配置列表中,如果在并且是第一个,则它在一个规定时间内(默认是5),向其他种子节点发送‘InitJoin’消息,如果有确认消息返回,则加入第一个返回确认的种子节点所在的cluster中,否则,它自己将创建一个新的cluster(这些任务由FirstSeedNodeProcess这个Actor完成,任务完成后它就销毁自己)

 

2.某种子节点启动,它首先判断自己的ip是否在种子节点配置中,但不是第一个,则它向其他种子节点发送消息,如果在一个规定时间内(默认是5)没有收到任何确认消息,则它将不断重试,直到有一个种子节点返回正确的确认消息,然后就加入这个种子节点所在的cluster中。(这里注意以下,它不会自己创建一个新cluster)(这些任务由JoinSeedNodeProcess这个Actor完成,任务完成后它就销毁自己)

         从上面的分析,我们可以得出下面的一些结论:

#.一个集群第一次启动成功,那一定是种子节点配置列表中排在第一位的节点,由它来创建出集群。但是随着时间的推移,排在第一的种子节点有可能重启了,那这个时候,它将首选加入到其他种子节点去。

#一个种子节点可以加入任何一个其他节点,不用非得都加到排第一位的节点上。

 

下面我们举例说明,有种子节点123

* 1. seed2启动, 但是没有收到seed1 seed3的确认。

* 2. seed3启动,没有收到seed1 的确认消息(seed2处在’inactive’状态)

* 3. seed1 启动,创建cluster并加入到自己中。

* 4. seed2 重试加入过程,收到seed1的确认, 加入到seed1

* 5. seed3重试加入过程,先收到seed2的确认, 加入到seed2

 

为了更好说明源码实现,我们先对节点的状态做一个介绍,下面是节点状态变迁图:


Akka的Cluster源码分析
 
下面具体说明状态的变迁:

#节点初始状态是joining (对应源码new Member(uniqueAddress, Int.MaxValue, Joining, roles))

#通过gossip该信息被传递到所有节点上。如果gossip能够收敛(convergence ,后面会详细介绍gossip的实现),那么该节点的状态变成up

#用户可以人工调用leavedown命令,让节点状态发生改变。

#由于Akka提供基于心跳的故障检测模块fd*(failure detector),所以当故障检测模块发现某个节点offline,则会把该节点置为unreachable*(它不是一种节点状态),当该节点online后,则自动恢复到当前状态(如图所示:unreachable*和其他节点状态之间是用虚线表示,说明其并没有发生状态的变迁)

#从图中可以看出节点的最终状态都是removed

#从上图还可以看出一个节点如果状态变更路径是joining-àup-àleaving-àexiting-àremoved,则它要经过4gossip收敛(图中的leader action,图中漏了一个把removed传递给其他节点的gossip收敛)

 

3.Gossip协议的实现

为了更好的分析Akkagossip协议实现, 我将从3方面来进行说明:算法说明、实例演示、源码分析。

3.1 算法说明

由于Akka采用无中性化的集群设计,在这种架构下为了能更好的传递彼此间的消息,Akka使用了Gossip协议。

         为了下面更好的理解Gossip执行过程,我们先介绍几个概念:

#Vector Clock(矢量钟)

1. 每个节点都有自己对应的id  + 计数器

2. 在每个节点的矢量钟实例里,包含了它能接触到的所有节点对应的计数器。如有可相互通讯的ABC3个节点,则A节点的矢量钟也包含了BC节点的计数器。

3.如果在更新矢量钟某个节点的值时,以max(计数器原值、新值)作为当前值。

       它主要用来检测在分布式环境下,是否存在并发的更新(冲突)。如果没有冲突则说明它们满足causality,如果有冲突,解决的方式也较简单:max

#发起gossip的节点叫gossiper接收者叫做recipient

#节点间gossip协议采用请求/应答模式。

#Akkagossip协议发送的具体内容。

case class Gossip(

       members: immutable.SortedSet[Member],

       seen: Set[UniqueAddress],

       reachability: Reachability,

      version: VectorClock

    )

     *members:存放该节点知道的所有其他节点。

     *seen:已经收到本次gossip的节点们,每个节点当接收到一个新的gossip时,会把自己放到seen这个队列中,作为响应返回给发送者。

     *reachability:这个队列由心跳模块来维护,用来判断节点们是否存活。

     *version:矢量钟。

 

3.1.1一个节点加入集群的消息交互 

     下面我们介绍一下一个节点加入集群的消息交互和状态变化过程,如下图所示:


Akka的Cluster源码分析
 节点NodeA随机选择SeedNode1作为入口加入到集群:

#它首先处于uninitialized状态(这里的状态并不是具体存在的,只是为了方便算法说明),发送InitJoin消息给SeedNode1;

 

# SeedNode1接收该消息,返回Ack进行确认;

 

#NodeA发送Join消息给SeedNode1,SeedNode1接收该消息后,会调用ClusterCoreDaemon实例的joining方法,它会调用另外一个非常重要的方法:updateLatestGossip。updateLatestGossip的作用有两个:1.更新当前gossip的矢量钟;2.清空当前gossip的seen队列,然后把自己加在里面。(后续发起gossip交互时,会优先选择那些没在seen队列中的成员)

 

# SeedNode1返回Welcome消息给NodeA。

 

#NodeA接收Welcome消息后,用SeedNode1的gossip作为自己当前的gossip,并且把自己加到seen队列中,然后把当前gossip再返回给SeedNode1。到此NodeA就完成了全部加入过程。

 

3.1.2 Gossip的执行过程

         Gossip的执行过程包含这几步:

1、 某节点定时发起一次gossip:

选择gossip接收者的算法具体如下(代码在ClusterCoreDaemon类的gossip方法里):

           首先选择那些没有在当前gossip的seen队列中的members,如果存在这样的members则从中随机选择一个节点,向它发起一次gossip交互。如果所有成员都已经在

seen队列中了,则随机选择一个节点向它仅仅发送当前gossip的版本信息(矢量钟),这个是对gossip协议的一个优化,因为在大部分的时候,节点间发送的都是彼此的矢量钟。

2、每个节点都会定时判断本次gossip是否结束(收敛)了(判断收敛的标志是:所有节点都看到了当前的gossip)。

3、节点发现某次gossip结束了,则判断自己是否是‘leader‘,是的话则执行相关的leader动作。

 

3.2 实例演示

下面我们用一个简单的场景来分析gossip协议的具体交互过程,为了去掉不必要的干扰把gossip-interval这个参数改成30秒,默认是1秒。场景说明:

firstSeedNode(在application.conf配置文件seed-nodes里排在第一)先启动,然后再启动SeedNode2,为了方便说明把firstSeedNode简写成S1,SeedNode2简写成S2。

下面是gossip交互示意图:


Akka的Cluster源码分析
 上图中的T0、T1、…这些都是表示时间轴,具体说明如下:

#T0时刻:S1启动,这时候的gossip为空,如(members = [], overview = GossipOverview(reachability = [], seen = []), version = VectorClock())。

#T1-T2:这是S1把自己加到cluster中去,具体可以参看‘一个节点加入集群的消息交互’章节。T2时刻结束的时候,版本为2。

#T3:S2启动后S1收到Join消息,经过一系列操作,S1的gossip版本变成3、成员新增一个Joining状态的S2。这个gossip会通过Welcome消息发给S2,版本是3了。

#T3_1:S2收到Welcome中的gossip,用它来初始化自己的gossip。

(这里重点说明一下后续的T4和T5其实它们没有必然的时间前后关系,这只是我测试时发生的,如果大家在测试的时候发现和我的顺序不一致,这没有问题的,因为原理是一样的)

#T4:Leader选举后,S1还是Leader,并且把S2的状态变成Up,版本是4。

#T5:S2在T3_1处理完后,紧接着把自己的gossip再发给S1,版本是3。

#T5_1:S1收到S2的gossip,和自己的进行比较,由于自己版本是4比3大,所以把自己的gossip再发给S2。

#T5_2:S2收到S1版本4的gossip,比自己的版本高,于是更新自己的gossip,同时发现S1的gossip的seen队列里并不包含自己,于是它又把自己更新过后gossip(版本是4)发给S1。

#T5_3:S1发现收到的gossip和自己版本一样,于是只是合并了一下彼此的seen队列。

 

到现在S1和S2的gossip完全相同了。上面虽然只是2个节点间的gossip交互,但其原理可以适用于任意多个节点。

 

3.3 源码分析

Akka在Cluster上的实现还算比较清晰,2.3.1版本只有15个主要直接相关的scala类,而文件大小最大的是ClusterDaemon.scala,它也是整个实现的核心类,这里我们重点对它进行一下分析。

这个文件中最重要的类是ClusterCoreDaemon,它的主要功能包括下面几个方面:

#节点状态管理:主要是对InitJoin、JoinTo、Join等消息的处理,这个细节可以参看前面章节的描述。

 

#gossip协议的管理:

1)发送gossip:Akka采用定时(通过gossip-interval参数配置的,默认1秒)的方式来发起新一轮的Gossip协议,这样做的好处是能有效的减少gossip的交互次数(1秒内的多个成员的状态变化,通过一轮gossip就完成了),Akka里每一轮gossip都是由这个定时器触发的,对应源码gossipTick方法。

 

2)选择哪个节点发起新一次gossip交互:

Akka采用有偏好的随机选择算法,它首先会选择当前gossip结构体中不在seen队列里的members队列中的成员,如果有这样的成员则随机选择一个进行gossip交互。如果没有这样的成员,则随机选择members队列中的一个成员进行gossip交互。对应源码gossip方法里。

 

3)接收gossip首先拿remote gossip(对端)和本地gossip进行版本(矢量钟)比较,对应源码VectorClock类的compareOnlyTo方法。比较的结果有3种:

# Same:相同,则进行seen队列合并就可以了。

# Before:本地新,则向对端发送最新的gossip,本地不变。

# After:对端新,则更新本地gossip。如果remote gossip里的seen队列里没有包含该本地ip,则发送最新的gossip给对端,以减少一次它们两个间的gossip交互。

         对应源码receiveGossip方法里。

        

#Leader的管理:

         Akka采用定时(通过leader-actions-interval参数配置的,默认1秒)的方式来判断Leader的产生。这个算法具体又包括几个小部分:

   1) 判断本节点是否是Leader:

Akka的Leader’选举’其实比较简单,就是判断当前 members队列(有序队列)里面排第一位置的节点就是当前整个集群的Leader。Leader不一定非得是SeedNode,普通节点也可以。具体member的排序算法在Member类的addressOrdering。举例说明:

如我们有2个seedNode(s1:10.10.10.101:2551,s2:10.10.10.102:2552),1个普通节点(n1:10.10.10.100:20000),当n1加入集群后,n1就成为Leader了,因为它的ip地址最小。

对应源码Gossip类的leaderOf方法。

2)本轮gossip是否收敛:

首先判断所有被故障检测模块fd检测出有问题的节点,它们的状态应该是Down, Exiting。如果不是的话,则等待人工设置问题节点的状态或由Leader自动执行auto-down。

当上面这个条件满足后,再判断是否所有成员状态是Up, Leaving,都在seen队列里,如果这个条件满足的话,则认为本轮gossip收敛。

对应源码Gossip类的convergence方法里。

3)执行Leader的职责:

        它重点关注两类成员:changedMembers和removedUnreachable; 

changedMembers是指那些有状态变化的成员,如JOINING ---> UP、LEAVING-àEXITING。而removedUnreachable是指被故障检测模块fd检测出有问题的节点,如果它们的状态是Down, Exiting中的一种。只要有满足上述任何一类的成员,Leader就会执行一系列相关操作,这些操作比较直观,请参看源码leaderActionsOnConvergence方法。