From 71182306390990e5ef9726e73924bc5a9f070282 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 26 Mar 2026 23:32:34 +0200 Subject: Implement Raft vote handling for 04c78b3c-2267-495b-9aca-84b544a1882f --- .../protocols/implementations/VSRaftProtocol.java | 120 ++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) (limited to 'src/main/java/protocols/implementations/VSRaftProtocol.java') diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 6257538..597c1e0 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -3,6 +3,7 @@ package protocols.implementations; import java.util.ArrayList; import java.util.Vector; +import core.VSInternalProcess; import core.VSMessage; import protocols.VSAbstractProtocol; @@ -87,12 +88,14 @@ public class VSRaftProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) */ public void onServerRecv(VSMessage recvMessage) { + handleMessage(recvMessage); } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) */ public void onClientRecv(VSMessage recvMessage) { + handleMessage(recvMessage); } /* (non-Javadoc) @@ -157,10 +160,23 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void becomeLeader() { isLeader = true; isCandidate = false; - removeSchedules(); + votesReceived = 0; leaderId = process.getProcessID(); lastHeartbeatTime = process.getTime(); + isServer(true); + + if (!getLongKeySet().contains("heartbeatInterval")) { + onServerInit(); + } + + boolean previousContextIsServer = currentContextIsServer(); + + currentContextIsServer(false); + removeSchedules(); + + currentContextIsServer(true); sendHeartbeat(); + currentContextIsServer(previousContextIsServer); } /** @@ -218,6 +234,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { isCandidate = true; leaderId = -1; lastHeartbeatTime = process.getTime(); + isServer(true); VSMessage voteRequest = new VSMessage(); voteRequest.setString("type", "voteRequest"); @@ -241,4 +258,105 @@ public class VSRaftProtocol extends VSAbstractProtocol { lastHeartbeatTime = process.getTime(); scheduleAt(process.getTime() + getLong("heartbeatInterval")); } + + /** + * Dispatches Raft messages to the relevant handlers. + * + * @param recvMessage the received message + */ + private void handleMessage(VSMessage recvMessage) { + String messageType = recvMessage.getString("type"); + + if ("voteRequest".equals(messageType)) { + handleVoteRequest(recvMessage); + } else if ("voteResponse".equals(messageType)) { + handleVoteResponse(recvMessage); + } + } + + /** + * Handles an incoming vote request from a candidate. + * + * @param recvMessage the vote request + */ + private void handleVoteRequest(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + int candidateId = recvMessage.getInteger("candidateId"); + boolean voteGranted = false; + + if (messageTerm >= currentTerm && + (votedFor == -1 || votedFor == candidateId)) { + becomeFollower(messageTerm, -1); + votedFor = candidateId; + voteGranted = true; + } + + VSMessage voteResponse = new VSMessage(); + voteResponse.setString("type", "voteResponse"); + voteResponse.setInteger("term", currentTerm); + voteResponse.setInteger("pid", process.getProcessID()); + voteResponse.setBoolean("voteGranted", voteGranted); + voteResponse.setInteger("targetPid", candidateId); + sendMessage(voteResponse); + } + + /** + * Handles an incoming vote response for an active election. + * + * @param recvMessage the vote response + */ + private void handleVoteResponse(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, -1); + return; + } + + if (!isCandidate || !isForMe(recvMessage) || + !recvMessage.getBoolean("voteGranted") || + messageTerm != currentTerm) { + return; + } + + votesReceived++; + + if (votesReceived > getClusterSize() / 2) { + becomeLeader(); + } + } + + /** + * Checks whether a directed response is meant for this process. + * + * @param recvMessage the received message + * @return true if the message targets this process or has no target field + */ + private boolean isForMe(VSMessage recvMessage) { + if (!recvMessage.getIntegerKeySet().contains("targetPid")) { + return true; + } + + return recvMessage.getInteger("targetPid") == process.getProcessID(); + } + + /** + * Determines the cluster size used for majority calculations. + * + * @return the number of processes participating in the election + */ + private int getClusterSize() { + VSInternalProcess internalProcess = (VSInternalProcess) process; + int numProcesses = internalProcess.getSimulatorCanvas().getNumProcesses(); + + if (numProcesses > 0) { + return numProcesses; + } + + if (getVectorKeySet().contains("pids")) { + return getVector("pids").size() + 1; + } + + return 1; + } } -- cgit v1.2.3