diff options
| author | Paul Buetow <paul@buetow.org> | 2008-05-26 22:14:52 +0000 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2008-05-26 22:14:52 +0000 |
| commit | 923b0503c91aa4a9c0bc94489caddf9ac94c5ad8 (patch) | |
| tree | 74bf4a51494085c9ecb107d3facc31bb585669b4 /sources/protocols/implementations/TwoPhaseCommitProtocol.java | |
| parent | faee8241ff7972ceeb622e0793c655f301ef0bd0 (diff) | |
Two phase commit protocol works.
Diffstat (limited to 'sources/protocols/implementations/TwoPhaseCommitProtocol.java')
| -rw-r--r-- | sources/protocols/implementations/TwoPhaseCommitProtocol.java | 133 |
1 files changed, 94 insertions, 39 deletions
diff --git a/sources/protocols/implementations/TwoPhaseCommitProtocol.java b/sources/protocols/implementations/TwoPhaseCommitProtocol.java index 05ac8cf..f86ff46 100644 --- a/sources/protocols/implementations/TwoPhaseCommitProtocol.java +++ b/sources/protocols/implementations/TwoPhaseCommitProtocol.java @@ -16,11 +16,14 @@ import core.VSMessage; public class TwoPhaseCommitProtocol extends VSAbstractProtocol { private static final long serialVersionUID = 1L; - /* Server variables, coordinator */ - private ArrayList<Integer> pids; + /** PIDs of all processes which still have to vote */ + private ArrayList<Integer> votePids; - /* Client variables */ - private boolean ackSent; + /** PIDs of all processes which have to acknowledge that they recv the global vote result */ + private ArrayList<Integer> ackPids; + + /** The gloal vote result */ + private boolean voteResult; /** * Instantiates a one phase commit protocol. @@ -33,8 +36,12 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { vec.add(2); vec.add(3); + /* Server */ initVector("pids", vec, "PIDs beteilitger Prozesse"); initLong("timeout", 5000, "Zeit bis erneuerter Anfrage", "ms"); + + /* Client */ + initInteger("ackProb", 100, "Festschreibw'keit", 0, 100, "%"); } /* (non-Javadoc) @@ -47,26 +54,42 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientReset() */ protected void onClientReset() { - pids.clear(); - pids.addAll(getVector("pids")); + if (votePids != null) { + voteResult = true; + votePids.clear(); + votePids.addAll(getVector("pids")); + ackPids.clear(); + ackPids.addAll(getVector("pids")); + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientStart() */ protected void onClientStart() { - if (pids == null) { - pids = new ArrayList<Integer>(); - pids.addAll(getVector("pids")); - + if (votePids == null) { + voteResult = true; + votePids = new ArrayList<Integer>(); + votePids.addAll(getVector("pids")); + ackPids = new ArrayList<Integer>(); + ackPids.addAll(getVector("pids")); } - if (pids.size() != 0) { + if (votePids.size() != 0) { long timeout = getLong("timeout") + process.getTime(); scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */ VSMessage message = new VSMessage(); - message.setBoolean("wantAck", true); + message.setBoolean("wantVote", true); + sendMessage(message); + + } else if (ackPids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */ + + VSMessage message = new VSMessage(); + message.setBoolean("isVoteResult", true); + message.setBoolean("voteResult", voteResult); sendMessage(message); } } @@ -75,18 +98,38 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) */ protected void onClientRecv(VSMessage recvMessage) { - if (pids.size() == 0) - return; - - if (recvMessage.getBoolean("isAck")) { + if (votePids.size() != 0 && recvMessage.getBoolean("isVote")) { Integer pid = recvMessage.getIntegerObj("pid"); - if (pids.contains(pid)) - pids.remove(pid); - - logg("ACK von Prozess " + pid + " erhalten!"); - - if (pids.size() == 0) - logg("ACKs von allen beteiligten Prozessen erhalten!"); + if (votePids.contains(pid)) + votePids.remove(pid); + else + return; + + boolean vote = recvMessage.getBoolean("vote"); + logg("Abstimmung von Prozess " + pid + " erhalten! Ergebnis: " + vote); + + if (!vote) + voteResult = false; + + if (votePids.size() == 0) { + logg("Abstimmungen von allen beteiligten Prozessen erhalten! Globales Ergebnis: " + voteResult); + /* Remove the active schedule which has been created in the onClientStart method */ + removeSchedules(); + /* Create a new schedule and send the vote result */ + onClientStart(); + } + } else if (ackPids.size() != 0 && recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + if (ackPids.contains(pid)) + ackPids.remove(pid); + else + return; + + if (ackPids.size() == 0) { + /* Remove the active schedule which has been created in the onClientStart method */ + removeSchedules(); + logg("Alle Teilnehmer haben die Abstimmung erhalten"); + } } } @@ -97,25 +140,44 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { onClientStart(); } + /* Client variables */ + private boolean voteSent; + private boolean myVote; + /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onServerReset() */ protected void onServerReset() { - ackSent = false; + voteSent = false; } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) */ protected void onServerRecv(VSMessage recvMessage) { - if (ackSent) - return; - - VSMessage message = new VSMessage(); - message.setBoolean("isAck", true); - message.setInteger("pid", process.getProcessID()); - sendMessage(message); - ackSent = true; + if (recvMessage.getBoolean("wantVote")) { + if (!voteSent) { + voteSent = true; + myVote = process.getRandomPercentage() <= getInteger("ackProb"); + } + + VSMessage message = new VSMessage(); + message.setBoolean("isVote", true); + message.setBoolean("vote", myVote); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + + logg("Abstimmung " + myVote + " versendet"); + + } else if (recvMessage.getBoolean("isVoteResult")) { + boolean voteResult = recvMessage.getBoolean("voteResult"); + logg("Globales Abstimmungsergebnis erhalten. Ergebnis: " + voteResult); + + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + } } /* (non-Javadoc) @@ -123,11 +185,4 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { */ protected void onServerSchedule() { } - - /* (non-Javadoc) - * @see protocols.VSAbstractProtocol#toString() - */ - public String toString() { - return super.toString(); - } } |
