Timers

In distributed systems it is quite common to have certain events happen after some time has passed, or in a regular schedule. And while globally synchronised clocks are rarely available, local clock drift is typically small enough that algorithms can rely on local time differences (or intervals).

There are number of timer facilities for Java and Scala ranging from the simple tree based Timer to the significantly more advanced HashedWheelTimer.

However, Kompics provides its own abstraction for scheduled or periodic events. It is very important that your code only relies on this abstraction as described in this section of the tutorial, if you want to use the Kompics Simulation framework described in section simulation. The reasons for this restriction will be described in detail later in the simulation part of the tutorial, but very shortly it has to do with being able to transparently replace real time with simulation time.

This section introduces the default Kompics timer facilities and the concept of request-response-type events.

Request-Response Events

In the previous section we described how Kompics events are broadcasted along all connected channels of their respective ports. We also showed a PingPong application that was already sending messages back and forth. So in a sense we were using the Ping like a request for a Pong-response. However, imagine we add a second Ponger component which we connect in the same way as the first one. Now every Ping would get two Pongs, one from each Ponger component, and our counters would be totally confused. And the same is true the other way around. Imagine we add a second Pinger component. Now there would be two Pings going out in parallel, being answered by two Pongs which get broadcasted to each Pinger component, which then answers each of those Pongs with another Ping resulting in four Pings arriving at the Ponger the next time, and so on. While the first type of behaviour might often in fact be what we want, clearly, the second type is rarely going to be the desired.

Note

If the first type of behaviour is not desired then Channel Selectors might provide a solution.

In order to get around the second type of behaviour (two Pinger – one Ponger) where it is necessary, Kompics supports two types of request-response patterns:

  • A normal (or old-style) pattern that records the whole path the request takes, and every response to that request simply backtracks along the channels and components recorded. Events using this pattern have to extend Request and Response respectively.
  • A direct (newer) pattern, that only remembers the origin port of the request and triggers the response directly on that port. Events using this pattern have to extend Direct.Request and implement Direct.Response respectively.

It is generally preferred to use the newer direct pattern when possible, because it is more efficient and simpler to understand. However, the disadvantage is that a component will receive responses even after all its channels have been disconnected, which can lead to confusion.

Warning

With the old-style pattern, responses will travel along the recorded path back to the orginal port. However, if that port is chained, i.e. there are channels connected on the outgoing side, the response will travel further along those channels like a normal event.

The direct pattern does not exhibit this behaviour.

In order to show examples for both types of request-response patterns, we will convert the PingPong example to the direct pattern now, and we will show the old-style pattern in the next section where the Kompics Timer is introduced, since it still relies on this pattern.

First of all, Ping now extends Direct.Request, parametrised by Pong, indicating that we expect answer of type Pong.

Java
package jexamples.basics.pingpongdirect;

import se.sics.kompics.Direct;

public class Ping extends Direct.Request<Pong> {}
Scala
case class Ping() extends Direct.Request[Pong.type];

And, of course, Pong has to implement Direct.Response in that case.

Java
package jexamples.basics.pingpongdirect;

import se.sics.kompics.Direct;

public class Pong implements Direct.Response {}
Scala
object Pong extends Direct.Response;

Additionally, we have to change our handler for Ping events in the Ponger to use the answer method instead of trigger, which causes Kompics to use the origin port provided in the request instead of a local port.

Java
Handler<Ping> pingHandler =
    new Handler<Ping>() {
      public void handle(Ping event) {
        counter++;
        logger.info("Got Ping #{}!", counter);
        answer(event, new Pong());
      }
    };
Scala
ppp uponEvent {
  case ping: Ping => {
    counter += 1L;
    log.info(s"Got Ping #${counter}!");
    answer(ping, Pong);
  }
}

To show that it actually behaves as expected, we add a second Pinger in the Parent and connect it like the first one.

Java
package jexamples.basics.pingpongdirect;

import se.sics.kompics.Channel;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Component;
import se.sics.kompics.Init;

public class Parent extends ComponentDefinition {
  Component pinger = create(Pinger.class, Init.NONE);
  Component ponger = create(Ponger.class, Init.NONE);
  Component pinger2 = create(Pinger.class, Init.NONE);

  {
    connect(
        pinger.getNegative(PingPongPort.class),
        ponger.getPositive(PingPongPort.class),
        Channel.TWO_WAY);
    connect(
        pinger2.getNegative(PingPongPort.class),
        ponger.getPositive(PingPongPort.class),
        Channel.TWO_WAY);
  }
}
Scala
package sexamples.basics.pingpongdirect

import se.sics.kompics.sl._

class Parent extends ComponentDefinition {
  val pinger = create[Pinger];
  val pinger2 = create[Pinger];
  val ponger = create[Ponger];

  connect(PingPongPort)(ponger -> pinger);
  connect(PingPongPort)(ponger -> pinger2);
}

Now if we compile and run this we can see that the single Ponger is counting twice as many Ping instances as each Pinger counts Pong instances:

runMain jexamples.basics.pingpongdirect.Main
runMain sexamples.basics.pingpongdirect.Main

Kompics Timer

This section describes the Kompics Timer port and the default implementation JavaTimer.

In order to use the timer port in Kompics, an additional dependency in our build file is required:

sbt
libraryDependencies += "se.sics.kompics.basic" % "kompics-port-timer" % "1.2.1"
Maven
<dependency>
  <groupId>se.sics.kompics.basic</groupId>
  <artifactId>kompics-port-timer</artifactId>
  <version>1.2.1</version>
</dependency>
Gradle
dependencies {
  compile group: 'se.sics.kompics.basic', name: 'kompics-port-timer', version: '1.2.1'
}

The Timer port itself allows four types of requests and only a single indication: Timeout, which extends the old-style Response and must be extended further by any component using Kompics’ timer facilities.

The requests come in two pairs, a schedule and a cancel for one-shot timers (ScheduleTimeout and CancelTimeout) and anotherr pair for periodic timers (SchedulePeriodicTimeout and CancelPeriodicTimeout). In both cases the schedule event extends the old-style request and requires a prepared timeout response instance to be passed along to the Timer component, using the setTimeoutEvent method. Additionally, each Timeout instance generates a random UUID upon creation, which the scheduling component should store somewhere, since it is needed to cancel the timeout later. This is especially important for the periodic timeouts, which should always be cancelled when they are no longer required, in order to reduce resource usage of the system.

Warning

It should be pointed out that the semantics, provided by Kompics’ Timer facilities, are only that timeouts are never handled before their delay/period has passed. Due to propagation and scheduling delays there is no fixed bound on how soon after a timeout was triggered, it is handled. In particular, timeout events can be caught in long port queues on the receiver side, as there is no notion of priority for events in Kompics.

The default component providing the Kompics Timer service is JavaTimer which can be accessed after adding the following dependency to the build file:

sbt
libraryDependencies += "se.sics.kompics.basic" % "kompics-component-java-timer" % "1.2.1"
Maven
<dependency>
  <groupId>se.sics.kompics.basic</groupId>
  <artifactId>kompics-component-java-timer</artifactId>
  <version>1.2.1</version>
</dependency>
Gradle
dependencies {
  compile group: 'se.sics.kompics.basic', name: 'kompics-component-java-timer', version: '1.2.1'
}

The JavaTimer is based on Java’s default Timer and should scale to all but the most extreme needs, with a single instance per Kompics component tree. Since the Timer port uses the old-style request-response pattern, it is recommended that it not be chained through multiple layers of the system, but instead directly connected to all components that require it. Otherwise strange “multi timer handling”-behaviour can occur, as was pointed out in the previous section. The direct connections also incur slightly less latency compared to chained ones, which is important for the timely handling of timeouts.

To update our example, we now want to only trigger Pings periodically, say one per second, and not in response to Pongs anymore.

First we extend our dual-Pinger setup from before with an instance of JavaTimer and connect everything appropriately.

Java
package jexamples.basics.pingpongtimer;

import se.sics.kompics.Channel;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Component;
import se.sics.kompics.Init;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;

public class Parent extends ComponentDefinition {
  Component pinger = create(Pinger.class, Init.NONE);
  Component ponger = create(Ponger.class, Init.NONE);
  Component pinger2 = create(Pinger.class, Init.NONE);
  Component timer = create(JavaTimer.class, Init.NONE);

  {
    connect(
        pinger.getNegative(PingPongPort.class),
        ponger.getPositive(PingPongPort.class),
        Channel.TWO_WAY);
    connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
    connect(
        pinger2.getNegative(PingPongPort.class),
        ponger.getPositive(PingPongPort.class),
        Channel.TWO_WAY);
    connect(pinger2.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
  }
}
Scala
package sexamples.basics.pingpongtimer

import se.sics.kompics.sl._
import se.sics.kompics.timer.Timer
import se.sics.kompics.timer.java.JavaTimer

class Parent extends ComponentDefinition {
  val pinger = create[Pinger];
  val pinger2 = create[Pinger];
  val ponger = create[Ponger];

  val timer = create[JavaTimer];

  connect(PingPongPort)(ponger -> pinger);
  connect(PingPongPort)(ponger -> pinger2);
  connect[Timer](timer -> pinger);
  connect[Timer](timer -> pinger2);

}

Then we add Timer as a required port to the Pinger and change the handlers’ behaviour to schedule a PingTimeout on start and then send a Ping whenever the PingTimeout is received. We’ll schedule the timout periodically every second with no initial delay. We also override the tearDown method to cancel our periodic timeout when we are being stopped. It simply is good form to clean up after oneself.

Java
package jexamples.basics.pingpongtimer;

import se.sics.kompics.Kompics;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.Start;
import se.sics.kompics.timer.*;

import java.util.UUID;

public class Pinger extends ComponentDefinition {

  Positive<PingPongPort> ppp = requires(PingPongPort.class);
  Positive<Timer> timer = requires(Timer.class);

  private long counter = 0;
  private UUID timerId;

  Handler<Start> startHandler =
      new Handler<Start>() {
        public void handle(Start event) {
          SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, 1000);
          PingTimeout timeout = new PingTimeout(spt);
          spt.setTimeoutEvent(timeout);
          trigger(spt, timer);
          timerId = timeout.getTimeoutId();
        }
      };
  Handler<Pong> pongHandler =
      new Handler<Pong>() {
        public void handle(Pong event) {
          counter++;
          logger.info("Got Pong #{}!", counter);
          if (counter > 10) {
            Kompics.asyncShutdown();
          }
        }
      };
  Handler<PingTimeout> timeoutHandler =
      new Handler<PingTimeout>() {
        public void handle(PingTimeout event) {
          trigger(new Ping(), ppp);
        }
      };

  {
    subscribe(startHandler, control);
    subscribe(pongHandler, ppp);
    subscribe(timeoutHandler, timer);
  }

  @Override
  public void tearDown() {
    trigger(new CancelPeriodicTimeout(timerId), timer);
  }

  public static class PingTimeout extends Timeout {
    public PingTimeout(SchedulePeriodicTimeout spt) {
      super(spt);
    }
  }
}
Scala
package sexamples.basics.pingpongtimer

import se.sics.kompics.sl._
import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer}

import java.util.UUID

object Pinger {
  class PingTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt);
}
class Pinger extends ComponentDefinition {
  import Pinger.PingTimeout;

  val ppp = requires(PingPongPort);
  val timer = requires[Timer];

  private var counter: Long = 0L;
  private var timerId: Option[UUID] = None;

  ctrl uponEvent {
    case _: Start => {
      val spt = new SchedulePeriodicTimeout(0, 1000);
      val timeout = new PingTimeout(spt);
      spt.setTimeoutEvent(timeout);
      trigger(spt -> timer);
      timerId = Some(timeout.getTimeoutId());
    }
  }

  ppp uponEvent {
    case Pong => {
      counter += 1L;
      log.info(s"Got Pong #${counter}!");
      if (counter > 10) {
        Kompics.asyncShutdown();
      }
    }
  }

  timer uponEvent {
    case _: PingTimeout => {
      trigger(Ping() -> ppp);
    }
  }

  override def tearDown(): Unit = {
    timerId match {
      case Some(id) => {
        trigger(new CancelPeriodicTimeout(id) -> timer);
      }
      case None => () // no cleanup necessary
    }
  }

}

If again compile and run this, we should see a significantly lower rate of Ping and Pong event production:

runMain jexamples.basics.pingpongtimer.Main
runMain sexamples.basics.pingpongtimer.Main
The source code for this page can be found here.