Basic Networking

As opposed to many other languages and frameworks commonly used for writing distributed applications (e.g., Erlang and Akka) in Kompics networking is not handled transparently. Instead Kompics requires explicit addressing of messages, that is events that (could) go over the network. There are two reasons for this design: First of all, events in Kompics are normally not messages in that they are not addressed, but follow channels instead. So the logical extension of this pattern to a distributed setting would be to create cross-network channels manually. However, this would be very unintuitive for a programmer as it would be way too static a setup. And further, and this is the second reason, it would be misleading to assume that local events and network messages have the same semantics. Network messages have to deal with unavailable nodes, connection loss, partitions, and so on. Instead of trying to force a one-fits-all solution onto the programmer, as systems like Akka and the venerable Java RMI do, Kompics exposes these challenges and allows system designers to find application appropriate solutions.

The following sections will describe the Kompics Network port, and its default implementation NettyNetwork. The latter also features a serialisation framework, which will be described as well in this part of the tutorial. As our example program we will stick to the PingPong code from the previous tutorials and extend it with networking capabilities.

The examples in this tutorial require two new dependencies in the pom.xml file:

<dependency>
    <groupId>se.sics.kompics.basic</groupId>
    <artifactId>kompics-port-network</artifactId>
    <version>${kompics.version}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>se.sics.kompics.basic</groupId>
    <artifactId>kompics-component-netty-network</artifactId>
    <version>${kompics.version}</version>
    <scope>compile</scope>
</dependency>

Messages, Addresses, and Headers

The se.sics.kompics.network.Network only allows three kinds of events: se.sics.kompics.network.Msg may pass in both directions (indication and request), while se.sics.kompics.network.MessageNotify.Req is a request and se.sics.kompics.network.MessageNotify.Resp is an indication. The latter two can be used to ask the Network service for feedback about whether or not a specific message was sent, how long it took, and how big the serialised version was. This is convenient for systems that need to keep statistics about their messages, or where resources should be freed as soon as a message crosses the wire. All messages that go over the network have to implement the Msg interface, which merely stipulates that a message has to have some kind of header (se.sics.kompics.network.Header). The deprecated methods still need to be implemented for backwards compatibility, but should simply forward fields from the Header. The header itself requires three fields:

  1. A source address, which on the sender side is typically the self address of the node, and on the receiver side refers to the sender.
  2. A destination address, which tells the network on the sender side, where the message should go, and is typically the self address on the receiver side.
  3. The transport protocol to be used for the message. There is no requirement for all Network implementations to implement all possible protocols. NettyNetwork implements TCP, UDP, and UDT only.

The se.sics.kompics.network.Address interface has four required methods:

  1. IP and
  2. port are pretty self explanatory.
  3. asSocket should get a combined represenation of IP and port. It is strongly recommended to keep the internal representation of the address as InetSocketAddress, since this method will be called a lot more often than the first two, and creating a new object for it on the fly is rather expensive.
  4. sameHostAs is used for avoiding serialisation overhead when the message would end up in the same JVM anyway. This can be used for local addressing of components and is typically important for virtual nodes (see section Virtual Networks) where components on the same JVM will often communicate via the network port.

Note

None of the interface presented above make any requirements as to the (im-)mutability of their fields. It is typically recommended to make all of them immutable. However, certain setups will require things like mutable headers, for example routing might be implemented in such a way.

Since almost all application will need custom fields in addition to the fields in the Msg, Header, or Address interfaces, almost everyone will want to write their own implementations. However, if that should not be the case, a simple default implementation can be found in se.sics.kompics.network.netty.DirectMessage, se.sics.kompics.network.netty.DirectHeader, and se.sics.kompics.network.netty.NettyAddress. For the purpose of this tutorial however, we will write our own, which we will prefix with T (for Tutorial) to avoid naming conflicts, since Java lacks support for import aliases.

package se.sics.test;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import se.sics.kompics.network.Address;

public class TAddress implements Address {

    private final InetSocketAddress isa;

    public TAddress(InetAddress addr, int port) {
        this.isa = new InetSocketAddress(addr, port);
    }

    @Override
    public InetAddress getIp() {
        return this.isa.getAddress();
    }

    @Override
    public int getPort() {
        return this.isa.getPort();
    }

    @Override
    public InetSocketAddress asSocket() {
        return this.isa;
    }

    @Override
    public boolean sameHostAs(Address other) {
        return this.isa.equals(other.asSocket());
    }

    // Not required but strongly recommended

    @Override
    public final String toString() {
        return isa.toString();
    }

    @Override
    public int hashCode() {
        int hash = 3;
        hash = 11 * hash + (this.isa != null ? this.isa.hashCode() : 0);
        return hash;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final TAddress other = (TAddress) obj;
        if (this.isa != other.isa && (this.isa == null || !this.isa.equals(other.isa))) {
            return false;
        }
        return true;
    }

}
package se.sics.test;

import se.sics.kompics.network.Address;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.Transport;

public class THeader implements Header<TAddress> {

    public final TAddress src;
    public final TAddress dst;
    public final Transport proto;

    public THeader(TAddress src, TAddress dst, Transport proto) {
        this.src = src;
        this.dst = dst;
        this.proto = proto;
    }

    @Override
    public TAddress getSource() {
        return src;
    }

    @Override
    public TAddress getDestination() {
        return dst;
    }

    @Override
    public Transport getProtocol() {
        return proto;
    }

}
package se.sics.test;

import se.sics.kompics.network.Address;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Transport;

public abstract class TMessage implements Msg<TAddress, THeader> {
    
    public final THeader header;
    
    public TMessage(TAddress src, TAddress dst, Transport protocol) {
        this.header = new THeader(src, dst, protocol);
    }
    
    @Override
    public THeader getHeader() {
        return this.header;
    }

    @Override
    public TAddress getSource() {
        return this.header.src;
    }

    @Override
    public TAddress getDestination() {
        return this.header.dst;
    }

    @Override
    public Transport getProtocol() {
        return this.header.proto;
    }
    
}

For our PingPong example we shall have both Ping and Pong extend TMessage instead of the direct request-response. We also hard-code TCP as protocol for both for now.

package se.sics.test;

import se.sics.kompics.network.Transport;

public class Ping extends TMessage {
	public Ping(TAddress src, TAddress dst) {
		super(src, dst, Transport.TCP);
	}
}
package se.sics.test;

import se.sics.kompics.network.Transport;

public class Pong extends TMessage {
	public Pong(TAddress src, TAddress dst) {
		super(src, dst, Transport.TCP);
	}
}

Now we need to change the Pinger and the Ponger to take a self address as init-parameter, require the Network port and feed the new constructor arguments to the Ping and Pong classes. Since we are using the network port now for the ping-pong, we can remove the requirement for the PingPongPort. In this first example we’ll make our lives somewhat simple and use the self address as source and destination for both classes. This corresponds to the local reflection example mentioned above.

package se.sics.test;

import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.Start;
import se.sics.kompics.timer.*;
import se.sics.kompics.network.Network;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;

public class Pinger extends ComponentDefinition {

		private static final Logger LOG = LoggerFactory.getLogger(Pinger.class);

		Positive<Network> net = requires(Network.class);
		Positive<Timer> timer = requires(Timer.class);

		private long counter = 0;
		private UUID timerId;
		private final TAddress self;

		public Pinger(Init init) {
			this.self = init.self;
		}

		Handler<Start> startHandler = new Handler<Start>(){
			public void handle(Start event) {
				SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, 1000);
				PingTimeout timeout = new PingTimeout(spt);
				spt.setTimeoutEvent(timeout);
				trigger(spt, timer);
				timerId = timeout.getTimeoutId();
			}
		};
		Handler<Pong> pongHandler = new Handler<Pong>(){
			public void handle(Pong event) {
				counter++;
				LOG.info("Got Pong #{}!", counter);
			}
		};
		Handler<PingTimeout> timeoutHandler = new Handler<PingTimeout>() {
			public void handle(PingTimeout event) {
				trigger(new Ping(self, self), net);
			}
		};
		{
			subscribe(startHandler, control);
			subscribe(pongHandler, net);
			subscribe(timeoutHandler, timer);
		}

		@Override
		public void tearDown() {
        	trigger(new CancelPeriodicTimeout(timerId), timer);
        }

		public static class PingTimeout extends Timeout {
			public PingTimeout(SchedulePeriodicTimeout spt) {
				super(spt);
			}
		}

		public static class Init extends se.sics.kompics.Init<Pinger> {
			public final TAddress self;
			public Init(TAddress self) {
				this.self = self;
			}
		}
	}
package se.sics.test;

import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.network.Network;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Ponger extends ComponentDefinition {

	private static final Logger LOG = LoggerFactory.getLogger(Ponger.class);

	Positive<Network> net = requires(Network.class);

	private long counter = 0;
	private final TAddress self;

	public Ponger(Init init) {
			this.self = init.self;
		}

	Handler<Ping> pingHandler = new Handler<Ping>(){
		public void handle(Ping event) {
			counter++;
			LOG.info("Got Ping #{}!", counter);
			trigger(new Pong(self, event.getSource()), net);
		}
	};
	{
		subscribe(pingHandler, net);
	}
	public static class Init extends se.sics.kompics.Init<Ponger> {
		public final TAddress self;
		public Init(TAddress self) {
			this.self = self;
		}
	}
}

Finally, we have to create the NettyNetwork in the Parent class, and connect it appropriately. As a preparation for later, we are going to take the port for self address as a commandline argument in the Main class, and hardcode 127.0.0.1 as IP address.

package se.sics.test;

import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Component;
import se.sics.kompics.Init;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.network.netty.NettyInit;

public class Parent extends ComponentDefinition {
	public Parent(Init init) {
		Component timer = create(JavaTimer.class, Init.NONE);
		Component network = create(NettyNetwork.class, new NettyInit(init.self));
		Component pinger = create(Pinger.class, new Pinger.Init(init.self));
		Component ponger = create(Ponger.class, new Ponger.Init(init.self));
		Component pinger2 = create(Pinger.class, new Pinger.Init(init.self));


		connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class));
		connect(pinger2.getNegative(Timer.class), timer.getPositive(Timer.class));

		connect(pinger.getNegative(Network.class), network.getPositive(Network.class));
		connect(pinger2.getNegative(Network.class), network.getPositive(Network.class));
		connect(ponger.getNegative(Network.class), network.getPositive(Network.class));
	}
	public static class Init extends se.sics.kompics.Init<Parent> {
		public final TAddress self;
		public Init(TAddress self) {
			this.self = self;
		}
	}
}
package se.sics.test;

import se.sics.kompics.Kompics;
import java.net.InetAddress;
import java.net.UnknownHostException;

public class Main {
	public static void main(String[] args) {
		try {
			InetAddress ip = InetAddress.getLocalHost();
			int port = Integer.parseInt(args[0]);
			TAddress self = new TAddress(ip, port);
			Kompics.createAndStart(Parent.class, new Parent.Init(self), 2);
			try {
				Thread.sleep(10000);
			} catch (InterruptedException ex) {
				System.exit(1);
			}
			Kompics.shutdown();
			System.exit(0);
		} catch (UnknownHostException ex) {
			System.err.println(ex);
			System.exit(1);
		}
	}
}

At this point we can compile and run the code with a slight variation to the usual commands, since we need the port as an argument now (pick a different port if 34567 is in use on your system):

mvn clean compile
mvn exec:java -Dexec.mainClass="se.sics.test.Main" -Dexec.args="34567"

The code until here can be found here.

We can see from the output that our setup technically works, but we are back to the problem of getting four Pongs on our two Pings. The reason is, of course, that we are cheating. We are running all components in the same JVM, but we are not using proper Virtual Networks support, yet. We also do not actually want to use virtual nodes here, but instead each Pinger and Ponger should be on a different host (or at least in a different JVM).

Before we can fix this, though, we have to fix another problem that we don’t even see yet: None of our messages are currently serialisable.

Serialisation

With NettyNetwork comes a serialisation framework that builds on Netty’s io.netty.buffer.ByteBuf. The Kompics part is two-fold, and consists of the se.sics.kompics.network.netty.serialization.Serializer interface, and a Serializer registration, mapping, and lookup service in se.sics.kompics.network.netty.serialization.Serializers.

For every Serializer you write and register you will be required to pick a unique int identifier. You are required to keep track of the identifiers you use yourself. Kompics reserves 0-16 for internal use. By default only a single byte worth of identifier space is used, to minimise the overhead of the framework. If you are assigning larger identifiers than 255, make sure to use the resize method in Serializers before registering the respective Serializers. For larger projects it can become quite difficult to keep track of which identifiers are used and which are free. It is recommended to either assign them all in a single place (e.g., static fields of a class, or a configuration file), or assign subranges to submodules of the project and then do the same single-location-assignment per module.

In the toBinary and fromBinary methods in the Serializer interface you are free to do as you like, as long as at the end of the toBinary the whole object is in the ByteBuf and at the end of the fromBinary the same number of bytes that you put into the ByteBuf are removed from it again.

The Serializers class has two important functions. First it registers Serializers into the system so they can be looked up when messages come in that have been serialised with them (as defined by their identifier). And second it maps types to specific Serializer instances that are already registered. The most convenient way to do that is by registering a short name to the Serializer instance, and then register the class mappings to that name. However, the identifier works as well.

The lookup of Serializers for types is hierarchical, that is, when there is no exact mapping found for the type, the system will traverse its supertype (and interface) hierarchy to find a matching supertype Serializer and use that one instead. For example, if the object to be serialised implements Java’s Serializable interface and there is no more specific Serializer registered, the object will be serialised with Java object serialisation (which is registered by default).

The other default Serializers (and their identifiers) are:

Note

For people familiar with common Java serialisation frameworks it might be obvious that Kryo is missing from the above list. This is on purpose. Kompics’ previous messaging middleware used to rely on Kryo for serialisation, but Kryo’s low level object layout hacks lead to a host of problems with deployments that included different types of JVMs (e.g., Oracle JRE and Open JRE, or different versions). For this reason we have purposefully moved away from Kryo serialisation. You are welcome to implement your own Kryo-based Serializer, just know that in some setups you might encounter a lot of very difficult to pinpoint bugs.

In our PingPong example, the classes that need to be serialised are: TAddress, THeader, Ping, and Pong. Note that there is no necessesity to serialise TMessage separately from Ping and Pong at this point, as it has no content of its own. Of course, we could simply have all of those classes implement Serializable and be done with it. Since this is a tutorial (and Java’s object serialisation is terribly inefficient) we are instead going to write our own custom serialisers. As preparation for future sections and to show off the system, we will split the serialisation logic into two classes: NetSerializer will deal with TAddress and THeader instances, while PingPongSerializer will deal with Ping and Pong messages. Since THeader is part of the messages, we will of course call the NetSerializer from the PingPongSerializer, but we will act as if we don’t know what kind of Header we are dealing with and let the serialisation framework figure that out. Similarly, TAddresses are part of a THeader so we will invoke the NetSerializer from itself. Note, that the difference between two approaches is, whether or not the identifier will be prepended. If we invoke a Serializer manually from within another Serializer it is assumed that it will be known at serialisation and deserialisation time what the object is and the identifier can be omitted. Otherwise the system will figure it out itself using the identifier.

In order to keep our identifier spaces separate we’ll pick 100 for the NetSerializer and 200 for the PingPongSerializer. And since we only have two instances to deal with, we’ll just define the identifiers within the classes in the lazy way. We also added some extra constructors to the messages, to make it easier to use THeader instances from another Serializer during deserialisation.

package se.sics.test;

import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.network.Transport;
import se.sics.kompics.network.netty.serialization.Serializer;

public class NetSerializer implements Serializer {

    private static final byte ADDR = 1;
    private static final byte HEADER = 2;

    @Override
    public int identifier() {
        return 100;
    }

    @Override
    public void toBinary(Object o, ByteBuf buf) {
        if (o instanceof TAddress) {
            TAddress addr = (TAddress) o;
            buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
            buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (let's hope it's IPv4^^)
            buf.writeShort(addr.getPort()); // we only need 2 bytes here
            // total 7 bytes
        } else if (o instanceof THeader) {
            THeader header = (THeader) o;
            buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
            this.toBinary(header.src, buf); // use this serialiser again (7 bytes)
            this.toBinary(header.dst, buf); // use this serialiser again (7 bytes)
            buf.writeByte(header.proto.ordinal()); // 1 byte is enough
            // total 16 bytes
        }
    }

    @Override
    public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
        byte type = buf.readByte(); // read the first byte to figure out the type
        switch (type) {
            case ADDR: {
                byte[] ipBytes = new byte[4];
                buf.readBytes(ipBytes);
                try {
                    InetAddress ip = InetAddress.getByAddress(ipBytes); // 4 bytes
                    int port = buf.readUnsignedShort(); // 2 bytes
                    return new TAddress(ip, port); // total of 7, check
                } catch (UnknownHostException ex) {
                    throw new RuntimeException(ex); // let Netty deal with this
                }
            }
            case HEADER: {
                TAddress src = (TAddress) this.fromBinary(buf, Optional.absent()); // We already know what it's going to be (7 bytes)
                TAddress dst = (TAddress) this.fromBinary(buf, Optional.absent()); // same here (7 bytes)
                int protoOrd = buf.readByte(); // 1 byte
                Transport proto = Transport.values()[protoOrd];
                return new THeader(src, dst, proto); // total of 16 bytes, check
            }
        }
        return null; // strange things happened^^
    }
}
package se.sics.test;

import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.netty.serialization.Serializers;

public class PingPongSerializer implements Serializer {

    private static final byte PING = 1;
    private static final byte PONG = 2;

    @Override
    public int identifier() {
        return 200;
    }

    @Override
    public void toBinary(Object o, ByteBuf buf) {
        if (o instanceof Ping) {
            Ping ping = (Ping) o;
            buf.writeByte(PING); // 1 byte
            Serializers.toBinary(ping.header, buf); // 1 byte serialiser id + 16 bytes THeader
            // total 18 bytes
        } else if (o instanceof Pong) {
            Pong pong = (Pong) o;
            buf.writeByte(PONG); // 1 byte
            Serializers.toBinary(pong.header, buf); // 1 byte serialiser id + 16 bytes THeader
            // total 18 bytes
        }
    }

    @Override
    public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
        byte type = buf.readByte(); // 1 byte
        switch (type) {
            case PING: {
                THeader header = (THeader) Serializers.fromBinary(buf, Optional.absent()); // 1 byte serialiser id + 16 bytes THeader
                return new Ping(header); // 18 bytes total, check
            }
            case PONG: {
                THeader header = (THeader) Serializers.fromBinary(buf, Optional.absent()); // 1 byte serialiser id + 16 bytes THeader
                return new Pong(header); // 18 bytes total, check
            }
        }
        return null;
    }
}

Note

The serialisation above code is not particularly space efficient, since that was not the purpose. To get an idea how to use bit-fields to get a significant space reduction on these kind of frequently used serialisation procedures, take a look at the source code of the SpecialSerializers class over on Github.

All that is left is now is to register the new Serializers and map the right types to them. We add the following to the Main class:

static {
        // register
        Serializers.register(new NetSerializer(), "netS");
        Serializers.register(new PingPongSerializer(), "ppS");
        // map
        Serializers.register(TAddress.class, "netS");
        Serializers.register(THeader.class, "netS");
        Serializers.register(Ping.class, "ppS");
        Serializers.register(Pong.class, "ppS");
}

Distributed PingPong

Now we are prepared for a true distributed deployment of our PingPong example. There are a number of changes we need to make to the way we set up the component hierarchy. First of all, we want to deploy Pinger and Ponger separately, and they also need different parameters. The Ponger is purely reactive and it only needs to know its own self address. The Pinger on the other hand needs to know both its own address and a Ponger’s. We are going to make use of that distinction in the Main class to decide which one to start. If we see two commandline arguments (1 IP and 1 port), we are going to start a Ponger. If, however, we see four commandline arguments (2 IPs and 2 ports), we are going to start a Pinger. Since our application classes now need different setup, we are also going to split the Parent into a PingerParent and a PongerParent. Note that it is always a good idea to have a class without any business logic that sets up the Network and Timer connections, as you will see in the section on Simulation.

package se.sics.test;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;

public class PingerParent extends ComponentDefinition {

    public PingerParent(Init init) {
        Component timer = create(JavaTimer.class, Init.NONE);
        Component network = create(NettyNetwork.class, new NettyInit(init.self));
        Component pinger = create(Pinger.class, new Pinger.Init(init.self, init.ponger));

        connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);

        connect(pinger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
    }

    public static class Init extends se.sics.kompics.Init<PingerParent> {

        public final TAddress self;
        public final TAddress ponger;

        public Init(TAddress self, TAddress ponger) {
            this.self = self;
            this.ponger = ponger;
        }
    }
}
package se.sics.test;

import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;

public class PongerParent extends ComponentDefinition {

    public PongerParent(Init init) {
        Component network = create(NettyNetwork.class, new NettyInit(init.self));
        Component ponger = create(Ponger.class, new Ponger.Init(init.self));

        connect(ponger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
    }

    public static class Init extends se.sics.kompics.Init<PongerParent> {

        public final TAddress self;

        public Init(TAddress self) {
            this.self = self;
        }
    }
}
package se.sics.test;

import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.Kompics;
import se.sics.kompics.network.netty.serialization.Serializers;

public class Main {

    
    static {
        // register
        Serializers.register(new NetSerializer(), "netS");
        Serializers.register(new PingPongSerializer(), "ppS");
        // map
        Serializers.register(TAddress.class, "netS");
        Serializers.register(THeader.class, "netS");
        Serializers.register(Ping.class, "ppS");
        Serializers.register(Pong.class, "ppS");
    }
    
    public static void main(String[] args) {
        try {
            if (args.length == 2) { // start Ponger
                InetAddress ip = InetAddress.getByName(args[0]);
                int port = Integer.parseInt(args[1]);
                TAddress self = new TAddress(ip, port);
                Kompics.createAndStart(PongerParent.class, new PongerParent.Init(self), 2);
                System.out.println("Starting Ponger at " + self);
                // no shutdown this time...act like a server and keep running until externally exited
            } else if (args.length == 4) { // start Pinger
                InetAddress myIp = InetAddress.getByName(args[0]);
                int myPort = Integer.parseInt(args[1]);
                TAddress self = new TAddress(myIp, myPort);
                InetAddress pongerIp = InetAddress.getByName(args[2]);
                int pongerPort = Integer.parseInt(args[3]);
                TAddress ponger = new TAddress(pongerIp, pongerPort);
                Kompics.createAndStart(PingerParent.class, new PingerParent.Init(self, ponger), 2);
                System.out.println("Starting Pinger at" + self + " to " + ponger);
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException ex) {
                    System.exit(1);
                }
                Kompics.shutdown();
                System.exit(0);
            } else {
                System.err.println("Invalid number of parameters (2 for Ponger, 4 for Pinger)");
                System.exit(1);
            }

        } catch (UnknownHostException ex) {
            System.err.println(ex);
            System.exit(1);
        }
    }
}

Now we are finally ready to try this. Use the usual command to compile the code:

mvn clean compile

Now the following commands should be run in different terminals, preferably side by side so you can clearly see how things are happening. For all the examples feel free to change the IPs and ports as you like, but be sure they match up correctly.

Start the Ponger first with:

mvn exec:java -Dexec.mainClass="se.sics.test.Main" -Dexec.args="127.0.0.1 34567"

Then start a Pinger with:

mvn exec:java -Dexec.mainClass="se.sics.test.Main" -Dexec.args="127.0.0.1 45678 127.0.0.1 34567"

You can start the same Pinger multiple times or start as many Pinger JVM in parallel as you like. Make sure they all have their own port, though! You will see that there is no Ping amplification anymore as we saw in the last example. All messages are now correctly addressed and replied to. If you can’t clearly see what’s going on among all the logging output try to reduce the logging level to INFO in the log4j.properties. Finally, use Control-c to shutdown the Ponger as it will run forever otherwise.

As always the code until here can be downloaded here.

Cleanup: Config files, ClassMatchers, and Assembly

While our example from the previous section works, there are still a number of things that are not optimal with it. We’ll use this section to make the whole code a bit nicer, and also change the way we deploy, since we can’t really rely on Maven being present on all target machines.

Configuration Files

First of all, you might have noticed that we have a lot of redundancy in the passing around of parameters between the different component Init objects. Furthermore, our reading from the commandline is not the safest. Sure, there are good libraries for commandline options, but this is not really what we want. What we want is to define a bunch of values once and then be able to access them from anywhere within the code. The right solution for this problem is using a configuration file, where we write IPs and ports and such things, and a configuration library that knows how to give us access to the values in our code. Kompics has its own configuration library, which by default uses Typesafe Config as a backend.

If you prefer a different configuration library, you may of course wrap it in an implementation of se.sics.kompics.config.BaselineConfig and pass it into se.sics.kompics.config.Config.Factory and then replace the default config with your custom one in Kompics.setConfig before starting the runtime.

For the tutorial we are going to stick to Typesafe Config as a baseline. We are thus going to add a src/main/resources/reference.conf file, where we describe default values for all our config options. This is not strictly speaking necessary, but it is generally a good idea to have one place your users can look at where all possible config values are outlined. While we are at it, we also make the timeout period configurable.

reference.conf
pingpong {
	self {
		host = "127.0.0.1"
		port = 34567
	}
	pinger {
		timeout = 1000
		pongeraddr {
			host = "127.0.0.1"
			port = 45678
		}
	}
}

Now that we have a configuration file, we can simply throw away all the Init classes we created before, and pull out the desired values from the config in the Pinger and Ponger constructors.

Pinger.java
public Pinger() {
    try {
        InetAddress selfIp = InetAddress.getByName(config().getValue("pingpong.self.host", String.class));
        int selfPort = config().getValue("pingpong.self.port", Integer.class);
        this.self = new TAddress(selfIp, selfPort);
        InetAddress pongerIp = InetAddress.getByName(config().getValue("pingpong.pinger.pongeraddr.host", String.class));
        int pongerPort = config().getValue("pingpong.pinger.pongeraddr.port", Integer.class);
        this.ponger = new TAddress(pongerIp, pongerPort);
    } catch (UnknownHostException ex) {
        throw new RuntimeException(ex);
    }
}

Handler<Start> startHandler = new Handler<Start>() {
    @Override
    public void handle(Start event) {
        long period = config().getValue("pingpong.pinger.timeout", Long.class);
        SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, period);
        PingTimeout timeout = new PingTimeout(spt);
        spt.setTimeoutEvent(timeout);
        trigger(spt, timer);
        timerId = timeout.getTimeoutId();
    }
};
Ponger.java
public Ponger() {
    try {
        InetAddress selfIp = InetAddress.getByName(config().getValue("pingpong.self.host", String.class));
        int selfPort = config().getValue("pingpong.self.port", Integer.class);
        this.self = new TAddress(selfIp, selfPort);
    } catch (UnknownHostException ex) {
        throw new RuntimeException(ex);
    }
}

However, since the NettyNetwork needs to know the self address as well, we are going to have to duplicate some of this work in the PingerParent and PongerParent. Alternatively we could continue to pass in the self address via the Pinger and Ponger Init classes and only construct it once in the respective parent class.

There is another solution, though, that gives a lot more concise code. Kompics’ configuration system supports so called conversions, which are used to convert compatible types from the config values to the requested values. For example, it would be unnecessary to throw an exception when the user is asking for an instance of Long but the value is returned as Integer by Typesafe Config. Instead the config library will look through a number of se.sics.kompics.config.Converter instances that are registered at se.sics.kompics.config.Conversions (this is very similar to how the serialisation framework is used) and try to find one that can convert an Integer object to a Long object. Thus we can use this system to write Converter that takes the object at "pingpong.self" for example and converts it to a TAddress. It turns out that the way we wrote the reference.conf Typesafe Config will give us a Map with the subvalues as keys. In this case we can pull out the values, convert them to String and Integer instances respectively and then construct the TAddress as before. As an example, we are also going to support an alternative way to write a TAddress, which is a single String in the following format: "127.0.0.1:34567".

TAddressConverter.java
package se.sics.test;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import se.sics.kompics.config.Conversions;
import se.sics.kompics.config.Converter;

public class TAddressConverter implements Converter<TAddress> {

    @Override
    public TAddress convert(Object o) {
        if (o instanceof Map) {
            try {
                Map m = (Map) o;
                String hostname = Conversions.convert(m.get("host"), String.class);
                int port = Conversions.convert(m.get("port"), Integer.class);
                InetAddress ip = InetAddress.getByName(hostname);
                return new TAddress(ip, port);
            } catch (UnknownHostException ex) {
                return null;
            }
        }
        if (o instanceof String) {
            try {
                String[] ipport = ((String) o).split(":");
                InetAddress ip = InetAddress.getByName(ipport[0]);
                int port = Integer.parseInt(ipport[1]);
            } catch (UnknownHostException ex) {
                return null;
            }
        }
        return null;
    }

    @Override
    public Class<TAddress> type() {
        return TAddress.class;
    }

}

Additionally we also need to register the new Converter in the static initialisation block of the Main class and then we can get a TAddress by simply calling config().getValue("pingpong.self", TAddress.class);, for example.

Main.java
static {
    // register
    Serializers.register(new NetSerializer(), "netS");
    Serializers.register(new PingPongSerializer(), "ppS");
    // map
    Serializers.register(TAddress.class, "netS");
    Serializers.register(THeader.class, "netS");
    Serializers.register(Ping.class, "ppS");
    Serializers.register(Pong.class, "ppS");
    // conversions
    Conversions.register(new TAddressConverter());
}

Note

We are still doing the same work as before, technically even more since we have to look through all the Converter instances now. It simply looks a lot nicer like this, as there is less unnecessary code duplication. At this time none of the converted values are cached anywhere, thus it is recommended to always write values from the configuration, that are often used, into local fields, instead of pulling them out of the config on demand every time they are needed.

Message Matching Handlers

Another thing that feels awkward with our code is how we write network messages. Our TMessage class does almost nothing except define what kind of header we expect, and all actual network messages like Ping and Pong have to implement all these annoying constructors that TMessage requires instead of focusing on their business logic (which is trivially simple to non-existent^^). We would much rather have the TMessage act as a kind of container for data and then Ping and Pong would simply be payloads. But then how would the handlers of Pinger and Ponger know which messages are for them, i.e. are Ping and Pong respectively, and which are for other classes. They would have to match on TMessage and handle all network messages. That would be way too expensive in a large system. Under no circumstance do we want to schedule components unnecessarily. The solution to our problem can be found in se.sics.kompics.ClassMatchedHandler which provides a very simple form of pattern matching for Kompics. Instead of matching on a single event type, it matches on two event types: The context type, which we will define as TMessage, and the content type which will be Ping and Pong respectively.

We shall rewrite TMessage to carry any kind of KompicsEvent as a payload, and to act as se.sics.kompics.PatternExtractor for the payload. We’ll also move its serialisation logic into the NetSerializer leaving the PingPongSerializer rather trivial as a result.

package se.sics.test;

import se.sics.kompics.KompicsEvent;
import se.sics.kompics.PatternExtractor;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Transport;

public class TMessage implements Msg<TAddress, THeader>, PatternExtractor<Class<Object>, KompicsEvent> {

    public final THeader header;
    public final KompicsEvent payload;

    public TMessage(TAddress src, TAddress dst, Transport protocol, KompicsEvent payload) {
        this.header = new THeader(src, dst, protocol);
        this.payload = payload;
    }

    TMessage(THeader header, KompicsEvent payload) {
        this.header = header;
        this.payload = payload;
    }

    @Override
    public THeader getHeader() {
        return this.header;
    }

    @Override
    public TAddress getSource() {
        return this.header.src;
    }

    @Override
    public TAddress getDestination() {
        return this.header.dst;
    }

    @Override
    public Transport getProtocol() {
        return this.header.proto;
    }

    @Override
    public Class<Object> extractPattern() {
        Class c = payload.getClass();
        return (Class<Object>) c;
    }

    @Override
    public KompicsEvent extractValue() {
        return payload;
    }

}
package se.sics.test;

import se.sics.kompics.KompicsEvent;

public class Ping implements KompicsEvent {
}
package se.sics.test;

import se.sics.kompics.KompicsEvent;

public class Pong implements KompicsEvent {
}
package se.sics.test;

import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.KompicsEvent;
import se.sics.kompics.network.Transport;
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.netty.serialization.Serializers;

public class NetSerializer implements Serializer {

    private static final byte ADDR = 1;
    private static final byte HEADER = 2;
    private static final byte MSG = 3;

    @Override
    public int identifier() {
        return 100;
    }

    @Override
    public void toBinary(Object o, ByteBuf buf) {
        if (o instanceof TAddress) {
            TAddress addr = (TAddress) o;
            buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
            addressToBinary(addr, buf); // 6 bytes
            // total 7 bytes
        } else if (o instanceof THeader) {
            THeader header = (THeader) o;
            buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
            headerToBinary(header, buf); // 13 bytes
            // total 14 bytes
        } else if (o instanceof TMessage) {
            TMessage msg = (TMessage) o;
            buf.writeByte(MSG); // mark which type we are serialising (1 byte)
            headerToBinary(msg.header, buf); // 13 bytes
            Serializers.toBinary(msg.payload, buf); // no idea what it is, let the framework deal with it
        }
    }

    @Override
    public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
        byte type = buf.readByte(); // read the first byte to figure out the type
        switch (type) {
            case ADDR:
                return addressFromBinary(buf);
            case HEADER:
                return headerFromBinary(buf);
            case MSG: {
                THeader header = headerFromBinary(buf); // 13 bytes
                KompicsEvent payload = (KompicsEvent) Serializers.fromBinary(buf, Optional.absent()); // don't know what it is but KompicsEvent is the upper bound
                return new TMessage(header, payload);
            }
        }
        return null; // strange things happened^^
    }

    private void headerToBinary(THeader header, ByteBuf buf) {
        addressToBinary(header.src, buf); // 6 bytes
        addressToBinary(header.dst, buf); // 6 bytes
        buf.writeByte(header.proto.ordinal()); // 1 byte is enough
        // total of 13 bytes
    }

    private THeader headerFromBinary(ByteBuf buf) {
        TAddress src = addressFromBinary(buf); // 6 bytes
        TAddress dst = addressFromBinary(buf); // 6 bytes
        int protoOrd = buf.readByte(); // 1 byte
        Transport proto = Transport.values()[protoOrd];
        return new THeader(src, dst, proto); // total of 13 bytes, check
    }

    private void addressToBinary(TAddress addr, ByteBuf buf) {
        buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (let's hope it's IPv4^^)
        buf.writeShort(addr.getPort()); // we only need 2 bytes here
        // total of 6 bytes
    }

    private TAddress addressFromBinary(ByteBuf buf) {
        byte[] ipBytes = new byte[4];
        buf.readBytes(ipBytes); // 4 bytes
        try {
            InetAddress ip = InetAddress.getByAddress(ipBytes);
            int port = buf.readUnsignedShort(); // 2 bytes
            return new TAddress(ip, port); // total of 6, check
        } catch (UnknownHostException ex) {
            throw new RuntimeException(ex); // let Netty deal with this
        }
    }
}
package se.sics.test;

import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import se.sics.kompics.network.netty.serialization.Serializer;

public class PingPongSerializer implements Serializer {

    private static final byte PING = 1;
    private static final byte PONG = 2;

    @Override
    public int identifier() {
        return 200;
    }

    @Override
    public void toBinary(Object o, ByteBuf buf) {
        if (o instanceof Ping) {
            Ping ping = (Ping) o;
            buf.writeByte(PING); // 1 byte
            // total 1 bytes
        } else if (o instanceof Pong) {
            Pong pong = (Pong) o;
            buf.writeByte(PONG); // 1 byte
            // total 1 bytes
        }
    }

    @Override
    public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
        byte type = buf.readByte(); // 1 byte
        switch (type) {
            case PING:
                return new Ping(); // 1 bytes total, check
            case PONG:
                return new Pong(); // 1 bytes total, check

        }
        return null;
    }
}
package se.sics.test;

import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;

public class Pinger extends ComponentDefinition {

    private static final Logger LOG = LoggerFactory.getLogger(Pinger.class);

    Positive<Network> net = requires(Network.class);
    Positive<Timer> timer = requires(Timer.class);

    private long counter = 0;
    private UUID timerId;
    private final TAddress self;
    private final TAddress ponger;

    public Pinger() {
        this.self = config().getValue("pingpong.self", TAddress.class);
        this.ponger = config().getValue("pingpong.pinger.pongeraddr", TAddress.class);
    }

    Handler<Start> startHandler = new Handler<Start>() {
        @Override
        public void handle(Start event) {
            long period = config().getValue("pingpong.pinger.timeout", Long.class);
            SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, period);
            PingTimeout timeout = new PingTimeout(spt);
            spt.setTimeoutEvent(timeout);
            trigger(spt, timer);
            timerId = timeout.getTimeoutId();
        }
    };
    ClassMatchedHandler<Pong, TMessage> pongHandler = new ClassMatchedHandler<Pong, TMessage>() {

        @Override
        public void handle(Pong content, TMessage context) {
            counter++;
            LOG.info("Got Pong #{}!", counter);
        }
    };
    Handler<PingTimeout> timeoutHandler = new Handler<PingTimeout>() {
        @Override
        public void handle(PingTimeout event) {
            trigger(new TMessage(self, ponger, Transport.TCP, new Ping()), net);
        }
    };

    {
        subscribe(startHandler, control);
        subscribe(pongHandler, net);
        subscribe(timeoutHandler, timer);
    }

    @Override
    public void tearDown() {
        trigger(new CancelPeriodicTimeout(timerId), timer);
    }

    public static class PingTimeout extends Timeout {

        public PingTimeout(SchedulePeriodicTimeout spt) {
            super(spt);
        }
    }
}
package se.sics.test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;

public class Ponger extends ComponentDefinition {

    private static final Logger LOG = LoggerFactory.getLogger(Ponger.class);

    Positive<Network> net = requires(Network.class);

    private long counter = 0;
    private final TAddress self;

    public Ponger() {
        this.self = config().getValue("pingpong.self", TAddress.class);
    }

    ClassMatchedHandler<Ping, TMessage> pingHandler = new ClassMatchedHandler<Ping, TMessage>() {
        @Override
        public void handle(Ping content, TMessage context) {
            counter++;
            LOG.info("Got Ping #{}!", counter);
            trigger(new TMessage(self, context.getSource(), Transport.TCP, new Pong()), net);
        }
    };

    {
        subscribe(pingHandler, net);
    }
}

And of course remember to register TMessage to the "netS" serialiser in the static initilisation block of Main.

Note

The ClassMatchedHandler is in fact only a specialisation of the more general se.sics.kompics.MatchedHandler which can use any kind of pattern to select values, and not just Class instances. The advantage of the ClassMatchedHandler is that the pattern to match against can be automatically extracted from the signature of the handle method using Java’s reflection API. For more general MatchedHandler usages the pattern would have to be supplied manually by overriding the pattern method.

Assembly

Lastly we finally need to move away from Maven to run our code. We need to able to deploy, configure, and run complete artifacts with all dependencies included. To achieve that goal we are going to need four things:

  1. A Maven assembly plugin,
  2. a dist folder where we collect all deployment artifacts,
  3. an application.conf` in the dist folder that is used to override configuration values from the reference.conf we need to customise for a specific deployment, and
  4. two bash scripts pinger.sh and ponger.sh that hide away the ugly JVM configuration parameters from the users.

For the assembly plugin we have two options. Either we use the default Maven Assembly Plugin, which is a bit simpler, or we use Maven Shade Plugin, which is a bit more powerful. For the tutorial we are going to use the Shade plugin, as it is usually the better long-term choice.

Our new pom.xml then looks as follows:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>se.sics.test</groupId>
    <artifactId>ping-pong</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>PingPong</name>
    <description>Ping Pong
    </description>
    <properties>
        <java.compiler.version>1.7</java.compiler.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kompics.version>1.0.0</kompics.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>se.sics.kompics</groupId>
            <artifactId>kompics-core</artifactId>
            <version>${kompics.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>se.sics.kompics.basic</groupId>
            <artifactId>kompics-port-timer</artifactId>
            <version>${kompics.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>se.sics.kompics.basic</groupId>
            <artifactId>kompics-component-java-timer</artifactId>
            <version>${kompics.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>se.sics.kompics.basic</groupId>
            <artifactId>kompics-port-network</artifactId>
            <version>${kompics.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>se.sics.kompics.basic</groupId>
            <artifactId>kompics-component-netty-network</artifactId>
            <version>${kompics.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <inherited>true</inherited>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <encoding>${project.build.sourceEncoding}</encoding>
                    <source>${java.compiler.version}</source>
                    <target>${java.compiler.version}</target>
                    <debug>true</debug>
                    <optimize>true</optimize>
                    <showDeprecations>true</showDeprecations>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.2</version>
                <executions>
                    <execution>
                        <!--<phase>package</phase>//-->
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>fat</shadedClassifierName>
                            <!-- Any name that makes sense -->
                            <createDependencyReducedPom>true</createDependencyReducedPom>
                            <dependencyReducedPomLocation>${java.io.tmpdir}/dependency-reduced-pom.xml
                    </dependencyReducedPomLocation>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>se.sics.test.Main</mainClass>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>log4j:log4j</artifact>
                                    <includes>
                                        <include>**</include>
                                    </includes>
                                </filter>
                                <filter>
                                    <artifact>commons-logging:commons-logging</artifact>
                                    <includes>
                                        <include>**</include>
                                    </includes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <snapshots>
                <enabled>false
            </enabled>
            </snapshots>
            <id>bintray-kompics-Maven
          </id>
            <name>bintray
          </name>
            <url>https://dl.bintray.com/kompics/Maven
          </url>
        </repository>
    </repositories>
</project>

After we create the new dist folder and move the new shaded fat jar from the target folder into it, we create the two scripts and the application.conf such that the content is as follows:

$ ls -ohn
total 9984
-rw-r--r--  1 501   165B Dec 26 18:43 application.conf
-rw-r--r--  1 501   4.9M Dec 26 18:28 ping-pong-1.0-SNAPSHOT-fat.jar
-rwxr-xr-x  1 501    93B Dec 26 18:35 pinger.sh
-rwxr-xr-x  1 501    93B Dec 26 18:33 ponger.sh

Note the executable flag set on the bash scripts. Now write the following into the newly created files.

application.conf
pingpong.self.host = "" // insert self hostname
// pinger only
pingpong.pinger.pongeraddr.host = "" // insert target hostname
pingpong.pinger.pongeraddr.port = 34567
ponger.sh
#!/bin/bash

java -Dconfig.file=./application.conf -jar ping-pong-1.0-SNAPSHOT-fat.jar ponger
pinger.sh
#!/bin/bash

java -Dconfig.file=./application.conf -jar ping-pong-1.0-SNAPSHOT-fat.jar pinger

And now simply pack up the dist folder and distribute it to two machines that are connected via the network, unpack and fill in the necessary fields in application.conf on both machines.

Finally, start first the ponger with:

./ponger.sh

And then the pinger:

./pinger.sh

As always the final code can be downloaded here.

Note

Of course the bash files only work on *nix machines. If you need this to run on Windows, you’ll either have to write .bat files or use one of the application packaging tools that generate .exe files from .jar files and you’ll have to fix all the paths.