diff options
Diffstat (limited to 'src/main/java/protocols/implementations')
11 files changed, 1341 insertions, 0 deletions
diff --git a/src/main/java/protocols/implementations/README b/src/main/java/protocols/implementations/README new file mode 100644 index 0000000..fd8effa --- /dev/null +++ b/src/main/java/protocols/implementations/README @@ -0,0 +1,10 @@ +How to add a new protocol: + +1. Copy the file VSDummyProtocol.java into VSYourOwnNiceProtocol.java + +2. Edit VSYourOwnNiceProtocol.java and replace the classname! + +3. Edit the initialize method of events.RegisteredEvents.java and add the classname +of your protocol. E.g.: "protocols.implementations.VSYourOwnNiceProtocol" + + diff --git a/src/main/java/protocols/implementations/VSBasicMulticastProtocol.java b/src/main/java/protocols/implementations/VSBasicMulticastProtocol.java new file mode 100644 index 0000000..cfd5399 --- /dev/null +++ b/src/main/java/protocols/implementations/VSBasicMulticastProtocol.java @@ -0,0 +1,86 @@ +package protocols.implementations; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSBasicMulticastProtocol, an implementation of the basic multicast + * protocol. + * + * @author Paul C. Buetow + */ +public class VSBasicMulticastProtocol extends VSAbstractProtocol { + /** + * Instantiates a new VSBasicMulticast object. + */ + public VSBasicMulticastProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + VSMessage message = new VSMessage(); + message.setBoolean("isMulticast", true); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (recvMessage.getBoolean("isMulticast")) + log("Multicast received"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/protocols/implementations/VSBerkelyTimeProtocol.java b/src/main/java/protocols/implementations/VSBerkelyTimeProtocol.java new file mode 100644 index 0000000..05d0eae --- /dev/null +++ b/src/main/java/protocols/implementations/VSBerkelyTimeProtocol.java @@ -0,0 +1,204 @@ +package protocols.implementations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Vector; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSBerkelyTimeProtocol, an implementation of the berkely time + * protocol. + * + * @author Paul C. Buetow + */ +public class VSBerkelyTimeProtocol extends VSAbstractProtocol { + /** + * Instantiates a new berkely time protocol. + */ + public VSBerkelyTimeProtocol() { + super(VSAbstractProtocol.HAS_ON_SERVER_START); + setClassname(getClass().toString()); + } + + /** Integer: Process ID, Long: Local time of the process */ + private HashMap<Integer,Long> processTimes = new HashMap<Integer,Long>(); + + /** Integer: Process ID, Long: Time of receiving the response from the + * process + */ + private HashMap<Integer,Long> recvTimes = new HashMap<Integer,Long>(); + + /** Integer: Process ID, Long: Calculated process times (using the RTT) */ + private HashMap<Integer,Long> realTimesRTT = new HashMap<Integer,Long>(); + + /** Contains all process IDs of processes which want to justify their + * time + */ + private ArrayList<Integer> peers = new ArrayList<Integer>(); + + /** Time the request/response has started */ + private long requestTime; + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + Vector<Integer> vec = new Vector<Integer>(); + vec.add(1); + vec.add(3); + initVector("pids", vec, "PIDs of participating processes"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + //System.out.println("FOOBAR"); + processTimes.clear(); + recvTimes.clear(); + realTimesRTT.clear(); + peers.clear(); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerStart() + */ + public void onServerStart() { + //System.out.println("FOO"); + peers.addAll(getVector("pids")); + requestTime = process.getTime(); + VSMessage message = new VSMessage(); + message.setBoolean("isRequest", true); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + /* Ignore all protocol messages which are not a response message, + e.g. itself */ + if (!recvMessage.getBoolean("isResponse")) + return; + + Integer processID = recvMessage.getIntegerObj("processID"); + + if (peers.contains(processID)) + peers.remove(processID); + else + return; /* Process has been handled already or is not listed */ + + Long time = Long.valueOf(recvMessage.getLong("time")); + + processTimes.put(processID, time); + recvTimes.put(processID, Long.valueOf(process.getTime())); + + /* All peers have told their times */ + if (peers.size() == 0) { + long avgTime = calculateAverageTime(); + /* Set the local's process time to the new avg reference time */ + process.setTime(avgTime); + /* Tell all other processes what to do in order to justify their + times */ + sendJustifyRequests(avgTime); + /* Start "clean" next time */ + onServerReset(); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /** + * Calculate the new average time. + * + * @return the long + */ + private long calculateAverageTime() { + long sum = 0; + for (Integer processID : processTimes.keySet()) { + Long localTime = processTimes.get(processID); + Long recvTime = recvTimes.get(processID); + long rtt = recvTime.longValue() - requestTime; + long realProcessTime = localTime + (long) (rtt / 2); + realTimesRTT.put(processID, Long.valueOf(realProcessTime)); + sum += realProcessTime; + } + /* Include the time of the local process */ + sum += process.getTime(); + return (long) sum / (getVector("pids").size() + 1); + } + + /** + * Sends to all clients a value to justify their local clocks. + * + * @param avgTime the avg time + */ + private void sendJustifyRequests(long avgTime) { + for (Integer processID : processTimes.keySet()) { + long realProcessTime = realTimesRTT.get(processID).longValue(); + long diff = avgTime - realProcessTime; + VSMessage message = new VSMessage(); + message.setBoolean("isJustify", true); + message.setLong("timeDiff", diff); + message.setInteger("receiverProcessID", processID); + sendMessage(message); + } + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + if (recvMessage.getBoolean("isRequest")) { + VSMessage message = new VSMessage(); + message.setInteger("processID", process.getProcessID()); + message.setLong("time", process.getTime()); + message.setBoolean("isResponse", true); + sendMessage(message); + + } else if (recvMessage.getBoolean("isJustify")) { + /* Check if it's "my" justify message */ + if (recvMessage.getInteger("receiverProcessID") != + process.getProcessID()) + return; + + long timeDiff = recvMessage.getLong("timeDiff"); + //long recvTime = process.getTime(); + long newTime = process.getTime() + timeDiff; + log("New time: " + newTime); + + process.setTime(newTime); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/protocols/implementations/VSBroadcastProtocol.java b/src/main/java/protocols/implementations/VSBroadcastProtocol.java new file mode 100644 index 0000000..54fd3b5 --- /dev/null +++ b/src/main/java/protocols/implementations/VSBroadcastProtocol.java @@ -0,0 +1,103 @@ +package protocols.implementations; + +import java.util.ArrayList; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSBroadcastProtocol, an implementation of the broadcast + * sturm protocol. + * + * @author Paul C. Buetow + */ +public class VSBroadcastProtocol extends VSAbstractProtocol { + /** The sent messages. */ + private ArrayList<Integer> sentMessages; + + /** The broadcast count. */ + private static int broadcastCount; + + /** + * Instantiates a new broadcast sturm protocol. + */ + public VSBroadcastProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + sentMessages = new ArrayList<Integer>(); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + VSMessage message = new VSMessage(); + message.setInteger("Broadcast", broadcastCount++); + sentMessages.add(message.getIntegerObj("Broadcast")); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + onServerRecv(recvMessage); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + sentMessages.clear(); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (!sentMessages.contains(recvMessage.getIntegerObj("Broadcast"))) { + VSMessage message = new VSMessage(); + message.setInteger("Broadcast", + recvMessage.getInteger("Broadcast")); + sentMessages.add(message.getIntegerObj("Broadcast")); + sendMessage(message); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/protocols/implementations/VSDummyProtocol.java b/src/main/java/protocols/implementations/VSDummyProtocol.java new file mode 100644 index 0000000..e9e6837 --- /dev/null +++ b/src/main/java/protocols/implementations/VSDummyProtocol.java @@ -0,0 +1,100 @@ +package protocols.implementations; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSDummyProtocol, can be used as a template in order to create + * own protocols. + * + * @author Paul C. Buetow + */ +public class VSDummyProtocol extends VSAbstractProtocol { + /** + * Instantiates a new dummy protocol object. + */ + public VSDummyProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + log("onClientReset()"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + log("onClientStart()"); + + VSMessage message = new VSMessage(); + message.setString("Greeting", "Hello World!"); + message.setInteger("A number", 1); + message.setBoolean("A boolean", true); + message.setFloat("A float", 1.2f); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + log("onClientRecv("+recvMessage+")"); + + /* + String s = recvMessage.getString("Greeting"); + int n = recvMessage.getInteger("A number"); + boolean b = recvMessage.getBoolean("A boolean"); + float f = recvMessage.getFloat("A float"); + */ + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + log("onClientReset()"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + log("onServerRecv("+recvMessage+")"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString() + "; Dummy Test"; + } +} diff --git a/src/main/java/protocols/implementations/VSExternalTimeSyncProtocol.java b/src/main/java/protocols/implementations/VSExternalTimeSyncProtocol.java new file mode 100644 index 0000000..f044861 --- /dev/null +++ b/src/main/java/protocols/implementations/VSExternalTimeSyncProtocol.java @@ -0,0 +1,118 @@ +package protocols.implementations; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSExternalTimeSyncProtocol, an implementation of the external + * time synchronisation protocol. + * + * @author Paul C. Buetow + */ +public class VSExternalTimeSyncProtocol extends VSAbstractProtocol { + /** The request time. */ + private long requestTime; + + /** The server is waiting for response, if true. */ + private boolean waitingForResponse; + + /** + * Instantiates a new external time sync protocol object. + */ + public VSExternalTimeSyncProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + requestTime = process.getTime(); + waitingForResponse = true; + + /* Multicast message to all processes */ + VSMessage message = new VSMessage(); + message.setBoolean("isClientRequest", true); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + if (!recvMessage.getBoolean("isServerResponse")) + return; + + if (waitingForResponse) + waitingForResponse = false; + else + return; + + long recvTime = process.getTime(); + long roundTripTime = recvTime - requestTime; + long serverTime = recvMessage.getLong("time"); + long newTime = serverTime + (long) (roundTripTime / 2); + + log("Server time: " + serverTime + "; RTT: " + roundTripTime + "; Old time: " + recvTime + "; New time: " + newTime + "; Offset: " + (newTime - recvTime)); + + process.setTime(newTime); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (!recvMessage.getBoolean("isClientRequest")) + return; + + /* Multicast message to all processes */ + VSMessage message = new VSMessage(); + message.setLong("time", process.getTime()); + message.setBoolean("isServerResponse", true); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/protocols/implementations/VSInternalTimeSyncProtocol.java b/src/main/java/protocols/implementations/VSInternalTimeSyncProtocol.java new file mode 100644 index 0000000..e3bd181 --- /dev/null +++ b/src/main/java/protocols/implementations/VSInternalTimeSyncProtocol.java @@ -0,0 +1,123 @@ +package protocols.implementations; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSInternalTimeSyncProtocol, an implementation of the internal + * time synchronisation protocol. + * + * @author Paul C. Buetow + */ +public class VSInternalTimeSyncProtocol extends VSAbstractProtocol { + /** The waiting for response. */ + private boolean waitingForResponse; + + /** + * Instantiates a new internal time sync protocol. + */ + public VSInternalTimeSyncProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + initLong("t_min", 2000, "Max. transmission time", "ms"); + initLong("t_max", 500, "Min. transmission time", "ms"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + waitingForResponse = true; + + /* Multicast message to all processes */ + VSMessage message = new VSMessage(); + message.setBoolean("isClientRequest", true); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + /* Ignore all protocol messages which are not a response message, + e.g. itself */ + if (!recvMessage.getBoolean("isServerResponse")) + return; + + if (waitingForResponse) + waitingForResponse = false; + else + return; + + long tMax = getLong("t_max"); + long tMin = getLong("t_min"); + long serverTime = recvMessage.getLong("time"); + long newTime = serverTime + (long) ((tMax + tMin) / 2 ); + + log("Server time: " + serverTime + "; (t_min,t_max): (" + tMin + + "," + tMax + "); Old time: " + process.getTime() + + "; New time: " + newTime + "; Offset: " + + (process.getTime() - newTime)); + + process.setTime(newTime); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + /* Ignore all protocol messages which are not a request message, + e.g. itself */ + if (!recvMessage.getBoolean("isClientRequest")) + return; + + /* Multicast message to all processes */ + VSMessage message = new VSMessage(); + message.setLong("time", process.getTime()); + message.setBoolean("isServerResponse", true); + sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/protocols/implementations/VSOnePhaseCommitProtocol.java b/src/main/java/protocols/implementations/VSOnePhaseCommitProtocol.java new file mode 100644 index 0000000..4dcfe7e --- /dev/null +++ b/src/main/java/protocols/implementations/VSOnePhaseCommitProtocol.java @@ -0,0 +1,148 @@ +package protocols.implementations; + +import java.util.ArrayList; +import java.util.Vector; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSOnePhaseCommitProtocol, an implementation of the one phase + * commit protocol. + * + * @author Paul C. Buetow + */ +public class VSOnePhaseCommitProtocol extends VSAbstractProtocol { + /* Server variables */ + private boolean ackSent; + + /** + * Instantiates a one phase commit protocol. + */ + public VSOnePhaseCommitProtocol() { + super(VSAbstractProtocol.HAS_ON_SERVER_START); + setClassname(getClass().toString()); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + /* Can be changed via GUI variables editor of each process */ + Vector<Integer> vec = new Vector<Integer>(); + vec.add(1); + vec.add(3); + + // TODO: Translate + initVector("pids", vec, "PIDs beteiligter Prozesse"); + initLong("timeout", 2500, "Zeit bis erneute Anfrage", "ms"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + if (pids != null) { + pids.clear(); + pids.addAll(getVector("pids")); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerStart() + */ + public void onServerStart() { + if (pids == null) { + pids = new ArrayList<Integer>(); + pids.addAll(getVector("pids")); + } + + if (pids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + /* Will run onServerSchedule() at the specified local time */ + scheduleAt(timeout); + + VSMessage message = new VSMessage(); + message.setBoolean("wantAck", true); + sendMessage(message); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (pids.size() == 0) + return; + + if (recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + if (pids.contains(pid)) + pids.remove(pid); + else + return; + + log("ACK from process " + pid + " received!"); + + if (pids.size() == 0) { + log("ACKs received from all participating processes! " + + "Committed!"); + + /* Remove the active schedule which has been created in the + onServerStart method */ + removeSchedules(); + } + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + onServerStart(); + } + + /* Client variables, coordinator */ + private ArrayList<Integer> pids; + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + ackSent = false; + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + if (ackSent) + return; + + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + ackSent = true; + log("Committed"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/protocols/implementations/VSPingPongProtocol.java b/src/main/java/protocols/implementations/VSPingPongProtocol.java new file mode 100644 index 0000000..b1f3d20 --- /dev/null +++ b/src/main/java/protocols/implementations/VSPingPongProtocol.java @@ -0,0 +1,110 @@ +package protocols.implementations; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSPingPongProtocol, an implementation of the ping pong protocol. + * + * @author Paul C. Buetow + */ +public class VSPingPongProtocol extends VSAbstractProtocol { + /** The client counter. */ + private int clientCounter; + + /** The server counter. */ + private int serverCounter; + + /** + * Instantiates a new ping pong protocol. + */ + public VSPingPongProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + clientCounter = 0; + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + VSMessage message = new VSMessage(); + message.setBoolean("fromClient", true); + message.setInteger("counter", ++clientCounter); + super.sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + if (!recvMessage.getBoolean("fromServer")) + return; + + super.log("message: " + recvMessage.getInteger("counter")); + + VSMessage message = new VSMessage(); + message.setBoolean("fromClient", true); + message.setInteger("counter", ++clientCounter); + super.sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + serverCounter = 0; + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (!recvMessage.getBoolean("fromClient")) + return; + + super.log("message: " + recvMessage.getInteger("counter")); + + VSMessage message = new VSMessage(); + message.setBoolean("fromServer", true); + message.setInteger("counter", ++serverCounter); + super.sendMessage(message); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#toString() + */ + public String toString() { + return super.toString() + "; New message afterwards"; + } +} diff --git a/src/main/java/protocols/implementations/VSReliableMulticastProtocol.java b/src/main/java/protocols/implementations/VSReliableMulticastProtocol.java new file mode 100644 index 0000000..170533a --- /dev/null +++ b/src/main/java/protocols/implementations/VSReliableMulticastProtocol.java @@ -0,0 +1,143 @@ +package protocols.implementations; + +import java.util.ArrayList; +import java.util.Vector; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSReliableMulticastProtocol, an implementation of the reliable + * multicast protocol. + * + * @author Paul C. Buetow + */ +public class VSReliableMulticastProtocol extends VSAbstractProtocol { + /** + * Instantiates a two phase commit protocol object. + */ + public VSReliableMulticastProtocol() { + super(VSAbstractProtocol.HAS_ON_CLIENT_START); + setClassname(getClass().toString()); + } + + /** PIDs of all processes which still have to send an ACK */ + private ArrayList<Integer> pids; + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + Vector<Integer> vec = new Vector<Integer>(); + vec.add(1); + vec.add(3); + + super.initVector("pids", vec, "PIDs beteiligter Prozesse"); + super.initLong("timeout", 2500, "Zeit bis erneute Anfrage", "ms"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + if (pids != null) { + pids.clear(); + pids.addAll(getVector("pids")); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientStart() + */ + public void onClientStart() { + if (pids == null) { + pids = new ArrayList<Integer>(); + pids.addAll(getVector("pids")); + } + + if (pids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + /* Will run onClientSchedule() at the specified local time */ + scheduleAt(timeout); + + VSMessage message = new VSMessage(); + message.setBoolean("isMulticast", true); + sendMessage(message); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + if (pids.size() != 0 && recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + + if (pids.contains(pid)) + pids.remove(pid); + else + return; + + log("ACK from process " + pid + " received!"); + + + if (pids.size() == 0) { + log("ACKs from all involved processes received!"); + + /* Remove the active schedule which has been created in the + onClientStart method */ + removeSchedules(); + } + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + onClientStart(); + } + + /** True if ACK has been sent already */ + private boolean ackSent; + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + ackSent = false; + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (recvMessage.getBoolean("isMulticast")) { + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + + if (ackSent) { + log("ACK sent again"); + + } else { + log("ACK sent"); + + ackSent = true; + } + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + } +} diff --git a/src/main/java/protocols/implementations/VSTwoPhaseCommitProtocol.java b/src/main/java/protocols/implementations/VSTwoPhaseCommitProtocol.java new file mode 100644 index 0000000..8f9e4a3 --- /dev/null +++ b/src/main/java/protocols/implementations/VSTwoPhaseCommitProtocol.java @@ -0,0 +1,196 @@ +package protocols.implementations; + +import java.util.ArrayList; +import java.util.Vector; + +import core.VSMessage; +import protocols.VSAbstractProtocol; + +/** + * The class VSTwoPhaseCommitProtocol, an implementation of the two phase + * commit protocol. + * + * @author Paul C. Buetow + */ +public class VSTwoPhaseCommitProtocol extends VSAbstractProtocol { + /** + * Instantiates a two phase commit protocol object. + */ + public VSTwoPhaseCommitProtocol() { + super(VSAbstractProtocol.HAS_ON_SERVER_START); + setClassname(getClass().toString()); + } + + /** PIDs of all processes which still have to vote */ + private ArrayList<Integer> votePids; + + /** PIDs of all processes which have to acknowledge that they recv the + * global vote result + */ + private ArrayList<Integer> ackPids; + + /** The gloal vote result */ + private boolean voteResult; + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onServerInit() + */ + public void onServerInit() { + Vector<Integer> vec = new Vector<Integer>(); + vec.add(2); + vec.add(3); + + initVector("pids", vec, "PIDs beteiligter Prozesse"); + initLong("timeout", 2500, "Zeit bis erneute Anfrage", "ms"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerReset() + */ + public void onServerReset() { + if (votePids != null) { + voteResult = true; + votePids.clear(); + votePids.addAll(getVector("pids")); + ackPids.clear(); + ackPids.addAll(getVector("pids")); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerStart() + */ + public void onServerStart() { + if (votePids == null) { + voteResult = true; + votePids = new ArrayList<Integer>(); + votePids.addAll(getVector("pids")); + ackPids = new ArrayList<Integer>(); + ackPids.addAll(getVector("pids")); + } + + if (votePids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + /* Will run onServerSchedule() at the specified local time */ + scheduleAt(timeout); + + VSMessage message = new VSMessage(); + message.setBoolean("wantVote", true); + sendMessage(message); + + } else if (ackPids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + /* Will run onServerSchedule() at the specified local time */ + scheduleAt(timeout); + + VSMessage message = new VSMessage(); + message.setBoolean("isVoteResult", true); + message.setBoolean("voteResult", voteResult); + sendMessage(message); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) + */ + public void onServerRecv(VSMessage recvMessage) { + if (votePids.size() != 0 && recvMessage.getBoolean("isVote")) { + Integer pid = recvMessage.getIntegerObj("pid"); + if (votePids.contains(pid)) + votePids.remove(pid); + else + return; + + boolean vote = recvMessage.getBoolean("vote"); + log("Vote from process " + pid + " received! Result: " + vote); + + if (!vote) + voteResult = false; + + if (votePids.size() == 0) { + log("Votes from all involved processes received! Global result: " + voteResult); + + /* Remove the active schedule which has been created in the + onServerStart method */ + removeSchedules(); + /* Create a new schedule and send the vote result */ + onServerStart(); + } + + } else if (ackPids.size() != 0 && recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + if (ackPids.contains(pid)) + ackPids.remove(pid); + else + return; + + if (ackPids.size() == 0) { + /* Remove the active schedule which has been created in the + onServerStart method */ + removeSchedules(); + log("All participants have received the vote"); + } + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onServerSchedule() + */ + public void onServerSchedule() { + onServerStart(); + } + + /* Server variables */ + private boolean voteSent; + private boolean myVote; + + /* (non-Javadoc) + * @see events.VSAbstractProtocol#onClientInit() + */ + public void onClientInit() { + initInteger("ackProb", 50, "Festschreibw'keit", 0, 100, "%"); + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientReset() + */ + public void onClientReset() { + voteSent = false; + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) + */ + public void onClientRecv(VSMessage recvMessage) { + if (recvMessage.getBoolean("wantVote")) { + if (!voteSent) { + voteSent = true; + myVote = process.getRandomPercentage() <= getInteger("ackProb"); + } + + VSMessage message = new VSMessage(); + message.setBoolean("isVote", true); + message.setBoolean("vote", myVote); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + + log("Vote " + myVote + " sent"); + + + } else if (recvMessage.getBoolean("isVoteResult")) { + boolean voteResult = recvMessage.getBoolean("voteResult"); + log("Global voting result received. Result: " + voteResult); + + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + } + } + + /* (non-Javadoc) + * @see protocols.VSAbstractProtocol#onClientSchedule() + */ + public void onClientSchedule() { + } +} |
