WPaxos是58同城开源的一致性算法Paxos的生产级高性能Java实现,用于解决高并发、高可靠分布式系统中多副本数据状态不一致问题以及分布式共识问题。
背景
Paxos是什么
Paxos是1990年Leslie Lamport在论文《The Part-Time Parliament》提出的一种基于消息传递且具有高度容错性的一致性算法,是解决分布式一致性问题最有效的算法之一。在基于异步通信的分布式环境中,机器宕机、网络异常等情况时常发生,引入Paxos算法可以保证发生以上异常情况时,分布式系统仍然可以对某个提案快速达成一致,同时不会破坏系统已有的一致性状态。但论文中提到的Paxos协议比较难于理解,又叫Basic Paxos。
Basic Paxos算法中每次发起的数据同步请求,称为一个提案,每个提案都拥有编号(M)以及提案内容(V),分布式集群中的每个节点都同时具有Proposer(提案的发起者)、Acceptor(提案的接受者)、Learner(提案的学习者)三个角色,每个节点都可以作为Proposer并发发起提案。
为了保证多节点提案能够达成一致,提案的选定有以下两个原则:
P1:每个Acceptor必须批准它接收到的第一个提案;
P2:如果一个提案[M0,V0]被选定后,任何Proposer提出更高编码的提案,最终选定的值都为V0;
基于这两个原则,Paxos算法提案选定过程分为以下两个阶段:
算法解析
直观的理解Basic Paxos算法可能会有很多疑惑,比方说为什么需要经过Prepare、Accept两个阶段过半节点响应才能提交提案,多个节点并发发起提案发生冲突如何处理等等,《The Part-Time Parliament》文章中有对算法进行详细的数学推理论证,本文不做重复论述,仅结合一些异常场景简单介绍下Basic Paxos算法的鲁棒性。
假如分布式系统中有三个节点,分别有Proposer P1 P2 P3,Acceptor A1 A2 A3,编号相同的Proposer与Acceptor为同一个节点,通过Paxos算法实现三个节点数据强一致性。
而图2的场景下,P1与P3都能与过半节点通信,但是由于A2节点只能接受编号更高的提案,所以P1与P3发起的提案,只有一个能够最终被确定提交,而另外一个节点,也会通过Learner将已提交的提案学习同步过来,保证数据一致;
其它网络异常的场景还有很多,Paxos算法要求任何提案必须经过过半节点响应,而 N个节点中任意两个N/2+1集合必有交集,所以出现网络异常时,不会出现数据分裂情况。但是是否仅通过过半节点响应机制就可以保证数据强一致性?假如在图2的场景下省去Prepare阶段,每个节点发起Accept请求被过半节点响应后就Commit提交会怎样?如图3所示:
图5展示了,如果Prepare阶段Acceptor未将已批准的值返回给Proposer的影响。P1、P3并发发起提案,P1发起的提案得到过半响应后,P3节点此时也并发发起了编号更高的提案,A2节点如果接收到P3的Prepare请求未返回已批准的P1提案值,A2优先接收到P3的Commit请求,就会把P1已经批准过进入Commit阶段的数据覆盖。而图6展示的是Acceptor将已批准值返回给Proposer的正常交互过程,最终节点数据状态是一致的。
在上面场景下,假如P1发起的Accept请求并未得到过半节点响应,如图7所示,之后P3发起提案P[m3,v3],若A1在接收到P3节点广播的Prepare请求时,先于A2节点将已Accept存储但未批准通过的值v1给P3,P3最终将提交的值替换为了v1,而P3原来预提交的值v3,会通过重新发起新的提案进行提交。这种情况下,作为发起提案P[m1,v1]的上层调用方,得到的反馈结果虽然是提案提交失败的,实际却是提交成功的状态。这时Paxos只能保证多节点数据最终状态一致,但保证不了事务执行的准确性。
工程化实现
总结
public int newValue(byte[] value) {
// 初始化新的提案值,若上次提案提交成功,在准备接收新的instance记录前,value清空,反之,提交失败进行重试时,value不更新仍然为原有值;
if(this.proposerState.getValue().length == 0) {
this.proposerState.setValue(value);
}
…
…
// canSkipPrepare : 是否可以跳过prepare阶段;首次执行或上次propose执行失败时不可跳过;
// wasRejectBySomeone :上次propose过程是否被其它节点拒绝过;
if(this.canSkipPrepare && !this.wasRejectBySomeone) {
// 其它节点承诺过得提案编号未发生变化,本次可跳过prepare阶段
accept();
} else {
//if not reject by someone, no need to increase ballot
prepare(this.wasRejectBySomeone);
}
return 0;
}
public void prepare(boolean needNewBallot) {
// 退出上轮Accept阶段
exitAccept();
this.isPreparing = true;
this.canSkipPrepare = false;
this.wasRejectBySomeone = false;
// 清空历史prepare阶段,其它节点返回的已accept的提案信息
this.proposerState.resetHighestOtherPreAcceptBallot();
if(needNewBallot) {
// 提升proposeID:条件为首次propose或上次propose过程被其它节点拒绝过
this.proposerState.newPrepare();
}
PaxosMsg paxosMsg = new PaxosMsg();
paxosMsg.setMsgType(PaxosMsgType.paxosPrepare.getValue());
paxosMsg.setInstanceID(getInstanceID());
paxosMsg.setNodeID(this.pConfig.getMyNodeID());
paxosMsg.setProposalID(this.proposerState.getProposalID());
// 初始化新的Prepare阶段统计值
this.msgCounter.startNewRound();
// 添加prepare请求超时定时器,超时后,随机延迟一定时间后重新发起prepare请求
addPrepareTimer(0);
// Proposer同时也为Acceptor
int runSelfFirst = BroadcastMessageType.BroadcastMessage_Type_RunSelf_First.getType();
int sendType = MessageSendType.UDP.getValue();
paxosMsg.setTimestamp(System.currentTimeMillis());
// UDP广播请求到其它节点
broadcastMessage(paxosMsg, runSelfFirst, sendType);
}
public void newPrepare() {
// 取已知提案编号的最大值+1
long maxProposalId = this.proposalID > this.highestOtherProposalID ? this.proposalID : this.highestOtherProposalID;
this.proposalID = maxProposalId + 1;
}
public int onPrepare(PaxosMsg paxosMsg) {
PaxosMsg replyPaxosMsg = new PaxosMsg();
replyPaxosMsg.setInstanceID(getInstanceID());
replyPaxosMsg.setNodeID(this.pConfig.getMyNodeID());
replyPaxosMsg.setProposalID(
paxosMsg.getProposalID());
replyPaxosMsg.setMsgType(
PaxosMsgType.paxosPrepareReply.getValue());
// BallotNumber类用于封装提案编号
BallotNumber ballot = new BallotNumber(paxosMsg.getProposalID(), paxosMsg.getNodeID());
BallotNumber pbn = this.acceptorState.getPromiseBallot();
if(ballot.gt(pbn)) {
// PaxosMsg中的提案编号比当前承诺的提案编号大,承若不再接受比ballot更小的值
int ret = updateAcceptorState4Prepare(replyPaxosMsg, ballot);
if(ret != 0) return ret;
} else {
// PaxosMsg中的提案编号小于等于当前承诺的提案编号,拒绝prepare请求,并返回当前已承诺的最大提案编号
replyPaxosMsg.setRejectByPromiseID(this.acceptorState.getPromiseBallot().getProposalID());
}
long replyNodeId = paxosMsg.getNodeID();
sendMessage(replyNodeId, replyPaxosMsg);
return 0;
}
private int updateAcceptorState4Prepare(PaxosMsg replyPaxosMsg, BallotNumber ballot) {
// 返回已批准但未提交的最大提案编号,未批准过值时为0
replyPaxosMsg.setPreAcceptID(this.acceptorState.getAcceptedBallot().getProposalID());
replyPaxosMsg.setPreAcceptNodeID(this.acceptorState.getAcceptedBallot().getNodeId());
if(this.acceptorState.getAcceptedBallot().getProposalID() > 0) {
// 返回当前已批准但未提交的值
replyPaxosMsg.setValue(this.acceptorState.getAcceptedValue());
}
this.acceptorState.setPromiseBallot(ballot);
// acceptorState同步持久化存储
int ret = this.acceptorState.persist(getInstanceID(), getLastChecksum());
if(ret != 0) {
return -1;
}
return 0;
}
public void onPrepareReply(PaxosMsg paxosMsg) {
// prepare阶段已经退出,说明当前接收到的是一条历史过期prepare回复,不做处理
if(!this.isPreparing) {
return ;
}
// 与当前prepare阶段不一致,不做处理
if(paxosMsg.getProposalID() != this.proposerState.getProposalID()) {
return;
}
this.msgCounter.addReceive(paxosMsg.getNodeID());
if(paxosMsg.getRejectByPromiseID() == 0) {
// prepare请求得到响应
BallotNumber ballot = new BallotNumber(paxosMsg.getPreAcceptID(),paxosMsg.getPreAcceptNodeID());
// 统计已经批准通过的节点
this.msgCounter.addPromiseOrAccept(paxosMsg.getNodeID());
// 更新其它节点已经批准的最大提案值
this.proposerState.addPreAcceptValue(ballot, paxosMsg.getValue());
} else {
// prepare请求被拒
this.msgCounter.addReject(paxosMsg.getNodeID());
this.wasRejectBySomeone = true;
// 更新其它节点已承诺的最大提案编号
this.proposerState.setOtherProposalID(paxosMsg.getRejectByPromiseID());
}
if(this.msgCounter.isPassedOnThisRound()) {
// 过半节点通过则进入accept阶段
int useTimeMs = this.timeStat.point();
this.canSkipPrepare = true;
accept();
} else if(this.msgCounter.isRejectedOnThisRound() || this.msgCounter.isAllReceiveOnThisRound()) {
// 过半节点未通过,随机延迟10-40毫秒后再进行重试,此处随机重试机制在讲Paxos算法活性时提到过,可一定程度上降低提案冲突的概率
addPrepareTimer(OtherUtils.fastRand() % 30 + 10);
}
}
若接收到的其它节点已批准的提案值,则取最大编号的提案值作为Accept阶段预提交的提案值
public void addPreAcceptValue(BallotNumber otherPreAcceptBallot, byte[] otherPreAcceptValue) {
if(otherPreAcceptBallot.isNull()) {
return ;
}
if(otherPreAcceptBallot.gt(this.highestOtherPreAcceptBallot)) {
this.highestOtherPreAcceptBallot = otherPreAcceptBallot;
this.value = otherPreAcceptValue;
}
}
public void accept() {
this.timeStat.point();
// 退出上轮prepare阶段
exitPrepare();
this.isAccepting = true;
PaxosMsg paxosMsg = new PaxosMsg();
paxosMsg.setMsgType(PaxosMsgType.paxosAccept.getValue());
paxosMsg.setInstanceID(getInstanceID());
paxosMsg.setNodeID(this.pConfig.getMyNodeID());
paxosMsg.setProposalID(this.proposerState.getProposalID());
paxosMsg.setValue(this.proposerState.getValue());
paxosMsg.setLastChecksum(getLastChecksum());
// 启动新的Accept阶段统计计数
this.msgCounter.startNewRound();
// 添加Accept请求超时定时器,超时后,重新发起prepare请求,当请求未得到正常响应时,很可能其它节点承诺过得最大提案编号已经发生变化,此时为了减少冲突,不直接进入Accept阶段重试;
addAcceptTimer(0);
int runSelfFirst = BroadcastMessageType.BroadcastMessage_Type_RunSelf_Final.getType();
int sendType = MessageSendType.UDP.getValue();
// 广播Accept请求到所有Acceptor,包括Proposer节点自己
broadcastMessage(paxosMsg, runSelfFirst, sendType);
}
public void onAccept(PaxosMsg paxosMsg) {
PaxosMsg replyPaxosMsg = new PaxosMsg();
replyPaxosMsg.setInstanceID(getInstanceID());
replyPaxosMsg.setNodeID(this.pConfig.getMyNodeID());
replyPaxosMsg.setProposalID(paxosMsg.getProposalID());
replyPaxosMsg.setMsgType(PaxosMsgType.paxosAcceptReply.getValue());
BallotNumber ballot = new BallotNumber(paxosMsg.getProposalID(), paxosMsg.getNodeID());
BallotNumber promiseBallot = this.acceptorState.getPromiseBallot();
if(ballot.ge(promiseBallot)) {
// 提案编号大于等于已承诺过得最大提案编号,则批准提案
this.acceptorState.setPromiseBallot(ballot);
BallotNumber acceptedBallot = new BallotNumber(ballot.getProposalID(),ballot.getNodeId());
this.acceptorState.setAcceptedBallot(acceptedBallot);
this.acceptorState.setAcceptedValue(paxosMsg.getValue());
// 持久化存储已批准提案状态
updateAcceptorState4Accept(replyPaxosMsg);
} else {
// 提案编号小于已承诺过得最大提案编号,则拒绝请求,并返回已承诺的最大提案编号
replyPaxosMsg.setRejectByPromiseID(this.acceptorState.getPromiseBallot().getProposalID());
}
long replyNodeId = paxosMsg.getNodeID();
sendMessage(replyNodeId, replyPaxosMsg);
}
Accept请求返回结果处理同样是统计过半节点返回ack情况,过半节点通过后,广播提案提交请求到所有节点,优先由本节点的Learner执行状态机,若执行成功并且Propose请求未超时,则可以返回给上层逻辑执行数据提交成功。最终提案提交阶段,Proposer不需要等待其它节点执行成功的反馈。
public void onAcceptReply(PaxosMsg paxosMsg) {
if(!this.isAccepting) {
return ;
}
if(paxosMsg.getProposalID() != this.proposerState.getProposalID()) {
return ;
}
this.msgCounter.addReceive(paxosMsg.getNodeID());
if(paxosMsg.getRejectByPromiseID() == 0) {
this.msgCounter.addPromiseOrAccept(paxosMsg.getNodeID());
} else {
this.msgCounter.addReject(paxosMsg.getNodeID());
this.wasRejectBySomeone = true;
this.proposerState.setOtherProposalID(paxosMsg.getRejectByPromiseID());
}
if(this.msgCounter.isPassedOnThisRound()) {
// 向所有节点广播提交提案
int useTimeMs = this.timeStat.point();
exitAccept();
this.learner.proposerSendSuccess(getInstanceID(),this.proposerState.getProposalID());
} else if(this.msgCounter.isRejectedOnThisRound() || this.msgCounter.isAllReceiveOnThisRound()) {
// 同样随机延迟10-40ms后,重新发起prepare请求
addAcceptTimer(OtherUtils.fastRand() % 30 +10);
}
}
public void proposerSendSuccess(long learnInstanceID, long proposalID) {
PaxosMsg msg = new PaxosMsg();
msg.setMsgType(PaxosMsgType.paxosLearnerProposerSendSuccess.getValue());
msg.setInstanceID(learnInstanceID);
msg.setNodeID(this.pConfig.getMyNodeID());
msg.setProposalID(proposalID);
msg.setLastChecksum(getLastChecksum());
// 同样通过UDP广播给所有节点,进行提案提交
broadcastMessage(msg, BroadcastMessageType.
BroadcastMessage_Type_RunSelf_First.getType(),
MessageSendType.UDP.getValue());
}
public void onProposerSendSuccess(PaxosMsg paxosMsg) {
if(paxosMsg.getInstanceID() != getInstanceID()) {
//insantceID不一致时,不是期望写入的数据序列
logger.debug("InstanceID not same, skip msg, paxosMsg instanceID {}, now instanceID{}.", paxosMsg.getInstanceID(), getInstanceID());
return ;
}
if(this.acceptor.getAcceptorState().getAcceptedBallot().getProposalID() == 0) {
//尚未批准过任何值
logger.debug("I haven\'t accpeted any proposal");
return ;
}
BallotNumber ballot = new BallotNumber(paxosMsg.getProposalID(), paxosMsg.getNodeID());
BallotNumber thisBallot = this.acceptor.getAcceptorState().getAcceptedBallot();
if(thisBallot.getProposalID() != ballot.getProposalID() || thisBallot.getNodeId() != ballot.getNodeId()) {
// 与批准的提案编号不一致,有可能提案值发生变化
logger.warn("ProposalBallot not same to AcceptedBallot");
return ;
}
// 学习数据并执行状态机
this.learnerState.learnValueWithoutWrite(paxosMsg.getInstanceID(),this.acceptor.getAcceptorState().getAcceptedValue(),this.acceptor.getAcceptorState().getCheckSum());
// 将数据转发给当前节点的follower节点
transmitToFollower();
}
{
SMCtx smCtx = new SMCtx();
boolean isMyCommit =this.commitCtx.isMycommit(this.learner.getInstanceID(),this.learner.getLearnValue(), smCtx);
// ……
// 执行状态机
if (!smExecute(this.learner.getInstanceID(), this.learner.getLearnValue(), isMyCommit, smCtx)) {
// 状态机执行失败时,设置Commit结果提交失败
this.commitCtx.setResult(PaxosTryCommitRet.PaxosTryCommitRet_ExecuteFail.getRet(),this.learner.getInstanceID(), this.learner.getLearnValue());
// 取消跳过prepare阶段
this.proposer.cancelSkipPrepare();
return -1;
}
// 状态机执行成功时,设置Commit结果提交成功
this.commitCtx.setResult(PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet(),this.learner.getInstanceID(), this.learner.getLearnValue());
// ……
// 更新checksum值
this.lastChecksum = this.learner.getNewChecksum();
// 清空instance状态,准备接收下一条instance数据
newInstance();
}