Channel Selectors

When a Channel is connected, it can optionally be given an instance of ChannelSelector (at least using the Java API for this), which is both a way to extract some value from an event (i.e. a specific field to use as a selection key) and also define a specific value for which this event should travel along this Channel. The internal implementation of this is fairly efficient, but requires the selection value for a Channel to be immutable. Note that, despite its previous name, a ChannelSelector can not be abused as a filter. It is better to think of it as a way to create a routing table (which is in fact pretty much what happens internally).

Support for Node Ids

We will update our PingPong example such that we start a configurable number of Pingers per host. We will stick to a single Ponger for now, since otherwise we would have to figure out how to load-balance across them. We shall add a byte[]Array[Byte] id field to TAddress, that we will use as a selection key. However, when creating those ids we shall be a bit lazy and simply use ascending integers in their byte[]Array[Byte] representation. You will see later why we picked byte[]Array[Byte] and not intInt directly. Of course, we will also have to update the NetSerializer, which we will not show again here, since it has become pretty big by now and is not terribly relevant to the point.

Reference.conf

pingpong {
	ponger.addr = "127.0.0.1:34567"
	pinger {
		addr {
			host = "127.0.0.1"
			port = 34568
		}
		num = 2
		timeout = 1000
		pongeraddr {
			host = "127.0.0.1"
			port = 34567
		}
	}
}

TAddress

Java
package jexamples.virtualnetworking.pingpongselectors;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Objects;
import se.sics.kompics.network.Address;

public class TAddress implements Address {

  private final InetSocketAddress isa;
  private final byte[] id;

  public TAddress(InetAddress addr, int port) {
    this(addr, port, null);
  }

  public TAddress(InetAddress addr, int port, byte[] id) {
    this.isa = new InetSocketAddress(addr, port);
    this.id = id;
  }

  @Override
  public InetAddress getIp() {
    return this.isa.getAddress();
  }

  @Override
  public int getPort() {
    return this.isa.getPort();
  }

  public byte[] getId() {
    return this.id;
  }

  public TAddress withVirtual(byte[] id) {
    return new TAddress(isa.getAddress(), isa.getPort(), id);
  }

  @Override
  public InetSocketAddress asSocket() {
    return this.isa;
  }

  @Override
  public boolean sameHostAs(Address other) {
    return this.isa.equals(other.asSocket());
    /* note that we don't include the id here, since nodes with different
     * ids but the same socket are still on the same machine
     */
  }

  // Not required but strongly recommended

  @Override
  public final String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append(isa.getHostString());
    sb.append(":");
    sb.append(isa.getPort());
    if (id != null) {
      sb.append(":");
      sb.append(Arrays.toString(id));
    }
    return sb.toString();
  }

  @Override
  public int hashCode() {
    return Objects.hash(isa, id);
  }

  @Override
  public boolean equals(Object obj) {
    if (obj == null) {
      return false;
    }
    if (getClass() != obj.getClass()) {
      return false;
    }
    final TAddress other = (TAddress) obj;
    return Objects.equals(this.isa, other.isa) && Objects.deepEquals(this.id, this.id);
  }
}
Scala
object TAddress {
  def apply(addr: InetAddress, port: Int): TAddress = {
    val isa = new InetSocketAddress(addr, port);
    TAddress(isa, None)
  }
  def apply(addr: InetAddress, port: Int, id: Array[Byte]): TAddress = {
    val isa = new InetSocketAddress(addr, port);
    TAddress(isa, Some(id))
  }
}
case class TAddress(isa: InetSocketAddress, id: Option[Array[Byte]]) extends Address {

  override def getIp(): InetAddress = isa.getAddress();
  override def getPort(): Int = isa.getPort();
  override def asSocket(): InetSocketAddress = isa;
  override def sameHostAs(other: Address): Boolean = {
    /* note that we don't include the id here, since nodes with different
     * ids but the same socket are still on the same machine
     */
    this.isa.equals(other.asSocket())
  }
}

IdChannelSelector

Java
package jexamples.virtualnetworking.pingpongselectors;

import java.nio.ByteBuffer;
import se.sics.kompics.ChannelSelector;

public class IdChannelSelector extends ChannelSelector<TMessage, ByteBuffer> {

  public IdChannelSelector(byte[] id) {
    super(TMessage.class, ByteBuffer.wrap(id), true);
  }

  @Override
  public ByteBuffer getValue(TMessage event) {
    return ByteBuffer.wrap(event.header.dst.getId());
  }
}
Scala
package sexamples.virtualnetworking.pingpongselectors

import java.nio.ByteBuffer
import se.sics.kompics.ChannelSelector

class IdChannelSelector(id: Array[Byte])
    extends ChannelSelector[TMessage, ByteBuffer](classOf[TMessage], ByteBuffer.wrap(id), true) {
  override def getValue(event: TMessage): ByteBuffer = {
    event.header.dst.id match {
      case Some(eventId) => ByteBuffer.wrap(eventId)
      case None          => null
    }
  }
}

Custom Child Configuration

Additionally, we need to tell the Pinger components now what their id is, since we can not pull that out of the configuration file, as that is global to the whole Kompics runtime. We can, however, pass a modified version of the Config object to child components. So we are going to pull the TAddress from the application.conf in the PingerParent and use it as a base address that we pass to NettyNetwork. And then for each Pinger we create, we are going to write a virtual address with the id into the child Config. This way we do not have to change any code in the Pinger itself.

PingerParent

Java
package jexamples.virtualnetworking.pingpongselectors;

import com.google.common.primitives.Ints;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.config.Config;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;

public class PingerParent extends ComponentDefinition {

  public PingerParent() {
    TAddress baseSelf = config().getValue("pingpong.pinger.addr", TAddress.class);

    Component timer = create(JavaTimer.class, Init.NONE);
    Component network = create(NettyNetwork.class, new NettyInit(baseSelf));

    int num = config().getValue("pingpong.pinger.num", Integer.class);
    for (int i = 0; i < num; i++) {
      byte[] id = Ints.toByteArray(i);
      Config.Builder cbuild = config().modify(this.id());
      cbuild.setValue("pingpong.pinger.addr", baseSelf.withVirtual(id));
      Component pinger = create(Pinger.class, Init.NONE, cbuild.finalise());

      connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);

      connect(
          pinger.getNegative(Network.class),
          network.getPositive(Network.class),
          new IdChannelSelector(id),
          Channel.TWO_WAY);
    }
  }
}
Scala
class PingerParent extends ComponentDefinition {

  val baseSelf = cfg.getValue[TAddress]("pingpong.pinger.addr");

  val timer = create[JavaTimer];
  val network = create[NettyNetwork](new NettyInit(baseSelf));

  val num = cfg.getValue[Int]("pingpong.pinger.num");
  for (i <- 0 until num) {
    val id = Ints.toByteArray(i);
    val cbuild = this.config().modify(this.id());
    cbuild.setValue("pingpong.pinger.addr", baseSelf.copy(id = Some(id)));
    val pinger = create[Pinger](Init.none[Pinger], cbuild.finalise());

    connect[Timer](timer -> pinger);

    // must use the Java API here
    connect(pinger.getNegative(classOf[Network]),
            network.getPositive(classOf[Network]),
            new IdChannelSelector(id),
            Channel.TWO_WAY);
  }
}

Execution

That is it. Either run it again from within sbt, or assemble and copy the fat .jar to the dist folder, and then distribute that folder to where you want to run from and run the Ponger first, then the Pinger. You will see one Pong for every Ping, as expected.

The source code for this page can be found here.