summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2008-06-25 23:42:04 +0000
committerPaul Buetow <paul@buetow.org>2008-06-25 23:42:04 +0000
commit06c3a2b043c676247ebcfce333a825e3872fd471 (patch)
tree7a1dad8fa5f438c761d5c9c55d9a1f70bac41c2d
parent18729d5c4b1b8282e293466cf2a802337ba7e397 (diff)
reliable multicast works
-rw-r--r--sources/protocols/implementations/VSReliableMulticastProtocol.java80
1 files changed, 71 insertions, 9 deletions
diff --git a/sources/protocols/implementations/VSReliableMulticastProtocol.java b/sources/protocols/implementations/VSReliableMulticastProtocol.java
index 7e41d8a..d95d202 100644
--- a/sources/protocols/implementations/VSReliableMulticastProtocol.java
+++ b/sources/protocols/implementations/VSReliableMulticastProtocol.java
@@ -23,6 +23,9 @@
package protocols.implementations;
+import java.util.ArrayList;
+import java.util.Vector;
+
import protocols.VSAbstractProtocol;
import core.VSMessage;
@@ -34,46 +37,97 @@ import core.VSMessage;
*/
public class VSReliableMulticastProtocol extends VSAbstractProtocol {
/** The serial version uid */
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
/**
- * Instantiates a new reliable multicast protocol object.
+ * Instantiates a two phase commit protocol object.
*/
public VSReliableMulticastProtocol() {
super(VSAbstractProtocol.HAS_ON_CLIENT_START);
setClassname(getClass().toString());
}
+ /** PIDs of all processes which still have to send an ACK */
+ private ArrayList<Integer> pids;
+
/* (non-Javadoc)
* @see events.VSAbstractProtocol#onClientInit()
*/
public void onClientInit() {
+ Vector<Integer> vec = new Vector<Integer>();
+ vec.add(2);
+ vec.add(3);
+ vec.add(4);
+
+ initVector("pids", vec, "PIDs beteilitger Prozesse");
+ initLong("timeout", 2500, "Zeit bis erneuerter Anfrage", "ms");
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientReset()
*/
public void onClientReset() {
+ if (pids != null) {
+ pids.clear();
+ pids.addAll(getVector("pids"));
+ }
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientStart()
*/
public void onClientStart() {
+ if (pids == null) {
+ pids = new ArrayList<Integer>();
+ pids.addAll(getVector("pids"));
+ }
+
+ if (pids.size() != 0) {
+ long timeout = getLong("timeout") + process.getTime();
+ /* Will run onClientSchedule() at the specified local time */
+ scheduleAt(timeout);
+
+ VSMessage message = new VSMessage();
+ message.setBoolean("isMulticast", true);
+ sendMessage(message);
+ }
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage)
*/
public void onClientRecv(VSMessage recvMessage) {
+ if (pids.size() != 0 && recvMessage.getBoolean("isAck")) {
+ Integer pid = recvMessage.getIntegerObj("pid");
+
+ if (pids.contains(pid))
+ pids.remove(pid);
+ else
+ return;
+
+ logg("ACK von Prozess " + pid + " erhalten!");
+
+ if (pids.size() == 0) {
+ logg("ACKs von allen beteiligten Prozessen " +
+ "erhalten!");
+
+ /* Remove the active schedule which has been created in the
+ onClientStart method */
+ removeSchedules();
+ }
+ }
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onClientSchedule()
*/
public void onClientSchedule() {
+ onClientStart();
}
+ /** True if ACK has been sent already */
+ private boolean ackSent;
+
/* (non-Javadoc)
* @see events.VSAbstractProtocol#onServerInit()
*/
@@ -84,12 +138,27 @@ public class VSReliableMulticastProtocol extends VSAbstractProtocol {
* @see protocols.VSAbstractProtocol#onServerReset()
*/
public void onServerReset() {
+ ackSent = false;
}
/* (non-Javadoc)
* @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage)
*/
public void onServerRecv(VSMessage recvMessage) {
+ if (recvMessage.getBoolean("isMulticast")) {
+ VSMessage message = new VSMessage();
+ message.setBoolean("isAck", true);
+ message.setInteger("pid", process.getProcessID());
+ sendMessage(message);
+
+ if (ackSent) {
+ logg("ACK erneuert versendet");
+
+ } else {
+ logg("ACK versendet");
+ ackSent = true;
+ }
+ }
}
/* (non-Javadoc)
@@ -97,11 +166,4 @@ public class VSReliableMulticastProtocol extends VSAbstractProtocol {
*/
public void onServerSchedule() {
}
-
- /* (non-Javadoc)
- * @see protocols.VSAbstractProtocol#toString()
- */
- public String toString() {
- return super.toString();
- }
}