diff options
Diffstat (limited to 'sources/protocols/implementations/VSReliableMulticastProtocol.java')
| -rw-r--r-- | sources/protocols/implementations/VSReliableMulticastProtocol.java | 80 |
1 files changed, 71 insertions, 9 deletions
diff --git a/sources/protocols/implementations/VSReliableMulticastProtocol.java b/sources/protocols/implementations/VSReliableMulticastProtocol.java index 7e41d8a..d95d202 100644 --- a/sources/protocols/implementations/VSReliableMulticastProtocol.java +++ b/sources/protocols/implementations/VSReliableMulticastProtocol.java @@ -23,6 +23,9 @@ package protocols.implementations; +import java.util.ArrayList; +import java.util.Vector; + import protocols.VSAbstractProtocol; import core.VSMessage; @@ -34,46 +37,97 @@ import core.VSMessage; */ public class VSReliableMulticastProtocol extends VSAbstractProtocol { /** The serial version uid */ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; /** - * Instantiates a new reliable multicast protocol object. + * Instantiates a two phase commit protocol object. */ public VSReliableMulticastProtocol() { super(VSAbstractProtocol.HAS_ON_CLIENT_START); setClassname(getClass().toString()); } + /** PIDs of all processes which still have to send an ACK */ + private ArrayList<Integer> pids; + /* (non-Javadoc) * @see events.VSAbstractProtocol#onClientInit() */ public void onClientInit() { + Vector<Integer> vec = new Vector<Integer>(); + vec.add(2); + vec.add(3); + vec.add(4); + + initVector("pids", vec, "PIDs beteilitger Prozesse"); + initLong("timeout", 2500, "Zeit bis erneuerter Anfrage", "ms"); } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientReset() */ public void onClientReset() { + if (pids != null) { + pids.clear(); + pids.addAll(getVector("pids")); + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientStart() */ public void onClientStart() { + if (pids == null) { + pids = new ArrayList<Integer>(); + pids.addAll(getVector("pids")); + } + + if (pids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + /* Will run onClientSchedule() at the specified local time */ + scheduleAt(timeout); + + VSMessage message = new VSMessage(); + message.setBoolean("isMulticast", true); + sendMessage(message); + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) */ public void onClientRecv(VSMessage recvMessage) { + if (pids.size() != 0 && recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + + if (pids.contains(pid)) + pids.remove(pid); + else + return; + + logg("ACK von Prozess " + pid + " erhalten!"); + + if (pids.size() == 0) { + logg("ACKs von allen beteiligten Prozessen " + + "erhalten!"); + + /* Remove the active schedule which has been created in the + onClientStart method */ + removeSchedules(); + } + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientSchedule() */ public void onClientSchedule() { + onClientStart(); } + /** True if ACK has been sent already */ + private boolean ackSent; + /* (non-Javadoc) * @see events.VSAbstractProtocol#onServerInit() */ @@ -84,12 +138,27 @@ public class VSReliableMulticastProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onServerReset() */ public void onServerReset() { + ackSent = false; } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) */ public void onServerRecv(VSMessage recvMessage) { + if (recvMessage.getBoolean("isMulticast")) { + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + + if (ackSent) { + logg("ACK erneuert versendet"); + + } else { + logg("ACK versendet"); + ackSent = true; + } + } } /* (non-Javadoc) @@ -97,11 +166,4 @@ public class VSReliableMulticastProtocol extends VSAbstractProtocol { */ public void onServerSchedule() { } - - /* (non-Javadoc) - * @see protocols.VSAbstractProtocol#toString() - */ - public String toString() { - return super.toString(); - } } |
