summaryrefslogtreecommitdiff
path: root/sources/protocols
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2008-05-26 22:14:52 +0000
committerPaul Buetow <paul@buetow.org>2008-05-26 22:14:52 +0000
commit923b0503c91aa4a9c0bc94489caddf9ac94c5ad8 (patch)
tree74bf4a51494085c9ecb107d3facc31bb585669b4 /sources/protocols
parentfaee8241ff7972ceeb622e0793c655f301ef0bd0 (diff)
Two phase commit protocol works.
Diffstat (limited to 'sources/protocols')
-rw-r--r--sources/protocols/VSAbstractProtocol.java51
-rw-r--r--sources/protocols/implementations/BerkelyTimeProtocol.java8
-rw-r--r--sources/protocols/implementations/OnePhaseCommitProtocol.java9
-rw-r--r--sources/protocols/implementations/TwoPhaseCommitProtocol.java133
4 files changed, 143 insertions, 58 deletions
diff --git a/sources/protocols/VSAbstractProtocol.java b/sources/protocols/VSAbstractProtocol.java
index 05864fb..7f459c5 100644
--- a/sources/protocols/VSAbstractProtocol.java
+++ b/sources/protocols/VSAbstractProtocol.java
@@ -4,6 +4,8 @@
*/
package protocols;
+import java.util.ArrayList;
+
import events.internal.*;
import events.*;
import core.*;
@@ -23,6 +25,12 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent {
/** The current protocol object's context is a server. */
private boolean currentContextIsServer;
+ /** The protocol's server schedules */
+ private ArrayList<VSTask> serverSchedules = new ArrayList<VSTask>();
+
+ /** The protocol's client schedules */
+ private ArrayList<VSTask> clientSchedules = new ArrayList<VSTask>();
+
/**
* Send a message.
*
@@ -139,31 +147,50 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent {
* Reset.
*/
public void reset() {
- if (isServer) {
- currentContextIsServer = true;
- isServer = false;
- onServerReset();
- }
-
- if (isClient) {
- currentContextIsServer = false;
- isClient = false;
- onClientReset();
- }
+ //if (isServer) {
+ currentContextIsServer = true;
+ isServer = false;
+ onServerReset();
+ serverSchedules.clear();
+ //}
+
+ //if (isClient) {
+ currentContextIsServer = false;
+ isClient = false;
+ onClientReset();
+ clientSchedules.clear();
+ //}
}
/**
* Reschedules the protocol for a new time and runs onClientSchedule or onServerSchedule
*
- * @param isClient the is client
+ * @param time The process' local time to run the schedule at.
*/
protected final void scheduleAt(long time) {
VSAbstractEvent scheduleEvent = new ProtocolScheduleEvent(this, currentContextIsServer);
VSTask scheduleTask = new VSTask(time, process, scheduleEvent, VSTask.LOCAL);
+ if (currentContextIsServer)
+ serverSchedules.add(scheduleTask);
+ else
+ clientSchedules.add(scheduleTask);
process.getSimulationCanvas().getTaskManager().addTask(scheduleTask);
}
/**
+ * Removes all schedules of the protocol (server or client)
+ */
+ protected final void removeSchedules() {
+ if (currentContextIsServer) {
+ process.getSimulationCanvas().getTaskManager().removeAllTasks(serverSchedules);
+ serverSchedules.clear();
+ } else {
+ process.getSimulationCanvas().getTaskManager().removeAllTasks(clientSchedules);
+ clientSchedules.clear();
+ }
+ }
+
+ /**
* On client start.
*/
abstract protected void onClientStart();
diff --git a/sources/protocols/implementations/BerkelyTimeProtocol.java b/sources/protocols/implementations/BerkelyTimeProtocol.java
index 411a9e8..890221a 100644
--- a/sources/protocols/implementations/BerkelyTimeProtocol.java
+++ b/sources/protocols/implementations/BerkelyTimeProtocol.java
@@ -44,7 +44,7 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol {
Vector<Integer> vec = new Vector<Integer>();
vec.add(2);
vec.add(3);
- initVector("processPIDs", vec, "PIDs beteiliger Prozesse");
+ initVector("pids", vec, "PIDs beteiliger Prozesse");
}
/* (non-Javadoc)
@@ -61,14 +61,14 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol {
recvTimes.clear();
realTimesRTT.clear();
peers.clear();
- peers.addAll(getVector("processPIDs"));
+ peers.addAll(getVector("pids"));
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientStart()
*/
protected void onClientStart() {
- peers.addAll(getVector("processPIDs"));
+ peers.addAll(getVector("pids"));
requestTime = process.getTime();
VSMessage message = new VSMessage();
message.setBoolean("isRequest", true);
@@ -130,7 +130,7 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol {
}
/* Include the time of the local process */
sum += process.getTime();
- return (long) sum / (getVector("processPIDs").size() + 1);
+ return (long) sum / (getVector("pids").size() + 1);
}
/**
diff --git a/sources/protocols/implementations/OnePhaseCommitProtocol.java b/sources/protocols/implementations/OnePhaseCommitProtocol.java
index a8d8d75..bc42c38 100644
--- a/sources/protocols/implementations/OnePhaseCommitProtocol.java
+++ b/sources/protocols/implementations/OnePhaseCommitProtocol.java
@@ -47,8 +47,10 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onClientReset()
*/
protected void onClientReset() {
- pids.clear();
- pids.addAll(getVector("pids"));
+ if (pids != null) {
+ pids.clear();
+ pids.addAll(getVector("pids"));
+ }
}
/* (non-Javadoc)
@@ -58,7 +60,6 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol {
if (pids == null) {
pids = new ArrayList<Integer>();
pids.addAll(getVector("pids"));
-
}
if (pids.size() != 0) {
@@ -82,6 +83,8 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol {
Integer pid = recvMessage.getIntegerObj("pid");
if (pids.contains(pid))
pids.remove(pid);
+ else
+ return;
logg("ACK von Prozess " + pid + " erhalten!");
diff --git a/sources/protocols/implementations/TwoPhaseCommitProtocol.java b/sources/protocols/implementations/TwoPhaseCommitProtocol.java
index 05ac8cf..f86ff46 100644
--- a/sources/protocols/implementations/TwoPhaseCommitProtocol.java
+++ b/sources/protocols/implementations/TwoPhaseCommitProtocol.java
@@ -16,11 +16,14 @@ import core.VSMessage;
public class TwoPhaseCommitProtocol extends VSAbstractProtocol {
private static final long serialVersionUID = 1L;
- /* Server variables, coordinator */
- private ArrayList<Integer> pids;
+ /** PIDs of all processes which still have to vote */
+ private ArrayList<Integer> votePids;
- /* Client variables */
- private boolean ackSent;
+ /** PIDs of all processes which have to acknowledge that they recv the global vote result */
+ private ArrayList<Integer> ackPids;
+
+ /** The gloal vote result */
+ private boolean voteResult;
/**
* Instantiates a one phase commit protocol.
@@ -33,8 +36,12 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol {
vec.add(2);
vec.add(3);
+ /* Server */
initVector("pids", vec, "PIDs beteilitger Prozesse");
initLong("timeout", 5000, "Zeit bis erneuerter Anfrage", "ms");
+
+ /* Client */
+ initInteger("ackProb", 100, "Festschreibw'keit", 0, 100, "%");
}
/* (non-Javadoc)
@@ -47,26 +54,42 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onClientReset()
*/
protected void onClientReset() {
- pids.clear();
- pids.addAll(getVector("pids"));
+ if (votePids != null) {
+ voteResult = true;
+ votePids.clear();
+ votePids.addAll(getVector("pids"));
+ ackPids.clear();
+ ackPids.addAll(getVector("pids"));
+ }
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientStart()
*/
protected void onClientStart() {
- if (pids == null) {
- pids = new ArrayList<Integer>();
- pids.addAll(getVector("pids"));
-
+ if (votePids == null) {
+ voteResult = true;
+ votePids = new ArrayList<Integer>();
+ votePids.addAll(getVector("pids"));
+ ackPids = new ArrayList<Integer>();
+ ackPids.addAll(getVector("pids"));
}
- if (pids.size() != 0) {
+ if (votePids.size() != 0) {
long timeout = getLong("timeout") + process.getTime();
scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */
VSMessage message = new VSMessage();
- message.setBoolean("wantAck", true);
+ message.setBoolean("wantVote", true);
+ sendMessage(message);
+
+ } else if (ackPids.size() != 0) {
+ long timeout = getLong("timeout") + process.getTime();
+ scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */
+
+ VSMessage message = new VSMessage();
+ message.setBoolean("isVoteResult", true);
+ message.setBoolean("voteResult", voteResult);
sendMessage(message);
}
}
@@ -75,18 +98,38 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage)
*/
protected void onClientRecv(VSMessage recvMessage) {
- if (pids.size() == 0)
- return;
-
- if (recvMessage.getBoolean("isAck")) {
+ if (votePids.size() != 0 && recvMessage.getBoolean("isVote")) {
Integer pid = recvMessage.getIntegerObj("pid");
- if (pids.contains(pid))
- pids.remove(pid);
-
- logg("ACK von Prozess " + pid + " erhalten!");
-
- if (pids.size() == 0)
- logg("ACKs von allen beteiligten Prozessen erhalten!");
+ if (votePids.contains(pid))
+ votePids.remove(pid);
+ else
+ return;
+
+ boolean vote = recvMessage.getBoolean("vote");
+ logg("Abstimmung von Prozess " + pid + " erhalten! Ergebnis: " + vote);
+
+ if (!vote)
+ voteResult = false;
+
+ if (votePids.size() == 0) {
+ logg("Abstimmungen von allen beteiligten Prozessen erhalten! Globales Ergebnis: " + voteResult);
+ /* Remove the active schedule which has been created in the onClientStart method */
+ removeSchedules();
+ /* Create a new schedule and send the vote result */
+ onClientStart();
+ }
+ } else if (ackPids.size() != 0 && recvMessage.getBoolean("isAck")) {
+ Integer pid = recvMessage.getIntegerObj("pid");
+ if (ackPids.contains(pid))
+ ackPids.remove(pid);
+ else
+ return;
+
+ if (ackPids.size() == 0) {
+ /* Remove the active schedule which has been created in the onClientStart method */
+ removeSchedules();
+ logg("Alle Teilnehmer haben die Abstimmung erhalten");
+ }
}
}
@@ -97,25 +140,44 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol {
onClientStart();
}
+ /* Client variables */
+ private boolean voteSent;
+ private boolean myVote;
+
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onServerReset()
*/
protected void onServerReset() {
- ackSent = false;
+ voteSent = false;
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage)
*/
protected void onServerRecv(VSMessage recvMessage) {
- if (ackSent)
- return;
-
- VSMessage message = new VSMessage();
- message.setBoolean("isAck", true);
- message.setInteger("pid", process.getProcessID());
- sendMessage(message);
- ackSent = true;
+ if (recvMessage.getBoolean("wantVote")) {
+ if (!voteSent) {
+ voteSent = true;
+ myVote = process.getRandomPercentage() <= getInteger("ackProb");
+ }
+
+ VSMessage message = new VSMessage();
+ message.setBoolean("isVote", true);
+ message.setBoolean("vote", myVote);
+ message.setInteger("pid", process.getProcessID());
+ sendMessage(message);
+
+ logg("Abstimmung " + myVote + " versendet");
+
+ } else if (recvMessage.getBoolean("isVoteResult")) {
+ boolean voteResult = recvMessage.getBoolean("voteResult");
+ logg("Globales Abstimmungsergebnis erhalten. Ergebnis: " + voteResult);
+
+ VSMessage message = new VSMessage();
+ message.setBoolean("isAck", true);
+ message.setInteger("pid", process.getProcessID());
+ sendMessage(message);
+ }
}
/* (non-Javadoc)
@@ -123,11 +185,4 @@ public class TwoPhaseCommitProtocol extends VSAbstractProtocol {
*/
protected void onServerSchedule() {
}
-
- /* (non-Javadoc)
- * @see protocols.VSAbstractProtocol#toString()
- */
- public String toString() {
- return super.toString();
- }
}