Channel Selectors
When a Channel
is connected, it can optionally be given an instance of ChannelSelector
(at least using the Java API for this), which is both a way to extract some value from an event (i.e. a specific field to use as a selection key) and also define a specific value for which this event should travel along this Channel
. The internal implementation of this is fairly efficient, but requires the selection value for a Channel
to be immutable. Note that, despite its previous name, a ChannelSelector
can not be abused as a filter. It is better to think of it as a way to create a routing table (which is in fact pretty much what happens internally).
Support for Node Ids
We will update our PingPong example such that we start a configurable number of Pinger
s per host. We will stick to a single Ponger
for now, since otherwise we would have to figure out how to load-balance across them. We shall add a byte[]
Array[Byte]
id field to TAddress
, that we will use as a selection key. However, when creating those ids we shall be a bit lazy and simply use ascending integers in their byte[]
Array[Byte]
representation. You will see later why we picked byte[]
Array[Byte]
and not int
Int
directly. Of course, we will also have to update the NetSerializer
, which we will not show again here, since it has become pretty big by now and is not terribly relevant to the point.
Reference.conf
pingpong {
ponger.addr = "127.0.0.1:34567"
pinger {
addr {
host = "127.0.0.1"
port = 34568
}
num = 2
timeout = 1000
pongeraddr {
host = "127.0.0.1"
port = 34567
}
}
}
TAddress
- Java
-
package jexamples.virtualnetworking.pingpongselectors; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Objects; import se.sics.kompics.network.Address; public class TAddress implements Address { private final InetSocketAddress isa; private final byte[] id; public TAddress(InetAddress addr, int port) { this(addr, port, null); } public TAddress(InetAddress addr, int port, byte[] id) { this.isa = new InetSocketAddress(addr, port); this.id = id; } @Override public InetAddress getIp() { return this.isa.getAddress(); } @Override public int getPort() { return this.isa.getPort(); } public byte[] getId() { return this.id; } public TAddress withVirtual(byte[] id) { return new TAddress(isa.getAddress(), isa.getPort(), id); } @Override public InetSocketAddress asSocket() { return this.isa; } @Override public boolean sameHostAs(Address other) { return this.isa.equals(other.asSocket()); /* note that we don't include the id here, since nodes with different * ids but the same socket are still on the same machine */ } // Not required but strongly recommended @Override public final String toString() { StringBuilder sb = new StringBuilder(); sb.append(isa.getHostString()); sb.append(":"); sb.append(isa.getPort()); if (id != null) { sb.append(":"); sb.append(Arrays.toString(id)); } return sb.toString(); } @Override public int hashCode() { return Objects.hash(isa, id); } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final TAddress other = (TAddress) obj; return Objects.equals(this.isa, other.isa) && Objects.deepEquals(this.id, this.id); } }
- Scala
-
object TAddress { def apply(addr: InetAddress, port: Int): TAddress = { val isa = new InetSocketAddress(addr, port); TAddress(isa, None) } def apply(addr: InetAddress, port: Int, id: Array[Byte]): TAddress = { val isa = new InetSocketAddress(addr, port); TAddress(isa, Some(id)) } } case class TAddress(isa: InetSocketAddress, id: Option[Array[Byte]]) extends Address { override def getIp(): InetAddress = isa.getAddress(); override def getPort(): Int = isa.getPort(); override def asSocket(): InetSocketAddress = isa; override def sameHostAs(other: Address): Boolean = { /* note that we don't include the id here, since nodes with different * ids but the same socket are still on the same machine */ this.isa.equals(other.asSocket()) } }
IdChannelSelector
- Java
-
package jexamples.virtualnetworking.pingpongselectors; import java.nio.ByteBuffer; import se.sics.kompics.ChannelSelector; public class IdChannelSelector extends ChannelSelector<TMessage, ByteBuffer> { public IdChannelSelector(byte[] id) { super(TMessage.class, ByteBuffer.wrap(id), true); } @Override public ByteBuffer getValue(TMessage event) { return ByteBuffer.wrap(event.header.dst.getId()); } }
- Scala
-
package sexamples.virtualnetworking.pingpongselectors import java.nio.ByteBuffer import se.sics.kompics.ChannelSelector class IdChannelSelector(id: Array[Byte]) extends ChannelSelector[TMessage, ByteBuffer](classOf[TMessage], ByteBuffer.wrap(id), true) { override def getValue(event: TMessage): ByteBuffer = { event.header.dst.id match { case Some(eventId) => ByteBuffer.wrap(eventId) case None => null } } }
Custom Child Configuration
Additionally, we need to tell the Pinger
components now what their id is, since we can not pull that out of the configuration file, as that is global to the whole Kompics runtime. We can, however, pass a modified version of the Config
object to child components. So we are going to pull the TAddress
from the application.conf
in the PingerParent
and use it as a base address that we pass to NettyNetwork
. And then for each Pinger
we create, we are going to write a virtual address with the id into the child Config
. This way we do not have to change any code in the Pinger
itself.
PingerParent
- Java
-
package jexamples.virtualnetworking.pingpongselectors; import com.google.common.primitives.Ints; import se.sics.kompics.Channel; import se.sics.kompics.Component; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Init; import se.sics.kompics.config.Config; import se.sics.kompics.network.Network; import se.sics.kompics.network.netty.NettyInit; import se.sics.kompics.network.netty.NettyNetwork; import se.sics.kompics.timer.Timer; import se.sics.kompics.timer.java.JavaTimer; public class PingerParent extends ComponentDefinition { public PingerParent() { TAddress baseSelf = config().getValue("pingpong.pinger.addr", TAddress.class); Component timer = create(JavaTimer.class, Init.NONE); Component network = create(NettyNetwork.class, new NettyInit(baseSelf)); int num = config().getValue("pingpong.pinger.num", Integer.class); for (int i = 0; i < num; i++) { byte[] id = Ints.toByteArray(i); Config.Builder cbuild = config().modify(this.id()); cbuild.setValue("pingpong.pinger.addr", baseSelf.withVirtual(id)); Component pinger = create(Pinger.class, Init.NONE, cbuild.finalise()); connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY); connect( pinger.getNegative(Network.class), network.getPositive(Network.class), new IdChannelSelector(id), Channel.TWO_WAY); } } }
- Scala
-
class PingerParent extends ComponentDefinition { val baseSelf = cfg.getValue[TAddress]("pingpong.pinger.addr"); val timer = create[JavaTimer]; val network = create[NettyNetwork](new NettyInit(baseSelf)); val num = cfg.getValue[Int]("pingpong.pinger.num"); for (i <- 0 until num) { val id = Ints.toByteArray(i); val cbuild = this.config().modify(this.id()); cbuild.setValue("pingpong.pinger.addr", baseSelf.copy(id = Some(id))); val pinger = create[Pinger](Init.none[Pinger], cbuild.finalise()); connect[Timer](timer -> pinger); // must use the Java API here connect(pinger.getNegative(classOf[Network]), network.getPositive(classOf[Network]), new IdChannelSelector(id), Channel.TWO_WAY); } }
Execution
That is it. Either run it again from within sbt, or assemble and copy the fat .jar
to the dist
folder, and then distribute that folder to where you want to run from and run the Ponger
first, then the Pinger
. You will see one Pong
for every Ping
, as expected.