Timers
In distributed systems it is quite common to have certain events happen after some time has passed, or in a regular schedule. And while globally synchronised clocks are rarely available, local clock drift is typically small enough that algorithms can rely on local time differences (or intervals).
There are number of timer facilities for Java and Scala ranging from the simple tree based Timer
to the significantly more advanced HashedWheelTimer
.
However, Kompics provides its own abstraction for scheduled or periodic events. It is very important that your code only relies on this abstraction as described in this section of the tutorial, if you want to use the Kompics Simulation framework described in section simulation. The reasons for this restriction will be described in detail later in the simulation part of the tutorial, but very shortly it has to do with being able to transparently replace real time with simulation time.
This section introduces the default Kompics timer facilities and the concept of request-response-type events.
Request-Response Events
In the previous section we described how Kompics events are broadcasted along all connected channels of their respective ports. We also showed a PingPong application that was already sending messages back and forth. So in a sense we were using the Ping
like a request for a Pong
-response. However, imagine we add a second Ponger
component which we connect in the same way as the first one. Now every Ping
would get two Pong
s, one from each Ponger
component, and our counters would be totally confused. And the same is true the other way around. Imagine we add a second Pinger
component. Now there would be two Ping
s going out in parallel, being answered by two Pong
s which get broadcasted to each Pinger
component, which then answers each of those Pong
s with another Ping
resulting in four Ping
s arriving at the Ponger
the next time, and so on. While the first type of behaviour might often in fact be what we want, clearly, the second type is rarely going to be the desired.
If the first type of behaviour is not desired then Channel Selectors might provide a solution.
In order to get around the second type of behaviour (two Pinger
– one Ponger
) where it is necessary, Kompics supports two types of request-response patterns:
- A normal (or old-style) pattern that records the whole path the request takes, and every response to that request simply backtracks along the channels and components recorded. Events using this pattern have to extend
Request
andResponse
respectively. - A direct (newer) pattern, that only remembers the origin port of the request and triggers the response directly on that port. Events using this pattern have to extend
Direct.Request
and implementDirect.Response
respectively.
It is generally preferred to use the newer direct pattern when possible, because it is more efficient and simpler to understand. However, the disadvantage is that a component will receive responses even after all its channels have been disconnected, which can lead to confusion.
With the old-style pattern, responses will travel along the recorded path back to the orginal port. However, if that port is chained, i.e. there are channels connected on the outgoing side, the response will travel further along those channels like a normal event.
The direct pattern does not exhibit this behaviour.
In order to show examples for both types of request-response patterns, we will convert the PingPong example to the direct pattern now, and we will show the old-style pattern in the next section where the Kompics Timer is introduced, since it still relies on this pattern.
First of all, Ping
now extends Direct.Request
, parametrised by Pong
, indicating that we expect answer of type Pong
.
- Java
-
package jexamples.basics.pingpongdirect; import se.sics.kompics.Direct; public class Ping extends Direct.Request<Pong> {}
- Scala
-
case class Ping() extends Direct.Request[Pong.type];
And, of course, Pong
has to implement Direct.Response
in that case.
- Java
-
package jexamples.basics.pingpongdirect; import se.sics.kompics.Direct; public class Pong implements Direct.Response {}
- Scala
-
object Pong extends Direct.Response;
Additionally, we have to change our handler for Ping
events in the Ponger
to use the answer
method instead of trigger
, which causes Kompics to use the origin port provided in the request instead of a local port.
- Java
-
Handler<Ping> pingHandler = new Handler<Ping>() { public void handle(Ping event) { counter++; logger.info("Got Ping #{}!", counter); answer(event, new Pong()); } };
- Scala
-
ppp uponEvent { case ping: Ping => { counter += 1L; log.info(s"Got Ping #${counter}!"); answer(ping, Pong); } }
To show that it actually behaves as expected, we add a second Pinger
in the Parent
and connect it like the first one.
- Java
-
package jexamples.basics.pingpongdirect; import se.sics.kompics.Channel; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Component; import se.sics.kompics.Init; public class Parent extends ComponentDefinition { Component pinger = create(Pinger.class, Init.NONE); Component ponger = create(Ponger.class, Init.NONE); Component pinger2 = create(Pinger.class, Init.NONE); { connect( pinger.getNegative(PingPongPort.class), ponger.getPositive(PingPongPort.class), Channel.TWO_WAY); connect( pinger2.getNegative(PingPongPort.class), ponger.getPositive(PingPongPort.class), Channel.TWO_WAY); } }
- Scala
-
package sexamples.basics.pingpongdirect import se.sics.kompics.sl._ class Parent extends ComponentDefinition { val pinger = create[Pinger]; val pinger2 = create[Pinger]; val ponger = create[Ponger]; connect(PingPongPort)(ponger -> pinger); connect(PingPongPort)(ponger -> pinger2); }
Now if we compile and run this we can see that the single Ponger
is counting twice as many Ping
instances as each Pinger
counts Pong
instances:
runMain jexamples.basics.pingpongdirect.Main
runMain sexamples.basics.pingpongdirect.Main
Kompics Timer
This section describes the Kompics Timer
port and the default implementation JavaTimer
.
In order to use the timer port in Kompics, an additional dependency in our build file is required:
- sbt
libraryDependencies += "se.sics.kompics.basic" % "kompics-port-timer" % "1.2.1"
- Maven
<dependency> <groupId>se.sics.kompics.basic</groupId> <artifactId>kompics-port-timer</artifactId> <version>1.2.1</version> </dependency>
- Gradle
dependencies { compile group: 'se.sics.kompics.basic', name: 'kompics-port-timer', version: '1.2.1' }
The Timer
port itself allows four types of requests and only a single indication: Timeout
, which extends the old-style Response
and must be extended further by any component using Kompics’ timer facilities.
The requests come in two pairs, a schedule and a cancel for one-shot timers (ScheduleTimeout
and CancelTimeout
) and anotherr pair for periodic timers (SchedulePeriodicTimeout
and CancelPeriodicTimeout
). In both cases the schedule event extends the old-style request and requires a prepared timeout response instance to be passed along to the Timer
component, using the setTimeoutEvent
method. Additionally, each Timeout
instance generates a random UUID
upon creation, which the scheduling component should store somewhere, since it is needed to cancel the timeout later. This is especially important for the periodic timeouts, which should always be cancelled when they are no longer required, in order to reduce resource usage of the system.
It should be pointed out that the semantics, provided by Kompics’ Timer
facilities, are only that timeouts are never handled before their delay/period has passed. Due to propagation and scheduling delays there is no fixed bound on how soon after a timeout was triggered, it is handled. In particular, timeout events can be caught in long port queues on the receiver side, as there is no notion of priority for events in Kompics.
The default component providing the Kompics Timer
service is JavaTimer
which can be accessed after adding the following dependency to the build file:
- sbt
libraryDependencies += "se.sics.kompics.basic" % "kompics-component-java-timer" % "1.2.1"
- Maven
<dependency> <groupId>se.sics.kompics.basic</groupId> <artifactId>kompics-component-java-timer</artifactId> <version>1.2.1</version> </dependency>
- Gradle
dependencies { compile group: 'se.sics.kompics.basic', name: 'kompics-component-java-timer', version: '1.2.1' }
The JavaTimer
is based on Java’s default Timer
and should scale to all but the most extreme needs, with a single instance per Kompics component tree. Since the Timer
port uses the old-style request-response pattern, it is recommended that it not be chained through multiple layers of the system, but instead directly connected to all components that require it. Otherwise strange “multi timer handling”-behaviour can occur, as was pointed out in the previous section. The direct connections also incur slightly less latency compared to chained ones, which is important for the timely handling of timeouts.
To update our example, we now want to only trigger Ping
s periodically, say one per second, and not in response to Pong
s anymore.
First we extend our dual-Pinger
setup from before with an instance of JavaTimer
and connect everything appropriately.
- Java
-
package jexamples.basics.pingpongtimer; import se.sics.kompics.Channel; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Component; import se.sics.kompics.Init; import se.sics.kompics.timer.Timer; import se.sics.kompics.timer.java.JavaTimer; public class Parent extends ComponentDefinition { Component pinger = create(Pinger.class, Init.NONE); Component ponger = create(Ponger.class, Init.NONE); Component pinger2 = create(Pinger.class, Init.NONE); Component timer = create(JavaTimer.class, Init.NONE); { connect( pinger.getNegative(PingPongPort.class), ponger.getPositive(PingPongPort.class), Channel.TWO_WAY); connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY); connect( pinger2.getNegative(PingPongPort.class), ponger.getPositive(PingPongPort.class), Channel.TWO_WAY); connect(pinger2.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY); } }
- Scala
-
package sexamples.basics.pingpongtimer import se.sics.kompics.sl._ import se.sics.kompics.timer.Timer import se.sics.kompics.timer.java.JavaTimer class Parent extends ComponentDefinition { val pinger = create[Pinger]; val pinger2 = create[Pinger]; val ponger = create[Ponger]; val timer = create[JavaTimer]; connect(PingPongPort)(ponger -> pinger); connect(PingPongPort)(ponger -> pinger2); connect[Timer](timer -> pinger); connect[Timer](timer -> pinger2); }
Then we add Timer
as a required port to the Pinger
and change the handlers’ behaviour to schedule a PingTimeout
on start and then send a Ping
whenever the PingTimeout
is received. We’ll schedule the timout periodically every second with no initial delay. We also override the tearDown
method to cancel our periodic timeout when we are being stopped. It simply is good form to clean up after oneself.
- Java
-
package jexamples.basics.pingpongtimer; import se.sics.kompics.Kompics; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Positive; import se.sics.kompics.Handler; import se.sics.kompics.Start; import se.sics.kompics.timer.*; import java.util.UUID; public class Pinger extends ComponentDefinition { Positive<PingPongPort> ppp = requires(PingPongPort.class); Positive<Timer> timer = requires(Timer.class); private long counter = 0; private UUID timerId; Handler<Start> startHandler = new Handler<Start>() { public void handle(Start event) { SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, 1000); PingTimeout timeout = new PingTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt, timer); timerId = timeout.getTimeoutId(); } }; Handler<Pong> pongHandler = new Handler<Pong>() { public void handle(Pong event) { counter++; logger.info("Got Pong #{}!", counter); if (counter > 10) { Kompics.asyncShutdown(); } } }; Handler<PingTimeout> timeoutHandler = new Handler<PingTimeout>() { public void handle(PingTimeout event) { trigger(new Ping(), ppp); } }; { subscribe(startHandler, control); subscribe(pongHandler, ppp); subscribe(timeoutHandler, timer); } @Override public void tearDown() { trigger(new CancelPeriodicTimeout(timerId), timer); } public static class PingTimeout extends Timeout { public PingTimeout(SchedulePeriodicTimeout spt) { super(spt); } } }
- Scala
-
package sexamples.basics.pingpongtimer import se.sics.kompics.sl._ import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer} import java.util.UUID object Pinger { class PingTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt); } class Pinger extends ComponentDefinition { import Pinger.PingTimeout; val ppp = requires(PingPongPort); val timer = requires[Timer]; private var counter: Long = 0L; private var timerId: Option[UUID] = None; ctrl uponEvent { case _: Start => { val spt = new SchedulePeriodicTimeout(0, 1000); val timeout = new PingTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt -> timer); timerId = Some(timeout.getTimeoutId()); } } ppp uponEvent { case Pong => { counter += 1L; log.info(s"Got Pong #${counter}!"); if (counter > 10) { Kompics.asyncShutdown(); } } } timer uponEvent { case _: PingTimeout => { trigger(Ping() -> ppp); } } override def tearDown(): Unit = { timerId match { case Some(id) => { trigger(new CancelPeriodicTimeout(id) -> timer); } case None => () // no cleanup necessary } } }
If again compile and run this, we should see a significantly lower rate of Ping
and Pong
event production:
runMain jexamples.basics.pingpongtimer.Main
runMain sexamples.basics.pingpongtimer.Main