Introduction to Simulation¶
First, we have to include the simulator dependency in our pom file. The simulator version is the same as kompics:
<dependency>
<groupId>se.sics.kompics.simulator</groupId>
<artifactId>core</artifactId>
<version>${kompics.version}</version>
</dependency>
Next we write the simulation scenario. A scenario is a parallel and/or sequential composition of stochastic processes. We call a stochastic process, a finite random sequence of events, with a specified distribution of inter-arrival times. In other words, stochastic processes define series of events that occur in simulation.
The following snippet of code creates one stochastic process that will generate 1000 events of type SimpleEvent
with an inter-arrival time of 2000ms(2s)(simulated time).
static Operation simpleEvent1Gen = new Operation<SimpleEvent>() {
@Override
public SimpleEvent1 generate() {
new SimpleEvent1();
}
};
...
StochasticProcess p1 = new StochasticProcess() {
{
eventInterArrivalTime(constant(2000));
raise(1000, simpleEventGen);
}
};
However, in the example above, all events are the same and there is no variation to the 1000 events generated. We can generate events that differ slightly, by having the event take an integer parameter and generating this parameter randomly.
static Operation1 simpleEvent2Gen = new Operation1<SimpleEvent, Long>() {
@Override
public SimpleEvent2 generate(Long param) {
new SimpleEvent2(param);
}
};
...
StochasticProcess p2 = new StochasticProcess() {
{
eventInterArrivalTime(constant(2000));
raise(500, simpleEventGen, uniform(1000, 2000));
}
};
The new events generated have a parameter that takes uniform long random values between 1000 and 2000. Both the inter-arrival time and the event parameters are defined as se.sics.kompics.simulator.adaptor.distributions.Distribution
, and the constant and uniform are just helper methods to generate these distributions.
There are several types of operations Operation(_,1,2,…,5) that can be interpreted by the simulator and they differ in the number of parameters they take to customize the generated events. The parameters are given as distributions. There are a number of distributions provided by the simulator, or you can write your own distribution by extending se.sics.kompics.simulator.adaptor.distributions.Distribution
.
In order to start the stochastic processes and to define the iterative/parallel behaviour, the se.sics.kompics.simulator.SimulationScenario.StochasticProcess
has a number of start/terminate methods.
If we want to start the two stochastic process above, a possible example would be:
p1.start();
p2.startAfterTerminationOf(1000, p1);
terminateAfterTerminationOf(2000, p2);
The above example would start the stochastic process p1 at the beginning of the simulation. After all 1000 events defined in p1 are generated, the simulation waits another 1000ms of simulated time and starts the second stochastic process p2. After generating all 500 p2 defined events and waiting another 2000ms of simulated time, the simulation will end.
The stochastic processes created and their order will be defined within the SimulationScenario:
static Operation simpleEvent1Gen = new Operation<SimpleEvent>() {
@Override
public SimpleEvent1 generate() {
new SimpleEvent1();
}
};
static Operation1 simpleEvent2Gen = new Operation1<SimpleEvent, Long>() {
@Override
public SimpleEvent2 generate(Long param) {
new SimpleEvent2(param);
}
};
public static SimulationScenario simpleSimulation() {
SimulationScenario scen = new SimulationScenario() {
{
StochasticProcess p1 = new StochasticProcess() {
{
eventInterArrivalTime(constant(2000));
raise(500, simpleEventGen, uniform(1000, 2000));
};
StochasticProcess p2 = new StochasticProcess() {
{
eventInterArrivalTime(constant(2000));
raise(500, simpleEventGen, uniform(1000, 2000));
};
p1.start();
p2.startAfterTerminationOf(1000, p1);
terminateAfterTerminationOf(2000, p2);
}
};
}
Events that can be generated by stochastic processes and interpreted by the simulator are:
- SetupEvent
- StartNodeEvent
- KillNodeEvent
- TerminateExperiment
You can read more about Operations, Distributions and StochasticProcesses in Simulation Scenarios Reference.
In order to run the simulation scenario, the following code is required:
public static void main(String[] args) {
long seed = 123;
SimulationScenario.setSeed(seed);
SimulationScenario simpleBootScenario = simpleSimulation();
simpleBootScenario.simulate(LauncherComp.class);
}
In the above code, we set the simulation scenario seed (more about this seed in later sections), we construct the simulation scenario and we give it to the se.sics.kompics.simulator.run.LauncherComp
to execute it.
PingPong Simulation¶
Starting from the Distributed PingPong, in this section we will see how we can run the pinger/ponger configuration as a simulation. As stated in the previous section, do not forget to include the simulator dependency in your pom file.
First change we want to do in order to be able to use the deploy code in simulation (with minimal changes), is use a Parent
and Host
division. The Parent
will typically connect all subcomponents of the system between themselves and to a Network
and Timer
port.
In deployment, the Host
will create the Timer
and Network
instances and connect them to the Parent
, while in simulation, the Simulator will provide the Timer
and Network
and will connect them to the Parent
.
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Pinger;
import se.sics.test.TAddress;
public class PingerParent extends ComponentDefinition {
Positive<Network> network = requires(Network.class);
Positive<Timer> timer = requires(Timer.class);
public PingerParent(Init init) {
//create and connect all components except timer and network
Component pinger = create(Pinger.class, new Pinger.Init(init.self, init.ponger));
//connect required internal components to network and timer
connect(pinger.getNegative(Network.class), network, Channel.TWO_WAY);
connect(pinger.getNegative(Timer.class), timer, Channel.TWO_WAY);
}
public static class Init extends se.sics.kompics.Init<PingerParent> {
public final TAddress self;
public final TAddress ponger;
public Init(TAddress self, TAddress ponger) {
this.self = self;
this.ponger = ponger;
}
}
}
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;
public class PingerHost extends ComponentDefinition {
public PingerHost(Init init) {
Component network = create(NettyNetwork.class, new NettyInit(init.self));
Component timer = create(JavaTimer.class, Init.NONE);
Component pingerParent = create(PingerParent.class, new PingerParent.Init(init.self, init.ponger));
connect(pingerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
connect(pingerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
}
public static class Init extends se.sics.kompics.Init<PingerHost> {
public final TAddress self;
public final TAddress ponger;
public Init(TAddress self, TAddress ponger) {
this.self = self;
this.ponger = ponger;
}
}
}
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Ponger;
import se.sics.test.TAddress;
public class PongerParent extends ComponentDefinition {
Positive<Network> network = requires(Network.class);
Positive<Timer> timer = requires(Timer.class);
public PongerParent(Init init) {
//create and connect all components except timer and network
Component ponger = create(Ponger.class, new Ponger.Init(init.self));
//connect required internal components to network and timer
connect(ponger.getNegative(Network.class), network, Channel.TWO_WAY);
}
public static class Init extends se.sics.kompics.Init<PongerParent> {
public final TAddress self;
public Init(TAddress self) {
this.self = self;
}
}
}
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;
public class PongerHost extends ComponentDefinition {
public PongerHost(Init init) {
Component network = create(NettyNetwork.class, new NettyInit(init.self));
Component timer = create(JavaTimer.class, Init.NONE);
Component pongerParent = create(PongerParent.class, new PongerParent.Init(init.self));
connect(pongerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
connect(pongerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
}
public static class Init extends se.sics.kompics.Init<PongerHost> {
public final TAddress self;
public Init(TAddress self) {
this.self = self;
}
}
}
In the simulation scenario we define two types of StartNodeEvent
events for Ponger and Pinger. The methods required to be overridden are very simple as they are getter methods for the Parent
ComponentDefinition
, Init
and the node Address
.
The scenario is set to start 5 ponger nodes and 5 pinger nodes. The sequential distributions will give IPs ending in: \(x=\{1,2,3,4,5\}\) to the pongers and IPs ending in: \(y=\{6,7,8,9,10\}\) to pingers and the <x,y> relation is: {<1,6>, <2, 7>, <3,8>, <4,9>, <5,10>} (or \(y = x+5\)).
package se.sics.test.sim;
import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.Init;
import se.sics.kompics.network.Address;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.adaptor.Operation1;
import se.sics.kompics.simulator.adaptor.Operation2;
import se.sics.kompics.simulator.adaptor.distributions.extra.BasicIntSequentialDistribution;
import se.sics.kompics.simulator.events.system.StartNodeEvent;
import se.sics.test.TAddress;
import se.sics.test.system.PingerParent;
import se.sics.test.system.PongerParent;
public class ScenarioGen {
static Operation1 startPongerOp = new Operation1<StartNodeEvent, Integer>() {
@Override
public StartNodeEvent generate(final Integer self) {
return new StartNodeEvent() {
TAddress selfAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return PongerParent.class;
}
@Override
public Init getComponentInit() {
return new PongerParent.Init(selfAdr);
}
@Override
public String toString() {
return "StartPonger<" + selfAdr.toString() + ">";
}
};
}
};
static Operation2 startPingerOp = new Operation2<StartNodeEvent, Integer, Integer>() {
@Override
public StartNodeEvent generate(final Integer self, final Integer ponger) {
return new StartNodeEvent() {
TAddress selfAdr;
TAddress pongerAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
pongerAdr = new TAddress(InetAddress.getByName("192.193.0." + ponger), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return PingerParent.class;
}
@Override
public Init getComponentInit() {
return new PingerParent.Init(selfAdr, pongerAdr);
}
@Override
public String toString() {
return "StartPinger<" + selfAdr.toString() + ">";
}
};
}
};
public static SimulationScenario simplePing() {
SimulationScenario scen = new SimulationScenario() {
{
SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
}
};
SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
}
};
ponger.start();
pinger.startAfterTerminationOf(1000, ponger);
terminateAfterTerminationOf(10000, pinger);
}
};
return scen;
}
}
package se.sics.test.sim;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.run.LauncherComp;
public class ScenarioLauncher {
public static void main(String[] args) {
long seed = 123;
SimulationScenario.setSeed(seed);
SimulationScenario simpleBootScenario = ScenarioGen.simplePing();
simpleBootScenario.simulate(LauncherComp.class);
}
}
Note
In some cases, you might start the simulation and the only output you get are some warnings from log4j, including:
log4j:WARN No appenders could be found for logger (CodeInstrumentation).
log4j:WARN Please initialize the log4j system properly.
In the case you should check your log4j.properties files as it might be missing or miss-configured.
Remember to set the following in your log4j.properties files, so that the logger output is manageable and related to your logs only.
log4j.logger.Kompics=WARN
log4j.logger.se.sics.kompics.simulator.core.impl.P2pSimulator=WARN
log4j.logger.se.sics.kompics.simulator.core.impl.SimulatorMngrComp=WARN
log4j.logger.CodeInstrumentation=WARN
When debugging, if the source of error is not clear, you might want to turn the loggers above to TRACE. The order should be:
- SimulatorMngrComp - to see if the stochastic process generated events order is as expected.
- P2pSimulator - to see if network messages and timeouts occur as expected
- Kompics - mostly Component control events
- CodeInstrumentation - only if you declared instrumentation exceptions(advanced).
The code until here can be downloaded here
.
Configuration¶
We will now change the code to Cleanup: Config files, ClassMatchers, and Assembly version and try to run it in simulation. The configuration now contains two types of parameters:
- node-specific parameters (like addresses)
- system-parameters (like timeout) that do not change with each node.
In simulation we will have one reference.conf
config file with system-parameters, and we will tell the simulator to add the node-specific parameters.
Thus the configuration file now contains only the timeout.
pingpong.pinger.timeout = 1000
We now want to tell the simulator to add the self and ponger addresses to the config of each individual node. For this we will need to override the se.sics.kompics.simulator.events.system.StartNodeEvent.initConfigUpdate()
method. The default implementation of this method returns an empty Map which corresponds to no change to the config. The returned Map is of type <config-key, config-value>. We can add the addresses to the config in two ways. We can add the components of the address:
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.self.host", selfAdr.getIp().getHostName());
config.put("pingpong.self.port", selfAdr.getPort());
config.put("pingpong.pinger.pongeraddr.host", pongerAdr.getIp().getHostName());
config.put("pingpong.pinger.pongeraddr.port", pongerAdr.getPort);
return config;
}
Or we can add the TAddress
object into the config directly:
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.self", selfAdr);
config.put("pingpong.pinger.pongeraddr", pongerAdr);
return config;
}
In this simulation scenario we have overridden the config method.
package se.sics.test.sim;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import se.sics.kompics.Init;
import se.sics.kompics.network.Address;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.adaptor.Operation1;
import se.sics.kompics.simulator.adaptor.Operation2;
import se.sics.kompics.simulator.adaptor.distributions.extra.BasicIntSequentialDistribution;
import se.sics.kompics.simulator.events.system.StartNodeEvent;
import se.sics.test.TAddress;
import se.sics.test.system.PingerParent;
import se.sics.test.system.PongerParent;
public class ScenarioGen {
static Operation1 startPongerOp = new Operation1<StartNodeEvent, Integer>() {
@Override
public StartNodeEvent generate(final Integer self) {
return new StartNodeEvent() {
TAddress selfAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.self", selfAdr);
return config;
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return PongerParent.class;
}
@Override
public Init getComponentInit() {
return Init.NONE;
}
@Override
public String toString() {
return "StartPonger<" + selfAdr.toString() + ">";
}
};
}
};
static Operation2 startPingerOp = new Operation2<StartNodeEvent, Integer, Integer>() {
@Override
public StartNodeEvent generate(final Integer self, final Integer ponger) {
return new StartNodeEvent() {
TAddress selfAdr;
TAddress pongerAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
pongerAdr = new TAddress(InetAddress.getByName("192.193.0." + ponger), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.self", selfAdr);
config.put("pingpong.pinger.pongeraddr", pongerAdr);
return config;
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return PingerParent.class;
}
@Override
public Init getComponentInit() {
return Init.NONE;
}
@Override
public String toString() {
return "StartPinger<" + selfAdr.toString() + ">";
}
};
}
};
public static SimulationScenario simplePing() {
SimulationScenario scen = new SimulationScenario() {
{
SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
}
};
SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
}
};
ponger.start();
pinger.startAfterTerminationOf(1000, ponger);
terminateAfterTerminationOf(10000, pinger);
}
};
return scen;
}
}
The Host
and Parent
are also changed to use the config.
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Pinger;
import se.sics.test.TAddress;
public class PingerParent extends ComponentDefinition {
Positive<Network> network = requires(Network.class);
Positive<Timer> timer = requires(Timer.class);
public PingerParent() {
//create and connect all components except timer and network
Class<? extends ComponentDefinition> p = Pinger.class;
Component pinger = create(p, Init.NONE);
//connect required internal components to network and timer
connect(pinger.getNegative(Network.class), network, Channel.TWO_WAY);
connect(pinger.getNegative(Timer.class), timer, Channel.TWO_WAY);
}
}
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;
public class PingerHost extends ComponentDefinition {
public PingerHost() {
TAddress self = config().getValue("pingpong.self", TAddress.class);
Component network = create(NettyNetwork.class, new NettyInit(self));
Component timer = create(JavaTimer.class, Init.NONE);
Component pingerParent = create(PingerParent.class, Init.NONE);
connect(pingerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
connect(pingerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
}
}
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Ponger;
import se.sics.test.TAddress;
public class PongerParent extends ComponentDefinition {
Positive<Network> network = requires(Network.class);
Positive<Timer> timer = requires(Timer.class);
public PongerParent() {
//create and connect all components except timer and network
Component ponger = create(Ponger.class, Init.NONE);
//connect required internal components to network and timer
connect(ponger.getNegative(Network.class), network, Channel.TWO_WAY);
}
}
package se.sics.test.system;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;
public class PongerHost extends ComponentDefinition {
public PongerHost(Init init) {
TAddress self = config().getValue("pingpong.self", TAddress.class);
Component network = create(NettyNetwork.class, new NettyInit(self));
Component timer = create(JavaTimer.class, Init.NONE);
Component pongerParent = create(PongerParent.class, Init.NONE);
connect(pongerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
connect(pongerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
}
}
The code until here can be downloaded here
.
Global View¶
Sometimes you might want to observe some state and do something special (like stop the simulation) in case the state matches a special case that you have considered. With this in mind the simulator offers a se.sics.kompics.simulator.util.GlobalView
. The GlobalView
allows you to do three things:
- check which nodes are dead or alive
- set/get <key,values> shared globally
- tell the simulator to terminate this simulation
You can access the GlobalView
from your config, using the key “simulation.globalview”:
GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
The simulation scenarios before terminated at a specific time:
terminateAfterTerminationOf(10000, pinger);
This time we want to check for certain conditions and terminate when one such condition occurs. In this section we want to terminate the current simulation when at least 100 pongs have been received by the pingers or when at least 3 nodes have died.
We write an observer that will check periodically on the global state and verify if any conditions have been met.
package se.sics.test.sim;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.simulator.util.GlobalView;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
public class SimulationObserver extends ComponentDefinition {
private static final Logger LOG = LoggerFactory.getLogger(SimulationObserver.class);
Positive<Timer> timer = requires(Timer.class);
Positive<Network> network = requires(Network.class);
private final int minPings;
private final int minDeadNodes;
private UUID timerId;
public SimulationObserver(Init init) {
minPings = init.minPings;
minDeadNodes = init.minDeadNodes;
subscribe(handleStart, control);
subscribe(handleCheck, timer);
}
Handler<Start> handleStart = new Handler<Start>() {
@Override
public void handle(Start event) {
schedulePeriodicCheck();
}
};
@Override
public void tearDown() {
trigger(new CancelPeriodicTimeout(timerId), timer);
}
Handler<CheckTimeout> handleCheck = new Handler<CheckTimeout>() {
@Override
public void handle(CheckTimeout event) {
GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
if(gv.getValue("simulation.pongs", Integer.class) > minPings) {
LOG.info("Terminating simulation as the minimum pings:{} is achieved", minPings);
gv.terminate();
}
if(gv.getDeadNodes().size() > minDeadNodes) {
LOG.info("Terminating simulation as the min dead nodes:{} is achieved", minDeadNodes);
gv.terminate();
}
}
};
public static class Init extends se.sics.kompics.Init<SimulationObserver> {
public final int minPings;
public final int minDeadNodes;
public Init(int minPings, int minDeadNodes) {
this.minPings = minPings;
this.minDeadNodes = minDeadNodes;
}
}
private void schedulePeriodicCheck() {
long period = config().getValue("pingpong.simulation.checktimeout", Long.class);
SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(period, period);
CheckTimeout timeout = new CheckTimeout(spt);
spt.setTimeoutEvent(timeout);
trigger(spt, timer);
timerId = timeout.getTimeoutId();
}
public static class CheckTimeout extends Timeout {
public CheckTimeout(SchedulePeriodicTimeout spt) {
super(spt);
}
}
}
We also add a small bit of code to the Pinger
to record their pongs.
ClassMatchedHandler<Pong, TMessage> pongHandler = new ClassMatchedHandler<Pong, TMessage>() {
@Override
public void handle(Pong content, TMessage context) {
counter++;
LOG.info("{} Got Pong #{}! from:{}", new Object[]{self, counter, context.header.src});
ponged();
}
};
private void ponged() {
GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
gv.setValue("simulation.pongs", gv.getValue("simulation.pongs", Integer.class) + 1);
}
Since we are using a custom <key, value> from the GlobalView
, we might want to initialize this value before. We can do this within the se.sics.kompics.simulator.events.system.SetupEvent
, by overriding the setupGlobalView
method:
static Operation setupOp = new Operation<SetupEvent>() {
@Override
public SetupEvent generate() {
return new SetupEvent() {
@Override
public void setupGlobalView(GlobalView gv) {
gv.setValue("simulation.pongs", 0);
}
};
}
};
We also want to be able to kill nodes, specifically pongers, so we write a se.sics.kompics.simulator.events.system.KillNodeEvent
, which requires overriding only the getNodeAddress
method to identify the node we want to kill:
static Operation1 killPongerOp = new Operation1<KillNodeEvent, Integer>() {
@Override
public KillNodeEvent generate(final Integer self) {
return new KillNodeEvent() {
TAddress selfAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public String toString() {
return "KillPonger<" + selfAdr.toString() + ">";
}
};
}
};
We modify now the old scenario simplePing
, that starts 5 pongers and 5 pingers and we expect the SimulationObserver
to terminate it early, when at least 100 pongs have been received. In case our observer’s termination conditions are not met due to bugs, the simulation might not stop and run forever, so we still want to keep the scenario termination time (a very high one) as a safety net. In this case we added a 10.000s termination time, but the SimulationObserver
should terminate the simulation within a couple of tens of seconds of simulated time.
The second scenario killPongers
will start killing the pongers, which the observer should notice and then stop the simulation. In this case both conditions – number of pings and number of dead nodes – can be met, but for the given code (seed, timing conditions) the number of dead nodes will be met first.
package se.sics.test.sim;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.network.Address;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.adaptor.Operation;
import se.sics.kompics.simulator.adaptor.Operation1;
import se.sics.kompics.simulator.adaptor.Operation2;
import se.sics.kompics.simulator.adaptor.distributions.extra.BasicIntSequentialDistribution;
import se.sics.kompics.simulator.events.system.SetupEvent;
import se.sics.kompics.simulator.events.system.KillNodeEvent;
import se.sics.kompics.simulator.events.system.StartNodeEvent;
import se.sics.kompics.simulator.util.GlobalView;
import se.sics.test.TAddress;
import se.sics.test.system.PingerParent;
import se.sics.test.system.PongerParent;
public class ScenarioGen {
static Operation setupOp = new Operation<SetupEvent>() {
@Override
public SetupEvent generate() {
return new SetupEvent() {
@Override
public void setupGlobalView(GlobalView gv) {
gv.setValue("simulation.pongs", 0);
}
};
}
};
static Operation startObserverOp = new Operation<StartNodeEvent>() {
@Override
public StartNodeEvent generate() {
return new StartNodeEvent() {
TAddress selfAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("0.0.0.0"), 0);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.simulation.checktimeout", 2000);
return config;
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return SimulationObserver.class;
}
@Override
public Init getComponentInit() {
return new SimulationObserver.Init(100, 2);
}
};
}
};
static Operation1 startPongerOp = new Operation1<StartNodeEvent, Integer>() {
@Override
public StartNodeEvent generate(final Integer self) {
return new StartNodeEvent() {
TAddress selfAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.self", selfAdr);
return config;
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return PongerParent.class;
}
@Override
public Init getComponentInit() {
return Init.NONE;
}
@Override
public String toString() {
return "StartPonger<" + selfAdr.toString() + ">";
}
};
}
};
static Operation1 killPongerOp = new Operation1<KillNodeEvent, Integer>() {
@Override
public KillNodeEvent generate(final Integer self) {
return new KillNodeEvent() {
TAddress selfAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public String toString() {
return "KillPonger<" + selfAdr.toString() + ">";
}
};
}
};
static Operation2 startPingerOp = new Operation2<StartNodeEvent, Integer, Integer>() {
@Override
public StartNodeEvent generate(final Integer self, final Integer ponger) {
return new StartNodeEvent() {
TAddress selfAdr;
TAddress pongerAdr;
{
try {
selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
pongerAdr = new TAddress(InetAddress.getByName("192.193.0." + ponger), 10000);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Map<String, Object> initConfigUpdate() {
HashMap<String, Object> config = new HashMap<>();
config.put("pingpong.self", selfAdr);
config.put("pingpong.pinger.pongeraddr", pongerAdr);
return config;
}
@Override
public Address getNodeAddress() {
return selfAdr;
}
@Override
public Class getComponentDefinition() {
return PingerParent.class;
}
@Override
public Init getComponentInit() {
return Init.NONE;
}
@Override
public String toString() {
return "StartPinger<" + selfAdr.toString() + ">";
}
};
}
};
public static SimulationScenario simplePing() {
SimulationScenario scen = new SimulationScenario() {
{
SimulationScenario.StochasticProcess setup = new SimulationScenario.StochasticProcess() {
{
raise(1, setupOp);
}
};
SimulationScenario.StochasticProcess observer = new SimulationScenario.StochasticProcess() {
{
raise(1, startObserverOp);
}
};
SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
}
};
SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
}
};
setup.start();
observer.startAfterTerminationOf(0, setup);
ponger.startAfterTerminationOf(1000, observer);
pinger.startAfterTerminationOf(1000, ponger);
terminateAfterTerminationOf(1000*10000, pinger);
}
};
return scen;
}
public static SimulationScenario killPongers() {
SimulationScenario scen = new SimulationScenario() {
{
SimulationScenario.StochasticProcess setup = new SimulationScenario.StochasticProcess() {
{
raise(1, setupOp);
}
};
SimulationScenario.StochasticProcess observer = new SimulationScenario.StochasticProcess() {
{
raise(1, startObserverOp);
}
};
SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
}
};
SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(1000));
raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
}
};
SimulationScenario.StochasticProcess killPonger = new SimulationScenario.StochasticProcess() {
{
eventInterArrivalTime(constant(0));
raise(5, killPongerOp, new BasicIntSequentialDistribution((1)));
}
};
setup.start();
observer.startAfterTerminationOf(0, setup);
ponger.startAfterTerminationOf(1000, observer);
pinger.startAfterTerminationOf(1000, ponger);
killPonger.startAfterTerminationOf(1000, pinger);
terminateAfterTerminationOf(1000*10000, pinger);
}
};
return scen;
}
}
package se.sics.test.sim;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.run.LauncherComp;
public class SimplePingLauncher {
public static void main(String[] args) {
long seed = 123;
SimulationScenario.setSeed(seed);
SimulationScenario simpleBootScenario = ScenarioGen.simplePing();
simpleBootScenario.simulate(LauncherComp.class);
}
}
package se.sics.test.sim;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.run.LauncherComp;
public class KillPongersLauncher {
public static void main(String[] args) {
long seed = 123;
SimulationScenario.setSeed(seed);
SimulationScenario killPongersScenario = ScenarioGen.killPongers();
killPongersScenario.simulate(LauncherComp.class);
}
}
The code until here can be downloaded here
.