diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-26 23:42:05 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-26 23:42:05 +0200 |
| commit | e3dfc7a62fc1eb27a9fb68dd530064cdd2d5bb07 (patch) | |
| tree | 1be8b851ca34015ac6e16d2c60316e5cc8bb6ef6 /src/main/java/protocols/implementations/VSRaftProtocol.java | |
| parent | 637c4a09bfbe7045b0a639a616cfffc983da7e05 (diff) | |
Implement Raft append replication b85586a4-4eb9-4686-93c7-0ab14173baa5
Diffstat (limited to 'src/main/java/protocols/implementations/VSRaftProtocol.java')
| -rw-r--r-- | src/main/java/protocols/implementations/VSRaftProtocol.java | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 230c6d9..983c8d3 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -171,6 +171,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { isCandidate = false; votesReceived = 0; voteResponsePids.clear(); + ackPids.clear(); leaderId = process.getProcessID(); lastHeartbeatTime = process.getTime(); isServer(true); @@ -186,6 +187,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { currentContextIsServer(true); sendHeartbeat(); + sendAppendEntry(); currentContextIsServer(previousContextIsServer); } @@ -272,6 +274,29 @@ public class VSRaftProtocol extends VSAbstractProtocol { } /** + * Sends a simplified append-entry request for the configured log entry. + */ + private void sendAppendEntry() { + if (getVectorKeySet().contains("pids")) { + ackPids.addAll(getVector("pids")); + } + + if (ackPids.isEmpty()) { + return; + } + + logIndex++; + + VSMessage appendEntry = new VSMessage(); + appendEntry.setString("type", "appendEntry"); + appendEntry.setInteger("term", currentTerm); + appendEntry.setInteger("leaderId", leaderId); + appendEntry.setString("entry", getString("logEntry")); + appendEntry.setInteger("logIndex", logIndex); + sendMessage(appendEntry); + } + + /** * Dispatches Raft messages to the relevant handlers. * * @param recvMessage the received message @@ -283,6 +308,10 @@ public class VSRaftProtocol extends VSAbstractProtocol { handleVoteRequest(recvMessage); } else if ("voteResponse".equals(messageType)) { handleVoteResponse(recvMessage); + } else if ("appendEntry".equals(messageType)) { + handleAppendEntry(recvMessage); + } else if ("appendAck".equals(messageType)) { + handleAppendAck(recvMessage); } } @@ -345,6 +374,58 @@ public class VSRaftProtocol extends VSAbstractProtocol { } /** + * Handles an incoming append-entry request from the current leader. + * + * @param recvMessage the append-entry message + */ + private void handleAppendEntry(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + int messageLeaderId = recvMessage.getInteger("leaderId"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, messageLeaderId); + } else if (messageTerm == currentTerm) { + leaderId = messageLeaderId; + isLeader = false; + isCandidate = false; + resetElectionTimeout(); + } else { + return; + } + + logIndex++; + + VSMessage appendAck = new VSMessage(); + appendAck.setString("type", "appendAck"); + appendAck.setInteger("term", currentTerm); + appendAck.setInteger("pid", process.getProcessID()); + appendAck.setInteger("logIndex", logIndex); + appendAck.setInteger("targetPid", messageLeaderId); + sendMessage(appendAck); + } + + /** + * Handles an append-entry acknowledgement on the leader. + * + * @param recvMessage the append acknowledgement + */ + private void handleAppendAck(VSMessage recvMessage) { + Integer responderPid = recvMessage.getIntegerObj("pid"); + + if (!isLeader || !isForMe(recvMessage) || responderPid == null || + !ackPids.contains(responderPid)) { + return; + } + + ackPids.remove(responderPid); + + if (ackPids.isEmpty()) { + commitIndex++; + log("Committed log index " + commitIndex); + } + } + + /** * Checks whether a directed response is meant for this process. * * @param recvMessage the received message |
