summaryrefslogtreecommitdiff
path: root/src/main/java/protocols/implementations
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-26 23:32:34 +0200
committerPaul Buetow <paul@buetow.org>2026-03-26 23:32:34 +0200
commit71182306390990e5ef9726e73924bc5a9f070282 (patch)
treed576759a4f1e6cd9c11613105e9043094b0cd6f8 /src/main/java/protocols/implementations
parent3820f2fe179995aa6aa12e1fd2ab9b07a7938620 (diff)
Implement Raft vote handling for 04c78b3c-2267-495b-9aca-84b544a1882f
Diffstat (limited to 'src/main/java/protocols/implementations')
-rw-r--r--src/main/java/protocols/implementations/VSRaftProtocol.java120
1 files changed, 119 insertions, 1 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java
index 6257538..597c1e0 100644
--- a/src/main/java/protocols/implementations/VSRaftProtocol.java
+++ b/src/main/java/protocols/implementations/VSRaftProtocol.java
@@ -3,6 +3,7 @@ package protocols.implementations;
import java.util.ArrayList;
import java.util.Vector;
+import core.VSInternalProcess;
import core.VSMessage;
import protocols.VSAbstractProtocol;
@@ -87,12 +88,14 @@ public class VSRaftProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage)
*/
public void onServerRecv(VSMessage recvMessage) {
+ handleMessage(recvMessage);
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage)
*/
public void onClientRecv(VSMessage recvMessage) {
+ handleMessage(recvMessage);
}
/* (non-Javadoc)
@@ -157,10 +160,23 @@ public class VSRaftProtocol extends VSAbstractProtocol {
private void becomeLeader() {
isLeader = true;
isCandidate = false;
- removeSchedules();
+ votesReceived = 0;
leaderId = process.getProcessID();
lastHeartbeatTime = process.getTime();
+ isServer(true);
+
+ if (!getLongKeySet().contains("heartbeatInterval")) {
+ onServerInit();
+ }
+
+ boolean previousContextIsServer = currentContextIsServer();
+
+ currentContextIsServer(false);
+ removeSchedules();
+
+ currentContextIsServer(true);
sendHeartbeat();
+ currentContextIsServer(previousContextIsServer);
}
/**
@@ -218,6 +234,7 @@ public class VSRaftProtocol extends VSAbstractProtocol {
isCandidate = true;
leaderId = -1;
lastHeartbeatTime = process.getTime();
+ isServer(true);
VSMessage voteRequest = new VSMessage();
voteRequest.setString("type", "voteRequest");
@@ -241,4 +258,105 @@ public class VSRaftProtocol extends VSAbstractProtocol {
lastHeartbeatTime = process.getTime();
scheduleAt(process.getTime() + getLong("heartbeatInterval"));
}
+
+ /**
+ * Dispatches Raft messages to the relevant handlers.
+ *
+ * @param recvMessage the received message
+ */
+ private void handleMessage(VSMessage recvMessage) {
+ String messageType = recvMessage.getString("type");
+
+ if ("voteRequest".equals(messageType)) {
+ handleVoteRequest(recvMessage);
+ } else if ("voteResponse".equals(messageType)) {
+ handleVoteResponse(recvMessage);
+ }
+ }
+
+ /**
+ * Handles an incoming vote request from a candidate.
+ *
+ * @param recvMessage the vote request
+ */
+ private void handleVoteRequest(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+ int candidateId = recvMessage.getInteger("candidateId");
+ boolean voteGranted = false;
+
+ if (messageTerm >= currentTerm &&
+ (votedFor == -1 || votedFor == candidateId)) {
+ becomeFollower(messageTerm, -1);
+ votedFor = candidateId;
+ voteGranted = true;
+ }
+
+ VSMessage voteResponse = new VSMessage();
+ voteResponse.setString("type", "voteResponse");
+ voteResponse.setInteger("term", currentTerm);
+ voteResponse.setInteger("pid", process.getProcessID());
+ voteResponse.setBoolean("voteGranted", voteGranted);
+ voteResponse.setInteger("targetPid", candidateId);
+ sendMessage(voteResponse);
+ }
+
+ /**
+ * Handles an incoming vote response for an active election.
+ *
+ * @param recvMessage the vote response
+ */
+ private void handleVoteResponse(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, -1);
+ return;
+ }
+
+ if (!isCandidate || !isForMe(recvMessage) ||
+ !recvMessage.getBoolean("voteGranted") ||
+ messageTerm != currentTerm) {
+ return;
+ }
+
+ votesReceived++;
+
+ if (votesReceived > getClusterSize() / 2) {
+ becomeLeader();
+ }
+ }
+
+ /**
+ * Checks whether a directed response is meant for this process.
+ *
+ * @param recvMessage the received message
+ * @return true if the message targets this process or has no target field
+ */
+ private boolean isForMe(VSMessage recvMessage) {
+ if (!recvMessage.getIntegerKeySet().contains("targetPid")) {
+ return true;
+ }
+
+ return recvMessage.getInteger("targetPid") == process.getProcessID();
+ }
+
+ /**
+ * Determines the cluster size used for majority calculations.
+ *
+ * @return the number of processes participating in the election
+ */
+ private int getClusterSize() {
+ VSInternalProcess internalProcess = (VSInternalProcess) process;
+ int numProcesses = internalProcess.getSimulatorCanvas().getNumProcesses();
+
+ if (numProcesses > 0) {
+ return numProcesses;
+ }
+
+ if (getVectorKeySet().contains("pids")) {
+ return getVector("pids").size() + 1;
+ }
+
+ return 1;
+ }
}