logo

raft实现之logcabin代码分析

代码目录

Client 客户端
Cluster 集群配置
Core 通用基础代码
Event 事件循环相关,基于epoll的封装
Examples 工具
Protocol protobuf相关
RPC rpc
Server 服务端
Storage 存储
Tree tree基本操作
include 头文件

raft概念

raft重要概念复制状态机(Replicated state machines)图解及对应代码。

Server:入口代码server/Global.cc,在Globals::init()初始化了server里面的服务,consensus(RaftConsensus),state machine (StateMachine),以及为client提供rpc调用和consensus之间相互调用的rpc服务(controlService,raftService,clientService)
Consensus Module:主要在server/RaftConsensus.cc
State Machine:代码server/StateMachine.cc
Log:代码storage/log.h

server启动

参数解析,写pid文件等
初始化 // globals.init();
初始化rpc调用服务CONTROL_SERVICE,RAFT_SERVICE,CLIENT_SERVICE
初始化Consensus Module // raft->init();
(启动线程leaderDiskThreadMain,timerThreadMain,stateMachineUpdaterThreadMain,stepDownThreadMain)
初始化StateMachine(启动线程applyThreadMain,snapshotThreadMain,snapshotWatchdogThreadMain)
是否bootstrap
是 bootstrap
否 启动运行globals.run();
eventLoop.runForever();
file.handleFileEvent(events[i].events);

Reconfigure

Reconfigure客户端工具(或者gmcn的lcReconfig选项)用来配置logcabin集群,配置完成后,集群才可用。reconfig时候会启动重要的线程peerThreadMain,用于节点间同步和心跳。每个节点线程peerThreadMain启动个数 = 新的节点数 – 已知的节点数
peerThreadMain线程是如何启动的?
file.handleFileEvent(events[i].events);//MessageSocket::ReceiveSocket::handleFileEvent(uint32_t events)
messageSocket.readable();
handler.handleReceivedMessage(inbound.header.messageId, std::move(inbound.message));
server->rpcHandler.handleRPC(std::move(rpc)); // void
Server::RPCHandler::handleRPC(OpaqueServerRPC opaqueRPC)
service->handleRPC(std::move(rpc));
ClientService::handleRPC(RPC::ServerRPC rpc)
客户端reconfig发起OpCode::SET_CONFIGURATION请求。
服务端handleRPC 调用setConfiguration(std::move(rpc));
globals.raft->setConfiguration(request, response);
configuration->setStagingServers(nextConfiguration);
getServer(it->server_id());
peer->startThread(peer);
std::thread(&RaftConsensus::peerThreadMain, &consensus, self).detach();
启动peerThreadMain线程。

read/write

读操作流程:

lient向Server发送一个读操作的rpc (Protocol::Client::ReadOnlyTree),Server通过ClientService的stateMachineQuery来响应;
2.ClientService::stateMachineQuery获取getLastCommitIndex,然后调用readOnlyTreeRPC,readOnlyTreeRPC根据read条件,调用tree.read获取数据返回。

写操作流程:

1.Client向Server发送一个写操作的rpc (Protocol::Client::ReadWriteTree),Server通过ClientService的stateMachineCommand来响应;
2.ClientService::stateMachineCommand通过调用globals.raft->replicate(cmdBuffer),log->append(entries)把操作写入日志,并通知等待stateChanged的所有线程唤醒;
3.leaderDiskThreadMain线程被唤醒,把日志flush到磁盘;
4.peerThreadMain被唤醒,作为leader,发现有操作需要同步或者心跳时间到,发送操作给其它server,处理follower的返回的response的时候,通过advanceCommitIndex()更新commitIndex,再次通过stateChanged唤醒其它线程。
5.applyThreadMain被唤醒,从log里面得到已经提交的操作,并执行。


ccj 于 1 年前 修改
0 回复
需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。