diff options
| author | Paul Buetow <paul@buetow.org> | 2008-05-26 20:25:30 +0000 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2008-05-26 20:25:30 +0000 |
| commit | 230ddbf8753db60f1baa517e4d9a4d08885fac04 (patch) | |
| tree | dd2f9f4016ce8162768552cfb52d15a31372a4f6 | |
| parent | 065bbc74eda368148851f0a8e8d9e4a016e48e28 (diff) | |
1 phase comit protocol works
5 files changed, 78 insertions, 20 deletions
diff --git a/sources/events/internal/MessageReceiveEvent.java b/sources/events/internal/MessageReceiveEvent.java index 300c4b3..f86b05f 100644 --- a/sources/events/internal/MessageReceiveEvent.java +++ b/sources/events/internal/MessageReceiveEvent.java @@ -59,7 +59,7 @@ public class MessageReceiveEvent extends VSAbstractEvent { } else { final VSAbstractProtocol protocol = (VSAbstractProtocol) protocolObj; logg(buffer.toString()); - protocol.onMessageRecv(message); + protocol.onMessageRecvStart(message); } } diff --git a/sources/events/internal/ProtocolScheduleEvent.java b/sources/events/internal/ProtocolScheduleEvent.java index 42f8836..12fbd7b 100644 --- a/sources/events/internal/ProtocolScheduleEvent.java +++ b/sources/events/internal/ProtocolScheduleEvent.java @@ -13,12 +13,23 @@ import protocols.VSAbstractProtocol; public class ProtocolScheduleEvent extends VSAbstractEvent { private static final long serialVersionUID = 1L; - /** The event is a client protocol schedule. */ - private boolean isClientSchedule; /* true = client, false = server */ + /** The event is a server protocol schedule. */ + private boolean isServerSchedule; /* true = server, false = client */ /** The reference to the protocol object to schedule. */ private VSAbstractProtocol protocol; + /** + * Create a ProtocolScheduleEvent object + * + * @param protocol the protocol + * @param isServerSchedule the event is a client protocol schedule if false, else server schedule + */ + public ProtocolScheduleEvent(VSAbstractProtocol protocol, boolean isServerSchedule) { + this.protocol = protocol; + this.isServerSchedule = isServerSchedule; + } + /* (non-Javadoc) * @see events.VSAbstractEvent#onInit() */ @@ -29,19 +40,19 @@ public class ProtocolScheduleEvent extends VSAbstractEvent { /** * Checks if is client protocol schedule. * - * @param isClientSchedule the event is a client protocol schedule if true, else server schedule + * @param isServerSchedule the event is a client protocol schedule if false, else server schedule */ - public void isClientSchedule(boolean isClientSchedule) { - this.isClientSchedule = isClientSchedule; + public void isServerSchedule(boolean isServerSchedule) { + this.isServerSchedule = isServerSchedule; } /** * Checks if is client protocol. * - * @return true, if is client protocol schedule, else server protocol schedule + * @return false, if is client protocol schedule, else server protocol schedule */ - public boolean isClientSchedule() { - return isClientSchedule; + public boolean isServerSchedule() { + return isServerSchedule; } /** @@ -66,9 +77,9 @@ public class ProtocolScheduleEvent extends VSAbstractEvent { * @see events.VSAbstractEvent#onStart() */ public void onStart() { - if (isClientSchedule) - protocol.onClientScheduleStart(); - else + if (isServerSchedule) protocol.onServerScheduleStart(); + else + protocol.onClientScheduleStart(); } } diff --git a/sources/protocols/VSAbstractProtocol.java b/sources/protocols/VSAbstractProtocol.java index 2c84d96..05864fb 100644 --- a/sources/protocols/VSAbstractProtocol.java +++ b/sources/protocols/VSAbstractProtocol.java @@ -4,6 +4,7 @@ */ package protocols; +import events.internal.*; import events.*; import core.*; @@ -83,7 +84,7 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent { * * @param message the message */ - public final void onMessageRecv(VSMessage message) { + public final void onMessageRecvStart(VSMessage message) { if (isIncorrectProtocol(message)) return; @@ -152,6 +153,17 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent { } /** + * Reschedules the protocol for a new time and runs onClientSchedule or onServerSchedule + * + * @param isClient the is client + */ + protected final void scheduleAt(long time) { + VSAbstractEvent scheduleEvent = new ProtocolScheduleEvent(this, currentContextIsServer); + VSTask scheduleTask = new VSTask(time, process, scheduleEvent, VSTask.LOCAL); + process.getSimulationCanvas().getTaskManager().addTask(scheduleTask); + } + + /** * On client start. */ abstract protected void onClientStart(); diff --git a/sources/protocols/implementations/BerkelyTimeProtocol.java b/sources/protocols/implementations/BerkelyTimeProtocol.java index 635b1cd..411a9e8 100644 --- a/sources/protocols/implementations/BerkelyTimeProtocol.java +++ b/sources/protocols/implementations/BerkelyTimeProtocol.java @@ -83,7 +83,7 @@ public class BerkelyTimeProtocol extends VSAbstractProtocol { if (!recvMessage.getBoolean("isResponse")) return; - Integer processID = new Integer(recvMessage.getInteger("processID")); + Integer processID = recvMessage.getIntegerObj("processID"); if (peers.contains(processID)) peers.remove(processID); diff --git a/sources/protocols/implementations/OnePhaseCommitProtocol.java b/sources/protocols/implementations/OnePhaseCommitProtocol.java index c8dd7f2..3e4ae52 100644 --- a/sources/protocols/implementations/OnePhaseCommitProtocol.java +++ b/sources/protocols/implementations/OnePhaseCommitProtocol.java @@ -4,6 +4,7 @@ */ package protocols.implementations; +import java.util.ArrayList; import java.util.Vector; import protocols.VSAbstractProtocol; @@ -15,7 +16,8 @@ import core.VSMessage; public class OnePhaseCommitProtocol extends VSAbstractProtocol { private static final long serialVersionUID = 1L; - /* Client variables, coordinator */ + /* Server variables, coordinator */ + private ArrayList<Integer> pids; /* Client variables */ private boolean ackSent; @@ -30,9 +32,9 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol { Vector<Integer> vec = new Vector<Integer>(); vec.add(2); vec.add(3); - vec.add(4); initVector("pids", vec, "PIDs beteilitger Prozesse"); + initLong("timeout", 5000, "Zeit bis erneuerter Anfrage", "ms"); } /* (non-Javadoc) @@ -45,29 +47,54 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientReset() */ protected void onClientReset() { + pids.clear(); + pids.addAll(getVector("pids")); } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientStart() */ protected void onClientStart() { - int numProcesses = getInteger("numProcesses"); - VSMessage message = new VSMessage(); - message.setBoolean("wantAck", true); - sendMessage(message); + if (pids == null) { + pids = new ArrayList<Integer>(); + pids.addAll(getVector("pids")); + + } + + if (pids.size() != 0) { + long timeout = getLong("timeout") + process.getTime(); + scheduleAt(timeout); /* Will run onClientSchedule() at the specified local time */ + + VSMessage message = new VSMessage(); + message.setBoolean("wantAck", true); + sendMessage(message); + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientRecv(core.VSMessage) */ protected void onClientRecv(VSMessage recvMessage) { + if (pids.size() == 0) + return; + if (recvMessage.getBoolean("isAck")) { + Integer pid = recvMessage.getIntegerObj("pid"); + if (pids.contains(pid)) + pids.remove(pid); + + logg("ACK von Prozess " + pid + " erhalten!"); + + if (pids.size() == 0) + logg("ACKs von allen beteiligten Prozessen erhalten!"); + } } /* (non-Javadoc) * @see protocols.VSAbstractProtocol#onClientSchedule() */ protected void onClientSchedule() { + onClientStart(); } /* (non-Javadoc) @@ -81,6 +108,14 @@ public class OnePhaseCommitProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onServerRecv(core.VSMessage) */ protected void onServerRecv(VSMessage recvMessage) { + if (ackSent) + return; + + VSMessage message = new VSMessage(); + message.setBoolean("isAck", true); + message.setInteger("pid", process.getProcessID()); + sendMessage(message); + ackSent = true; } /* (non-Javadoc) |
