Virtual Networks

Quite a few distributed systems require some notion of vnodes, that is multiple instances of the same node functionality within one host machine. In a P2P system you might join different overlays, in DHT you might need to artificially increase the number of nodes to meet the statistical requirements for a reasonable load-balancing, and so on. The are many ways of realising such vnodes. The easiest might be to simply start one JVM for each node. This requires no change in your code whatsoever, just maybe a new startup script that keeps track of the PIDs of all the started nodes and how many to start. However, JVM instances aren’t cheap. They eat up significant resources. This approach might scale to a few tens of nodes, but certainly not to hundreds or thousands of vnodes.

An alternative approach is to start all (or at least some) of the vnodes in the same JVM instance. With Kompics this is also pretty easy: Instead of creating a new startup script, we create a new parent component that manages the different instances of the old parent component. Apart from that, we leave the whole component tree as it was before. Of course, we have to assign different ports to all the different NettyNetwork instances that are being started, so our config files might grow a bit bigger unless we find a programmatic way of assigning them, but nothing that can’t be overcome. This approach might scale to a few hundreds of vnodes, but after that we are probably going to have problems finding free ports for Netty to bind on, since it actually uses twice as many ports as we ask it to (one for TCP and UDP and another one for UDT) and most systems limit a single JVM’s (or any process’) port usage quite drastically. Netty also creates some threads when it starts, so we might end up driving the JVM’s thread scheduler insane.

Finally, we can exploit the fact that most of our vnode’s implementations wouldn’t actually use the full capacity of their Network and Timer implementations, and instead share them among all vnodes. In fact we might try to share as much common functionality as we can manage to extract from the vnodes. For this to work, however, we need a way to send messages only along a specific channel, or we will end up with the same problem our very first version of networked PingPong had where the Pongs reached all Pingers and not just the one that sent the Ping that caused it. Part of the solution are the Request-Response Events that were introduced in the section about timers. But as already mentioned there, the request-response pattern is not enough. What if we simply want to send an event to one of the vnodes without having an outstanding request? What if a network message for that vnode comes in? The solution to this problem are Channel Selectors.

Channel Selectors

When a Channel is connected it can optionally be given an instance of se.sics.kompics.ChannelSelector which is both a way to extract some value from an event (i.e. a specific field to use as a selection key) and also define a specific value for which this event should travel along this Channel. The internal implementation of this is fairly efficient, but requires the selection value for a Channel to be immutable. Note that despite its previous name, a ChannelSelector can not be abused as a filter. It’s better to think of it like a way to create a routing table (which is in fact pretty much what happens internally). If you want to compare it to the handler pattern matching described earlier (see Cleanup: Config files, ClassMatchers, and Assembly) you can think of a ChannelSelector to be both the PatternExtractor and the MatchedHandler.

We will update our PingPong example such that we start a configurable number of Pingers per host. We stick to a single Ponger for now, since otherwise we’d have to figure out how to load-balance across them. We shall add a byte[] id field to TAddress that we will use as a selection key. However, when creating those ids we shall be a bit lazy and simply use ascending integers in their byte[] representation. You will see later why we picked byte[] and not int directly. Of course, we’ll also have to update the NetSerializer, which we won’t show again here, since it’s pretty big by now and not quite relevant to the point.

reference.conf
pingpong {
	self {
		host = "127.0.0.1"
		port = 34567
	}
	pinger {
                num = 2
		timeout = 1000
		pongeraddr {
			host = "127.0.0.1"
			port = 45678
		}
	}
}
TAddress.java
package se.sics.test;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Objects;
import se.sics.kompics.network.Address;

public class TAddress implements Address {

    private final InetSocketAddress isa;
    private final byte[] id;

    public TAddress(InetAddress addr, int port) {
        this(addr, port, null);
    }

    public TAddress(InetAddress addr, int port, byte[] id) {
        this.isa = new InetSocketAddress(addr, port);
        this.id = id;
    }

    @Override
    public InetAddress getIp() {
        return this.isa.getAddress();
    }

    @Override
    public int getPort() {
        return this.isa.getPort();
    }

    public byte[] getId() {
        return this.id;
    }
    
    public TAddress withVirtual(byte[] id) {
        return new TAddress(isa.getAddress(), isa.getPort(), id);
    }

    @Override
    public InetSocketAddress asSocket() {
        return this.isa;
    }

    @Override
    public boolean sameHostAs(Address other) {
        return this.isa.equals(other.asSocket());
        /* note that we don't include the id here, since nodes with different 
         * ids but the same socket are still on the same machine
         */
    }

    // Not required but strongly recommended
    @Override
    public final String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(isa.getHostString());
        sb.append(":");
        sb.append(isa.getPort());
        if (id != null) {
            sb.append(":");
            sb.append(Arrays.toString(id));
        }
        return sb.toString();
    }

    @Override
    public int hashCode() {
        return Objects.hash(isa, id);
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final TAddress other = (TAddress) obj;
        return Objects.equals(this.isa, other.isa) && Objects.deepEquals(this.id, this.id);
    }

}
IdChannelSelector.java
package se.sics.test;

import java.nio.ByteBuffer;
import se.sics.kompics.ChannelSelector;

public class IdChannelSelector extends ChannelSelector<TMessage, ByteBuffer> {

    public IdChannelSelector(byte[] id) {
        super(TMessage.class, ByteBuffer.wrap(id), true);
    }
    
    @Override
    public ByteBuffer getValue(TMessage event) {
        return ByteBuffer.wrap(event.header.dst.getId());
    }
    
}

Additionally, we need to tell the Pinger components now what their id is, since we can’t pull that out of the configuration file, as that is global to the whole Kompics runtime. We can, however, pass a modified version of the Config object to child components. So we are going to pull the TAddress from the application.conf in the PingerParent and use it as a base address that we pass to NettyNetwork. And then for each Pinger we create, we are going to write a virtual address with the id into the child Config. This way we don’t have to change any code in the Pinger itself.

PingerParent.java
package se.sics.test;

import com.google.common.primitives.Ints;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.config.Config;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;

public class PingerParent extends ComponentDefinition {

    public PingerParent() {
        TAddress baseSelf = config().getValue("pingpong.self", TAddress.class);

        Component timer = create(JavaTimer.class, Init.NONE);
        Component network = create(NettyNetwork.class, new NettyInit(baseSelf));
        int num = config().getValue("pingpong.pinger.num", Integer.class);
        for (int i = 0; i < num; i++) {
            byte[] id = Ints.toByteArray(i);
            Config.Builder cbuild = config().modify(id());
            cbuild.setValue("pingpong.self", baseSelf.withVirtual(id));
            Component pinger = create(Pinger.class, Init.NONE, cbuild.finalise());

            connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);

            connect(pinger.getNegative(Network.class), network.getPositive(Network.class), 
                    new IdChannelSelector(id), Channel.TWO_WAY);
        }

    }

}

That’s it. Compile and package (mvn clean package), copy the fat .jar to the dist folder and then distribute that folder to where you want to run from and run the Ponger first, then the Pinger. You’ll see one Pong for every Ping as expected.

The full example code can be downloaded here.

Virtual Network Channel

There is another way we can realise the channel routing that we implemented with the channel selector in the previous section. We can implement a custom type of se.sics.kompics.ChannelCore implementation, that takes care of the routing internally. This is functionally relatively equivalent, but saves us quite a few object creations (just think of all those channels and selector instances if we have 1000 vnodes or more). However, Channel code is not quite trivial as there can be concurrency issues, the discussion of which would burst the bounds of this tutorial. Luckily, there already is an implementation of exactly the type of byte[] id switching network channel, that we want. All we have to do is replace the kompics-port-network dependency in our pom.xml with kompics-port-virtual-network. Then change TAddress to implement se.sics.kompics.network.virtual.Address and THeader to implement se.sics.kompics.network.virtual.Header instead and modify our setup code in the PingerParent a bit to use se.sics.kompics.network.virtual.VirtualNetworkChannel instead of the IdChannelSelector which we can delete now.

new PingerParent.java
package se.sics.test;

import com.google.common.primitives.Ints;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Init;
import se.sics.kompics.config.Config;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.network.virtual.VirtualNetworkChannel;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;

public class PingerParent extends ComponentDefinition {

    public PingerParent() {
        TAddress baseSelf = config().getValue("pingpong.self", TAddress.class);

        Component timer = create(JavaTimer.class, Init.NONE);
        Component network = create(NettyNetwork.class, new NettyInit(baseSelf));
        VirtualNetworkChannel vnc = VirtualNetworkChannel.connect(network.getPositive(Network.class), proxy);
        int num = config().getValue("pingpong.pinger.num", Integer.class);
        for (int i = 0; i < num; i++) {
            byte[] id = Ints.toByteArray(i);
            Config.Builder cbuild = config().modify(id());
            cbuild.setValue("pingpong.self", baseSelf.withVirtual(id));
            Component pinger = create(Pinger.class, Init.NONE, cbuild.finalise());

            connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);

            vnc.addConnection(id, pinger.getNegative(Network.class));
        }

    }

}

Again, the full example code can be downloaded here.