diff options
Diffstat (limited to 'src/main/java/protocols')
| -rw-r--r-- | src/main/java/protocols/implementations/VSRaftProtocol.java | 87 |
1 files changed, 79 insertions, 8 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 6828b34..53ead3c 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -306,15 +306,86 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void handleMessage(VSMessage recvMessage) { String messageType = recvMessage.getString("type"); - if ("voteRequest".equals(messageType)) { - handleVoteRequest(recvMessage); - } else if ("voteResponse".equals(messageType)) { - handleVoteResponse(recvMessage); - } else if ("appendEntry".equals(messageType)) { - handleAppendEntry(recvMessage); - } else if ("appendAck".equals(messageType)) { - handleAppendAck(recvMessage); + if (messageType == null) { + return; + } + + switch (messageType) { + case "heartbeat": + handleHeartbeat(recvMessage); + break; + case "heartbeatAck": + handleHeartbeatAck(recvMessage); + break; + case "voteRequest": + handleVoteRequest(recvMessage); + break; + case "voteResponse": + handleVoteResponse(recvMessage); + break; + case "appendEntry": + handleAppendEntry(recvMessage); + break; + case "appendAck": + handleAppendAck(recvMessage); + break; + default: + break; + } + } + + /** + * Handles an incoming heartbeat from the current leader. + * + * @param recvMessage the heartbeat message + */ + private void handleHeartbeat(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + int messageLeaderId = recvMessage.getInteger("leaderId"); + + if (messageTerm < currentTerm) { + return; } + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, messageLeaderId); + } else { + leaderId = messageLeaderId; + isLeader = false; + isCandidate = false; + resetElectionTimeout(); + } + + lastHeartbeatTime = process.getTime(); + + VSMessage heartbeatAck = new VSMessage(); + heartbeatAck.setString("type", "heartbeatAck"); + heartbeatAck.setInteger("term", currentTerm); + heartbeatAck.setInteger("pid", process.getProcessID()); + heartbeatAck.setInteger("targetPid", messageLeaderId); + sendMessage(heartbeatAck); + } + + /** + * Handles an incoming heartbeat acknowledgement on the leader. + * + * @param recvMessage the heartbeat acknowledgement + */ + private void handleHeartbeatAck(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + Integer responderPid = recvMessage.getIntegerObj("pid"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, -1); + return; + } + + if (!isLeader || !isForMe(recvMessage) || responderPid == null || + messageTerm != currentTerm) { + return; + } + + log("Heartbeat ACK from process " + responderPid + " received"); } /** |
