Global View
Sometimes you might want to observe some state and do something special (e.g., stop the simulation) in case the state matches a special condition that you have considered. For this purpose, the simulator offers a GlobalView
, which allows you to do three things:
- Check which nodes are dead or alive,
- set/get key-value-pairs shared globally, and
- tell the simulator to terminate this simulation.
You can access the GlobalView
from your config, using the key "simulation.globalview"
, as in:
GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
val gv = cfg.getValue[GlobalView]("simulation.globalview");
Obviously, this key is only available while actually running in simulation, so you must take care not access it wrongly when you are sharing code between simulation and deployment.
Terminate on Condition
So far, all simulation scenarios have terminated at a specific time using terminateAfterTerminationOf(10000, pinger)
.andThen(10.seconds).afterTermination(Terminate)
. This time we want to check for certain conditions and terminate when one such condition occurs. In this section, we will to terminate the current simulation when at least 100 pongs have been received by the pingers or when at least 3 nodes have died.
SimulationObserver
To do so, we write an observer that will periodically inspect the global state and check whether any conditions have been met.
- Java
-
package jexamples.simulation.pingpongglobal; import java.util.UUID; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Handler; import se.sics.kompics.Positive; import se.sics.kompics.Start; import se.sics.kompics.network.Network; import se.sics.kompics.simulator.util.GlobalView; import se.sics.kompics.timer.CancelPeriodicTimeout; import se.sics.kompics.timer.SchedulePeriodicTimeout; import se.sics.kompics.timer.Timeout; import se.sics.kompics.timer.Timer; public class SimulationObserver extends ComponentDefinition { Positive<Timer> timer = requires(Timer.class); Positive<Network> network = requires(Network.class); private final int minPings; private final int minDeadNodes; private UUID timerId; public SimulationObserver(Init init) { minPings = init.minPings; minDeadNodes = init.minDeadNodes; subscribe(handleStart, control); subscribe(handleCheck, timer); } Handler<Start> handleStart = new Handler<Start>() { @Override public void handle(Start event) { schedulePeriodicCheck(); } }; @Override public void tearDown() { trigger(new CancelPeriodicTimeout(timerId), timer); } Handler<CheckTimeout> handleCheck = new Handler<CheckTimeout>() { @Override public void handle(CheckTimeout event) { GlobalView gv = config().getValue("simulation.globalview", GlobalView.class); if (gv.getValue("simulation.pongs", Integer.class) > minPings) { logger.info("Terminating simulation as the minimum pings:{} is achieved", minPings); gv.terminate(); } if (gv.getDeadNodes().size() > minDeadNodes) { logger.info( "Terminating simulation as the min dead nodes:{} is achieved", minDeadNodes); gv.terminate(); } } }; public static class Init extends se.sics.kompics.Init<SimulationObserver> { public final int minPings; public final int minDeadNodes; public Init(int minPings, int minDeadNodes) { this.minPings = minPings; this.minDeadNodes = minDeadNodes; } } private void schedulePeriodicCheck() { long period = config().getValue("pingpong.simulation.checktimeout", Long.class); SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(period, period); CheckTimeout timeout = new CheckTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt, timer); timerId = timeout.getTimeoutId(); } public static class CheckTimeout extends Timeout { public CheckTimeout(SchedulePeriodicTimeout spt) { super(spt); } } }
- Scala
-
package sexamples.simulation.pingpongglobal import se.sics.kompics.sl._ import se.sics.kompics.simulator.util.GlobalView import se.sics.kompics.network.Network import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer} import java.util.UUID object SimulationObserver { class CheckTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt); } class SimulationObserver(init: Init[SimulationObserver]) extends ComponentDefinition { import SimulationObserver._; val timer = requires[Timer]; val network = requires[Network]; val Init(minPings: Int, minDeadNodes: Int) = init; private var timerId: Option[UUID] = None; ctrl uponEvent { case _: Start => { val period = cfg.getValue[Long]("pingpong.simulation.checktimeout"); val spt = new SchedulePeriodicTimeout(period, period); val timeout = new CheckTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt -> timer); timerId = Some(timeout.getTimeoutId()); } } timer uponEvent { case _: CheckTimeout => { val gv = cfg.getValue[GlobalView]("simulation.globalview"); // Java API only :( if (gv.getValue("simulation.pongs", classOf[Integer]) > minPings) { log.info(s"Terminating simulation as the minimum pings=$minPings is achieved"); gv.terminate(); } if (gv.getDeadNodes().size() > minDeadNodes) { log.info(s"Terminating simulation as the min dead nodes=$minDeadNodes is achieved"); gv.terminate(); } } } override def tearDown(): Unit = { timerId match { case Some(id) => { trigger(new CancelPeriodicTimeout(id) -> timer); } case None => () // nothing to cancel } } }
Pinger
We also add a small bit of code to the Pinger
to record their pongs. We must be careful here, to only access the GlobalView
when we are actually in simulation mode, because otherwise our program will fail in deployment.
- Java
-
ClassMatchedHandler<Pong, TMessage> pongHandler = new ClassMatchedHandler<Pong, TMessage>() { @Override public void handle(Pong content, TMessage context) { counter++; logger.info("Got Pong #{}!", counter); ponged(); } }; private void ponged() { Optional<GlobalView> gvo = config().readValue("simulation.globalview", GlobalView.class); if (gvo.isPresent()) { // only in simulation GlobalView gv = gvo.get(); gv.setValue("simulation.pongs", gv.getValue("simulation.pongs", Integer.class) + 1); } }
- Scala
-
net uponEvent { case TMessage(_, Pong) => { counter += 1L; log.info(s"Got Pong #${counter}!"); ponged() } } private def ponged(): Unit = { val gvo: Option[GlobalView] = cfg.readValue[GlobalView]("simulation.globalview"); gvo match { case Some(gv: GlobalView) => { // Java API only :( gv.setValue("simulation.pongs", gv.getValue("simulation.pongs", classOf[Integer]) + 1); } case None => throw new RuntimeException("Why are we not in simulation?") // not in simulation } }
ScenarioGen Setup
Since we are using a custom key-value-pair from the GlobalView
, we need to initialize this value before we read it the first time. We can do this from SetupEvent
, by overriding the setupGlobalView
methodSetup
, by initiuating with a function that takes a GlobalView
instance and modifies it:
- Java
-
static Operation<SetupEvent> setupOp = new Operation<SetupEvent>() { @Override public SetupEvent generate() { return new SetupEvent() { @Override public void setupGlobalView(GlobalView gv) { gv.setValue("simulation.pongs", 0); } }; } };
- Scala
-
val setupOp = Op { (_: Unit) => Setup { (gv: GlobalView) => gv.setValue("simulation.pongs", 0) }.build() };
ScenarioGen Killing Nodes
We also want to be able to kill nodes, specifically Ponger
s, so we also add an operation for a KillNodeEvent
, which requires overriding only the getNodeAddress
method to identify the node we want to killKillNode
, which requires us to pass in the address of the node we want to kill:
- Java
-
static Operation1<KillNodeEvent, Integer> killPongerOp = new Operation1<KillNodeEvent, Integer>() { @Override public KillNodeEvent generate(final Integer self) { return new KillNodeEvent() { TAddress selfAdr; { try { selfAdr = new TAddress(InetAddress.getByName(IP_PREFIX + self), PORT); } catch (UnknownHostException ex) { throw new RuntimeException(ex); } } @Override public Address getNodeAddress() { return selfAdr; } @Override public String toString() { return "KillPonger<" + selfAdr.toString() + ">"; } }; } };
- Scala
-
val killPongerOp = Op { (self: java.lang.Integer) => KillNode(makeAddress(self)) };
ScenarioGen Simple Ping Scenario
We modify now the old scenario simplePing
, that starts 5 pongers and 5 pingers and we expect the SimulationObserver
to terminate it early, when at least 100 pongs have been received. In the case where our observer’s termination conditions are not met, for example due to some bug, the simulation might not stop but run forever. To avoid this, we still want to keep the scenario termination time (a very high one) as a safety net. In this case, we added a 10.000s termination time, but the SimulationObserver
should terminate the simulation within a couple of tens of seconds of simulated time.
- Java
-
public static SimulationScenario simplePing() { SimulationScenario scen = new SimulationScenario() { { SimulationScenario.StochasticProcess setup = new SimulationScenario.StochasticProcess() { { raise(1, setupOp); } }; SimulationScenario.StochasticProcess observer = new SimulationScenario.StochasticProcess() { { raise(1, startObserverOp); } }; SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() { { eventInterarrivalTime(constant(1000)); raise(5, startPongerOp, new BasicIntSequentialDistribution(1)); } }; SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() { { eventInterarrivalTime(constant(1000)); raise( 5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1)); } }; setup.start(); observer.startAfterTerminationOf(0, setup); ponger.startAfterTerminationOf(1000, observer); pinger.startAfterTerminationOf(1000, ponger); terminateAfterTerminationOf(1000 * 10000, pinger); } }; return scen; }
- Scala
-
val simplePingScenario = raise(1, setupOp) .arrival(constant(0.seconds)) .andThen(0.seconds) .afterTermination( raise(1, startObserverOp).arrival(constant(0.seconds)) ) .andThen(1.second) .afterTermination( raise(5, startPongerOp, 1.toN) .arrival(constant(1.second)) ) .andThen(1.second) .afterTermination( raise(5, startPingerOp, 6.toN, 1.toN).arrival(constant(1.second)) ) .andThen(10000.seconds) .afterTermination(Terminate);
ScenarioGen Kill Pongers Scenario
The second scenario, killPongers
, will start killing the pongers, which the observer should notice and then stop the simulation. In this case both conditions – number of pings and number of dead nodes – can be met, but for the given code (seed, timing conditions) the number of dead nodes will be met first.
- Java
-
public static SimulationScenario killPongers() { SimulationScenario scen = new SimulationScenario() { { SimulationScenario.StochasticProcess setup = new SimulationScenario.StochasticProcess() { { raise(1, setupOp); } }; SimulationScenario.StochasticProcess observer = new SimulationScenario.StochasticProcess() { { raise(1, startObserverOp); } }; SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() { { eventInterarrivalTime(constant(1000)); raise(5, startPongerOp, new BasicIntSequentialDistribution(1)); } }; SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() { { eventInterarrivalTime(constant(1000)); raise( 5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1)); } }; SimulationScenario.StochasticProcess killPonger = new SimulationScenario.StochasticProcess() { { eventInterarrivalTime(constant(0)); raise(5, killPongerOp, new BasicIntSequentialDistribution(1)); } }; setup.start(); observer.startAfterTerminationOf(0, setup); ponger.startAfterTerminationOf(1000, observer); pinger.startAfterTerminationOf(1000, ponger); killPonger.startAfterTerminationOf(1000, pinger); terminateAfterTerminationOf(1000 * 10000, pinger); } }; return scen; }
- Scala
-
val killPongersScenario = raise(1, setupOp) .arrival(constant(0.seconds)) .andThen(0.seconds) .afterTermination( raise(1, startObserverOp).arrival(constant(0.seconds)) ) .andThen(1.second) .afterTermination( raise(5, startPongerOp, 1.toN) .arrival(constant(1.second)) ) .andThen(1.second) .afterTermination( raise(5, startPingerOp, 6.toN, 1.toN).arrival(constant(1.second)) ) .andThen(1.second) .afterTermination( raise(5, killPongerOp, 1.toN).arrival(constant(0.seconds)) ) .andThen(10000.seconds) .afterTermination(Terminate);
Main
We extend our Main
classobject yet again, to differentiate the two simulation we want to run:
- Java
-
} else if (args[0].equalsIgnoreCase("simulation")) { long seed = 123; SimulationScenario.setSeed(seed); if (args.length > 1) { SimulationScenario scenario; if (args[1].equalsIgnoreCase("simple")) { scenario = ScenarioGen.simplePing(); } else if (args[1].equalsIgnoreCase("kill")) { scenario = ScenarioGen.killPongers(); } else { throw new RuntimeException("Invalid argument: " + args[1]); } System.out.println("Starting a Simulation"); scenario.simulate(LauncherComp.class); } else { System.err.println("Invalid number of parameters"); System.exit(1); } } else {
- Scala
-
} else if (args(0).equalsIgnoreCase("simulation")) { val seed = 123; SimulationScenario.setSeed(seed); if (args.length > 1) { val scenario = if (args(1).equalsIgnoreCase("simple")) { ScenarioGen.simplePingScenario } else if (args(1).equalsIgnoreCase("kill")) { ScenarioGen.killPongersScenario } else { throw new RuntimeException(s"Invalid argument: ${args(1)}"); }; System.out.println("Starting a Simulation"); scenario.simulate(classOf[LauncherComp]); } else { System.err.println("Invalid number of parameters"); System.exit(1); } } else {
Execution
Now we can simply run out simulation from within sbt as before with:
Simple Ping Scenario
runMain jexamples.simulation.pingpongglobal.Main simulation simple
runMain sexamples.simulation.pingpongglobal.Main simulation simple
Kill Pongers Scenario
runMain jexamples.simulation.pingpongglobal.Main simulation kill
runMain sexamples.simulation.pingpongglobal.Main simulation kill