Global View

Sometimes you might want to observe some state and do something special (e.g., stop the simulation) in case the state matches a special condition that you have considered. For this purpose, the simulator offers a GlobalView, which allows you to do three things:

  1. Check which nodes are dead or alive,
  2. set/get key-value-pairs shared globally, and
  3. tell the simulator to terminate this simulation.

You can access the GlobalView from your config, using the key "simulation.globalview", as in:

GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
val gv = cfg.getValue[GlobalView]("simulation.globalview");

Obviously, this key is only available while actually running in simulation, so you must take care not access it wrongly when you are sharing code between simulation and deployment.

Terminate on Condition

So far, all simulation scenarios have terminated at a specific time using terminateAfterTerminationOf(10000, pinger).andThen(10.seconds).afterTermination(Terminate). This time we want to check for certain conditions and terminate when one such condition occurs. In this section, we will to terminate the current simulation when at least 100 pongs have been received by the pingers or when at least 3 nodes have died.

SimulationObserver

To do so, we write an observer that will periodically inspect the global state and check whether any conditions have been met.

Java
package jexamples.simulation.pingpongglobal;

import java.util.UUID;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.simulator.util.GlobalView;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;

public class SimulationObserver extends ComponentDefinition {

  Positive<Timer> timer = requires(Timer.class);
  Positive<Network> network = requires(Network.class);

  private final int minPings;
  private final int minDeadNodes;

  private UUID timerId;

  public SimulationObserver(Init init) {
    minPings = init.minPings;
    minDeadNodes = init.minDeadNodes;

    subscribe(handleStart, control);
    subscribe(handleCheck, timer);
  }

  Handler<Start> handleStart =
      new Handler<Start>() {
        @Override
        public void handle(Start event) {
          schedulePeriodicCheck();
        }
      };

  @Override
  public void tearDown() {
    trigger(new CancelPeriodicTimeout(timerId), timer);
  }

  Handler<CheckTimeout> handleCheck =
      new Handler<CheckTimeout>() {
        @Override
        public void handle(CheckTimeout event) {
          GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);

          if (gv.getValue("simulation.pongs", Integer.class) > minPings) {
            logger.info("Terminating simulation as the minimum pings:{} is achieved", minPings);
            gv.terminate();
          }
          if (gv.getDeadNodes().size() > minDeadNodes) {
            logger.info(
                "Terminating simulation as the min dead nodes:{} is achieved", minDeadNodes);
            gv.terminate();
          }
        }
      };

  public static class Init extends se.sics.kompics.Init<SimulationObserver> {

    public final int minPings;
    public final int minDeadNodes;

    public Init(int minPings, int minDeadNodes) {
      this.minPings = minPings;
      this.minDeadNodes = minDeadNodes;
    }
  }

  private void schedulePeriodicCheck() {
    long period = config().getValue("pingpong.simulation.checktimeout", Long.class);
    SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(period, period);
    CheckTimeout timeout = new CheckTimeout(spt);
    spt.setTimeoutEvent(timeout);
    trigger(spt, timer);
    timerId = timeout.getTimeoutId();
  }

  public static class CheckTimeout extends Timeout {

    public CheckTimeout(SchedulePeriodicTimeout spt) {
      super(spt);
    }
  }
}
Scala
package sexamples.simulation.pingpongglobal

import se.sics.kompics.sl._
import se.sics.kompics.simulator.util.GlobalView
import se.sics.kompics.network.Network
import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer}

import java.util.UUID

object SimulationObserver {
  class CheckTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt);
}
class SimulationObserver(init: Init[SimulationObserver]) extends ComponentDefinition {
  import SimulationObserver._;

  val timer = requires[Timer];
  val network = requires[Network];

  val Init(minPings: Int, minDeadNodes: Int) = init;

  private var timerId: Option[UUID] = None;

  ctrl uponEvent {
    case _: Start => {
      val period = cfg.getValue[Long]("pingpong.simulation.checktimeout");
      val spt = new SchedulePeriodicTimeout(period, period);
      val timeout = new CheckTimeout(spt);
      spt.setTimeoutEvent(timeout);
      trigger(spt -> timer);
      timerId = Some(timeout.getTimeoutId());
    }
  }

  timer uponEvent {
    case _: CheckTimeout => {
      val gv = cfg.getValue[GlobalView]("simulation.globalview");

      // Java API only :(
      if (gv.getValue("simulation.pongs", classOf[Integer]) > minPings) {
        log.info(s"Terminating simulation as the minimum pings=$minPings is achieved");
        gv.terminate();
      }
      if (gv.getDeadNodes().size() > minDeadNodes) {
        log.info(s"Terminating simulation as the min dead nodes=$minDeadNodes is achieved");
        gv.terminate();
      }
    }
  }

  override def tearDown(): Unit = {
    timerId match {
      case Some(id) => {
        trigger(new CancelPeriodicTimeout(id) -> timer);
      }
      case None => () // nothing to cancel
    }

  }

}

Pinger

We also add a small bit of code to the Pinger to record their pongs. We must be careful here, to only access the GlobalView when we are actually in simulation mode, because otherwise our program will fail in deployment.

Java
ClassMatchedHandler<Pong, TMessage> pongHandler =
    new ClassMatchedHandler<Pong, TMessage>() {

      @Override
      public void handle(Pong content, TMessage context) {
        counter++;
        logger.info("Got Pong #{}!", counter);
        ponged();
      }
    };
private void ponged() {
  Optional<GlobalView> gvo = config().readValue("simulation.globalview", GlobalView.class);
  if (gvo.isPresent()) { // only in simulation
    GlobalView gv = gvo.get();
    gv.setValue("simulation.pongs", gv.getValue("simulation.pongs", Integer.class) + 1);
  }
}
Scala
net uponEvent {
  case TMessage(_, Pong) => {
    counter += 1L;
    log.info(s"Got Pong #${counter}!");
    ponged()
  }
}
private def ponged(): Unit = {
  val gvo: Option[GlobalView] = cfg.readValue[GlobalView]("simulation.globalview");
  gvo match {
    case Some(gv: GlobalView) => {
      // Java API only :(
      gv.setValue("simulation.pongs", gv.getValue("simulation.pongs", classOf[Integer]) + 1);
    }
    case None => throw new RuntimeException("Why are we not in simulation?") // not in simulation
  }
}

ScenarioGen Setup

Since we are using a custom key-value-pair from the GlobalView, we need to initialize this value before we read it the first time. We can do this from SetupEvent, by overriding the setupGlobalView methodSetup, by initiuating with a function that takes a GlobalView instance and modifies it:

Java
static Operation<SetupEvent> setupOp =
    new Operation<SetupEvent>() {
      @Override
      public SetupEvent generate() {
        return new SetupEvent() {
          @Override
          public void setupGlobalView(GlobalView gv) {
            gv.setValue("simulation.pongs", 0);
          }
        };
      }
    };
Scala
val setupOp = Op { (_: Unit) =>
  Setup { (gv: GlobalView) =>
    gv.setValue("simulation.pongs", 0)
  }.build()
};

ScenarioGen Killing Nodes

We also want to be able to kill nodes, specifically Pongers, so we also add an operation for a KillNodeEvent, which requires overriding only the getNodeAddress method to identify the node we want to killKillNode, which requires us to pass in the address of the node we want to kill:

Java
static Operation1<KillNodeEvent, Integer> killPongerOp =
    new Operation1<KillNodeEvent, Integer>() {
      @Override
      public KillNodeEvent generate(final Integer self) {
        return new KillNodeEvent() {
          TAddress selfAdr;

          {
            try {
              selfAdr = new TAddress(InetAddress.getByName(IP_PREFIX + self), PORT);
            } catch (UnknownHostException ex) {
              throw new RuntimeException(ex);
            }
          }

          @Override
          public Address getNodeAddress() {
            return selfAdr;
          }

          @Override
          public String toString() {
            return "KillPonger<" + selfAdr.toString() + ">";
          }
        };
      }
    };
Scala
val killPongerOp = Op { (self: java.lang.Integer) =>
  KillNode(makeAddress(self))
};

ScenarioGen Simple Ping Scenario

We modify now the old scenario simplePing, that starts 5 pongers and 5 pingers and we expect the SimulationObserver to terminate it early, when at least 100 pongs have been received. In the case where our observer’s termination conditions are not met, for example due to some bug, the simulation might not stop but run forever. To avoid this, we still want to keep the scenario termination time (a very high one) as a safety net. In this case, we added a 10.000s termination time, but the SimulationObserver should terminate the simulation within a couple of tens of seconds of simulated time.

Java
public static SimulationScenario simplePing() {
  SimulationScenario scen =
      new SimulationScenario() {
        {
          SimulationScenario.StochasticProcess setup =
              new SimulationScenario.StochasticProcess() {
                {
                  raise(1, setupOp);
                }
              };

          SimulationScenario.StochasticProcess observer =
              new SimulationScenario.StochasticProcess() {
                {
                  raise(1, startObserverOp);
                }
              };

          SimulationScenario.StochasticProcess ponger =
              new SimulationScenario.StochasticProcess() {
                {
                  eventInterarrivalTime(constant(1000));
                  raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
                }
              };

          SimulationScenario.StochasticProcess pinger =
              new SimulationScenario.StochasticProcess() {
                {
                  eventInterarrivalTime(constant(1000));
                  raise(
                      5,
                      startPingerOp,
                      new BasicIntSequentialDistribution(6),
                      new BasicIntSequentialDistribution(1));
                }
              };

          setup.start();
          observer.startAfterTerminationOf(0, setup);
          ponger.startAfterTerminationOf(1000, observer);
          pinger.startAfterTerminationOf(1000, ponger);
          terminateAfterTerminationOf(1000 * 10000, pinger);
        }
      };

  return scen;
}
Scala
val simplePingScenario = raise(1, setupOp)
  .arrival(constant(0.seconds))
  .andThen(0.seconds)
  .afterTermination(
    raise(1, startObserverOp).arrival(constant(0.seconds))
  )
  .andThen(1.second)
  .afterTermination(
    raise(5, startPongerOp, 1.toN)
      .arrival(constant(1.second))
  )
  .andThen(1.second)
  .afterTermination(
    raise(5, startPingerOp, 6.toN, 1.toN).arrival(constant(1.second))
  )
  .andThen(10000.seconds)
  .afterTermination(Terminate);

ScenarioGen Kill Pongers Scenario

The second scenario, killPongers, will start killing the pongers, which the observer should notice and then stop the simulation. In this case both conditions – number of pings and number of dead nodes – can be met, but for the given code (seed, timing conditions) the number of dead nodes will be met first.

Java
public static SimulationScenario killPongers() {
  SimulationScenario scen =
      new SimulationScenario() {
        {
          SimulationScenario.StochasticProcess setup =
              new SimulationScenario.StochasticProcess() {
                {
                  raise(1, setupOp);
                }
              };

          SimulationScenario.StochasticProcess observer =
              new SimulationScenario.StochasticProcess() {
                {
                  raise(1, startObserverOp);
                }
              };

          SimulationScenario.StochasticProcess ponger =
              new SimulationScenario.StochasticProcess() {
                {
                  eventInterarrivalTime(constant(1000));
                  raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
                }
              };

          SimulationScenario.StochasticProcess pinger =
              new SimulationScenario.StochasticProcess() {
                {
                  eventInterarrivalTime(constant(1000));
                  raise(
                      5,
                      startPingerOp,
                      new BasicIntSequentialDistribution(6),
                      new BasicIntSequentialDistribution(1));
                }
              };

          SimulationScenario.StochasticProcess killPonger =
              new SimulationScenario.StochasticProcess() {
                {
                  eventInterarrivalTime(constant(0));
                  raise(5, killPongerOp, new BasicIntSequentialDistribution(1));
                }
              };

          setup.start();
          observer.startAfterTerminationOf(0, setup);
          ponger.startAfterTerminationOf(1000, observer);
          pinger.startAfterTerminationOf(1000, ponger);
          killPonger.startAfterTerminationOf(1000, pinger);
          terminateAfterTerminationOf(1000 * 10000, pinger);
        }
      };

  return scen;
}
Scala
val killPongersScenario = raise(1, setupOp)
  .arrival(constant(0.seconds))
  .andThen(0.seconds)
  .afterTermination(
    raise(1, startObserverOp).arrival(constant(0.seconds))
  )
  .andThen(1.second)
  .afterTermination(
    raise(5, startPongerOp, 1.toN)
      .arrival(constant(1.second))
  )
  .andThen(1.second)
  .afterTermination(
    raise(5, startPingerOp, 6.toN, 1.toN).arrival(constant(1.second))
  )
  .andThen(1.second)
  .afterTermination(
    raise(5, killPongerOp, 1.toN).arrival(constant(0.seconds))
  )
  .andThen(10000.seconds)
  .afterTermination(Terminate);

Main

We extend our Main classobject yet again, to differentiate the two simulation we want to run:

Java
} else if (args[0].equalsIgnoreCase("simulation")) {
  long seed = 123;
  SimulationScenario.setSeed(seed);
  if (args.length > 1) {
    SimulationScenario scenario;
    if (args[1].equalsIgnoreCase("simple")) {
      scenario = ScenarioGen.simplePing();
    } else if (args[1].equalsIgnoreCase("kill")) {
      scenario = ScenarioGen.killPongers();
    } else {
      throw new RuntimeException("Invalid argument: " + args[1]);
    }
    System.out.println("Starting a Simulation");
    scenario.simulate(LauncherComp.class);
  } else {
    System.err.println("Invalid number of parameters");
    System.exit(1);
  }
} else {
Scala
} else if (args(0).equalsIgnoreCase("simulation")) {
  val seed = 123;
  SimulationScenario.setSeed(seed);
  if (args.length > 1) {
    val scenario = if (args(1).equalsIgnoreCase("simple")) {
      ScenarioGen.simplePingScenario
    } else if (args(1).equalsIgnoreCase("kill")) {
      ScenarioGen.killPongersScenario
    } else {
      throw new RuntimeException(s"Invalid argument: ${args(1)}");
    };
    System.out.println("Starting a Simulation");
    scenario.simulate(classOf[LauncherComp]);
  } else {
    System.err.println("Invalid number of parameters");
    System.exit(1);
  }
} else {

Execution

Now we can simply run out simulation from within sbt as before with:

Simple Ping Scenario

runMain jexamples.simulation.pingpongglobal.Main simulation simple
runMain sexamples.simulation.pingpongglobal.Main simulation simple

Kill Pongers Scenario

runMain jexamples.simulation.pingpongglobal.Main simulation kill
runMain sexamples.simulation.pingpongglobal.Main simulation kill
The source code for this page can be found here.