From 923b0503c91aa4a9c0bc94489caddf9ac94c5ad8 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 26 May 2008 22:14:52 +0000 Subject: Two phase commit protocol works. --- sources/protocols/VSAbstractProtocol.java | 51 ++++++-- .../implementations/BerkelyTimeProtocol.java | 8 +- .../implementations/OnePhaseCommitProtocol.java | 9 +- .../implementations/TwoPhaseCommitProtocol.java | 133 +++++++++++++++------ 4 files changed, 143 insertions(+), 58 deletions(-) (limited to 'sources/protocols') diff --git a/sources/protocols/VSAbstractProtocol.java b/sources/protocols/VSAbstractProtocol.java index 05864fb..7f459c5 100644 --- a/sources/protocols/VSAbstractProtocol.java +++ b/sources/protocols/VSAbstractProtocol.java @@ -4,6 +4,8 @@ */ package protocols; +import java.util.ArrayList; + import events.internal.*; import events.*; import core.*; @@ -23,6 +25,12 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent { /** The current protocol object's context is a server. */ private boolean currentContextIsServer; + /** The protocol's server schedules */ + private ArrayList serverSchedules = new ArrayList(); + + /** The protocol's client schedules */ + private ArrayList clientSchedules = new ArrayList(); + /** * Send a message. * @@ -139,30 +147,49 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent { * Reset. */ public void reset() { - if (isServer) { - currentContextIsServer = true; - isServer = false; - onServerReset(); - } - - if (isClient) { - currentContextIsServer = false; - isClient = false; - onClientReset(); - } + //if (isServer) { + currentContextIsServer = true; + isServer = false; + onServerReset(); + serverSchedules.clear(); + //} + + //if (isClient) { + currentContextIsServer = false; + isClient = false; + onClientReset(); + clientSchedules.clear(); + //} } /** * Reschedules the protocol for a new time and runs onClientSchedule or onServerSchedule * - * @param isClient the is client + * @param time The process' local time to run the schedule at. */ protected final void scheduleAt(long time) { VSAbstractEvent scheduleEvent = new ProtocolScheduleEvent(this, currentContextIsServer); VSTask scheduleTask = new VSTask(time, process, scheduleEvent, VSTask.LOCAL); + if (currentContextIsServer) + serverSchedules.add(scheduleTask); + else + clientSchedules.add(scheduleTask); process.getSimulationCanvas().getTaskManager().addTask(scheduleTask); } + /** + * Removes all schedules of the protocol (server or client) + */ + protected final void removeSchedules() { + if (currentContextIsServer) { + process.getSimulationCanvas().getTaskManager().removeAllTasks(serverSchedules); + serverSchedules.clear(); + } else { + process.getSimulationCanvas().getTaskManager().removeAllTasks(clientSchedules); + clientSchedules.clear(); + } + } + /** * On client start. */ diff --git a/sources/protocols/implementations/BerkelyTimeProtocol.java b/sources/protocols/implementations/BerkelyTimeProtocol.java index 411a9e8..890221a 100644 --- a/sources/protocols/implementations/BerkelyTimeProtocol.java +++ b/sources/protocols/implementations/BerkelyTimeProtocol.java @@ -44,7 +44,7 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol { Vector vec = new Vector(); vec.add(2); vec.add(3); - initVector("processPIDs", vec, "PIDs beteiliger Prozesse"); + initVector("pids", vec, "PIDs beteiliger Prozesse"); } /* (non-Javadoc) @@ -61,14 +61,14 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol { recvTimes.clear(); realTimesRTT.clear(); peers.clear(); - peers.addAll(getVector("processPIDs")); + peers.addAll(getVector("pids")); } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientStart() */ protected void onClientStart() { - peers.addAll(getVector("processPIDs")); + peers.addAll(getVector("pids")); requestTime = process.getTime(); VSMessage message = new VSMessage(); message.setBoolean("isRequest", true); @@ -130,7 +130,7 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol { } /* Include the time of the local process */ sum += process.getTime(); - return (long) sum / (getVector("processPIDs").size() + 1); + return (long) sum / (getVector("pids").size() + 1); } /** diff --git a/sources/protocols/implementations/OnePhaseCommitProtocol.java b/sources/protocols/implementations/OnePhaseCommitProtocol.java index a8d8d75..bc42c38 100644 --- a/sources/protocols/implementations/OnePhaseCommitProtocol.java +++ b/sources/protocols/implementations/OnePhaseCommitProtocol.java @@ -47,8 +47,10 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientReset() */ protected void onClientReset() { - pids.clear(); - pids.addAll(getVector("pids")); + if (pids != null) { + pids.clear(); + pids.addAll(getVector("pids")); + } } /* (non-Javadoc) @@ -58,7 +60,6 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol { if (pids == null) { pids = new ArrayList(); pids.addAll(getVector("pids")); - } if (pids.size() != 0) { @@ -82,6 +83,8 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol { Integer pid = recvMessage.getIntegerObj("pid"); if (pids.contains(pid)) pids.remove(pid); + else + return; logg("ACK von Prozess " + pid + " erhalten!"); diff --git a/sources/protocols/implementations/TwoPhaseCommitProtocol.java b/sources/protocols/implementations/TwoPhaseCommitProtocol.java index 05ac8cf..f86ff46 100644 --- a/sources/protocols/implementations/TwoPhaseCommitProtocol.java +++ b/sources/protocols/implementations/TwoPhaseCommitProtocol.java @@ -16,11 +16,14 @@ import core.VSMessage; public class TwoPhaseCommitProtocol extends VSAbstractProtocol { private static final long serialVersionUID = 1L; - /* Server variables, coordinator */ - private ArrayList pids; + /** PIDs of all processes which still have to vote */ + private ArrayList votePids; - /* Client variables */ - private boolean ackSent; + /** PIDs of all processes which have to acknowledge that they recv the global vote result */ + private ArrayList ackPids; + + /** The gloal vote result */ + private boolean voteResult; /** * Instantiates a one phase commit protocol. @@ -33,8 +36,12 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { vec.add(2); vec.add(3); + /* Server */ initVector("pids", vec, "PIDs beteilitger Prozesse"); initLong("timeout", 5000, "Zeit bis erneuerter Anfrage", "ms"); + + /* Client */ + initInteger("ackProb", 100, "Festschreibw'keit", 0, 100, "%"); } /* (non-Javadoc) @@ -47,26 +54,42 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientReset() */ protected void onClientReset() { - pids.clear(); - pids.addAll(getVector("pids")); + if (votePids != null) { + voteResult = true; + votePids.clear(); + votePids.addAll(getVector("pids")); + ackPids.clear(); + ackPids.addAll(getVector("pids")); + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientStart() */ protected void onClientStart() { - if (pids == null) { - pids = new ArrayList(); - pids.addAll(getVector("pids")); - + if (votePids == null) { + voteResult = true; + votePids = new ArrayList(); + votePids.addAll(getVector("pids")); + ackPids = new ArrayList(); + ackPids.addAll(getVector("pids")); } - if (pids.size() != 0) { + if (votePids.size() != 0) { long timeout = getLong("timeout") + process.getTime(); scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */ VSMessage message = new VSMessage(); - message.setBoolean("wantAck", true); + message.setBoolean("wantVote", true); + sendMessage(message); + + } else if (ackPids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */ + + VSMessage message = new VSMessage(); + message.setBoolean("isVoteResult", true); + message.setBoolean("voteResult", voteResult); sendMessage(message); } } @@ -75,18 +98,38 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) */ protected void onClientRecv(VSMessage recvMessage) { - if (pids.size() == 0) - return; - - if (recvMessage.getBoolean("isAck")) { + if (votePids.size() != 0 && recvMessage.getBoolean("isVote")) { Integer pid = recvMessage.getIntegerObj("pid"); - if (pids.contains(pid)) - pids.remove(pid); - - logg("ACK von Prozess " + pid + " erhalten!"); - - if (pids.size() == 0) - logg("ACKs von allen beteiligten Prozessen erhalten!"); + if (votePids.contains(pid)) + votePids.remove(pid); + else + return; + + boolean vote = recvMessage.getBoolean("vote"); + logg("Abstimmung von Prozess " + pid + " erhalten! Ergebnis: " + vote); + + if (!vote) + voteResult = false; + + if (votePids.size() == 0) { + logg("Abstimmungen von allen beteiligten Prozessen erhalten! Globales Ergebnis: " + voteResult); + /* Remove the active schedule which has been created in the onClientStart method */ + removeSchedules(); + /* Create a new schedule and send the vote result */ + onClientStart(); + } + } else if (ackPids.size() != 0 && recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + if (ackPids.contains(pid)) + ackPids.remove(pid); + else + return; + + if (ackPids.size() == 0) { + /* Remove the active schedule which has been created in the onClientStart method */ + removeSchedules(); + logg("Alle Teilnehmer haben die Abstimmung erhalten"); + } } } @@ -97,25 +140,44 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { onClientStart(); } + /* Client variables */ + private boolean voteSent; + private boolean myVote; + /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onServerReset() */ protected void onServerReset() { - ackSent = false; + voteSent = false; } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) */ protected void onServerRecv(VSMessage recvMessage) { - if (ackSent) - return; - - VSMessage message = new VSMessage(); - message.setBoolean("isAck", true); - message.setInteger("pid", process.getProcessID()); - sendMessage(message); - ackSent = true; + if (recvMessage.getBoolean("wantVote")) { + if (!voteSent) { + voteSent = true; + myVote = process.getRandomPercentage() <= getInteger("ackProb"); + } + + VSMessage message = new VSMessage(); + message.setBoolean("isVote", true); + message.setBoolean("vote", myVote); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + + logg("Abstimmung " + myVote + " versendet"); + + } else if (recvMessage.getBoolean("isVoteResult")) { + boolean voteResult = recvMessage.getBoolean("voteResult"); + logg("Globales Abstimmungsergebnis erhalten. Ergebnis: " + voteResult); + + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + } } /* (non-Javadoc) @@ -123,11 +185,4 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol { */ protected void onServerSchedule() { } - - /* (non-Javadoc) - * @see protocols.VSAbstractProtocol#toString() - */ - public String toString() { - return super.toString(); - } } -- cgit v1.2.3