Introduction to Simulation

First, we have to include the simulator dependency in our pom file. The simulator version is the same as kompics:

<dependency>
        <groupId>se.sics.kompics.simulator</groupId>
        <artifactId>core</artifactId>
        <version>${kompics.version}</version>
</dependency>

Next we write the simulation scenario. A scenario is a parallel and/or sequential composition of stochastic processes. We call a stochastic process, a finite random sequence of events, with a specified distribution of inter-arrival times. In other words, stochastic processes define series of events that occur in simulation.

The following snippet of code creates one stochastic process that will generate 1000 events of type SimpleEvent with an inter-arrival time of 2000ms(2s)(simulated time).

static Operation simpleEvent1Gen = new Operation<SimpleEvent>() {

        @Override
        public SimpleEvent1 generate() {
                new SimpleEvent1();
        }
};
...
StochasticProcess p1 = new StochasticProcess() {
        {
                eventInterArrivalTime(constant(2000));
                raise(1000, simpleEventGen);
        }
};

However, in the example above, all events are the same and there is no variation to the 1000 events generated. We can generate events that differ slightly, by having the event take an integer parameter and generating this parameter randomly.

static Operation1 simpleEvent2Gen = new Operation1<SimpleEvent, Long>() {

        @Override
        public SimpleEvent2 generate(Long param) {
                new SimpleEvent2(param);
        }
};
...
StochasticProcess p2 = new StochasticProcess() {
        {
                eventInterArrivalTime(constant(2000));
                raise(500, simpleEventGen, uniform(1000, 2000));
        }
};

The new events generated have a parameter that takes uniform long random values between 1000 and 2000. Both the inter-arrival time and the event parameters are defined as se.sics.kompics.simulator.adaptor.distributions.Distribution, and the constant and uniform are just helper methods to generate these distributions.

There are several types of operations Operation(_,1,2,…,5) that can be interpreted by the simulator and they differ in the number of parameters they take to customize the generated events. The parameters are given as distributions. There are a number of distributions provided by the simulator, or you can write your own distribution by extending se.sics.kompics.simulator.adaptor.distributions.Distribution.

In order to start the stochastic processes and to define the iterative/parallel behaviour, the se.sics.kompics.simulator.SimulationScenario.StochasticProcess has a number of start/terminate methods.

If we want to start the two stochastic process above, a possible example would be:

p1.start();
p2.startAfterTerminationOf(1000, p1);
terminateAfterTerminationOf(2000, p2);

The above example would start the stochastic process p1 at the beginning of the simulation. After all 1000 events defined in p1 are generated, the simulation waits another 1000ms of simulated time and starts the second stochastic process p2. After generating all 500 p2 defined events and waiting another 2000ms of simulated time, the simulation will end.

The stochastic processes created and their order will be defined within the SimulationScenario:

static Operation simpleEvent1Gen = new Operation<SimpleEvent>() {

        @Override
        public SimpleEvent1 generate() {
                new SimpleEvent1();
        }
};

static Operation1 simpleEvent2Gen = new Operation1<SimpleEvent, Long>() {

        @Override
        public SimpleEvent2 generate(Long param) {
                new SimpleEvent2(param);
        }
};

public static SimulationScenario simpleSimulation() {
        SimulationScenario scen = new SimulationScenario() {
                {
                        StochasticProcess p1 = new StochasticProcess() {
                        {
                                eventInterArrivalTime(constant(2000));
                                raise(500, simpleEventGen, uniform(1000, 2000));
                        };

                        StochasticProcess p2 = new StochasticProcess() {
                        {
                                eventInterArrivalTime(constant(2000));
                                raise(500, simpleEventGen, uniform(1000, 2000));
                        };

                        p1.start();
                        p2.startAfterTerminationOf(1000, p1);
                        terminateAfterTerminationOf(2000, p2);
                }
        };
}

Events that can be generated by stochastic processes and interpreted by the simulator are:

  1. SetupEvent
  2. StartNodeEvent
  3. KillNodeEvent
  4. TerminateExperiment

You can read more about Operations, Distributions and StochasticProcesses in Simulation Scenarios Reference.

In order to run the simulation scenario, the following code is required:

public static void main(String[] args) {
        long seed = 123;
        SimulationScenario.setSeed(seed);
        SimulationScenario simpleBootScenario = simpleSimulation();
        simpleBootScenario.simulate(LauncherComp.class);
}

In the above code, we set the simulation scenario seed (more about this seed in later sections), we construct the simulation scenario and we give it to the se.sics.kompics.simulator.run.LauncherComp to execute it.

PingPong Simulation

Starting from the Distributed PingPong, in this section we will see how we can run the pinger/ponger configuration as a simulation. As stated in the previous section, do not forget to include the simulator dependency in your pom file.

First change we want to do in order to be able to use the deploy code in simulation (with minimal changes), is use a Parent and Host division. The Parent will typically connect all subcomponents of the system between themselves and to a Network and Timer port.

In deployment, the Host will create the Timer and Network instances and connect them to the Parent, while in simulation, the Simulator will provide the Timer and Network and will connect them to the Parent.

package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Pinger;
import se.sics.test.TAddress;

public class PingerParent extends ComponentDefinition {

    Positive<Network> network = requires(Network.class);
    Positive<Timer> timer = requires(Timer.class);

    public PingerParent(Init init) {
        //create and connect all components except timer and network
        Component pinger = create(Pinger.class, new Pinger.Init(init.self, init.ponger));

        //connect required internal components to network and timer
        connect(pinger.getNegative(Network.class), network, Channel.TWO_WAY);
        connect(pinger.getNegative(Timer.class), timer, Channel.TWO_WAY);
    }

    public static class Init extends se.sics.kompics.Init<PingerParent> {

        public final TAddress self;
        public final TAddress ponger;

        public Init(TAddress self, TAddress ponger) {
            this.self = self;
            this.ponger = ponger;
        }
    }
}
package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;

public class PingerHost extends ComponentDefinition {
    public PingerHost(Init init) {
        Component network = create(NettyNetwork.class, new NettyInit(init.self));
        Component timer = create(JavaTimer.class, Init.NONE);
        Component pingerParent = create(PingerParent.class, new PingerParent.Init(init.self, init.ponger));
        
        connect(pingerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
        connect(pingerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
    }
    
    public static class Init extends se.sics.kompics.Init<PingerHost> {

        public final TAddress self;
        public final TAddress ponger;

        public Init(TAddress self, TAddress ponger) {
            this.self = self;
            this.ponger = ponger;
        }
    }
}
package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Ponger;
import se.sics.test.TAddress;

public class PongerParent extends ComponentDefinition {

    Positive<Network> network = requires(Network.class);
    Positive<Timer> timer = requires(Timer.class);
    
    public PongerParent(Init init) {
        //create and connect all components except timer and network
        Component ponger = create(Ponger.class, new Ponger.Init(init.self));

        //connect required internal components to network and timer
        connect(ponger.getNegative(Network.class), network, Channel.TWO_WAY);
    }

    public static class Init extends se.sics.kompics.Init<PongerParent> {

        public final TAddress self;

        public Init(TAddress self) {
            this.self = self;
        }
    }
}
package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;

public class PongerHost extends ComponentDefinition {
    
    public PongerHost(Init init) {
        Component network = create(NettyNetwork.class, new NettyInit(init.self));
        Component timer = create(JavaTimer.class, Init.NONE);
        Component pongerParent = create(PongerParent.class, new PongerParent.Init(init.self));
        
        connect(pongerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
        connect(pongerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
    }
    
    public static class Init extends se.sics.kompics.Init<PongerHost> {

        public final TAddress self;

        public Init(TAddress self) {
            this.self = self;
        }
    }
}

In the simulation scenario we define two types of StartNodeEvent events for Ponger and Pinger. The methods required to be overridden are very simple as they are getter methods for the Parent ComponentDefinition, Init and the node Address. The scenario is set to start 5 ponger nodes and 5 pinger nodes. The sequential distributions will give IPs ending in: \(x=\{1,2,3,4,5\}\) to the pongers and IPs ending in: \(y=\{6,7,8,9,10\}\) to pingers and the <x,y> relation is: {<1,6>, <2, 7>, <3,8>, <4,9>, <5,10>} (or \(y = x+5\)).

package se.sics.test.sim;

import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.Init;
import se.sics.kompics.network.Address;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.adaptor.Operation1;
import se.sics.kompics.simulator.adaptor.Operation2;
import se.sics.kompics.simulator.adaptor.distributions.extra.BasicIntSequentialDistribution;
import se.sics.kompics.simulator.events.system.StartNodeEvent;
import se.sics.test.TAddress;
import se.sics.test.system.PingerParent;
import se.sics.test.system.PongerParent;

public class ScenarioGen {

    static Operation1 startPongerOp = new Operation1<StartNodeEvent, Integer>() {

        @Override
        public StartNodeEvent generate(final Integer self) {
            return new StartNodeEvent() {
                TAddress selfAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return PongerParent.class;
                }

                @Override
                public Init getComponentInit() {
                    return new PongerParent.Init(selfAdr);
                }
                
                @Override
                public String toString() {
                    return "StartPonger<" + selfAdr.toString() + ">";
                }
            };
        }
    };
    
    static Operation2 startPingerOp = new Operation2<StartNodeEvent, Integer, Integer>() {

        @Override
        public StartNodeEvent generate(final Integer self, final Integer ponger) {
            return new StartNodeEvent() {
                TAddress selfAdr;
                TAddress pongerAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                        pongerAdr = new TAddress(InetAddress.getByName("192.193.0." + ponger), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return PingerParent.class;
                }

                @Override
                public Init getComponentInit() {
                    return new PingerParent.Init(selfAdr, pongerAdr);
                }
                
                @Override
                public String toString() {
                    return "StartPinger<" + selfAdr.toString() + ">";
                }
            };
        }
    };

    public static SimulationScenario simplePing() {
        SimulationScenario scen = new SimulationScenario() {
            {
                SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
                    }
                };
                
                SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
                    }
                };

                ponger.start();
                pinger.startAfterTerminationOf(1000, ponger);
                terminateAfterTerminationOf(10000, pinger);
            }
        };

        return scen;
    }
}
package se.sics.test.sim;

import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.run.LauncherComp;

public class ScenarioLauncher {
    public static void main(String[] args) {
        long seed = 123;
        SimulationScenario.setSeed(seed);
        SimulationScenario simpleBootScenario = ScenarioGen.simplePing();
        simpleBootScenario.simulate(LauncherComp.class);
    }
}

Note

In some cases, you might start the simulation and the only output you get are some warnings from log4j, including:

log4j:WARN No appenders could be found for logger (CodeInstrumentation).
log4j:WARN Please initialize the log4j system properly.

In the case you should check your log4j.properties files as it might be missing or miss-configured.

Remember to set the following in your log4j.properties files, so that the logger output is manageable and related to your logs only.

log4j.logger.Kompics=WARN
log4j.logger.se.sics.kompics.simulator.core.impl.P2pSimulator=WARN
log4j.logger.se.sics.kompics.simulator.core.impl.SimulatorMngrComp=WARN
log4j.logger.CodeInstrumentation=WARN

When debugging, if the source of error is not clear, you might want to turn the loggers above to TRACE. The order should be:

  1. SimulatorMngrComp - to see if the stochastic process generated events order is as expected.
  2. P2pSimulator - to see if network messages and timeouts occur as expected
  3. Kompics - mostly Component control events
  4. CodeInstrumentation - only if you declared instrumentation exceptions(advanced).

The code until here can be downloaded here.

Configuration

We will now change the code to Cleanup: Config files, ClassMatchers, and Assembly version and try to run it in simulation. The configuration now contains two types of parameters:

  1. node-specific parameters (like addresses)
  2. system-parameters (like timeout) that do not change with each node.

In simulation we will have one reference.conf config file with system-parameters, and we will tell the simulator to add the node-specific parameters.

Thus the configuration file now contains only the timeout.

pingpong.pinger.timeout = 1000

We now want to tell the simulator to add the self and ponger addresses to the config of each individual node. For this we will need to override the se.sics.kompics.simulator.events.system.StartNodeEvent.initConfigUpdate() method. The default implementation of this method returns an empty Map which corresponds to no change to the config. The returned Map is of type <config-key, config-value>. We can add the addresses to the config in two ways. We can add the components of the address:

@Override
public Map<String, Object> initConfigUpdate() {
        HashMap<String, Object> config = new HashMap<>();
        config.put("pingpong.self.host", selfAdr.getIp().getHostName());
        config.put("pingpong.self.port", selfAdr.getPort());
        config.put("pingpong.pinger.pongeraddr.host", pongerAdr.getIp().getHostName());
        config.put("pingpong.pinger.pongeraddr.port", pongerAdr.getPort);
        return config;
}

Or we can add the TAddress object into the config directly:

@Override
public Map<String, Object> initConfigUpdate() {
        HashMap<String, Object> config = new HashMap<>();
        config.put("pingpong.self", selfAdr);
        config.put("pingpong.pinger.pongeraddr", pongerAdr);
        return config;
}

In this simulation scenario we have overridden the config method.

package se.sics.test.sim;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import se.sics.kompics.Init;
import se.sics.kompics.network.Address;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.adaptor.Operation1;
import se.sics.kompics.simulator.adaptor.Operation2;
import se.sics.kompics.simulator.adaptor.distributions.extra.BasicIntSequentialDistribution;
import se.sics.kompics.simulator.events.system.StartNodeEvent;
import se.sics.test.TAddress;
import se.sics.test.system.PingerParent;
import se.sics.test.system.PongerParent;

public class ScenarioGen {

    static Operation1 startPongerOp = new Operation1<StartNodeEvent, Integer>() {

        @Override
        public StartNodeEvent generate(final Integer self) {
            return new StartNodeEvent() {
                TAddress selfAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Map<String, Object> initConfigUpdate() {
                    HashMap<String, Object> config = new HashMap<>();
                    config.put("pingpong.self", selfAdr);
                    return config;
                }

                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return PongerParent.class;
                }

                @Override
                public Init getComponentInit() {
                    return Init.NONE;
                }

                @Override
                public String toString() {
                    return "StartPonger<" + selfAdr.toString() + ">";
                }
            };
        }
    };

    static Operation2 startPingerOp = new Operation2<StartNodeEvent, Integer, Integer>() {

        @Override
        public StartNodeEvent generate(final Integer self, final Integer ponger) {
            return new StartNodeEvent() {
                TAddress selfAdr;
                TAddress pongerAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                        pongerAdr = new TAddress(InetAddress.getByName("192.193.0." + ponger), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                
                @Override
                public Map<String, Object> initConfigUpdate() {
                    HashMap<String, Object> config = new HashMap<>();
                    config.put("pingpong.self", selfAdr);
                    config.put("pingpong.pinger.pongeraddr", pongerAdr);
                    return config;
                }

                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return PingerParent.class;
                }

                @Override
                public Init getComponentInit() {
                    return Init.NONE;
                }

                @Override
                public String toString() {
                    return "StartPinger<" + selfAdr.toString() + ">";
                }
            };
        }
    };

    public static SimulationScenario simplePing() {
        SimulationScenario scen = new SimulationScenario() {
            {
                SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
                    }
                };

                SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
                    }
                };

                ponger.start();
                pinger.startAfterTerminationOf(1000, ponger);
                terminateAfterTerminationOf(10000, pinger);
            }
        };

        return scen;
    }
}

The Host and Parent are also changed to use the config.

package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Pinger;
import se.sics.test.TAddress;

public class PingerParent extends ComponentDefinition {

    Positive<Network> network = requires(Network.class);
    Positive<Timer> timer = requires(Timer.class);

    public PingerParent() {
        //create and connect all components except timer and network
        Class<? extends ComponentDefinition> p = Pinger.class;
        Component pinger = create(p, Init.NONE);

        //connect required internal components to network and timer
        connect(pinger.getNegative(Network.class), network, Channel.TWO_WAY);
        connect(pinger.getNegative(Timer.class), timer, Channel.TWO_WAY);
    }
}
package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;

public class PingerHost extends ComponentDefinition {
    public PingerHost() {
        TAddress self = config().getValue("pingpong.self", TAddress.class);
        
        Component network = create(NettyNetwork.class, new NettyInit(self));
        Component timer = create(JavaTimer.class, Init.NONE);
        Component pingerParent = create(PingerParent.class, Init.NONE);
        
        connect(pingerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
        connect(pingerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
    }
}
package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.test.Ponger;
import se.sics.test.TAddress;

public class PongerParent extends ComponentDefinition {

    Positive<Network> network = requires(Network.class);
    Positive<Timer> timer = requires(Timer.class);
    
    public PongerParent() {
        //create and connect all components except timer and network
        Component ponger = create(Ponger.class, Init.NONE);

        //connect required internal components to network and timer
        connect(ponger.getNegative(Network.class), network, Channel.TWO_WAY);
    }
}
package se.sics.test.system;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.test.TAddress;

public class PongerHost extends ComponentDefinition {
    
    public PongerHost(Init init) {
        TAddress self = config().getValue("pingpong.self", TAddress.class);
        
        Component network = create(NettyNetwork.class, new NettyInit(self));
        Component timer = create(JavaTimer.class, Init.NONE);
        Component pongerParent = create(PongerParent.class, Init.NONE);
        
        connect(pongerParent.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
        connect(pongerParent.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
    }
}

The code until here can be downloaded here.

Global View

Sometimes you might want to observe some state and do something special (like stop the simulation) in case the state matches a special case that you have considered. With this in mind the simulator offers a se.sics.kompics.simulator.util.GlobalView. The GlobalView allows you to do three things:

  1. check which nodes are dead or alive
  2. set/get <key,values> shared globally
  3. tell the simulator to terminate this simulation

You can access the GlobalView from your config, using the key “simulation.globalview”:

GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);

The simulation scenarios before terminated at a specific time:

terminateAfterTerminationOf(10000, pinger);

This time we want to check for certain conditions and terminate when one such condition occurs. In this section we want to terminate the current simulation when at least 100 pongs have been received by the pingers or when at least 3 nodes have died.

We write an observer that will check periodically on the global state and verify if any conditions have been met.

package se.sics.test.sim;

import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.simulator.util.GlobalView;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;

public class SimulationObserver extends ComponentDefinition {

    private static final Logger LOG = LoggerFactory.getLogger(SimulationObserver.class);
    
    Positive<Timer> timer = requires(Timer.class);
    Positive<Network> network = requires(Network.class);

    private final int minPings;
    private final int minDeadNodes;

    private UUID timerId;

    public SimulationObserver(Init init) {
        minPings = init.minPings;
        minDeadNodes = init.minDeadNodes;

        subscribe(handleStart, control);
        subscribe(handleCheck, timer);
    }

    Handler<Start> handleStart = new Handler<Start>() {
        @Override
        public void handle(Start event) {
            schedulePeriodicCheck();
        }
    };

    @Override
    public void tearDown() {
        trigger(new CancelPeriodicTimeout(timerId), timer);
    }

    Handler<CheckTimeout> handleCheck = new Handler<CheckTimeout>() {
        @Override
        public void handle(CheckTimeout event) {
            GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
            
            if(gv.getValue("simulation.pongs", Integer.class) > minPings) {
                LOG.info("Terminating simulation as the minimum pings:{} is achieved", minPings);
                gv.terminate();
            }
            if(gv.getDeadNodes().size() > minDeadNodes) {
                LOG.info("Terminating simulation as the min dead nodes:{} is achieved", minDeadNodes);
                gv.terminate();
            }
        }
    };

    public static class Init extends se.sics.kompics.Init<SimulationObserver> {

        public final int minPings;
        public final int minDeadNodes;

        public Init(int minPings, int minDeadNodes) {
            this.minPings = minPings;
            this.minDeadNodes = minDeadNodes;
        }
    }

    private void schedulePeriodicCheck() {
        long period = config().getValue("pingpong.simulation.checktimeout", Long.class);
        SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(period, period);
        CheckTimeout timeout = new CheckTimeout(spt);
        spt.setTimeoutEvent(timeout);
        trigger(spt, timer);
        timerId = timeout.getTimeoutId();
    }

    public static class CheckTimeout extends Timeout {

        public CheckTimeout(SchedulePeriodicTimeout spt) {
            super(spt);
        }
    }
}

We also add a small bit of code to the Pinger to record their pongs.

ClassMatchedHandler<Pong, TMessage> pongHandler = new ClassMatchedHandler<Pong, TMessage>() {

        @Override
        public void handle(Pong content, TMessage context) {
                counter++;
                LOG.info("{} Got Pong #{}! from:{}", new Object[]{self, counter, context.header.src});
                ponged();
        }
};

private void ponged() {
        GlobalView gv = config().getValue("simulation.globalview", GlobalView.class);
        gv.setValue("simulation.pongs", gv.getValue("simulation.pongs", Integer.class) + 1);
}

Since we are using a custom <key, value> from the GlobalView, we might want to initialize this value before. We can do this within the se.sics.kompics.simulator.events.system.SetupEvent, by overriding the setupGlobalView method:

static Operation setupOp = new Operation<SetupEvent>() {
        @Override
        public SetupEvent generate() {
                return new SetupEvent() {
                        @Override
                        public void setupGlobalView(GlobalView gv) {
                                gv.setValue("simulation.pongs", 0);
                        }
                };
        }
};

We also want to be able to kill nodes, specifically pongers, so we write a se.sics.kompics.simulator.events.system.KillNodeEvent, which requires overriding only the getNodeAddress method to identify the node we want to kill:

static Operation1 killPongerOp = new Operation1<KillNodeEvent, Integer>() {
        @Override
        public KillNodeEvent generate(final Integer self) {
                return new KillNodeEvent() {
                        TAddress selfAdr;

                        {
                                try {
                                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                                } catch (UnknownHostException ex) {
                                        throw new RuntimeException(ex);
                                }
                        }

                        @Override
                        public Address getNodeAddress() {
                                return selfAdr;
                        }

                        @Override
                        public String toString() {
                                return "KillPonger<" + selfAdr.toString() + ">";
                        }
                };
        }
};

We modify now the old scenario simplePing, that starts 5 pongers and 5 pingers and we expect the SimulationObserver to terminate it early, when at least 100 pongs have been received. In case our observer’s termination conditions are not met due to bugs, the simulation might not stop and run forever, so we still want to keep the scenario termination time (a very high one) as a safety net. In this case we added a 10.000s termination time, but the SimulationObserver should terminate the simulation within a couple of tens of seconds of simulated time.

The second scenario killPongers will start killing the pongers, which the observer should notice and then stop the simulation. In this case both conditions – number of pings and number of dead nodes – can be met, but for the given code (seed, timing conditions) the number of dead nodes will be met first.

package se.sics.test.sim;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.network.Address;
import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.adaptor.Operation;
import se.sics.kompics.simulator.adaptor.Operation1;
import se.sics.kompics.simulator.adaptor.Operation2;
import se.sics.kompics.simulator.adaptor.distributions.extra.BasicIntSequentialDistribution;
import se.sics.kompics.simulator.events.system.SetupEvent;
import se.sics.kompics.simulator.events.system.KillNodeEvent;
import se.sics.kompics.simulator.events.system.StartNodeEvent;
import se.sics.kompics.simulator.util.GlobalView;
import se.sics.test.TAddress;
import se.sics.test.system.PingerParent;
import se.sics.test.system.PongerParent;

public class ScenarioGen {

    static Operation setupOp = new Operation<SetupEvent>() {
        @Override
        public SetupEvent generate() {
            return new SetupEvent() {
                @Override
                public void setupGlobalView(GlobalView gv) {
                    gv.setValue("simulation.pongs", 0);
                }
            };
        }
    };

    static Operation startObserverOp = new Operation<StartNodeEvent>() {
        @Override
        public StartNodeEvent generate() {
            return new StartNodeEvent() {
                TAddress selfAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("0.0.0.0"), 0);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Map<String, Object> initConfigUpdate() {
                    HashMap<String, Object> config = new HashMap<>();
                    config.put("pingpong.simulation.checktimeout", 2000);
                    return config;
                }
                
                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return SimulationObserver.class;
                }

                @Override
                public Init getComponentInit() {
                    return new SimulationObserver.Init(100, 2);
                }
            };
        }
    };

    static Operation1 startPongerOp = new Operation1<StartNodeEvent, Integer>() {

        @Override
        public StartNodeEvent generate(final Integer self) {
            return new StartNodeEvent() {
                TAddress selfAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Map<String, Object> initConfigUpdate() {
                    HashMap<String, Object> config = new HashMap<>();
                    config.put("pingpong.self", selfAdr);
                    return config;
                }

                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return PongerParent.class;
                }

                @Override
                public Init getComponentInit() {
                    return Init.NONE;
                }

                @Override
                public String toString() {
                    return "StartPonger<" + selfAdr.toString() + ">";
                }
            };
        }
    };
    
    static Operation1 killPongerOp = new Operation1<KillNodeEvent, Integer>() {
        @Override
        public KillNodeEvent generate(final Integer self) {
            return new KillNodeEvent() {
                TAddress selfAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                
                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }
                
                @Override
                public String toString() {
                    return "KillPonger<" + selfAdr.toString() + ">";
                }
            };
        }
    };

    static Operation2 startPingerOp = new Operation2<StartNodeEvent, Integer, Integer>() {

        @Override
        public StartNodeEvent generate(final Integer self, final Integer ponger) {
            return new StartNodeEvent() {
                TAddress selfAdr;
                TAddress pongerAdr;

                {
                    try {
                        selfAdr = new TAddress(InetAddress.getByName("192.193.0." + self), 10000);
                        pongerAdr = new TAddress(InetAddress.getByName("192.193.0." + ponger), 10000);
                    } catch (UnknownHostException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Map<String, Object> initConfigUpdate() {
                    HashMap<String, Object> config = new HashMap<>();
                    config.put("pingpong.self", selfAdr);
                    config.put("pingpong.pinger.pongeraddr", pongerAdr);
                    return config;
                }

                @Override
                public Address getNodeAddress() {
                    return selfAdr;
                }

                @Override
                public Class getComponentDefinition() {
                    return PingerParent.class;
                }

                @Override
                public Init getComponentInit() {
                    return Init.NONE;
                }

                @Override
                public String toString() {
                    return "StartPinger<" + selfAdr.toString() + ">";
                }
            };
        }
    };

    public static SimulationScenario simplePing() {
        SimulationScenario scen = new SimulationScenario() {
            {
                SimulationScenario.StochasticProcess setup = new SimulationScenario.StochasticProcess() {
                    {
                        raise(1, setupOp);
                    }
                };

                SimulationScenario.StochasticProcess observer = new SimulationScenario.StochasticProcess() {
                    {
                        raise(1, startObserverOp);
                    }
                };

                SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
                    }
                };

                SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
                    }
                };

                setup.start();
                observer.startAfterTerminationOf(0, setup);
                ponger.startAfterTerminationOf(1000, observer);
                pinger.startAfterTerminationOf(1000, ponger);
                terminateAfterTerminationOf(1000*10000, pinger);
            }
        };

        return scen;
    }
    
    public static SimulationScenario killPongers() {
        SimulationScenario scen = new SimulationScenario() {
            {
                SimulationScenario.StochasticProcess setup = new SimulationScenario.StochasticProcess() {
                    {
                        raise(1, setupOp);
                    }
                };

                SimulationScenario.StochasticProcess observer = new SimulationScenario.StochasticProcess() {
                    {
                        raise(1, startObserverOp);
                    }
                };

                SimulationScenario.StochasticProcess ponger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPongerOp, new BasicIntSequentialDistribution(1));
                    }
                };

                SimulationScenario.StochasticProcess pinger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(1000));
                        raise(5, startPingerOp, new BasicIntSequentialDistribution(6), new BasicIntSequentialDistribution(1));
                    }
                };
                
                SimulationScenario.StochasticProcess killPonger = new SimulationScenario.StochasticProcess() {
                    {
                        eventInterArrivalTime(constant(0));
                        raise(5, killPongerOp, new BasicIntSequentialDistribution((1)));
                    }  
                };

                setup.start();
                observer.startAfterTerminationOf(0, setup);
                ponger.startAfterTerminationOf(1000, observer);
                pinger.startAfterTerminationOf(1000, ponger);
                killPonger.startAfterTerminationOf(1000, pinger);
                terminateAfterTerminationOf(1000*10000, pinger);
            }
        };

        return scen;
    }
}
package se.sics.test.sim;

import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.run.LauncherComp;

public class SimplePingLauncher {
    public static void main(String[] args) {
        long seed = 123;
        SimulationScenario.setSeed(seed);
        SimulationScenario simpleBootScenario = ScenarioGen.simplePing();
        simpleBootScenario.simulate(LauncherComp.class);
    }
}
package se.sics.test.sim;

import se.sics.kompics.simulator.SimulationScenario;
import se.sics.kompics.simulator.run.LauncherComp;

public class KillPongersLauncher {

    public static void main(String[] args) {
        long seed = 123;
        SimulationScenario.setSeed(seed);
        SimulationScenario killPongersScenario = ScenarioGen.killPongers();
        killPongersScenario.simulate(LauncherComp.class);
    }
}

The code until here can be downloaded here.