etcd 是 CoreOS 团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd 内部采用 raft 协议作为一致性, 现在就来看看 raft 在 etcd 中是如何实现的。必须先阅读 raft 协议的论文,再对协议论文有深刻理解的情况下,理解实现会更加轻松。
一、分析流程
阅读readMe.md文件
运行单节点raftexample
首先启动一个raftexample的单成员集群:
1 | raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12380 |
每个raftexample进程都维护一个raft实例和一个键值服务器。
进程的逗号分隔对等体(–cluster)列表,其对等列表(–id)的raft ID索引和http键值服务器端口(–port)通过命令行传递。
接下来,将值(“hello”)存储到键(“my-key”):
1 | curl -L http://127.0.0.1:12380/my-key -XPUT -d hello |
最后,检索存储的密钥:
1 | curl -L http://127.0.0.1:12380/my-key |
运行本地群集
首先安装[goreman](https://github.com/mattn/goreman),它管理基于Procfile的应用程序。
[Procfile脚本](./ Procfile)将设置本地示例集群。从以下开始:
1 | goreman start |
这将带来三个raftexample实例。
现在可以将键值对写入集群的任何成员,并同样从任何成员中检索它。
容错
要测试群集恢复,首先启动群集并写入值“foo”:
1 | goreman start |
接下来,删除节点并将值替换为“bar”以检查群集可用性:
1 | goreman run stop raftexample2 |
最后,重新启动节点并使用更新后的值“bar”验证它是否恢复:
1 | goreman run start raftexample2 |
动态集群重新配置
可以使用对REST API的请求将节点添加到正在运行的集群中或从中删除节点。
例如,假设我们有一个使用命令启动的3节点集群:
1 | raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380 |
可以通过发出POST来添加ID为4的第四个节点:
1 | curl -L http://127.0.0.1:12380/4 -XPOST -d http://127.0.0.1:42379 |
然后使用–join选项可以像其他节点一样启动新节点:
1 | raftexample --id 4 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379,http://127.0.0.1:42379 --port 42380 --join |
新节点应加入群集,并能够为密钥/值请求提供服务。
我们可以使用DELETE请求删除节点:
1 | curl -L http://127.0.0.1:12380/3 -XDELETE |
一旦集群处理了此请求,节点3就应该自行关闭。
设计
raftexample由三个组件组成:一个由raft支持的键值存储,一个REST API服务器和一个基于etcd的raft实现的raft共识服务器。
支持raft的键值存储是一个键值映射,它包含所有已提交的键值。该存储桥接了raft服务器和REST服务器之间的通信。键值更新通过存储发送到raft服务器。一旦raft报告提交更新,存储就会更新其地图。
REST服务器通过访问raft支持的键值存储来公开当前的raft共识。GET命令在存储中查找键并返回值(如果有)。键值PUT命令向存储发出更新提议。
raft服务器与其集群对等方达成共识。当REST服务器提交提议时,raft服务器将提议发送给其对等方。当raft达成共识时,服务器通过提交通道发布所有已提交的更新。对于raftexample,此提交通道由键值存储使用。
二、介绍raft库代码结构及核心数据结构
为什么要首先介绍核心数据结构,如果不介绍核心数据结构就可能看不懂后面我的分析,每一个Msg具体代表什么含义,所以先看核心数据结构,这里只需要大概浏览一遍就可以了,忘记了可以在这里看。
MsgHup消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgHup | 不用于节点间通信,仅用于发送给本节点让本节点进行选举 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
MsgBeat消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgBeat | 不用于节点间通信 ,仅用于leader节点在heartbeat定时器到期时向集群中其他节点发送心跳消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
MsgProp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgProp | raft库使用者提议(propose)数据 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
entries | Entry | 日志条目数组 |
MsgApp/MsgSnap消息
MsgApp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgApp | 用于leader向集群中其他节点同步数据的消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
entries | Entry 日志条目数组 | |
logTerm | uint64 日志所处的任期ID | |
index | uint64 索引ID |
MsgSnap消息
成员 类型 | 作用 |
---|---|
type | MsgSnap 用于leader向follower同步数据用的快照消息 |
to uint64 | 消息接收者的节点ID |
from | uint64 本节点ID |
snapshot | Snapshot 快照数据 |
MsgAppResp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgAppResp | 集群中其他节点针对leader的MsgApp/MsgSnap消息的应答消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
index | uint64 | 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID |
reject | bool | 是否拒绝同步日志的请求 |
rejectHint | uint64 | 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置 |
MsgVote/MsgPreVote消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgVote/MsgPreVote | 节点投票给自己以进行新一轮的选举 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
term | uint64 | 任期ID |
index | uint64 | 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID |
logTerm | uint64 | 日志所处的任期ID |
context | bytes | 上下文数据 |
MsgVoteResp/MsgPreVoteResp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgVoteResp/MsgPreVoteResp | 投票应答消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
reject | bool | 是否拒绝 |
三、分析源码
按照上面README.md文档开始分析,首先把单个实例跑起来。然后根据源码的顺序一个一个的看。
main
1 | func main() { |
proposeC
创建一个提议channel,用于提议请求的通信confChangeC
创建一个配置更改的channel,用于配置更改的通信newRaftNode
创建raft协议的node节点,然后返回一个提交的channel,和一个错误通知的channelsnapshotterReady
用于等待快照创建完毕,然后执行newKVStore
方法创建一个key存储容器serveHttpKVAPI
创建一个http服务
main.newKVStore
1 | func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { |
创建一个KV存储对象,读取提交信息到kv存储容器中
1 | type kvstore struct { |
proposeC
是一个提议channel,sync.RWMutex
因为是在多个 go 协程中运行的,所以需要加锁,存储实际上就是一个map
类型
1 | type Snapshotter struct { |
Snapshotter
类型则是一个zap类型的日志,会持久化到磁盘中
1 | func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { |
根据提交然后加载快照中的信息,从快照中的信息恢复到kv存储容器中,然后再从commit日志中恢复信息
main.serveHttpKVAPI
1 | // serveHttpKVAPI starts a key-value server with a GET/PUT API and listens. |
开启http服务
1 | func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
可以分为两类,一类是属性信息,第二类是节点配置的信息
- 1.属性信息
PUT
通过h.store.Propose(key, string(v))
该方法往s.proposeC
中提交建议GET
通过h.store.Lookup(key)
在v, ok := s.kvStore[key]
集合中查找数据,然后返回
- 2.节点配置的信息
POST
因为设置的Type: raftpb.ConfChangeAddNode
信息为添加类型,再由h.confChangeC <- cc
传输到配置更改的channel,实现添加节点DELTE
因为设置的Type: raftpb.ConfChangeRemoveNode
信息为添加类型,再由h.confChangeC <- cc
传输到配置更改的channel,实现删除节点
main.newRaftNode
1 | func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, |
创建一个raft的node节点,然后启用协程开启调用startRaft()
,启动raft协议,后面的方法会等待snapshotterReady
快照信息准备完毕
main.newRaftNode.startRaft
1 | func (rc *raftNode) startRaft() { |
- 是否本地时候存在快照信息
- 如果有快照就读取之前快照信息
- 否则就在本地初始化创建一个快照
- 是否本地存在wal日志,
- 如果存在wal日志,就根据之前的配置来恢复这个节点
- 否则就创建node节点,启动node节点
- 启动传输服务启动,并将对等节点放入到要传入的服务中
- 启动节点之间需要的tcp连接
- 监听各个节点给他发送的消息
1 | func (t *Transport) Start() error { |
创建一个传输服务,分为stream类型,和pipeline类型, 其中stream类型是一个长链接,而pipeline类型是一个短链接类型。
1 | streamRt http.RoundTripper // roundTripper used by streams |
- stream类型
- 用来发送心跳和日志的信息
- pipeline类型
- 用来传输快照
- 用来发送心跳和日志的信息(仅当stream类型不可用是,才会使用)
1 | func (rc *raftNode) serveRaft() { |
时间这个方法只是打开了一个tcp连接,然后将rc.transport.Handler()
传入
1 | func (t *Transport) Handler() http.Handler { |
传入了四种handler,分别对应四种类型
1 | RaftPrefix = "/raft" |
管道类型,探测类型,流类型,快照类型,至于每一个类型handler的httpServer实现,我这里就不讲了。
1 | func (rc *raftNode) serveChannels() { |
其中有有两个 for 循环,一个是来接收提议和配置更改的信息 ,一个是类处理node节点之间的信息
- 接收提议和配置更改的信息的for循环
case prop, ok := <-rc.proposeC:
然后调用rc.node.Propose(context.TODO(), []byte(prop))
发送提议case cc, ok := <-rc.confChangeC:
然后调用rc.node.ProposeConfChange(context.TODO(), cc)
发送配置更改
- 处理node节点之间的信息的 for 循环
case <-ticker.C:
使用rc.node.Tick()
用来处理超时和心跳的逻辑rd := <-rc.node.Ready():
返回当前时间点状态的通道,将条目存储到wal,然后通过提交通道发布,然后调用Advance
它准备节点返回下一个可用的Ready。case err := <-rc.transport.ErrorC:
使用rc.writeError(err)产生错误就把错误写到通道里case <-rc.stopc:
停止该节点
1 | func RestartNode(c *Config) Node { |
如果重启节点,就从配置中恢复节点的状态
1 | // StartNode 返回一个新的Node给定配置和一个raft对等列表。 |
- 首先创建一个raft协议,成为第1任期的追随者并应用第1任期的初始配置条目,将领导者设置为空
- 并将条目添到wal中
- 添加对等节点
- 运行
run
方法,调用处理过程
main.newRaftNode.startRaft.run
该方法是一个相当重要的方法
1 | func (n *node) run(r *raft) { |
raftNode结构体主循环:
- 如果proposeC中有数据写入(外部通信):调用状态机进行处理
- 如果recvc中有数据写入(内部通信),调用状态机进行处理
- 如果confChangeC中有数据写入:调用node.Node.ProposeConfChange向raft库提交配置变更数据
- 如果tick定时器到期:,调用node.Tick函数进行raft库的定时操作
- 如果node.Ready()函数返回的Ready结构体channel有数据变更:依次处理Ready结构体中各成员数据
- 处理完毕之后调用node.Advance函数,进行持久化或者快照操作
- 如果状态有变更就变更状态
- 监听节点是否停止的消息
main.newRaftNode.startRaft.Step
状态机器处理过程,这是raft的核心逻辑
1 | // 状态机器理过程 |
当条件是:
- 如果任期为0,不做处理
- 如果消息任期大于当前节点的任期
- 如果是预投票或者和投票类型
- 如果非强制,而且又在租约期以内,就不做任何处理(见论文的4.2.3,这是为了阻止已经离开集群的节点再次发起投票请求) ⚠️结束
- 如果是MsgPreVote类型,在应答一个prevote消息时不对任期term做修改 (防止分区导致的,领导人重新选举)
- 如果是MsgPreVoteResp类型并且没有拒绝
- 如果上面两者都不是的话
- 如果是领导者给跟随者发的消息或者收到了心跳或者收到了快照信息,就将当前节点,设置为跟随者。(根据论文 5.2 领导人选举 这一节)
- 否则就将领导者设置为空,因为不满足上面的条件不应当处理
- 如果是预投票或者和投票类型
- 如果传入消息的任期小于当前节点的任期
- 在等待投票的时候,候选人可能会从其他的服务器接收到声明它是领导人的附加日志项 RPC。如果这个领导人的任期号(包含在此次的 RPC中)不小于候选人当前的任期号,那么候选人会承认领导人合法并回到跟随者状态 (根据论文 5.2 领导人选举 这一节)
- 如果是MsgPreVote类型,会拒绝,因为候选人的任期没有当前节点的任期大,日志不是最新的。
- 否则忽略其他类型 ⚠️结束
- 如果是选举类型 (该类型只会由本节点发送给自己)
- 如果状态不是领导者
- 如果待处理的配置更改要应用,因此无法进行选举 ⚠️结束
- 如果状态是预投票,就开始预选举
- 如果状态是正式投票,就开始正式选举
- 如果是领导者就是忽略这条消息
- 如果状态不是领导者
- 如果是预投票或者和投票类型
- 如果该节点是学习者,学习者不能投票 ⚠️结束
- 如果 (1.)之前已经投过票的节点 或者 2.)当前没有给任何节点进行过投票,并且没有领导者 或者 3.)消息是预投票,并且消息的任期号更大)并且(该节点的日志数据是最新的) 就投票给这个消息节点,将修改raft.Vote为消息发送者ID
- 否则 将应答msg.Reject=true,拒绝该节点的投票消息
- 如果上面两种状态都不是,就进入特有身份处理步骤中
- stepLeader
- stepCandidate
- stepFollower
1 | func stepLeader(r *raft, m pb.Message) error { |
case pb.MsgBeat:
向所有跟随者,广播心跳,⚠️结束case pb.MsgCheckQuorum:
检查是否有一半以上的跟随者在自己的状态机中处于活跃,⚠️结束case pb.MsgProp:
raft库的使用者向raft库propose数据时,最后会封装成这个类型的消息来进行提交,不同类型的节点处理还不尽相同。⚠️结束case pb.MsgReadIndex:
- 如果总节点人数大于一个,也就是除了自己还有其他节点
- 首先如果该leader在成为新的leader之后没有提交过任何值,那么会直接返回不做处理。
- 然后检查只读类型是
ReadOnlySafe
还是ReadOnlyLeaseBased
,- 如果是
ReadOnlySafe
, 保存该MsgreadIndex请求到来时的commit索引,向集群中所有其他节点广播一个心跳消息MsgHeartbeat,并且在其中带上该读请求的唯一标识。 - 如果是
ReadOnlyLeaseBased
- 如果消息是当前成员,如果没有提交过任何数据,那么在它所在的这个任期(term)内的commit索引当时是并不知道的,因此在成为leader之后,需要马上提交一个no-op的空日志,这样拿到该任期的第一个commit索引。
- 否则就发送消息回应给跟随者
- 如果是
- 否则
- 如果消息是当前成员,如果没有提交过任何数据,那么在它所在的这个任期(term)内的commit索引当时是并不知道的,因此在成为leader之后,需要马上提交一个no-op的空日志,这样拿到该任期的第一个commit索引。
- 否则就回应该消息,因为是因为是来自学习者
- ⚠️结束
- 如果总节点人数大于一个,也就是除了自己还有其他节点
- 获取跟随者的进度
case pb.MsgAppResp:
- msg.Reject为true的情况,说明节点拒绝了前面的MsgApp/MsgSnap消息,根据msg.RejectHint成员回退leader上保存的关于该节点的日志记录状态。比如leader前面认为从日志索引为10的位置开始向节点A同步数据,但是节点A拒绝了这次数据同步,同时返回RejectHint为2,说明节点A告知leader在它上面保存的最大日志索引ID为2,这样下一次leader就可以直接从索引为2的日志数据开始同步数据到节点A。而如果没有这个RejectHint成员,leader只能在每次被拒绝数据同步后都递减1进行下一次数据同步,显然这样是低效的。
- 因为上面节点拒绝了这次数据同步,所以节点的状态可能存在一些异常,此时如果leader上保存的节点状态为ProgressStateReplicate,那么将切换到ProgressStateProbe状态(关于这几种状态,下面会谈到)。
- 前面已经按照msg.RejectHint修改了leader上关于该节点日志状态的索引数据,接着再次尝试按照这个新的索引数据向该节点再次同步数据。
- msg.Reject为false的情况
- 更新进度,如果不是过时的
- 如果该节点之前在ProgressStateProbe状态,说明之前处于探测状态,此时可以切换到ProgressStateReplicate,开始正常的接收leader的同步数据了。
- 如果之前处于ProgressStateSnapshot状态,即还在同步副本,说明节点之前可能落后leader数据比较多才采用了接收副本的状态。这里还需要多做一点解释,因为在节点落后leader数据很多的情况下,可能leader会多次通过snapshot同步数据给节点,而当 pr.Match >= pr.PendingSnapshot的时候,说明通过快照来同步数据的流程完成了,这时可以进入正常的接收同步数据状态了,这就是函数Progress.needSnapshotAbort要做的判断。
- 如果之前处于ProgressStateReplicate状态,此时可以修改leader关于这个节点的滑动窗口索引,释放掉这部分数据索引,好让节点可以接收新的数据了。关于这个滑动窗口设计,见下面详细解释。
- 更新进度,如果不是过时的
- 判断是否有新的数据可以提交(commit)了。因为raft的提交数据的流程是这样的:首先节点将数据提议(propose)给leader,leader在将数据写入到自己的日志成功之后,再通过MsgApp把这些提议的数据广播给集群中的其他节点,在某一条日志数据收到超过半数(qurom)的节点同意之后,才认为是可以提交(commit)的。因此每次leader节点在收到一条MsgAppResp类型消息,同时msg.Reject又是false的情况下,都需要去检查当前有哪些日志是超过半数的节点同意的,再将这些可以提交(commit)的数据广播出去。而在没有数据可以提交的情况下,如果之前节点处于暂停状态,那么将继续向该节点同步数据。
- 最后还要做一个跟leader迁移相关的操作。如果该消息节点是准备迁移过去的新leader节点(raft.leadTransferee == msg.From),而且此时该节点上的Match索引已经跟旧的leader的日志最大索引一致,说明新旧节点的日志数据已经同步,可以正式进行集群leader迁移操作了。
- msg.Reject为true的情况,说明节点拒绝了前面的MsgApp/MsgSnap消息,根据msg.RejectHint成员回退leader上保存的关于该节点的日志记录状态。比如leader前面认为从日志索引为10的位置开始向节点A同步数据,但是节点A拒绝了这次数据同步,同时返回RejectHint为2,说明节点A告知leader在它上面保存的最大日志索引ID为2,这样下一次leader就可以直接从索引为2的日志数据开始同步数据到节点A。而如果没有这个RejectHint成员,leader只能在每次被拒绝数据同步后都递减1进行下一次数据同步,显然这样是低效的。
case pb.MsgHeartbeatResp:
- 将进度设置为活跃
- 为完整滑动窗口释放一个插槽以允许进度
- 如果消息节点是日志索引是落后的就发送追加
- leader在接收到MsgHeartbeatResp消息后,如果其中有ctx字段,说明该MsgHeartbeatResp消息对应的MsgHeartbeat消息,是收到ReadIndex时leader消息为了确认自己还是集群leader发送的心跳消息
- 通知raft状态机收到的只读结构对只读请求附加的心跳的确认上下文,根据消息中的ctx字段,到全局的pendingReadIndex中查找是否有保存该ctx的带处理的readIndex请求,如果有就在acks map中记录下该follower已经进行了应答
- 当ack数量超过了集群半数时,意味着该leader仍然还是集群的leader,此时调用r.readOnly.advance(m)函数
- 将该readIndex之前的所有readIndex请求都认为是已经成功进行确认的了,所有成功确认的readIndex请求,将会加入到readStates数组中,同时leader也会向follower发送MsgReadIndexResp。
case pb.MsgSnapStatus:
仅leader处理这类消息- 如果reject为false:表示接收快照成功,将切换该节点状态到探测状态。
- 否则接收失败,将切换该节点状态到探测状态。
- 当Paused为true时,raft应暂停向此对等方发送复制消息。
case pb.MsgUnreachable:
在乐观复制期间,如果远程无法访问,MsgApp很有可能丢失。- 如果远程节点状态变为复制状态,就变为探测状态
case pb.MsgTransferLeader:
这类消息follower将转发给leader处理,因为follower并没有修改集群配置状态的权限。- 如果是学习者 就不能进行转发给leader
- 如果当前的raft.leadTransferee成员不为空,说明有正在进行的leader迁移流程。此时会判断是否与这次迁移是同样的新leader ID,如果是则忽略该消息直接返回;否则将终止前面还没有完毕的迁移流程。
- 判断是否转让过来的leader是否本节点,如果是也直接返回,因为本节点已经是leader了
- 如果日志已经匹配了,那么就发送timeoutnow协议过去
- 否则继续追加日志到新的领导者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53// stepCandidate 由StateCandidate和StatePreCandidate共享;不同的是
//它们是否响应MsgVoteResp或MsgPreVoteResp。
func stepCandidate(r *raft, m pb.Message) error {
// 只处理与我们的候选资格相对应的投票回复 (当在StateCandidate, 在这个任期中,我们可能会收到陈旧的MsgPreVoteResp消息从我们的 pre-candidate 状态).
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
//1.节点调用raft.poll函数,其中传入msg.Reject参数表示发送者是否同意这次选举,根据这些来计算当前集群中有多少节点给这次选举投了同意票。
//2.如果有半数的节点同意了,如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;否则该节点就成为了新的leader,调用raft.becomeLeader函数切换状态,然后开始同步日志数据给集群中其他节点了。
//3.而如果半数以上的节点没有同意,那么重新切换到follower状态。
// 计算当前集群中有多少节点给自己投了票
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
switch r.quorum() {
case gr: // 如果进行投票的节点数量正好是半数以上节点数量
//如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;
if r.state == StatePreCandidate {
r.campaign(campaignElection)
//vote状态正式的一轮选举
} else {
// 变成leader
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr: // 如果是半数以上节点拒绝了投票
// 变成follower
// pb.MsgPreVoteResp包含未来候选人的期限
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
pb.MsgProp:
如果是提议属性消息,那么就直接放弃,因为在候选人阶段是不能够添加日志。pb.MsgApp:
如果收到领导人消息,直接将当前节点转为跟随者,并且向领导人发送当前的commitidpb.MsgHeartbeat:
如果收到心跳,也变为跟随者,然后处理心跳pb.MsgSnap:
如果收到快照,也变为跟随者,然后处理快照case myVoteRespType:
- 节点调用raft.poll函数,其中传入msg.Reject参数表示发送者是否同意这次选举,根据这些来计算当前集群中有多少节点给这次选举投了同意票。
- 如果有半数的节点同意了,如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;否则该节点就成为了新的leader,调用raft.becomeLeader函数切换状态,然后开始同步日志数据给集群中其他节点了。
- 而如果半数以上的节点没有同意,那么重新切换到follower状态。
case pb.MsgTimeoutNow:
忽略这条信息,因为状态不对。
1 | func stepFollower(r *raft, m pb.Message) error { |
case pb.MsgProp:
将消息转发给领导人case pb.MsgApp:
收到了领导人了消息,重置弹性超时时间,并且添加日志case pb.MsgHeartbeat:
收到心跳,重置弹性超时时间,处理心跳case pb.MsgSnap:
收到快照,重置弹性超时时间,处理快照case pb.MsgTransferLeader:
转移领导人case pb.MsgTimeoutNow:
新的leader节点,在还未迁移之前仍然是follower,在收到这条消息后,就可以进行迁移了,此时会调用前面分析MsgVote时说过的campaign函数,传入的参数是campaignTransfer,表示这是一次由于迁移leader导致的选举流程。case pb.MsgReadIndex:
像领导人发送读请求case pb.MsgReadIndexResp:
从远程条目里面,添加到本地条目到读状态数组
四、总结
源码还是比较难的,还有一些地方我还需要仔细分析,在后面会慢慢加上去,目前就先分析主要流程。最核心的代码依然是在状态机中,Step()函数,以及三个身份的步骤的状态函数。