Cleanup

While our example from the previous section works, there are still a number of things that are not optimal with it. We will use this section to make the whole code a bit nicer, and also look into a way to deploy the final code, since we can not really rely on sbt being present on all target machines.

Configuration Files

First of all, you might have noticed that we have a lot of redundancy in the passing around of parameters between the different component Init instances. Furthermore, our reading from the commandline is not the safest. Sure, there are good libraries for commandline options, but this is not really what we want. What we want is to define a bunch of values once and then be able to access them from anywhere within the code. The right solution for this problem is using a configuration file, where we write IPs and ports and such things, and a configuration library that knows how to give us access to the values in our code. Kompics comes with its own configuration library, which by default uses Typesafe Config as a backend.

If you prefer a different configuration library, you may add support for it, by wrapping it in an implementation of BaselineConfig, passing it into the Config.Factory, and finally replacing the default config with your custom one using Kompics.setConfig before starting the runtime.

For the tutorial we are going to stick to Typesafe Config as a baseline. We are thus going to add a src/main/resources/reference.conf file, where we describe default values for all our config options. This is not strictly speaking necessary, but it is generally a good idea to have one place your users can look at where all possible config values are outlined. While we are at it, we also make the timeout period configurable.

pingpong {
	ponger.addr = "127.0.0.1:34567"
	pinger {
		addr {
			host = "127.0.0.1"
			port = 34568
		}
		timeout = 1000
		pongeraddr {
			host = "127.0.0.1"
			port = 34567
		}
	}
}

Now that we have a configuration file, we can simply throw away all the Init classesinstances we created before, and pull out the desired values from the config in the Pinger and Ponger constructors. Something along the lines of:

try {
  InetAddress selfIp = InetAddress.getByName(config().getValue("pingpong.pinger.addr.host", String.class));
  int selfPort = config().getValue("pingpong.pinger.addr.port", Integer.class);
  this.self = new TAddress(selfIp, selfPort);
  InetAddress pongerIp = InetAddress.getByName(config().getValue("pingpong.pinger.pongeraddr.host", String.class));
  int pongerPort = config().getValue("pingpong.pinger.pongeraddr.port", Integer.class);
  this.ponger = new TAddress(pongerIp, pongerPort);
  this.timeoutPeriod = config().getValue("pingpong.pinger.timeout", Long.class);
} catch (UnknownHostException ex) {
  throw new RuntimeException(ex);
}
val selfIp = InetAddress.getByName(cfg.getValue[String]("pingpong.pinger.addr.host"));
val selfPort = cfg.getValue[Int]("pingpong.pinger.addr.port");
val self = TAddress(selfIp, selfPort);
val pongerIp = InetAddress.getByName(cfg.getValue[String]("pingpong.pinger.pongeraddr.host"));
val pongerPort = cfg.getValue[Int]("pingpong.pinger.pongeraddr.port");
val ponger = TAddress(selfIp, selfPort);
val timeoutPeriod = cfg.getValue[Long]("pingpong.pinger.timeout");

However, we require addresses, and in particular the self address in a number of places: Pinger, Ponger, and NettyNetwork. While certainly possible, it may not be the greatest idea to just copy&paste the construction code for it to every place where we need it.

There is another solution, though, that gives a lot more concise code. Kompics’ configuration system supports so called conversions, which are used to convert compatible types from the config values to the requested values. For example, it would be unnecessary to throw an exception when the user is asking for an instance of Long but the value is returned as IntegerInt by Typesafe Config. Instead the config library will look through a number of Converter instances that are registered at Conversions (this is very similar to how the serialisation framework is used) and try to find one that can convert an IntegerInt instance to a Long instance. Thus we can use this system to write Converter that takes the configuration object at "pingpong.self" for example and converts it to a TAddress. It turns out that, because of the way we wrote the reference.conf entries, Typesafe Config will give us a Map with the subvalues as keys. In this case, we can pull out the values, convert them to String and IntegerInt instances respectively, and then construct the TAddress as before. Just to exemplify the method, we are additionally going to support an alternative way to write a TAddress in the config file: A single String in the format "127.0.0.1:34567".

TAddressConverter

Java
package jexamples.networking.pingpongcleaned;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import se.sics.kompics.config.Conversions;
import se.sics.kompics.config.Converter;

public class TAddressConverter implements Converter<TAddress> {

  @Override
  public TAddress convert(Object o) {
    if (o instanceof Map) {
      try {
        Map m = (Map) o;
        String hostname = Conversions.convert(m.get("host"), String.class);
        int port = Conversions.convert(m.get("port"), Integer.class);
        InetAddress ip = InetAddress.getByName(hostname);
        return new TAddress(ip, port);
      } catch (UnknownHostException ex) {
        return null;
      }
    } else if (o instanceof String) {
      try {
        String[] ipport = ((String) o).split(":");
        InetAddress ip = InetAddress.getByName(ipport[0]);
        int port = Integer.parseInt(ipport[1]);
        return new TAddress(ip, port);
      } catch (UnknownHostException ex) {
        return null;
      }
    } else {
      return null;
    }
  }

  @Override
  public Class<TAddress> type() {
    return TAddress.class;
  }
}
Scala
package sexamples.networking.pingpongcleaned

import java.net.{InetAddress, UnknownHostException}
import java.util.Map
import se.sics.kompics.config.{Conversions, Converter}

object TAddressConverter extends Converter[TAddress] {

  override def convert(o: AnyRef): TAddress = {
    o match {
      case m: Map[String, Any] @unchecked => {
        val hostname = Conversions.convert(m.get("host"), classOf[String]);
        val port: Int = Conversions.convert(m.get("port"), classOf[Integer]);
        val ip = InetAddress.getByName(hostname);
        TAddress(ip, port)
      }
      case s: String => {
        val ipport = s.split(":");
        val ip = InetAddress.getByName(ipport(0));
        val port = Integer.parseInt(ipport(1));
        TAddress(ip, port)
      }
      case _ => null
    }
  }

  override def `type`(): Class[TAddress] = classOf[TAddress];
}

Additionally we also need to register the new Converter in the the Main classobject and then we can get an instance of TAddress from the config by simply calling config().getValue("pingpong.pinger.addr", TAddress.class);cfg.getValue[TAddress]("pingpong.pinger.addr"), for example.

Java
static {
  // register
  Serializers.register(new NetSerializer(), "netS");
  Serializers.register(new PingPongSerializer(), "ppS");
  // map
  Serializers.register(TAddress.class, "netS");
  Serializers.register(THeader.class, "netS");
  Serializers.register(TMessage.class, "netS");
  Serializers.register(Ping.class, "ppS");
  Serializers.register(Pong.class, "ppS");
  // conversions
  Conversions.register(new TAddressConverter());
}
Scala
// register
Serializers.register(NetSerializer, "netS");
Serializers.register(PingPongSerializer, "ppS");
// map
Serializers.register(classOf[TAddress], "netS");
Serializers.register(classOf[THeader], "netS");
Serializers.register(classOf[TMessage], "netS");
Serializers.register(Ping.getClass, "ppS");
Serializers.register(Pong.getClass, "ppS");
// conversions
Conversions.register(TAddressConverter);
Note

We are still doing the same work as before to get our TAddress instances. Technically, even more than before, since we have to look through all the registered Converter instances now. It simply looks a lot nicer like this, as there is less unnecessary code duplication. At this time, none of the converted values are cached anywhere. Thus, it is generally recommended to always write values from the configuration that are used often into local fields, instead of pulling them out of the config on demand every time they are needed.

It is possible to “cache” converted config entries, by reading them out of the config in the converted object, and then writing them back into it, overriting the original, unconverted entry that was in their place. Config updates are somewhat involved, however, so we will not be treating them here.

Pattern Matching Messages

Another thing that feels awkward with our code is how we write network messages: Our TMessage class does almost nothing, except define what kind of header we expect, and all actual network messages like Ping and Pong have to implement all these annoying constructors that TMessage requires, instead of focusing on their business logic (which is trivially simple). We would much rather have the TMessage act as a kind of container for data and then Ping and Pong would simply be payloads. But then, how would the handlers of Pinger and Ponger know which TMessage instance are for them, i.e. contain Ping and Pong instances respectively, and which are for other classes? They would have to match on TMessage and handle all network messages. That would be way too expensive in a large system. Under no circumstance do we want to schedule components unnecessarily. The solution to our problem can be found in ClassMatchedHandler, which provides a very simple form of pattern matching for Kompics Java. Instead of matching on a single event type, it matches on two event types: The context type, which we will define as TMessage, and the content type, which will be Ping and Pong respectively.We must use pattern matching to already select only the appropriate instance of TMessage in the handler.

We shall rewrite TMessage to carry any kind of KompicsEvent as a payload, and to act as PatternExtractor for the payloadpayload. We will also move its serialisation logic into the NetSerializer leaving the PingPongSerializer rather trivial as a result.

TMessage

Java
package jexamples.networking.pingpongcleaned;

import se.sics.kompics.KompicsEvent;
import se.sics.kompics.PatternExtractor;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Transport;

public class TMessage
    implements Msg<TAddress, THeader>, PatternExtractor<Class<Object>, KompicsEvent> {

  public final THeader header;
  public final KompicsEvent payload;

  public TMessage(TAddress src, TAddress dst, Transport protocol, KompicsEvent payload) {
    this.header = new THeader(src, dst, protocol);
    this.payload = payload;
  }

  TMessage(THeader header, KompicsEvent payload) {
    this.header = header;
    this.payload = payload;
  }

  @Override
  public THeader getHeader() {
    return this.header;
  }

  @SuppressWarnings("deprecation")
  @Override
  public TAddress getSource() {
    return this.header.src;
  }

  @SuppressWarnings("deprecation")
  @Override
  public TAddress getDestination() {
    return this.header.dst;
  }

  @SuppressWarnings("deprecation")
  @Override
  public Transport getProtocol() {
    return this.header.proto;
  }

  @SuppressWarnings("unchecked")
  @Override
  public Class<Object> extractPattern() {
    Class c = payload.getClass();
    return (Class<Object>) c;
  }

  @Override
  public KompicsEvent extractValue() {
    return payload;
  }
}
Scala
object TMessage {
  def apply(src: TAddress, dst: TAddress, payload: KompicsEvent): TMessage = {
    val header = THeader(src, dst, Transport.TCP);
    TMessage(header, payload)
  }
}
case class TMessage(header: THeader, payload: KompicsEvent) extends Msg[TAddress, THeader] {
  override def getHeader(): THeader = header;
  override def getSource(): TAddress = header.src;
  override def getDestination(): TAddress = header.dst;
  override def getProtocol(): Transport = header.proto;
}

Ping

Java
package jexamples.networking.pingpongcleaned;

import se.sics.kompics.KompicsEvent;

public class Ping implements KompicsEvent {
  public static final Ping EVENT = new Ping();
}
Scala
object Ping extends KompicsEvent;

Pong

Java
package jexamples.networking.pingpongcleaned;

import se.sics.kompics.KompicsEvent;

public class Pong implements KompicsEvent {
  public static final Pong EVENT = new Pong();
}
Scala
object Pong extends KompicsEvent;

NetSerializer

Java
package jexamples.networking.pingpongcleaned;

import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;

import se.sics.kompics.KompicsEvent;
import se.sics.kompics.network.Transport;
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.netty.serialization.Serializers;

public class NetSerializer implements Serializer {

  private static final byte ADDR = 1;
  private static final byte HEADER = 2;
  private static final byte MSG = 3;

  @Override
  public int identifier() {
    return 100;
  }

  @Override
  public void toBinary(Object o, ByteBuf buf) {
    if (o instanceof TAddress) {
      TAddress addr = (TAddress) o;
      buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
      addressToBinary(addr, buf); // 6 bytes
      // total 7 bytes
    } else if (o instanceof THeader) {
      THeader header = (THeader) o;
      buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
      headerToBinary(header, buf); // 13 bytes
      // total 14 bytes
    } else if (o instanceof TMessage) {
      TMessage msg = (TMessage) o;
      buf.writeByte(MSG); // mark which type we are serialising (1 byte)
      headerToBinary(msg.header, buf); // 13 bytes
      Serializers.toBinary(msg.payload, buf); // no idea what it is, let the framework deal with it
    }
  }

  @Override
  public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
    byte type = buf.readByte(); // read the first byte to figure out the type
    switch (type) {
      case ADDR:
        return addressFromBinary(buf);
      case HEADER:
        return headerFromBinary(buf);
      case MSG:
        {
          THeader header = headerFromBinary(buf); // 13 bytes
          KompicsEvent payload =
              (KompicsEvent)
                  Serializers.fromBinary(
                      buf,
                      Optional
                          .empty()); // don't know what it is but KompicsEvent is the upper bound
          return new TMessage(header, payload);
        }
    }
    return null; // strange things happened^^
  }

  private void headerToBinary(THeader header, ByteBuf buf) {
    addressToBinary(header.src, buf); // 6 bytes
    addressToBinary(header.dst, buf); // 6 bytes
    buf.writeByte(header.proto.ordinal()); // 1 byte is enough
    // total of 13 bytes
  }

  private THeader headerFromBinary(ByteBuf buf) {
    TAddress src = addressFromBinary(buf); // 6 bytes
    TAddress dst = addressFromBinary(buf); // 6 bytes
    int protoOrd = buf.readByte(); // 1 byte
    Transport proto = Transport.values()[protoOrd];
    return new THeader(src, dst, proto); // total of 13 bytes, check
  }

  private void addressToBinary(TAddress addr, ByteBuf buf) {
    buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (assume it's IPv4)
    buf.writeShort(addr.getPort()); // we only need 2 bytes here
    // total of 6 bytes
  }

  private TAddress addressFromBinary(ByteBuf buf) {
    byte[] ipBytes = new byte[4];
    buf.readBytes(ipBytes); // 4 bytes
    try {
      InetAddress ip = InetAddress.getByAddress(ipBytes);
      int port = buf.readUnsignedShort(); // 2 bytes
      return new TAddress(ip, port); // total of 6, check
    } catch (UnknownHostException ex) {
      throw new RuntimeException(ex); // let Netty deal with this
    }
  }
}
Scala
package sexamples.networking.pingpongcleaned

import io.netty.buffer.ByteBuf
import java.net.{InetAddress, UnknownHostException}
import java.util.Optional
import se.sics.kompics.KompicsEvent
import se.sics.kompics.network.Transport
import se.sics.kompics.network.netty.serialization.{Serializer, Serializers}

object NetSerializer extends Serializer {
  private val ADDR: Byte = 1;
  private val HEADER: Byte = 2;
  private val MSG: Byte = 3;

  private val NO_HINT: Optional[AnyRef] = Optional.empty();

  override def identifier(): Int = 100;

  override def toBinary(o: AnyRef, buf: ByteBuf): Unit = {
    o match {
      case addr: TAddress => {
        buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
        addressToBinary(addr, buf); // 6 bytes
        // total 7 bytes
      }
      case header: THeader => {
        buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
        headerToBinary(header, buf); // 13 bytes
        // total 14 bytes
      }
      case msg: TMessage => {
        buf.writeByte(MSG); // mark which type we are serialising (1 byte)
        headerToBinary(msg.header, buf); // 13 bytes
        Serializers.toBinary(msg.payload, buf); // no idea what it is, let the framework deal with it
      }
    }
  }

  override def fromBinary(buf: ByteBuf, hint: Optional[AnyRef]): AnyRef = {
    val typeFlag = buf.readByte(); // read the first byte to figure out the type
    typeFlag match {
      case ADDR => {
        return addressFromBinary(buf);
      }
      case HEADER => {
        return headerFromBinary(buf);
      }
      case MSG => {
        val header = headerFromBinary(buf); // 13 bytes
        val payload = Serializers.fromBinary(buf, NO_HINT).asInstanceOf[KompicsEvent]; // don't know what it is but KompicsEvent is the upper bound
        return TMessage(header, payload);
      }
      case _ => {
        Console.err.println(s"Got invalid byte flag=$typeFlag");
        return null;
      }
    }
  }

  private def headerToBinary(header: THeader, buf: ByteBuf): Unit = {
    addressToBinary(header.src, buf); // 6 bytes
    addressToBinary(header.dst, buf); // 6 bytes
    buf.writeByte(header.proto.ordinal()); // 1 byte is enough
    // total of 13 bytes
  }

  private def headerFromBinary(buf: ByteBuf): THeader = {
    val src = addressFromBinary(buf); // 6 bytes
    val dst = addressFromBinary(buf); // 6 bytes
    val protoOrd = buf.readByte(); // 1 byte
    val proto = Transport.values()(protoOrd);
    return THeader(src, dst, proto); // total of 13 bytes, check
  }

  private def addressToBinary(addr: TAddress, buf: ByteBuf): Unit = {
    buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (assume it's IPv4)
    buf.writeShort(addr.getPort()); // we only need 2 bytes here
    // total of 6 bytes
  }

  private def addressFromBinary(buf: ByteBuf): TAddress = {
    val ipBytes = Array.ofDim[Byte](4);
    buf.readBytes(ipBytes); // 4 bytes
    val ip = InetAddress.getByAddress(ipBytes);
    val port = buf.readUnsignedShort(); // 2 bytes
    return TAddress(ip, port); // total of 7, check
  }
}

PingPongSerializer

Java
package jexamples.networking.pingpongcleaned;

import java.util.Optional;
import io.netty.buffer.ByteBuf;
import se.sics.kompics.network.netty.serialization.Serializer;

public class PingPongSerializer implements Serializer {

  private static final byte PING = 1;
  private static final byte PONG = 2;

  @Override
  public int identifier() {
    return 200;
  }

  @Override
  public void toBinary(Object o, ByteBuf buf) {
    if (o instanceof Ping) {
      buf.writeByte(PING); // 1 byte
      // total 1 bytes
    } else if (o instanceof Pong) {
      buf.writeByte(PONG); // 1 byte
      // total 1 bytes
    }
  }

  @Override
  public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
    byte type = buf.readByte(); // 1 byte
    switch (type) {
      case PING:
        return Ping.EVENT; // 1 bytes total, check
      case PONG:
        return Pong.EVENT; // 1 bytes total, check
    }
    return null;
  }
}
Scala
package sexamples.networking.pingpongcleaned

import io.netty.buffer.ByteBuf
import java.net.{InetAddress, UnknownHostException}
import java.util.Optional
import se.sics.kompics.network.Transport
import se.sics.kompics.network.netty.serialization.{Serializer, Serializers}

object PingPongSerializer extends Serializer {
  private val PING: Byte = 1;
  private val PONG: Byte = 2;

  private val NO_HINT: Optional[AnyRef] = Optional.empty();

  override def identifier(): Int = 200;

  override def toBinary(o: AnyRef, buf: ByteBuf): Unit = {
    o match {
      case Ping => {
        buf.writeByte(PING); // 1 byte
        // total 1 bytes
      }
      case Pong => {
        buf.writeByte(PONG); // 1 byte
        // total 1 bytes
      }
    }
  }

  override def fromBinary(buf: ByteBuf, hint: Optional[AnyRef]): AnyRef = {
    val typeFlag = buf.readByte(); // 1 byte
    typeFlag match {
      case PING => {
        return Ping; // 1 bytes total, check
      }
      case PONG => {
        return Pong; // 1 bytes total, check
      }
      case _ => {
        Console.err.println(s"Got invalid byte flag=$typeFlag");
        return null;
      }
    }
  }
}

Pinger

Java
package jexamples.networking.pingpongcleaned;

import se.sics.kompics.Kompics;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.Start;
import se.sics.kompics.timer.*;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;

import java.util.UUID;

public class Pinger extends ComponentDefinition {

  Positive<Network> net = requires(Network.class);
  Positive<Timer> timer = requires(Timer.class);

  private long counter = 0;
  private UUID timerId;
  private final TAddress self;
  private final TAddress ponger;
  private final long timeoutPeriod;

  public Pinger() {
    this.self = config().getValue("pingpong.pinger.addr", TAddress.class);
    this.ponger = config().getValue("pingpong.pinger.pongeraddr", TAddress.class);
    this.timeoutPeriod = config().getValue("pingpong.pinger.timeout", Long.class);
  }

  Handler<Start> startHandler =
      new Handler<Start>() {
        public void handle(Start event) {
          SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, timeoutPeriod);
          PingTimeout timeout = new PingTimeout(spt);
          spt.setTimeoutEvent(timeout);
          trigger(spt, timer);
          timerId = timeout.getTimeoutId();
        }
      };

  ClassMatchedHandler<Pong, TMessage> pongHandler =
      new ClassMatchedHandler<Pong, TMessage>() {

        @Override
        public void handle(Pong content, TMessage context) {
          counter++;
          logger.info("Got Pong #{}!", counter);
          if (counter > 10) {
            Kompics.asyncShutdown();
          }
        }
      };
  Handler<PingTimeout> timeoutHandler =
      new Handler<PingTimeout>() {
        public void handle(PingTimeout event) {
          trigger(new TMessage(self, ponger, Transport.TCP, Ping.EVENT), net);
        }
      };

  {
    subscribe(startHandler, control);
    subscribe(pongHandler, net);
    subscribe(timeoutHandler, timer);
  }

  @Override
  public void tearDown() {
    trigger(new CancelPeriodicTimeout(timerId), timer);
  }

  public static class PingTimeout extends Timeout {
    public PingTimeout(SchedulePeriodicTimeout spt) {
      super(spt);
    }
  }
}
Scala
package sexamples.networking.pingpongcleaned

import se.sics.kompics.sl._
import se.sics.kompics.network.Network
import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer}

import java.util.UUID

object Pinger {
  class PingTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt);
}
class Pinger extends ComponentDefinition {
  import Pinger.PingTimeout;

  val self = cfg.getValue[TAddress]("pingpong.pinger.addr");
  val ponger = cfg.getValue[TAddress]("pingpong.pinger.pongeraddr");
  val timeoutPeriod = cfg.getValue[Long]("pingpong.pinger.timeout");

  val net = requires[Network];
  val timer = requires[Timer];

  private var counter: Long = 0L;
  private var timerId: Option[UUID] = None;

  ctrl uponEvent {
    case _: Start => {
      val spt = new SchedulePeriodicTimeout(0, timeoutPeriod);
      val timeout = new PingTimeout(spt);
      spt.setTimeoutEvent(timeout);
      trigger(spt -> timer);
      timerId = Some(timeout.getTimeoutId());
    }
  }

  net uponEvent {
    case TMessage(_, Pong) => {
      counter += 1L;
      log.info(s"Got Pong #${counter}!");
      if (counter > 10) {
        Kompics.asyncShutdown();
      }
    }
  }

  timer uponEvent {
    case _: PingTimeout => {
      trigger(TMessage(self, ponger, Ping) -> net);
    }
  }

  override def tearDown(): Unit = {
    timerId match {
      case Some(id) => {
        trigger(new CancelPeriodicTimeout(id) -> timer);
      }
      case None => () // no cleanup necessary
    }
  }

}

Ponger

Java
package jexamples.networking.pingpongcleaned;

import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;

public class Ponger extends ComponentDefinition {

  Positive<Network> net = requires(Network.class);

  private long counter = 0;
  private final TAddress self;

  public Ponger() {
    this.self = config().getValue("pingpong.ponger.addr", TAddress.class);
  }

  ClassMatchedHandler<Ping, TMessage> pingHandler =
      new ClassMatchedHandler<Ping, TMessage>() {
        @Override
        public void handle(Ping content, TMessage context) {
          counter++;
          logger.info("Got Ping #{}!", counter);
          trigger(new TMessage(self, context.getSource(), Transport.TCP, Pong.EVENT), net);
        }
      };

  {
    subscribe(pingHandler, net);
  }
}
Scala
package sexamples.networking.pingpongcleaned

import se.sics.kompics.sl._
import se.sics.kompics.network.Network

class Ponger extends ComponentDefinition {

  val self = cfg.getValue[TAddress]("pingpong.ponger.addr");

  val net = requires[Network];

  private var counter: Long = 0L;

  net uponEvent {
    case TMessage(header, Ping) => {
      counter += 1L;
      log.info(s"Got Ping #${counter}!");
      trigger(TMessage(self, header.getSource(), Pong) -> net);
    }
  }
}

And, of course, we must remember to register TMessage to the "netS" serialiser in the Main classobject.

Note

The ClassMatchedHandler is in fact only a specialisation of the more general MatchedHandler, which can use any kind of pattern to select values, and not just Class instances. The advantage of the ClassMatchedHandler is that the pattern to match against can be automatically extracted from the signature of the handle method using Java’s reflection API. For more general MatchedHandler usages, the pattern would have to be supplied manually by overriding the pattern method.

Assembly

Finally, we need to move away from using sbt to run our code. We need to able to deploy, configure, and run complete artifacts with all dependencies included. To achieve that goal we are going to need four things:

  1. The sbt assembly plugin,
  2. a folder named dist, where we collect all deployment artifacts,
  3. an application.conf file in the dist folder, that is used to override configuration values from the reference.conf file, which we need to customise for a specific deployment, and
  4. two bash scripts pinger.sh and ponger.sh, that hide away the ugly JVM configuration parameters from the users (or simply save us time typing them).

For the assembly plugin to select the correct main file, if we have more than one, add the following to the build settings:

mainClass in assembly := Some("jexamples.networking.pingpongcleaned.Main")
mainClass in assembly := Some("sexamples.networking.pingpongcleaned.Main")

Then simply call assembly in sbt.

After we create the new dist folder and move the new fat jar from the target folder into it, we create the two scripts and the application.conf file such that the content looks similar to the following:

$ ls -ohn
total 9984
-rw-r--r--  1 501   165B Dec 26 18:43 application.conf
-rw-r--r--  1 501   4.9M Dec 26 18:28 ping-pong-1.0-SNAPSHOT-fat.jar
-rwxr-xr-x  1 501    93B Dec 26 18:35 pinger.sh
-rwxr-xr-x  1 501    93B Dec 26 18:33 ponger.sh

Note the executable flag set on the bash scripts. Now write the following into the newly created files.

application.conf

pingpong.ponger.addr = "" // insert ponger self address
pingpong.pinger.addr = "" // insert pinger self address
pingpong.pinger.pongeraddr = "" // insert ponger target address

pinger.sh

#!/bin/bash

java -Dconfig.file=./application.conf -jar ping-pong-1.0-SNAPSHOT-fat.jar pinger

ponger.sh

#!/bin/bash

java -Dconfig.file=./application.conf -jar ping-pong-1.0-SNAPSHOT-fat.jar ponger

And now you can simply pack up the dist folder and distribute it to two machines that are connected via the network, unpack, and fill in the necessary fields in application.conf on both machines.

Finally, start first the ponger with:

./ponger.sh

And then the pinger:

./pinger.sh
Note

Of course, the bash files only work on *nix machines. If you need this to run on Windows, you’ll either have to write .bat files, or use one of the application packaging tools that generate .exe files from .jar files, and you’ll have to fix all the paths.

The source code for this page can be found here.