Serialisation

With NettyNetwork comes a serialisation framework that builds on Netty’s ByteBuf facilities. The Kompics part is two-fold: It consists of the Serializer interface, and a registration, mapping, and lookup service for such serialisers in Serializers.

Serialiser Identifiers

For every Serializer you write and register you will be required to pick a unique intInt identifier. You are required to keep track of the identifiers you use yourself. Kompics reserves the range 0-16 for internal use. By default only a single byteByte worth of identifier space is used, in order to minimise the overhead of the framework. If you are assigning larger identifiers than 255 (or their signed equivalents), make sure to use the resize method before registering the respective serialisers.

Note

For larger projects it can become quite difficult to keep track of which identifiers are used and which are free. It is recommended to either assign them all in a single place (e.g., static fields of a classfields of an object, or a configuration file), or assign subranges to submodules of the project and then do the same single-location-assignment per module.

Serialiser Methods

When implementing the toBinary and fromBinary methods you are free to do as you like, as long as at the end of toBinary the whole object is in the ByteBuf and at the end of fromBinary the same number of bytes that you put into the ByteBuf are removed from it again.

Serializers

The Serializers singleton class has two important functions. First, it registers serialisers into the system, so that they can be looked up when messages come in that have been serialised with them (as defined by their identifier). And second, it maps types to specific Serializer instances that are already registered. The most convenient way to do that is by registering a short name to the serialiser instance via register(Serializer, String), and then register the class mappings to that name using register(Class, String). Alternatively, the identifier can be used as well with register(Class, int) or both can be achieved at once for a single type-serialiser-mapping using register(Class, Serializer).

The lookup of serialisers for types is hierarchical, that is, when there is no exact mapping found for the type, the system will traverse its type hierarchy upwards to find a matching supertype Serializer and attempt to use that one instead. For example, if the object to be serialised implements Java’s Serializable interface and there is no more specific Serializer registered, the object will be serialised with Java object serialisation (which is registered by default).

Default Serialisers

The other default serialisers (and their identifiers) are:

Note

For people familiar with common Java serialisation frameworks it might be obvious that Kryo is missing from the above list. This is on purpose. Kompics’ previous messaging middleware used to rely on Kryo for serialisation, but Kryo’s low level object layout hacks lead to a host of problems with deployments that included different types of JVMs (e.g., Oracle JRE and Open JRE, or different versions). For this reason we have purposefully moved away from Kryo serialisation. You are welcome to implement your own Kryo-based Serializer, just know that in some setups you might encounter a lot of very difficult to pinpoint bugs.

PingPong Serialisers

In our PingPong example, the classes that need to be serialised are: TAddress, THeader, Ping, and Pong. Note that there is no necessesity to serialise TMessage separately from Ping and Pong at this point, as it has no content of its own and cannot be instantied. Of course, we could simply have all of those classes implement Serializable (as the case classes already do) and be done with it. Since this is a tutorial (and Java’s object serialisation is terribly inefficient), we are instead going to write our own custom serialisers. As preparation for future sections and to show off the system, we will split the serialisation logic into two classes: NetSerializer will deal with TAddress and THeader instances, while PingPongSerializer will deal with Ping and Pong messages. Since THeader is part of the messages, we will of course call the NetSerializer from the PingPongSerializer, but we will act as if we do not know what kind of Header we are dealing with and let the serialisation framework figure that out. Similarly, TAddresses are part of a THeader, so we will invoke the NetSerializer from itself. Note, that the difference between two approaches is whether or not the serialiser’s identifier will be prepended. If we invoke a serialiser directly from within another serialiser, it is assumed that it will be known at serialisation and deserialisation time what the object is and the identifier can be omitted. If we instead use the Serializers facilities, the system will keep track of that itself using the identifier.

In order to keep our identifier spaces separate we’ll pick id=100 for the NetSerializer and id=200 for the PingPongSerializer. And since we only have two instances to deal with, we’ll just define the identifiers within the classes in the lazy way.

We also added some extra constructors to the messages, to make it easier to pass THeader instances from another serialiser during deserialisation.

NetSerializer

Java
package jexamples.networking.pingpongdistributed;

import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.UnknownHostException;

import java.util.Optional;
import se.sics.kompics.network.Transport;
import se.sics.kompics.network.netty.serialization.Serializer;

public class NetSerializer implements Serializer {

  private static final byte ADDR = 1;
  private static final byte HEADER = 2;

  @Override
  public int identifier() {
    return 100;
  }

  @Override
  public void toBinary(Object o, ByteBuf buf) {
    if (o instanceof TAddress) {
      TAddress addr = (TAddress) o;
      buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
      buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (let's assumee it's IPv4)
      buf.writeShort(addr.getPort()); // we only need 2 bytes here
      // total 7 bytes
    } else if (o instanceof THeader) {
      THeader header = (THeader) o;
      buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
      this.toBinary(header.src, buf); // use this serialiser again (7 bytes)
      this.toBinary(header.dst, buf); // use this serialiser again (7 bytes)
      buf.writeByte(header.proto.ordinal()); // 1 byte is enough
      // total 16 bytes
    }
  }

  @Override
  public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
    byte type = buf.readByte(); // read the first byte to figure out the type
    switch (type) {
      case ADDR:
        {
          byte[] ipBytes = new byte[4];
          buf.readBytes(ipBytes);
          try {
            InetAddress ip = InetAddress.getByAddress(ipBytes); // 4 bytes
            int port = buf.readUnsignedShort(); // 2 bytes
            return new TAddress(ip, port); // total of 7, check
          } catch (UnknownHostException ex) {
            throw new RuntimeException(ex); // let Netty deal with this
          }
        }
      case HEADER:
        {
          TAddress src =
              (TAddress)
                  this.fromBinary(
                      buf, Optional.empty()); // We already know what it's going to be (7 bytes)
          TAddress dst = (TAddress) this.fromBinary(buf, Optional.empty()); // same here (7 bytes)
          int protoOrd = buf.readByte(); // 1 byte
          Transport proto = Transport.values()[protoOrd];
          return new THeader(src, dst, proto); // total of 16 bytes, check
        }
    }
    return null; // strange things happened^^
  }
}
Scala
package sexamples.networking.pingpongdistributed

import io.netty.buffer.ByteBuf
import java.net.{InetAddress, UnknownHostException}
import java.util.Optional
import se.sics.kompics.network.Transport
import se.sics.kompics.network.netty.serialization.Serializer

object NetSerializer extends Serializer {
  private val ADDR: Byte = 1;
  private val HEADER: Byte = 2;

  private val NO_HINT: Optional[AnyRef] = Optional.empty();

  override def identifier(): Int = 100;

  override def toBinary(o: AnyRef, buf: ByteBuf): Unit = {
    o match {
      case addr: TAddress => {
        buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
        buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (let's assume it's IPv4)
        buf.writeShort(addr.getPort()); // we only need 2 bytes here
        // total 7 bytes
      }
      case header: THeader => {
        buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
        this.toBinary(header.src, buf); // use this serialiser again (7 bytes)
        this.toBinary(header.dst, buf); // use this serialiser again (7 bytes)
        buf.writeByte(header.proto.ordinal()); // 1 byte is enough
        // total 16 bytes
      }
    }
  }

  override def fromBinary(buf: ByteBuf, hint: Optional[AnyRef]): AnyRef = {
    val typeFlag = buf.readByte(); // read the first byte to figure out the type
    typeFlag match {
      case ADDR => {
        val ipBytes = Array.ofDim[Byte](4);
        buf.readBytes(ipBytes);
        val ip = InetAddress.getByAddress(ipBytes); // 4 bytes
        val port = buf.readUnsignedShort(); // 2 bytes
        return TAddress(ip, port); // total of 7, check
      }
      case HEADER => {
        val src = this.fromBinary(buf, NO_HINT).asInstanceOf[TAddress]; // We already know what it's going to be (7 bytes)
        val dst = this.fromBinary(buf, NO_HINT).asInstanceOf[TAddress]; // We already know what it's going to be (7 bytes)
        val protoOrd = buf.readByte(); // 1 byte
        val proto = Transport.values()(protoOrd);
        return THeader(src, dst, proto); // total of 16 bytes, check
      }
      case _ => {
        Console.err.println(s"Got invalid byte flag=$typeFlag");
        return null;
      }
    }
  }
}

PingPongSerializer

Java
package jexamples.networking.pingpongdistributed;

import java.util.Optional;
import io.netty.buffer.ByteBuf;
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.netty.serialization.Serializers;

public class PingPongSerializer implements Serializer {

  private static final byte PING = 1;
  private static final byte PONG = 2;

  @Override
  public int identifier() {
    return 200;
  }

  @Override
  public void toBinary(Object o, ByteBuf buf) {
    if (o instanceof Ping) {
      Ping ping = (Ping) o;
      buf.writeByte(PING); // 1 byte
      Serializers.toBinary(ping.header, buf); // 1 byte serialiser id + 16 bytes THeader
      // total 18 bytes
    } else if (o instanceof Pong) {
      Pong pong = (Pong) o;
      buf.writeByte(PONG); // 1 byte
      Serializers.toBinary(pong.header, buf); // 1 byte serialiser id + 16 bytes THeader
      // total 18 bytes
    }
  }

  @Override
  public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
    byte type = buf.readByte(); // 1 byte
    switch (type) {
      case PING:
        {
          THeader header =
              (THeader)
                  Serializers.fromBinary(
                      buf, Optional.empty()); // 1 byte serialiser id + 16 bytes THeader
          return new Ping(header); // 18 bytes total, check
        }
      case PONG:
        {
          THeader header =
              (THeader)
                  Serializers.fromBinary(
                      buf, Optional.empty()); // 1 byte serialiser id + 16 bytes THeader
          return new Pong(header); // 18 bytes total, check
        }
    }
    return null;
  }
}
Scala
package sexamples.networking.pingpongdistributed

import io.netty.buffer.ByteBuf
import java.net.{InetAddress, UnknownHostException}
import java.util.Optional
import se.sics.kompics.network.Transport
import se.sics.kompics.network.netty.serialization.{Serializer, Serializers}

object PingPongSerializer extends Serializer {
  private val PING: Byte = 1;
  private val PONG: Byte = 2;

  private val NO_HINT: Optional[AnyRef] = Optional.empty();

  override def identifier(): Int = 200;

  override def toBinary(o: AnyRef, buf: ByteBuf): Unit = {
    o match {
      case ping: Ping => {
        buf.writeByte(PING); // 1 byte
        Serializers.toBinary(ping.header, buf); // 1 byte serialiser id + 16 bytes THeader
        // total 18 bytes
      }
      case pong: Pong => {
        buf.writeByte(PONG); // 1 byte
        Serializers.toBinary(pong.header, buf); // 1 byte serialiser id + 16 bytes THeader
        // total 18 bytes
      }
    }
  }

  override def fromBinary(buf: ByteBuf, hint: Optional[AnyRef]): AnyRef = {
    val typeFlag = buf.readByte(); // 1 byte
    typeFlag match {
      case PING => {
        val header = Serializers.fromBinary(buf, NO_HINT).asInstanceOf[THeader]; // 1 byte serialiser id + 16 bytes THeader
        return new Ping(header); // 18 bytes total, check
      }
      case PONG => {
        val header = Serializers.fromBinary(buf, NO_HINT).asInstanceOf[THeader]; // 1 byte serialiser id + 16 bytes THeader
        return new Pong(header); // 18 bytes total, check
      }
      case _ => {
        Console.err.println(s"Got invalid byte flag=$typeFlag");
        return null;
      }
    }
  }
}
Note

The serialisation above code is not particularly space efficient, since that was not the purpose. To get an idea how to use bit-fields to get a significant space reduction on these kind of frequently used serialisation procedures, take a look at the source code of the SpecialSerializers class.

All that is left is now is to register the new serialisers and map the right types to them. We add the following to the Main classobject:

Java
static {
  // register
  Serializers.register(new NetSerializer(), "netS");
  Serializers.register(new PingPongSerializer(), "ppS");
  // map
  Serializers.register(TAddress.class, "netS");
  Serializers.register(THeader.class, "netS");
  Serializers.register(Ping.class, "ppS");
  Serializers.register(Pong.class, "ppS");
}
Scala
// register
Serializers.register(NetSerializer, "netS");
Serializers.register(PingPongSerializer, "ppS");
// map
Serializers.register(classOf[TAddress], "netS");
Serializers.register(classOf[THeader], "netS");
Serializers.register(classOf[Ping], "ppS");
Serializers.register(classOf[Pong], "ppS");

Distributed PingPong

Now we are prepared for a true distributed deployment of our PingPong example. There are a number of changes we need to make to the way we set up the component hierarchy. First of all, we want to deploy Pinger and Ponger separately, and they also need different parameters. The Ponger is purely reactive and it only needs to know its own self address. The Pinger, on the other hand, needs to know both its own address and a Ponger’s. We are going to make use of that distinction in the Main classobject, in order to decide which one to start. If we see two commandline arguments (1 IP and 1 port), we are going to start a Ponger. If, however, we see four commandline arguments (2 IPs and 2 ports), we are going to start a Pinger. Since our application components now need different setup, we are also going to split the Parent into a PingerParent and a PongerParent. Note, that it is always a good idea to have a class without any business logic that sets up the Network and Timer connections, as you will see in the section on simulation.

PingerParent

Java
package jexamples.networking.pingpongdistributed;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;

public class PingerParent extends ComponentDefinition {

  public PingerParent(Init init) {
    Component timer = create(JavaTimer.class, Init.NONE);
    Component network = create(NettyNetwork.class, new NettyInit(init.self));
    Component pinger = create(Pinger.class, new Pinger.Init(init.self, init.ponger));

    connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);

    connect(pinger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
  }

  public static class Init extends se.sics.kompics.Init<PingerParent> {

    public final TAddress self;
    public final TAddress ponger;

    public Init(TAddress self, TAddress ponger) {
      this.self = self;
      this.ponger = ponger;
    }
  }
}
Scala
class PingerParent(init: Init[PingerParent]) extends ComponentDefinition {

  val Init(self: TAddress, ponger: TAddress) = init;

  val timer = create[JavaTimer];
  val network = create[NettyNetwork](new NettyInit(self));
  val pinger = create[Pinger](Init[Pinger](self, ponger));

  connect[Timer](timer -> pinger);

  connect[Network](network -> pinger);
}

PongerParent

Java
package jexamples.networking.pingpongdistributed;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;

public class PongerParent extends ComponentDefinition {

  public PongerParent(Init init) {
    Component network = create(NettyNetwork.class, new NettyInit(init.self));
    Component ponger = create(Ponger.class, new Ponger.Init(init.self));

    connect(ponger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
  }

  public static class Init extends se.sics.kompics.Init<PongerParent> {

    public final TAddress self;

    public Init(TAddress self) {
      this.self = self;
    }
  }
}
Scala
class PongerParent(init: Init[PongerParent]) extends ComponentDefinition {

  val Init(self: TAddress) = init;

  val network = create[NettyNetwork](new NettyInit(self));
  val ponger = create[Ponger](Init[Ponger](self));

  connect[Network](network -> ponger);
}

Main

Java
package jexamples.networking.pingpongdistributed;

import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.Kompics;
import se.sics.kompics.network.netty.serialization.Serializers;

public class Main {

  static {
    // register
    Serializers.register(new NetSerializer(), "netS");
    Serializers.register(new PingPongSerializer(), "ppS");
    // map
    Serializers.register(TAddress.class, "netS");
    Serializers.register(THeader.class, "netS");
    Serializers.register(Ping.class, "ppS");
    Serializers.register(Pong.class, "ppS");
  }

  public static void main(String[] args) {
    try {
      if (args.length == 2) { // start Ponger
        InetAddress ip = InetAddress.getByName(args[0]);
        int port = Integer.parseInt(args[1]);
        TAddress self = new TAddress(ip, port);
        Kompics.createAndStart(PongerParent.class, new PongerParent.Init(self), 2);
        System.out.println("Starting Ponger at " + self);
        Kompics.waitForTermination();
        // will never actually terminate...act like a server and keep running until externally
        // exited
      } else if (args.length == 4) { // start Pinger
        InetAddress myIp = InetAddress.getByName(args[0]);
        int myPort = Integer.parseInt(args[1]);
        TAddress self = new TAddress(myIp, myPort);
        InetAddress pongerIp = InetAddress.getByName(args[2]);
        int pongerPort = Integer.parseInt(args[3]);
        TAddress ponger = new TAddress(pongerIp, pongerPort);
        Kompics.createAndStart(PingerParent.class, new PingerParent.Init(self, ponger), 2);
        System.out.println("Starting Pinger at" + self + " to " + ponger);
        Kompics.waitForTermination();
        System.exit(0);
      } else {
        System.err.println("Invalid number of parameters (2 for Ponger, 4 for Pinger)");
        System.exit(1);
      }
    } catch (UnknownHostException ex) {
      System.err.println(ex);
      System.exit(1);
    } catch (InterruptedException ex) {
      System.err.println(ex);
      System.exit(1);
    }
  }
}
Scala
package sexamples.networking.pingpongdistributed

import se.sics.kompics.sl._
import se.sics.kompics.network.netty.serialization.Serializers
import java.net.{InetAddress, UnknownHostException}

object Main {

  // register
  Serializers.register(NetSerializer, "netS");
  Serializers.register(PingPongSerializer, "ppS");
  // map
  Serializers.register(classOf[TAddress], "netS");
  Serializers.register(classOf[THeader], "netS");
  Serializers.register(classOf[Ping], "ppS");
  Serializers.register(classOf[Pong], "ppS");

  def main(args: Array[String]): Unit = {
    if (args.length == 2) { // start Ponger
      val ip = InetAddress.getByName(args(0));
      val port = Integer.parseInt(args(1));
      val self = TAddress(ip, port);
      Kompics.createAndStart(classOf[PongerParent], Init[PongerParent](self), 2);
      println(s"Starting Ponger at $self");
      Kompics.waitForTermination();
      // will never actually terminate...act like a server and keep running until externally exited
    } else if (args.length == 4) {
      val ip = InetAddress.getByName(args(0));
      val port = Integer.parseInt(args(1));
      val self = TAddress(ip, port);
      val pongerIp = InetAddress.getByName(args(2));
      val pongerPort = Integer.parseInt(args(3));
      val ponger = TAddress(pongerIp, pongerPort);
      Kompics.createAndStart(classOf[PingerParent], Init[PingerParent](self, ponger), 2);
      Kompics.waitForTermination();
      System.exit(0);
    } else {
      System.err.println("Invalid number of parameters (2 for Ponger, 4 for Pinger)");
      System.exit(1);
    }
  }
}

Execution

Now we are finally ready to try this. The following commands should be run in different terminals (different sbt instances), preferably side by side so you can clearly see how things are happening. For all the examples, feel free to change the IPs and ports as you like, but be sure they match up correctly.

Start the Ponger first with:

runMain jexamples.networking.pingpongdistributed.Main 127.0.0.1 34567
runMain sexamples.networking.pingpongdistributed.Main 127.0.0.1 34567

Then start a Pinger with:

runMain jexamples.networking.pingpongdistributed.Main 127.0.0.1 34568 127.0.0.1 34567
runMain sexamples.networking.pingpongdistributed.Main 127.0.0.1 34568 127.0.0.1 34567

You can start the same Pinger multiple times or start as many Pinger JVMs in parallel as you like. Make sure they all have their own port, though!

You will see that there is no Ping amplification anymore as we saw in the last example. All messages are now correctly addressed and replied to. If you struggle to clearly see what is going on among all the logging output, try to reduce the logging level to info in the logback.xml file. Finally, use Control-c to shutdown the Ponger, as it will run forever otherwise.

The source code for this page can be found here.