Messages, Addresses, and Headers

The Network port only allows three kinds of events: A Msg may pass in both directions (indication and request), while MessageNotify.Req is a request and MessageNotify.Resp is an indication. The latter two can be used to ask the Network service for feedback about whether or not a specific message was sent, how long it took, and how big the serialised version was. This is convenient for systems that need to keep statistics about their messages, or where resources should be freed as soon as a message crosses the wire. All messages that go over the network have to implement the Msg interface, which merely stipulates that a message has to have some kind of Header. The deprecated methods still need to be implemented for backwards compatibility, but should simply forward fields from the header.

Header

The header itself requires three fields:

  1. A source address, which on the sender side is typically the self address of the node, and on the receiver side refers to the sender.
  2. A destination address, which tells the network on the sender side, where the message should go, and is typically the self address on the receiver side.
  3. The transport protocol to be used for the message. There is no requirement for all Network implementations to implement all possible protocols. NettyNetwork implements TCP, UDP, and UDT only.

Address

The Address interface has four required methods:

  1. IP and
  2. port are pretty self explanatory.
  3. asSocket should get a combined represenation of IP and port. It is strongly recommended to keep the internal representation of the address as InetSocketAddress, since this method will be called a lot more often than the first two, and creating a new object for it on the fly is rather expensive.
  4. Finally, the sameHostAs method is used for avoiding serialisation overhead when the message would end up in the same JVM anyway. This can be used for local addressing of components and is typically important for virtual nodes, where components on the same JVM will often communicate via the network port.
Note

None of the interface presented above make any requirements as to the (im-)mutability of their fields. It is generally recommended to make all of them immutable whenever possible. However, certain setups may require mutable headers, for example. A routing component might be implemented in such a way.

Implementations

Since almost all application will need custom fields in addition to the fields in the Msg, Header, or Address interfaces, almost everyone will want to write their own implementations. However, if that should not be the case, a simple default implementation can be found in netty.DirectMessage, netty.DirectHeader, and NettyAddress. For the purpose of this tutorial, however, we will write our own, which we will prefix with the letter T (for Tutorial) to easily differentiate it from the interfaces.

TAddress

Java
package jexamples.networking.pingpong;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import se.sics.kompics.network.Address;

public class TAddress implements Address {

  private final InetSocketAddress isa;

  public TAddress(InetAddress addr, int port) {
    this.isa = new InetSocketAddress(addr, port);
  }

  @Override
  public InetAddress getIp() {
    return this.isa.getAddress();
  }

  @Override
  public int getPort() {
    return this.isa.getPort();
  }

  @Override
  public InetSocketAddress asSocket() {
    return this.isa;
  }

  @Override
  public boolean sameHostAs(Address other) {
    return this.isa.equals(other.asSocket());
  }

  // Not required but strongly recommended

  @Override
  public final String toString() {
    return isa.toString();
  }

  @Override
  public int hashCode() {
    int hash = 3;
    hash = 11 * hash + (this.isa != null ? this.isa.hashCode() : 0);
    return hash;
  }

  @Override
  public boolean equals(Object obj) {
    if (obj == null) {
      return false;
    }
    if (getClass() != obj.getClass()) {
      return false;
    }
    final TAddress other = (TAddress) obj;
    if (this.isa != other.isa && (this.isa == null || !this.isa.equals(other.isa))) {
      return false;
    }
    return true;
  }
}
Scala
object TAddress {
  def apply(addr: InetAddress, port: Int): TAddress = {
    val isa = new InetSocketAddress(addr, port);
    TAddress(isa)
  }
}
case class TAddress(isa: InetSocketAddress) extends Address {

  override def getIp(): InetAddress = isa.getAddress();
  override def getPort(): Int = isa.getPort();
  override def asSocket(): InetSocketAddress = isa;
  override def sameHostAs(other: Address): Boolean = {
    this.isa.equals(other.asSocket())
  }
}

THeader

Java
package jexamples.networking.pingpong;

import se.sics.kompics.network.Address;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.Transport;

public class THeader implements Header<TAddress> {

  public final TAddress src;
  public final TAddress dst;
  public final Transport proto;

  public THeader(TAddress src, TAddress dst, Transport proto) {
    this.src = src;
    this.dst = dst;
    this.proto = proto;
  }

  @Override
  public TAddress getSource() {
    return src;
  }

  @Override
  public TAddress getDestination() {
    return dst;
  }

  @Override
  public Transport getProtocol() {
    return proto;
  }
}
Scala
case class THeader(src: TAddress, dst: TAddress, proto: Transport) extends Header[TAddress] {
  override def getSource(): TAddress = src;
  override def getDestination(): TAddress = dst;
  override def getProtocol(): Transport = proto;
}

TMessage

Java
package jexamples.networking.pingpong;

import se.sics.kompics.network.Address;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Transport;

public abstract class TMessage implements Msg<TAddress, THeader> {

  public final THeader header;

  public TMessage(TAddress src, TAddress dst, Transport protocol) {
    this.header = new THeader(src, dst, protocol);
  }

  @Override
  public THeader getHeader() {
    return this.header;
  }

  @SuppressWarnings("deprecation")
  @Override
  public TAddress getSource() {
    return this.header.src;
  }

  @SuppressWarnings("deprecation")
  @Override
  public TAddress getDestination() {
    return this.header.dst;
  }

  @SuppressWarnings("deprecation")
  @Override
  public Transport getProtocol() {
    return this.header.proto;
  }
}
Scala
abstract class TMessage(val header: THeader) extends Msg[TAddress, THeader] {
  def this(src: TAddress, dst: TAddress, proto: Transport) {
    this(THeader(src, dst, proto))
  }

  override def getHeader(): THeader = header;
  override def getSource(): TAddress = header.src;
  override def getDestination(): TAddress = header.dst;
  override def getProtocol(): Transport = header.proto;
}

For our PingPong example we shall have both Ping and Pong extend TMessage instead of using the direct request-response. We also hard-code TCP as protocol for both for now.

Ping

Java
package jexamples.networking.pingpong;

import se.sics.kompics.network.Transport;

public class Ping extends TMessage {
  public Ping(TAddress src, TAddress dst) {
    super(src, dst, Transport.TCP);
  }
}
Scala
object Ping {
  def apply(src: TAddress, dst: TAddress): Ping = new Ping(THeader(src, dst, Transport.TCP));
}
class Ping(_header: THeader) extends TMessage(_header);

Pong

Java
package jexamples.networking.pingpong;

import se.sics.kompics.network.Transport;

public class Pong extends TMessage {
  public Pong(TAddress src, TAddress dst) {
    super(src, dst, Transport.TCP);
  }
}
Scala
object Pong {
  def apply(src: TAddress, dst: TAddress): Pong = new Pong(THeader(src, dst, Transport.TCP));
}
class Pong(_header: THeader) extends TMessage(_header);

Now we need to change the Pinger and the Ponger to take a self address as init-parameter, require the Network port and feed the new constructor arguments to the Ping and Pong classes. Since we are using the network port now for the ping-pong exchange, we can remove the requirement for the PingPongPort. In this first example we shall make our lives somewhat simple and use the self address as source and destination for both classes. This corresponds to the local reflection example mentioned above.

Pinger

Java
package jexamples.networking.pingpong;

import se.sics.kompics.Kompics;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.Start;
import se.sics.kompics.timer.*;
import se.sics.kompics.network.Network;

import java.util.UUID;

public class Pinger extends ComponentDefinition {

  Positive<Network> net = requires(Network.class);
  Positive<Timer> timer = requires(Timer.class);

  private long counter = 0;
  private UUID timerId;
  private final TAddress self;

  public Pinger(Init init) {
    this.self = init.self;
  }

  Handler<Start> startHandler =
      new Handler<Start>() {
        public void handle(Start event) {
          SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, 1000);
          PingTimeout timeout = new PingTimeout(spt);
          spt.setTimeoutEvent(timeout);
          trigger(spt, timer);
          timerId = timeout.getTimeoutId();
        }
      };
  Handler<Pong> pongHandler =
      new Handler<Pong>() {
        public void handle(Pong event) {
          counter++;
          logger.info("Got Pong #{}!", counter);
          if (counter > 10) {
            Kompics.asyncShutdown();
          }
        }
      };
  Handler<PingTimeout> timeoutHandler =
      new Handler<PingTimeout>() {
        public void handle(PingTimeout event) {
          trigger(new Ping(self, self), net);
        }
      };

  {
    subscribe(startHandler, control);
    subscribe(pongHandler, net);
    subscribe(timeoutHandler, timer);
  }

  @Override
  public void tearDown() {
    trigger(new CancelPeriodicTimeout(timerId), timer);
  }

  public static class PingTimeout extends Timeout {
    public PingTimeout(SchedulePeriodicTimeout spt) {
      super(spt);
    }
  }

  public static class Init extends se.sics.kompics.Init<Pinger> {
    public final TAddress self;

    public Init(TAddress self) {
      this.self = self;
    }
  }
}
Scala
package sexamples.networking.pingpong

import se.sics.kompics.sl._
import se.sics.kompics.network.Network
import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer}

import java.util.UUID

object Pinger {
  class PingTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt);
}
class Pinger(init: Init[Pinger]) extends ComponentDefinition {
  import Pinger.PingTimeout;

  val Init(self: TAddress) = init;

  val net = requires[Network];
  val timer = requires[Timer];

  private var counter: Long = 0L;
  private var timerId: Option[UUID] = None;

  ctrl uponEvent {
    case _: Start => {
      val spt = new SchedulePeriodicTimeout(0, 1000);
      val timeout = new PingTimeout(spt);
      spt.setTimeoutEvent(timeout);
      trigger(spt -> timer);
      timerId = Some(timeout.getTimeoutId());
    }
  }

  net uponEvent {
    case _: Pong => {
      counter += 1L;
      log.info(s"Got Pong #${counter}!");
      if (counter > 10) {
        Kompics.asyncShutdown();
      }
    }
  }

  timer uponEvent {
    case _: PingTimeout => {
      trigger(Ping(self, self) -> net);
    }
  }

  override def tearDown(): Unit = {
    timerId match {
      case Some(id) => {
        trigger(new CancelPeriodicTimeout(id) -> timer);
      }
      case None => () // no cleanup necessary
    }
  }

}

Ponger

Java
package jexamples.networking.pingpong;

import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.network.Network;

public class Ponger extends ComponentDefinition {

  Positive<Network> net = requires(Network.class);

  private long counter = 0;
  private final TAddress self;

  public Ponger(Init init) {
    this.self = init.self;
  }

  Handler<Ping> pingHandler =
      new Handler<Ping>() {
        public void handle(Ping event) {
          counter++;
          logger.info("Got Ping #{}!", counter);
          trigger(new Pong(self, event.getSource()), net);
        }
      };

  {
    subscribe(pingHandler, net);
  }

  public static class Init extends se.sics.kompics.Init<Ponger> {
    public final TAddress self;

    public Init(TAddress self) {
      this.self = self;
    }
  }
}
Scala
package sexamples.networking.pingpong

import se.sics.kompics.sl._
import se.sics.kompics.network.Network

class Ponger(init: Init[Ponger]) extends ComponentDefinition {

  val Init(self: TAddress) = init;

  val net = requires[Network];

  private var counter: Long = 0L;

  net uponEvent {
    case ping: Ping => {
      counter += 1L;
      log.info(s"Got Ping #${counter}!");
      trigger(Pong(self, ping.getSource()) -> net);
    }
  }

}

Parent

Finally, we have to create the NettyNetwork component in the Parent class, and connect it appropriately.

Java
package jexamples.networking.pingpong;

import se.sics.kompics.Channel;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Component;
import se.sics.kompics.Init;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.network.netty.NettyInit;

public class Parent extends ComponentDefinition {
  public Parent(Init init) {
    Component timer = create(JavaTimer.class, Init.NONE);
    Component network = create(NettyNetwork.class, new NettyInit(init.self));
    Component pinger = create(Pinger.class, new Pinger.Init(init.self));
    Component ponger = create(Ponger.class, new Ponger.Init(init.self));
    Component pinger2 = create(Pinger.class, new Pinger.Init(init.self));

    connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
    connect(pinger2.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);

    connect(pinger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
    connect(
        pinger2.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
    connect(ponger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
  }

  public static class Init extends se.sics.kompics.Init<Parent> {
    public final TAddress self;

    public Init(TAddress self) {
      this.self = self;
    }
  }
}
Scala
package sexamples.networking.pingpong

import se.sics.kompics.sl._
import se.sics.kompics.timer.Timer
import se.sics.kompics.timer.java.JavaTimer
import se.sics.kompics.network.Network
import se.sics.kompics.network.netty.{NettyInit, NettyNetwork}

class Parent(init: Init[Parent]) extends ComponentDefinition {

  val Init(self: TAddress) = init;

  val timer = create[JavaTimer];
  val network = create[NettyNetwork](new NettyInit(self));
  val pinger = create[Pinger](Init[Pinger](self));
  val pinger2 = create[Pinger](Init[Pinger](self));
  val ponger = create[Ponger](Init[Ponger](self));

  connect[Timer](timer -> pinger);
  connect[Timer](timer -> pinger2);

  connect[Network](network -> pinger);
  connect[Network](network -> pinger2);
  connect[Network](network -> ponger);
}

Main

As a preparation for later, we are going to take the port for self address as a commandline argument in the Main class, and hardcode 127.0.0.1 as IP address.

Java
package jexamples.networking.pingpong;

import se.sics.kompics.Kompics;
import java.net.InetAddress;
import java.net.UnknownHostException;

public class Main {
  public static void main(String[] args) throws InterruptedException, UnknownHostException {
    InetAddress ip = InetAddress.getLocalHost();
    int port = Integer.parseInt(args[0]);
    TAddress self = new TAddress(ip, port);
    Kompics.createAndStart(Parent.class, new Parent.Init(self), 2);
    Kompics.waitForTermination();
  }
}
Scala
package sexamples.networking.pingpong

import se.sics.kompics.sl._
import java.net.{InetAddress, UnknownHostException}

object Main {
  def main(args: Array[String]): Unit = {
    val ip = InetAddress.getLocalHost();
    val port = Integer.parseInt(args(0));
    val self = TAddress(ip, port);
    Kompics.createAndStart(classOf[Parent], Init[Parent](self), 2);
    Kompics.waitForTermination();
  }
}

Execution

At this point we can compile and run the code with a slight variation to the usual commands, since we need the port as an argument now (pick a different port if 34567 is in use on your system):

runMain jexamples.networking.pingpong.Main 34567
runMain sexamples.networking.pingpong.Main 34567

Problems

We can see from the output that our setup technically works, but we are back to the problem of getting four Pongs on our two Pings. The reason is, of course, that we are cheating. We are running all components in the same JVM, but we are not using proper virtual network support, yet. We also do not actually want to use virtual nodes here, but instead each Pinger and Ponger should be on a different host (or at least in a different JVM).

Before we can fix this, though, we have to fix another problem that we do not even see, yet: None of our messages are currently serialisable.

The source code for this page can be found here.