summaryrefslogtreecommitdiff
path: root/sources/protocols
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2008-05-26 20:25:30 +0000
committerPaul Buetow <paul@buetow.org>2008-05-26 20:25:30 +0000
commit230ddbf8753db60f1baa517e4d9a4d08885fac04 (patch)
treedd2f9f4016ce8162768552cfb52d15a31372a4f6 /sources/protocols
parent065bbc74eda368148851f0a8e8d9e4a016e48e28 (diff)
1 phase comit protocol works
Diffstat (limited to 'sources/protocols')
-rw-r--r--sources/protocols/VSAbstractProtocol.java14
-rw-r--r--sources/protocols/implementations/BerkelyTimeProtocol.java2
-rw-r--r--sources/protocols/implementations/OnePhaseCommitProtocol.java47
3 files changed, 55 insertions, 8 deletions
diff --git a/sources/protocols/VSAbstractProtocol.java b/sources/protocols/VSAbstractProtocol.java
index 2c84d96..05864fb 100644
--- a/sources/protocols/VSAbstractProtocol.java
+++ b/sources/protocols/VSAbstractProtocol.java
@@ -4,6 +4,7 @@
*/
package protocols;
+import events.internal.*;
import events.*;
import core.*;
@@ -83,7 +84,7 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent {
*
* @param message the message
*/
- public final void onMessageRecv(VSMessage message) {
+ public final void onMessageRecvStart(VSMessage message) {
if (isIncorrectProtocol(message))
return;
@@ -152,6 +153,17 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent {
}
/**
+ * Reschedules the protocol for a new time and runs onClientSchedule or onServerSchedule
+ *
+ * @param isClient the is client
+ */
+ protected final void scheduleAt(long time) {
+ VSAbstractEvent scheduleEvent = new ProtocolScheduleEvent(this, currentContextIsServer);
+ VSTask scheduleTask = new VSTask(time, process, scheduleEvent, VSTask.LOCAL);
+ process.getSimulationCanvas().getTaskManager().addTask(scheduleTask);
+ }
+
+ /**
* On client start.
*/
abstract protected void onClientStart();
diff --git a/sources/protocols/implementations/BerkelyTimeProtocol.java b/sources/protocols/implementations/BerkelyTimeProtocol.java
index 635b1cd..411a9e8 100644
--- a/sources/protocols/implementations/BerkelyTimeProtocol.java
+++ b/sources/protocols/implementations/BerkelyTimeProtocol.java
@@ -83,7 +83,7 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol {
if (!recvMessage.getBoolean("isResponse"))
return;
- Integer processID = new Integer(recvMessage.getInteger("processID"));
+ Integer processID = recvMessage.getIntegerObj("processID");
if (peers.contains(processID))
peers.remove(processID);
diff --git a/sources/protocols/implementations/OnePhaseCommitProtocol.java b/sources/protocols/implementations/OnePhaseCommitProtocol.java
index c8dd7f2..3e4ae52 100644
--- a/sources/protocols/implementations/OnePhaseCommitProtocol.java
+++ b/sources/protocols/implementations/OnePhaseCommitProtocol.java
@@ -4,6 +4,7 @@
*/
package protocols.implementations;
+import java.util.ArrayList;
import java.util.Vector;
import protocols.VSAbstractProtocol;
@@ -15,7 +16,8 @@ import core.VSMessage;
public class OnePhaseCommitProtocol extends VSAbstractProtocol {
private static final long serialVersionUID = 1L;
- /* Client variables, coordinator */
+ /* Server variables, coordinator */
+ private ArrayList<Integer> pids;
/* Client variables */
private boolean ackSent;
@@ -30,9 +32,9 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol {
Vector<Integer> vec = new Vector<Integer>();
vec.add(2);
vec.add(3);
- vec.add(4);
initVector("pids", vec, "PIDs beteilitger Prozesse");
+ initLong("timeout", 5000, "Zeit bis erneuerter Anfrage", "ms");
}
/* (non-Javadoc)
@@ -45,29 +47,54 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onClientReset()
*/
protected void onClientReset() {
+ pids.clear();
+ pids.addAll(getVector("pids"));
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientStart()
*/
protected void onClientStart() {
- int numProcesses = getInteger("numProcesses");
- VSMessage message = new VSMessage();
- message.setBoolean("wantAck", true);
- sendMessage(message);
+ if (pids == null) {
+ pids = new ArrayList<Integer>();
+ pids.addAll(getVector("pids"));
+
+ }
+
+ if (pids.size() != 0) {
+ long timeout = getLong("timeout") + process.getTime();
+ scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */
+
+ VSMessage message = new VSMessage();
+ message.setBoolean("wantAck", true);
+ sendMessage(message);
+ }
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage)
*/
protected void onClientRecv(VSMessage recvMessage) {
+ if (pids.size() == 0)
+ return;
+ if (recvMessage.getBoolean("isAck")) {
+ Integer pid = recvMessage.getIntegerObj("pid");
+ if (pids.contains(pid))
+ pids.remove(pid);
+
+ logg("ACK von Prozess " + pid + " erhalten!");
+
+ if (pids.size() == 0)
+ logg("ACKs von allen beteiligten Prozessen erhalten!");
+ }
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientSchedule()
*/
protected void onClientSchedule() {
+ onClientStart();
}
/* (non-Javadoc)
@@ -81,6 +108,14 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage)
*/
protected void onServerRecv(VSMessage recvMessage) {
+ if (ackSent)
+ return;
+
+ VSMessage message = new VSMessage();
+ message.setBoolean("isAck", true);
+ message.setInteger("pid", process.getProcessID());
+ sendMessage(message);
+ ackSent = true;
}
/* (non-Javadoc)