summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-26 23:42:05 +0200
committerPaul Buetow <paul@buetow.org>2026-03-26 23:42:05 +0200
commite3dfc7a62fc1eb27a9fb68dd530064cdd2d5bb07 (patch)
tree1be8b851ca34015ac6e16d2c60316e5cc8bb6ef6 /src/main
parent637c4a09bfbe7045b0a639a616cfffc983da7e05 (diff)
Implement Raft append replication b85586a4-4eb9-4686-93c7-0ab14173baa5
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/protocols/implementations/VSRaftProtocol.java81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java
index 230c6d9..983c8d3 100644
--- a/src/main/java/protocols/implementations/VSRaftProtocol.java
+++ b/src/main/java/protocols/implementations/VSRaftProtocol.java
@@ -171,6 +171,7 @@ public class VSRaftProtocol extends VSAbstractProtocol {
isCandidate = false;
votesReceived = 0;
voteResponsePids.clear();
+ ackPids.clear();
leaderId = process.getProcessID();
lastHeartbeatTime = process.getTime();
isServer(true);
@@ -186,6 +187,7 @@ public class VSRaftProtocol extends VSAbstractProtocol {
currentContextIsServer(true);
sendHeartbeat();
+ sendAppendEntry();
currentContextIsServer(previousContextIsServer);
}
@@ -272,6 +274,29 @@ public class VSRaftProtocol extends VSAbstractProtocol {
}
/**
+ * Sends a simplified append-entry request for the configured log entry.
+ */
+ private void sendAppendEntry() {
+ if (getVectorKeySet().contains("pids")) {
+ ackPids.addAll(getVector("pids"));
+ }
+
+ if (ackPids.isEmpty()) {
+ return;
+ }
+
+ logIndex++;
+
+ VSMessage appendEntry = new VSMessage();
+ appendEntry.setString("type", "appendEntry");
+ appendEntry.setInteger("term", currentTerm);
+ appendEntry.setInteger("leaderId", leaderId);
+ appendEntry.setString("entry", getString("logEntry"));
+ appendEntry.setInteger("logIndex", logIndex);
+ sendMessage(appendEntry);
+ }
+
+ /**
* Dispatches Raft messages to the relevant handlers.
*
* @param recvMessage the received message
@@ -283,6 +308,10 @@ public class VSRaftProtocol extends VSAbstractProtocol {
handleVoteRequest(recvMessage);
} else if ("voteResponse".equals(messageType)) {
handleVoteResponse(recvMessage);
+ } else if ("appendEntry".equals(messageType)) {
+ handleAppendEntry(recvMessage);
+ } else if ("appendAck".equals(messageType)) {
+ handleAppendAck(recvMessage);
}
}
@@ -345,6 +374,58 @@ public class VSRaftProtocol extends VSAbstractProtocol {
}
/**
+ * Handles an incoming append-entry request from the current leader.
+ *
+ * @param recvMessage the append-entry message
+ */
+ private void handleAppendEntry(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+ int messageLeaderId = recvMessage.getInteger("leaderId");
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, messageLeaderId);
+ } else if (messageTerm == currentTerm) {
+ leaderId = messageLeaderId;
+ isLeader = false;
+ isCandidate = false;
+ resetElectionTimeout();
+ } else {
+ return;
+ }
+
+ logIndex++;
+
+ VSMessage appendAck = new VSMessage();
+ appendAck.setString("type", "appendAck");
+ appendAck.setInteger("term", currentTerm);
+ appendAck.setInteger("pid", process.getProcessID());
+ appendAck.setInteger("logIndex", logIndex);
+ appendAck.setInteger("targetPid", messageLeaderId);
+ sendMessage(appendAck);
+ }
+
+ /**
+ * Handles an append-entry acknowledgement on the leader.
+ *
+ * @param recvMessage the append acknowledgement
+ */
+ private void handleAppendAck(VSMessage recvMessage) {
+ Integer responderPid = recvMessage.getIntegerObj("pid");
+
+ if (!isLeader || !isForMe(recvMessage) || responderPid == null ||
+ !ackPids.contains(responderPid)) {
+ return;
+ }
+
+ ackPids.remove(responderPid);
+
+ if (ackPids.isEmpty()) {
+ commitIndex++;
+ log("Committed log index " + commitIndex);
+ }
+ }
+
+ /**
* Checks whether a directed response is meant for this process.
*
* @param recvMessage the received message