From c5e06e480d01f4f87d02b5f04e873f44a679c741 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 27 Mar 2026 06:09:31 +0200 Subject: Fix Raft append replication review issues b85586a4-4eb9-4686-93c7-0ab14173baa5 --- .../protocols/implementations/VSRaftProtocol.java | 23 ++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) (limited to 'src/main/java/protocols') diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 983c8d3..8c36b68 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -277,6 +277,8 @@ public class VSRaftProtocol extends VSAbstractProtocol { * Sends a simplified append-entry request for the configured log entry. */ private void sendAppendEntry() { + ackPids.clear(); + if (getVectorKeySet().contains("pids")) { ackPids.addAll(getVector("pids")); } @@ -381,6 +383,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void handleAppendEntry(VSMessage recvMessage) { int messageTerm = recvMessage.getInteger("term"); int messageLeaderId = recvMessage.getInteger("leaderId"); + int messageLogIndex = recvMessage.getInteger("logIndex"); if (messageTerm > currentTerm) { becomeFollower(messageTerm, messageLeaderId); @@ -393,13 +396,17 @@ public class VSRaftProtocol extends VSAbstractProtocol { return; } - logIndex++; + if (messageLogIndex != logIndex + 1) { + return; + } + + logIndex = messageLogIndex; VSMessage appendAck = new VSMessage(); appendAck.setString("type", "appendAck"); appendAck.setInteger("term", currentTerm); appendAck.setInteger("pid", process.getProcessID()); - appendAck.setInteger("logIndex", logIndex); + appendAck.setInteger("logIndex", messageLogIndex); appendAck.setInteger("targetPid", messageLeaderId); sendMessage(appendAck); } @@ -410,17 +417,25 @@ public class VSRaftProtocol extends VSAbstractProtocol { * @param recvMessage the append acknowledgement */ private void handleAppendAck(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); Integer responderPid = recvMessage.getIntegerObj("pid"); + int ackLogIndex = recvMessage.getInteger("logIndex"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, -1); + return; + } if (!isLeader || !isForMe(recvMessage) || responderPid == null || + messageTerm != currentTerm || ackLogIndex != logIndex || !ackPids.contains(responderPid)) { return; } ackPids.remove(responderPid); - if (ackPids.isEmpty()) { - commitIndex++; + if (ackPids.isEmpty() && commitIndex < ackLogIndex) { + commitIndex = ackLogIndex; log("Committed log index " + commitIndex); } } -- cgit v1.2.3