package core; import java.awt.Color; import core.time.VSVectorTime; import events.VSAbstractEvent; import events.VSRegisteredEvents; import events.implementations.VSProcessCrashEvent; import events.implementations.VSVectorClockMonitor; import prefs.VSPrefs; import protocols.VSAbstractProtocol; import simulator.VSLogging; import simulator.VSSimulatorVisualization; import utils.VSPriorityQueue; /** * Internal process implementation for the distributed systems simulator. * This class extends {@link VSAbstractProcess} with simulation-specific features * including: *
Each process maintains its own local time that drifts from global time * based on configured clock variance, simulating realistic clock skew in * distributed systems.
* * @see VSAbstractProcess * @see VSSimulatorVisualization * @author Paul C. Buetow */ public class VSInternalProcess extends VSAbstractProcess { /** The vector clock monitor for timestamp-triggered events */ private VSVectorClockMonitor vectorClockMonitor; /** Optional message handler for decoupled message sending */ private simulator.messaging.MessageHandler messageHandler; /** * Instantiates a new process. * * @param prefs the simulator's default prefs * @param processNum the process num * @param simulatorVisualization the simulator canvas * @param loging the loging object */ public VSInternalProcess(VSPrefs prefs, int processNum, VSSimulatorVisualization simulatorVisualization, VSLogging loging) { super(prefs, processNum, simulatorVisualization, loging); vectorClockMonitor = new VSVectorClockMonitor(this); } /** * Updates this process's configuration from its preference settings. * Called by {@link simulator.VSProcessEditor} after editing is complete * to apply any configuration changes. * *This method updates:
*This method saves:
*The synchronization algorithm:
*Called repeatedly by {@link VSSimulatorVisualization} during simulation.
* * @param globalTime the current global simulation time */ public synchronized void syncTime(final long globalTime) { final long currentGlobalTimestep = globalTime - this.globalTime; this.globalTime = globalTime; localTime += currentGlobalTimestep; clockOffset += currentGlobalTimestep * (double) clockVariance; while (clockOffset >= 1) { clockOffset -= 1; ++localTime; } while (clockOffset <= -1) { clockOffset += 1; --localTime; } /* We do not want a negative time */ if (localTime < 0) localTime = 0; } /** * Activates visual highlighting for this process. * The process's current color is saved and replaced with the highlight color. * Use {@link #highlightOff()} to restore the original color. * * @see #highlightOff() */ public synchronized void highlightOn() { tmpColor = currentColor; currentColor = getColor("col.process.highlight"); isHighlighted = true; } /** * Unhighlights the process. */ public synchronized void highlightOff() { currentColor = tmpColor; isHighlighted = false; } /** * Resets the process. */ public synchronized void reset() { isPaused = true; isCrashed = false; hasCrashed = false; localTime = 0; globalTime = 0; clockOffset = 0; for (VSAbstractProtocol protocol : protocolsToReset) protocol.reset(); setCurrentColor(getColor("col.process.default")); resetTimeFormats(); // Clear any vector clock monitor events if (vectorClockMonitor != null) { vectorClockMonitor.clearVectorEvents(); } } /** * Creates the random crash task. The crash task will be created only if * the process is not crashed atm. and if * VSInternalProcess.getARandomCrashTime() * returns a non-negative value. * The random crash task uses the simulaion's global time for its * scheduling. */ public synchronized void createRandomCrashTask() { if (!isCrashed) { VSTaskManager taskManager = simulatorVisualization.getTaskManager(); long crashTime = getARandomCrashTime(); if (crashTime < 0) return; if (randomCrashTask != null) taskManager.removeTask(randomCrashTask); if (crashTime >= getGlobalTime()) { VSAbstractEvent event = new VSProcessCrashEvent(); randomCrashTask = new VSTask(crashTime, this, event, VSTask.GLOBAL); taskManager.addTask(randomCrashTask); } else { randomCrashTask = null; } } } /** * Generates a random percentage value between 0 and 100 (inclusive). * Uses this process's dedicated random number generator to ensure * reproducible results for a given random seed. * *This method is commonly used for:
*The clock offset accumulates fractional time units that are * converted to whole time units during {@link #syncTime(long)}.
* * @param add the amount to add to the clock offset (can be negative) * @see VSTaskManager#runTasks(long) */ public synchronized void addClockOffset(long add) { this.clockOffset += add; } /** * Transitions this process to the 'playing' state. * The process resumes normal operation and its color changes * to indicate running status. * *Called by the simulator when starting or resuming simulation.
* * @see #pause() * @see #finish() */ public synchronized void play() { isPaused = false; setCurrentColor(getColor("col.process.running")); } /** * Transitions this process to the 'paused' state. * The process stops executing and its color changes * to indicate stopped status. * *Called by the simulator when pausing simulation.
* * @see #play() * @see #finish() */ public synchronized void pause() { isPaused = true; setCurrentColor(getColor("col.process.stopped")); } /** * Transitions this process to the 'finished' state. * The process stops executing and returns to its default color. * *Called by the simulator when ending simulation.
* * @see #play() * @see #pause() */ public synchronized void finish() { isPaused = true; setCurrentColor(getColor("col.process.default")); } /** * Gets the current process' color. * * @return the current color of the process. */ public synchronized Color getColor() { return currentColor; } /** * Gets the color of this process if it's crashed. * * @return the crashed color */ public synchronized Color getCrashedColor() { return crashedColor; } /** * Checks if this process's time has been modified by a task. * The task manager uses this flag to determine if clock offset * adjustments are needed after task execution. * *This flag is set when tasks directly modify the process's * local time, requiring synchronization adjustments.
* * @return true if the time has been modified since last check * @see VSTaskManager#runTasks(long) */ public synchronized boolean timeModified() { return timeModified; } /** * Sets if the time has been modified by a task. * * @param timeModified true, if it has been modified. */ public synchronized void timeModified(boolean timeModified) { this.timeModified = timeModified; } /** * Sets the global time. * * @param globalTime the new global time */ public synchronized void setGlobalTime(final long globalTime) { this.globalTime = globalTime >= 0 ? globalTime : 0; } /* Gets the duration time of a message to send. * * @return the duration time */ public synchronized long getDurationTime() { final long maxDurationTime = getLong("message.sendingtime.max"); final long minDurationTime = getLong("message.sendingtime.min"); if (maxDurationTime <= minDurationTime) return minDurationTime; final int diff = (int) (maxDurationTime - minDurationTime); /* Integer overflow */ if (diff <= 0) return minDurationTime; return minDurationTime + random.nextInt(diff+1); } /** * Gets the a random message outage time. * * @param durationTime the duration time * * @return the a random message outage time. It will be -1 if the message * will not get lost at all. */ public synchronized long getARandomMessageOutageTime(long durationTime, VSInternalProcess receiverProcess) { int percentage = (int) ((getInteger("message.prob.outage") + receiverProcess.getInteger( "message.prob.outage")) / 2); /* Check if the message will have an outage or not */ if (getRandomPercentage() < percentage) { /* Calculate the random outage time! */ long outageTime = globalTime + random.nextLong(durationTime+1) % simulatorVisualization.getUntilTime(); return outageTime; } /* No outage */ return -1; } /** * Gets the random crash task. * * @return the random crash task */ public synchronized VSTask getCrashTask() { return randomCrashTask; } /** * Checks if the process is paused. * * @return true, if is paused */ public synchronized boolean isPaused() { return isPaused; } /** * Called by a task if the process sends a message. * * @param message the message to send. */ public synchronized void sendMessage(VSMessage message) { StringBuffer buffer = new StringBuffer(); buffer.append(prefs.getString("lang.message.sent")); buffer.append("; "); buffer.append(message.toStringFull()); log(buffer.toString()); // Use message handler if available (for decoupled operation) if (messageHandler != null) { messageHandler.handleMessage(message); } else { // Fallback to direct visualization call for backward compatibility simulatorVisualization.sendMessage(message); } } /** * Sets the message handler for decoupled message sending. * @param handler the message handler to use */ public void setMessageHandler(simulator.messaging.MessageHandler handler) { this.messageHandler = handler; } /** * Gets the simulator canvas. * * @return the simulator canvas */ public VSSimulatorVisualization getSimulatorCanvas() { return simulatorVisualization; } /** * Removes the process at the specified index. Called by the simulator * canvas if a process has been removed from the simulator. Needed in * order to update the vector time and the local processNum. * * @param index the index the process has to get removed. */ public synchronized void removedAProcessAtIndex(int index) { if (index < processNum) --processNum; vectorTime.remove(index); for (VSVectorTime vectorTime : vectorTimeHistory) vectorTime.remove(index); } /** * Added a process. Needed in order to update the vector time's size. * Called by the simulator canvas if a process has been added to the * simulator. */ public synchronized void addedAProcess() { vectorTime.add(Long.valueOf(0)); for (VSVectorTime vectorTime : vectorTimeHistory) vectorTime.add(Long.valueOf(0)); } /** * Gets the tasks of the process. * * @return The tasks */ public VSPriorityQueue