Virtual Networks¶
Quite a few distributed systems require some notion of vnodes, that is multiple instances of the same node functionality within one host machine. In a P2P system you might join different overlays, in DHT you might need to artificially increase the number of nodes to meet the statistical requirements for a reasonable load-balancing, and so on. The are many ways of realising such vnodes. The easiest might be to simply start one JVM for each node. This requires no change in your code whatsoever, just maybe a new startup script that keeps track of the PIDs of all the started nodes and how many to start. However, JVM instances aren’t cheap. They eat up significant resources. This approach might scale to a few tens of nodes, but certainly not to hundreds or thousands of vnodes.
An alternative approach is to start all (or at least some) of the vnodes in the same JVM instance. With Kompics this is also pretty easy: Instead of creating a new startup script, we create a new parent component that manages the different instances of the old parent component. Apart from that, we leave the whole component tree as it was before. Of course, we have to assign different ports to all the different NettyNetwork
instances that are being started, so our config files might grow a bit bigger unless we find a programmatic way of assigning them, but nothing that can’t be overcome. This approach might scale to a few hundreds of vnodes, but after that we are probably going to have problems finding free ports for Netty to bind on, since it actually uses twice as many ports as we ask it to (one for TCP
and UDP
and another one for UDT
) and most systems limit a single JVM’s (or any process’) port usage quite drastically. Netty also creates some threads when it starts, so we might end up driving the JVM’s thread scheduler insane.
Finally, we can exploit the fact that most of our vnode’s implementations wouldn’t actually use the full capacity of their Network
and Timer
implementations, and instead share them among all vnodes. In fact we might try to share as much common functionality as we can manage to extract from the vnodes. For this to work, however, we need a way to send messages only along a specific channel, or we will end up with the same problem our very first version of networked PingPong had where the Pong
s reached all Pinger
s and not just the one that sent the Ping
that caused it. Part of the solution are the Request-Response Events that were introduced in the section about timers. But as already mentioned there, the request-response pattern is not enough. What if we simply want to send an event to one of the vnodes without having an outstanding request? What if a network message for that vnode comes in? The solution to this problem are Channel Selectors.
Channel Selectors¶
When a Channel
is connected it can optionally be given an instance of se.sics.kompics.ChannelSelector
which is both a way to extract some value from an event (i.e. a specific field to use as a selection key) and also define a specific value for which this event should travel along this Channel
. The internal implementation of this is fairly efficient, but requires the selection value for a Channel
to be immutable. Note that despite its previous name, a ChannelSelector
can not be abused as a filter. It’s better to think of it like a way to create a routing table (which is in fact pretty much what happens internally). If you want to compare it to the handler pattern matching described earlier (see Cleanup: Config files, ClassMatchers, and Assembly) you can think of a ChannelSelector
to be both the PatternExtractor
and the MatchedHandler
.
We will update our PingPong example such that we start a configurable number of Pinger
s per host. We stick to a single Ponger
for now, since otherwise we’d have to figure out how to load-balance across them. We shall add a byte[]
id field to TAddress
that we will use as a selection key. However, when creating those ids we shall be a bit lazy and simply use ascending integers in their byte[]
representation. You will see later why we picked byte[]
and not int
directly. Of course, we’ll also have to update the NetSerializer
, which we won’t show again here, since it’s pretty big by now and not quite relevant to the point.
pingpong {
self {
host = "127.0.0.1"
port = 34567
}
pinger {
num = 2
timeout = 1000
pongeraddr {
host = "127.0.0.1"
port = 45678
}
}
}
package se.sics.test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Objects;
import se.sics.kompics.network.Address;
public class TAddress implements Address {
private final InetSocketAddress isa;
private final byte[] id;
public TAddress(InetAddress addr, int port) {
this(addr, port, null);
}
public TAddress(InetAddress addr, int port, byte[] id) {
this.isa = new InetSocketAddress(addr, port);
this.id = id;
}
@Override
public InetAddress getIp() {
return this.isa.getAddress();
}
@Override
public int getPort() {
return this.isa.getPort();
}
public byte[] getId() {
return this.id;
}
public TAddress withVirtual(byte[] id) {
return new TAddress(isa.getAddress(), isa.getPort(), id);
}
@Override
public InetSocketAddress asSocket() {
return this.isa;
}
@Override
public boolean sameHostAs(Address other) {
return this.isa.equals(other.asSocket());
/* note that we don't include the id here, since nodes with different
* ids but the same socket are still on the same machine
*/
}
// Not required but strongly recommended
@Override
public final String toString() {
StringBuilder sb = new StringBuilder();
sb.append(isa.getHostString());
sb.append(":");
sb.append(isa.getPort());
if (id != null) {
sb.append(":");
sb.append(Arrays.toString(id));
}
return sb.toString();
}
@Override
public int hashCode() {
return Objects.hash(isa, id);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final TAddress other = (TAddress) obj;
return Objects.equals(this.isa, other.isa) && Objects.deepEquals(this.id, this.id);
}
}
package se.sics.test;
import java.nio.ByteBuffer;
import se.sics.kompics.ChannelSelector;
public class IdChannelSelector extends ChannelSelector<TMessage, ByteBuffer> {
public IdChannelSelector(byte[] id) {
super(TMessage.class, ByteBuffer.wrap(id), true);
}
@Override
public ByteBuffer getValue(TMessage event) {
return ByteBuffer.wrap(event.header.dst.getId());
}
}
Additionally, we need to tell the Pinger
components now what their id is, since we can’t pull that out of the configuration file, as that is global to the whole Kompics runtime. We can, however, pass a modified version of the Config
object to child components. So we are going to pull the TAddress
from the application.conf
in the PingerParent
and use it as a base address that we pass to NettyNetwork
. And then for each Pinger
we create, we are going to write a virtual address with the id into the child Config
. This way we don’t have to change any code in the Pinger
itself.
package se.sics.test;
import com.google.common.primitives.Ints;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.config.Config;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
public class PingerParent extends ComponentDefinition {
public PingerParent() {
TAddress baseSelf = config().getValue("pingpong.self", TAddress.class);
Component timer = create(JavaTimer.class, Init.NONE);
Component network = create(NettyNetwork.class, new NettyInit(baseSelf));
int num = config().getValue("pingpong.pinger.num", Integer.class);
for (int i = 0; i < num; i++) {
byte[] id = Ints.toByteArray(i);
Config.Builder cbuild = config().modify(id());
cbuild.setValue("pingpong.self", baseSelf.withVirtual(id));
Component pinger = create(Pinger.class, Init.NONE, cbuild.finalise());
connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
connect(pinger.getNegative(Network.class), network.getPositive(Network.class),
new IdChannelSelector(id), Channel.TWO_WAY);
}
}
}
That’s it. Compile and package (mvn clean package
), copy the fat .jar
to the dist
folder and then distribute that folder to where you want to run from and run the Ponger
first, then the Pinger
. You’ll see one Pong
for every Ping
as expected.
The full example code can be downloaded here
.
Virtual Network Channel¶
There is another way we can realise the channel routing that we implemented with the channel selector in the previous section. We can implement a custom type of se.sics.kompics.ChannelCore
implementation, that takes care of the routing internally. This is functionally relatively equivalent, but saves us quite a few object creations (just think of all those channels and selector instances if we have 1000 vnodes or more). However, Channel
code is not quite trivial as there can be concurrency issues, the discussion of which would burst the bounds of this tutorial. Luckily, there already is an implementation of exactly the type of byte[]
id switching network channel, that we want. All we have to do is replace the kompics-port-network
dependency in our pom.xml
with kompics-port-virtual-network
. Then change TAddress
to implement se.sics.kompics.network.virtual.Address
and THeader
to implement se.sics.kompics.network.virtual.Header
instead and modify our setup code in the PingerParent
a bit to use se.sics.kompics.network.virtual.VirtualNetworkChannel
instead of the IdChannelSelector
which we can delete now.
package se.sics.test;
import com.google.common.primitives.Ints;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.config.Config;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.network.virtual.VirtualNetworkChannel;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
public class PingerParent extends ComponentDefinition {
public PingerParent() {
TAddress baseSelf = config().getValue("pingpong.self", TAddress.class);
Component timer = create(JavaTimer.class, Init.NONE);
Component network = create(NettyNetwork.class, new NettyInit(baseSelf));
VirtualNetworkChannel vnc = VirtualNetworkChannel.connect(network.getPositive(Network.class), proxy);
int num = config().getValue("pingpong.pinger.num", Integer.class);
for (int i = 0; i < num; i++) {
byte[] id = Ints.toByteArray(i);
Config.Builder cbuild = config().modify(id());
cbuild.setValue("pingpong.self", baseSelf.withVirtual(id));
Component pinger = create(Pinger.class, Init.NONE, cbuild.finalise());
connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
vnc.addConnection(id, pinger.getNegative(Network.class));
}
}
}
Again, the full example code can be downloaded here
.