Basic Networking¶
As opposed to many other languages and frameworks commonly used for writing distributed applications (e.g., Erlang and Akka) in Kompics networking is not handled transparently. Instead Kompics requires explicit addressing of messages, that is events that (could) go over the network. There are two reasons for this design: First of all, events in Kompics are normally not messages in that they are not addressed, but follow channels instead. So the logical extension of this pattern to a distributed setting would be to create cross-network channels manually. However, this would be very unintuitive for a programmer as it would be way too static a setup. And further, and this is the second reason, it would be misleading to assume that local events and network messages have the same semantics. Network messages have to deal with unavailable nodes, connection loss, partitions, and so on. Instead of trying to force a one-fits-all solution onto the programmer, as systems like Akka and the venerable Java RMI do, Kompics exposes these challenges and allows system designers to find application appropriate solutions.
The following sections will describe the Kompics Network
port, and its default implementation NettyNetwork
. The latter also features a serialisation framework, which will be described as well in this part of the tutorial. As our example program we will stick to the PingPong code from the previous tutorials and extend it with networking capabilities.
The examples in this tutorial require two new dependencies in the pom.xml
file:
<dependency>
<groupId>se.sics.kompics.basic</groupId>
<artifactId>kompics-port-network</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>se.sics.kompics.basic</groupId>
<artifactId>kompics-component-netty-network</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
Messages, Addresses, and Headers¶
The se.sics.kompics.network.Network
only allows three kinds of events: se.sics.kompics.network.Msg
may pass in both directions (indication and request), while se.sics.kompics.network.MessageNotify.Req
is a request and se.sics.kompics.network.MessageNotify.Resp
is an indication. The latter two can be used to ask the Network
service for feedback about whether or not a specific message was sent, how long it took, and how big the serialised version was. This is convenient for systems that need to keep statistics about their messages, or where resources should be freed as soon as a message crosses the wire.
All messages that go over the network have to implement the Msg
interface, which merely stipulates that a message has to have some kind of header (se.sics.kompics.network.Header
). The deprecated methods still need to be implemented for backwards compatibility, but should simply forward fields from the Header
.
The header itself requires three fields:
- A source address, which on the sender side is typically the self address of the node, and on the receiver side refers to the sender.
- A destination address, which tells the network on the sender side, where the message should go, and is typically the self address on the receiver side.
- The transport protocol to be used for the message. There is no requirement for all
Network
implementations to implement all possible protocols.NettyNetwork
implementsTCP
,UDP
, andUDT
only.
The se.sics.kompics.network.Address
interface has four required methods:
- IP and
- port are pretty self explanatory.
asSocket
should get a combined represenation of IP and port. It is strongly recommended to keep the internal representation of the address asInetSocketAddress
, since this method will be called a lot more often than the first two, and creating a new object for it on the fly is rather expensive.sameHostAs
is used for avoiding serialisation overhead when the message would end up in the same JVM anyway. This can be used for local addressing of components and is typically important for virtual nodes (see section Virtual Networks) where components on the same JVM will often communicate via the network port.
Note
None of the interface presented above make any requirements as to the (im-)mutability of their fields. It is typically recommended to make all of them immutable. However, certain setups will require things like mutable headers, for example routing might be implemented in such a way.
Since almost all application will need custom fields in addition to the fields in the Msg
, Header
, or Address
interfaces, almost everyone will want to write their own implementations. However, if that should not be the case, a simple default implementation can be found in se.sics.kompics.network.netty.DirectMessage
, se.sics.kompics.network.netty.DirectHeader
, and se.sics.kompics.network.netty.NettyAddress
. For the purpose of this tutorial however, we will write our own, which we will prefix with T (for Tutorial) to avoid naming conflicts, since Java lacks support for import aliases.
package se.sics.test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import se.sics.kompics.network.Address;
public class TAddress implements Address {
private final InetSocketAddress isa;
public TAddress(InetAddress addr, int port) {
this.isa = new InetSocketAddress(addr, port);
}
@Override
public InetAddress getIp() {
return this.isa.getAddress();
}
@Override
public int getPort() {
return this.isa.getPort();
}
@Override
public InetSocketAddress asSocket() {
return this.isa;
}
@Override
public boolean sameHostAs(Address other) {
return this.isa.equals(other.asSocket());
}
// Not required but strongly recommended
@Override
public final String toString() {
return isa.toString();
}
@Override
public int hashCode() {
int hash = 3;
hash = 11 * hash + (this.isa != null ? this.isa.hashCode() : 0);
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final TAddress other = (TAddress) obj;
if (this.isa != other.isa && (this.isa == null || !this.isa.equals(other.isa))) {
return false;
}
return true;
}
}
package se.sics.test;
import se.sics.kompics.network.Address;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.Transport;
public class THeader implements Header<TAddress> {
public final TAddress src;
public final TAddress dst;
public final Transport proto;
public THeader(TAddress src, TAddress dst, Transport proto) {
this.src = src;
this.dst = dst;
this.proto = proto;
}
@Override
public TAddress getSource() {
return src;
}
@Override
public TAddress getDestination() {
return dst;
}
@Override
public Transport getProtocol() {
return proto;
}
}
package se.sics.test;
import se.sics.kompics.network.Address;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Transport;
public abstract class TMessage implements Msg<TAddress, THeader> {
public final THeader header;
public TMessage(TAddress src, TAddress dst, Transport protocol) {
this.header = new THeader(src, dst, protocol);
}
@Override
public THeader getHeader() {
return this.header;
}
@Override
public TAddress getSource() {
return this.header.src;
}
@Override
public TAddress getDestination() {
return this.header.dst;
}
@Override
public Transport getProtocol() {
return this.header.proto;
}
}
For our PingPong example we shall have both Ping
and Pong
extend TMessage
instead of the direct request-response. We also hard-code TCP
as protocol for both for now.
package se.sics.test;
import se.sics.kompics.network.Transport;
public class Ping extends TMessage {
public Ping(TAddress src, TAddress dst) {
super(src, dst, Transport.TCP);
}
}
package se.sics.test;
import se.sics.kompics.network.Transport;
public class Pong extends TMessage {
public Pong(TAddress src, TAddress dst) {
super(src, dst, Transport.TCP);
}
}
Now we need to change the Pinger
and the Ponger
to take a self address as init-parameter, require the Network
port and feed the new constructor arguments to the Ping
and Pong
classes. Since we are using the network port now for the ping-pong, we can remove the requirement for the PingPongPort
. In this first example we’ll make our lives somewhat simple and use the self address as source and destination for both classes. This corresponds to the local reflection example mentioned above.
package se.sics.test;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.Start;
import se.sics.kompics.timer.*;
import se.sics.kompics.network.Network;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
public class Pinger extends ComponentDefinition {
private static final Logger LOG = LoggerFactory.getLogger(Pinger.class);
Positive<Network> net = requires(Network.class);
Positive<Timer> timer = requires(Timer.class);
private long counter = 0;
private UUID timerId;
private final TAddress self;
public Pinger(Init init) {
this.self = init.self;
}
Handler<Start> startHandler = new Handler<Start>(){
public void handle(Start event) {
SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, 1000);
PingTimeout timeout = new PingTimeout(spt);
spt.setTimeoutEvent(timeout);
trigger(spt, timer);
timerId = timeout.getTimeoutId();
}
};
Handler<Pong> pongHandler = new Handler<Pong>(){
public void handle(Pong event) {
counter++;
LOG.info("Got Pong #{}!", counter);
}
};
Handler<PingTimeout> timeoutHandler = new Handler<PingTimeout>() {
public void handle(PingTimeout event) {
trigger(new Ping(self, self), net);
}
};
{
subscribe(startHandler, control);
subscribe(pongHandler, net);
subscribe(timeoutHandler, timer);
}
@Override
public void tearDown() {
trigger(new CancelPeriodicTimeout(timerId), timer);
}
public static class PingTimeout extends Timeout {
public PingTimeout(SchedulePeriodicTimeout spt) {
super(spt);
}
}
public static class Init extends se.sics.kompics.Init<Pinger> {
public final TAddress self;
public Init(TAddress self) {
this.self = self;
}
}
}
package se.sics.test;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.Handler;
import se.sics.kompics.network.Network;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Ponger extends ComponentDefinition {
private static final Logger LOG = LoggerFactory.getLogger(Ponger.class);
Positive<Network> net = requires(Network.class);
private long counter = 0;
private final TAddress self;
public Ponger(Init init) {
this.self = init.self;
}
Handler<Ping> pingHandler = new Handler<Ping>(){
public void handle(Ping event) {
counter++;
LOG.info("Got Ping #{}!", counter);
trigger(new Pong(self, event.getSource()), net);
}
};
{
subscribe(pingHandler, net);
}
public static class Init extends se.sics.kompics.Init<Ponger> {
public final TAddress self;
public Init(TAddress self) {
this.self = self;
}
}
}
Finally, we have to create the NettyNetwork
in the Parent
class, and connect it appropriately. As a preparation for later, we are going to take the port for self address as a commandline argument in the Main
class, and hardcode 127.0.0.1
as IP address.
package se.sics.test;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Component;
import se.sics.kompics.Init;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.network.netty.NettyInit;
public class Parent extends ComponentDefinition {
public Parent(Init init) {
Component timer = create(JavaTimer.class, Init.NONE);
Component network = create(NettyNetwork.class, new NettyInit(init.self));
Component pinger = create(Pinger.class, new Pinger.Init(init.self));
Component ponger = create(Ponger.class, new Ponger.Init(init.self));
Component pinger2 = create(Pinger.class, new Pinger.Init(init.self));
connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class));
connect(pinger2.getNegative(Timer.class), timer.getPositive(Timer.class));
connect(pinger.getNegative(Network.class), network.getPositive(Network.class));
connect(pinger2.getNegative(Network.class), network.getPositive(Network.class));
connect(ponger.getNegative(Network.class), network.getPositive(Network.class));
}
public static class Init extends se.sics.kompics.Init<Parent> {
public final TAddress self;
public Init(TAddress self) {
this.self = self;
}
}
}
package se.sics.test;
import se.sics.kompics.Kompics;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class Main {
public static void main(String[] args) {
try {
InetAddress ip = InetAddress.getLocalHost();
int port = Integer.parseInt(args[0]);
TAddress self = new TAddress(ip, port);
Kompics.createAndStart(Parent.class, new Parent.Init(self), 2);
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
System.exit(1);
}
Kompics.shutdown();
System.exit(0);
} catch (UnknownHostException ex) {
System.err.println(ex);
System.exit(1);
}
}
}
At this point we can compile and run the code with a slight variation to the usual commands, since we need the port as an argument now (pick a different port if 34567
is in use on your system):
mvn clean compile
mvn exec:java -Dexec.mainClass="se.sics.test.Main" -Dexec.args="34567"
The code until here can be found here
.
We can see from the output that our setup technically works, but we are back to the problem of getting four Pong
s on our two Ping
s. The reason is, of course, that we are cheating. We are running all components in the same JVM, but we are not using proper Virtual Networks support, yet. We also do not actually want to use virtual nodes here, but instead each Pinger
and Ponger
should be on a different host (or at least in a different JVM).
Before we can fix this, though, we have to fix another problem that we don’t even see yet: None of our messages are currently serialisable.
Serialisation¶
With NettyNetwork
comes a serialisation framework that builds on Netty’s io.netty.buffer.ByteBuf
. The Kompics part is two-fold, and consists of the se.sics.kompics.network.netty.serialization.Serializer
interface, and a Serializer
registration, mapping, and lookup service in se.sics.kompics.network.netty.serialization.Serializers
.
For every Serializer
you write and register you will be required to pick a unique int
identifier. You are required to keep track of the identifiers you use yourself. Kompics reserves 0-16 for internal use. By default only a single byte
worth of identifier space is used, to minimise the overhead of the framework. If you are assigning larger identifiers than 255, make sure to use the resize
method in Serializers
before registering the respective Serializer
s.
For larger projects it can become quite difficult to keep track of which identifiers are used and which are free. It is recommended to either assign them all in a single place (e.g., static fields of a class, or a configuration file), or assign subranges to submodules of the project and then do the same single-location-assignment per module.
In the toBinary
and fromBinary
methods in the Serializer
interface you are free to do as you like, as long as at the end of the toBinary
the whole object is in the ByteBuf
and at the end of the fromBinary
the same number of bytes that you put into the ByteBuf
are removed from it again.
The Serializers
class has two important functions. First it registers Serializer
s into the system so they can be looked up when messages come in that have been serialised with them (as defined by their identifier). And second it maps types to specific Serializer
instances that are already registered. The most convenient way to do that is by registering a short name to the Serializer
instance, and then register the class mappings to that name. However, the identifier works as well.
The lookup of Serializer
s for types is hierarchical, that is, when there is no exact mapping found for the type, the system will traverse its supertype (and interface) hierarchy to find a matching supertype Serializer
and use that one instead. For example, if the object to be serialised implements Java’s Serializable
interface and there is no more specific Serializer
registered, the object will be serialised with Java object serialisation (which is registered by default).
The other default Serializer
s (and their identifiers) are:
se.sics.kompics.network.netty.serialization.SpecialSerializers.NullSerializer
fornull
se.sics.kompics.network.netty.serialization.SpecialSerializers.ByteSerializer
forbyte[]
se.sics.kompics.network.netty.serialization.SpecialSerializers.AddressSerializer
forNettyAddress
se.sics.kompics.network.netty.serialization.JavaSerializer
for anything that implementsSerializable
se.sics.kompics.network.netty.serialization.ProtobufSerializer
for Protocol Buffers (not registered by default)se.sics.kompics.network.netty.NettySerializer
forNettyNetwork
internal messagesse.sics.kompics.network.netty.serialization.SpecialSerializers.UUIDSerializer
forUUID
se.sics.kompics.network.netty.serialization.AvroSerializer
for Avro (not registered by default)- to 16. reserved for future additions
Note
For people familiar with common Java serialisation frameworks it might be obvious that Kryo is missing from the above list. This is on purpose. Kompics’ previous messaging middleware used to rely on Kryo for serialisation, but Kryo’s low level object layout hacks lead to a host of problems with deployments that included different types of JVMs (e.g., Oracle JRE and Open JRE, or different versions). For this reason we have purposefully moved away from Kryo serialisation. You are welcome to implement your own Kryo-based Serializer
, just know that in some setups you might encounter a lot of very difficult to pinpoint bugs.
In our PingPong example, the classes that need to be serialised are: TAddress
, THeader
, Ping
, and Pong
. Note that there is no necessesity to serialise TMessage
separately from Ping
and Pong
at this point, as it has no content of its own. Of course, we could simply have all of those classes implement Serializable
and be done with it. Since this is a tutorial (and Java’s object serialisation is terribly inefficient) we are instead going to write our own custom serialisers. As preparation for future sections and to show off the system, we will split the serialisation logic into two classes: NetSerializer
will deal with TAddress
and THeader
instances, while PingPongSerializer
will deal with Ping
and Pong
messages. Since THeader
is part of the messages, we will of course call the NetSerializer
from the PingPongSerializer
, but we will act as if we don’t know what kind of Header
we are dealing with and let the serialisation framework figure that out. Similarly, TAddress
es are part of a THeader
so we will invoke the NetSerializer
from itself. Note, that the difference between two approaches is, whether or not the identifier will be prepended. If we invoke a Serializer
manually from within another Serializer
it is assumed that it will be known at serialisation and deserialisation time what the object is and the identifier can be omitted. Otherwise the system will figure it out itself using the identifier.
In order to keep our identifier spaces separate we’ll pick 100 for the NetSerializer
and 200 for the PingPongSerializer
. And since we only have two instances to deal with, we’ll just define the identifiers within the classes in the lazy way.
We also added some extra constructors to the messages, to make it easier to use THeader
instances from another Serializer
during deserialisation.
package se.sics.test;
import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.network.Transport;
import se.sics.kompics.network.netty.serialization.Serializer;
public class NetSerializer implements Serializer {
private static final byte ADDR = 1;
private static final byte HEADER = 2;
@Override
public int identifier() {
return 100;
}
@Override
public void toBinary(Object o, ByteBuf buf) {
if (o instanceof TAddress) {
TAddress addr = (TAddress) o;
buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (let's hope it's IPv4^^)
buf.writeShort(addr.getPort()); // we only need 2 bytes here
// total 7 bytes
} else if (o instanceof THeader) {
THeader header = (THeader) o;
buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
this.toBinary(header.src, buf); // use this serialiser again (7 bytes)
this.toBinary(header.dst, buf); // use this serialiser again (7 bytes)
buf.writeByte(header.proto.ordinal()); // 1 byte is enough
// total 16 bytes
}
}
@Override
public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
byte type = buf.readByte(); // read the first byte to figure out the type
switch (type) {
case ADDR: {
byte[] ipBytes = new byte[4];
buf.readBytes(ipBytes);
try {
InetAddress ip = InetAddress.getByAddress(ipBytes); // 4 bytes
int port = buf.readUnsignedShort(); // 2 bytes
return new TAddress(ip, port); // total of 7, check
} catch (UnknownHostException ex) {
throw new RuntimeException(ex); // let Netty deal with this
}
}
case HEADER: {
TAddress src = (TAddress) this.fromBinary(buf, Optional.absent()); // We already know what it's going to be (7 bytes)
TAddress dst = (TAddress) this.fromBinary(buf, Optional.absent()); // same here (7 bytes)
int protoOrd = buf.readByte(); // 1 byte
Transport proto = Transport.values()[protoOrd];
return new THeader(src, dst, proto); // total of 16 bytes, check
}
}
return null; // strange things happened^^
}
}
package se.sics.test;
import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.netty.serialization.Serializers;
public class PingPongSerializer implements Serializer {
private static final byte PING = 1;
private static final byte PONG = 2;
@Override
public int identifier() {
return 200;
}
@Override
public void toBinary(Object o, ByteBuf buf) {
if (o instanceof Ping) {
Ping ping = (Ping) o;
buf.writeByte(PING); // 1 byte
Serializers.toBinary(ping.header, buf); // 1 byte serialiser id + 16 bytes THeader
// total 18 bytes
} else if (o instanceof Pong) {
Pong pong = (Pong) o;
buf.writeByte(PONG); // 1 byte
Serializers.toBinary(pong.header, buf); // 1 byte serialiser id + 16 bytes THeader
// total 18 bytes
}
}
@Override
public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
byte type = buf.readByte(); // 1 byte
switch (type) {
case PING: {
THeader header = (THeader) Serializers.fromBinary(buf, Optional.absent()); // 1 byte serialiser id + 16 bytes THeader
return new Ping(header); // 18 bytes total, check
}
case PONG: {
THeader header = (THeader) Serializers.fromBinary(buf, Optional.absent()); // 1 byte serialiser id + 16 bytes THeader
return new Pong(header); // 18 bytes total, check
}
}
return null;
}
}
Note
The serialisation above code is not particularly space efficient, since that was not the purpose. To get an idea how to use bit-fields to get a significant space reduction on these kind of frequently used serialisation procedures, take a look at the source code of the SpecialSerializers
class over on Github.
All that is left is now is to register the new Serializer
s and map the right types to them. We add the following to the Main
class:
static {
// register
Serializers.register(new NetSerializer(), "netS");
Serializers.register(new PingPongSerializer(), "ppS");
// map
Serializers.register(TAddress.class, "netS");
Serializers.register(THeader.class, "netS");
Serializers.register(Ping.class, "ppS");
Serializers.register(Pong.class, "ppS");
}
Distributed PingPong¶
Now we are prepared for a true distributed deployment of our PingPong example. There are a number of changes we need to make to the way we set up the component hierarchy. First of all, we want to deploy Pinger
and Ponger
separately, and they also need different parameters. The Ponger
is purely reactive and it only needs to know its own self address. The Pinger
on the other hand needs to know both its own address and a Ponger
’s. We are going to make use of that distinction in the Main
class to decide which one to start. If we see two commandline arguments (1 IP and 1 port), we are going to start a Ponger
. If, however, we see four commandline arguments (2 IPs and 2 ports), we are going to start a Pinger
. Since our application classes now need different setup, we are also going to split the Parent
into a PingerParent
and a PongerParent
. Note that it is always a good idea to have a class without any business logic that sets up the Network
and Timer
connections, as you will see in the section on Simulation.
package se.sics.test;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.timer.java.JavaTimer;
public class PingerParent extends ComponentDefinition {
public PingerParent(Init init) {
Component timer = create(JavaTimer.class, Init.NONE);
Component network = create(NettyNetwork.class, new NettyInit(init.self));
Component pinger = create(Pinger.class, new Pinger.Init(init.self, init.ponger));
connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY);
connect(pinger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
}
public static class Init extends se.sics.kompics.Init<PingerParent> {
public final TAddress self;
public final TAddress ponger;
public Init(TAddress self, TAddress ponger) {
this.self = self;
this.ponger = ponger;
}
}
}
package se.sics.test;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.NettyInit;
import se.sics.kompics.network.netty.NettyNetwork;
public class PongerParent extends ComponentDefinition {
public PongerParent(Init init) {
Component network = create(NettyNetwork.class, new NettyInit(init.self));
Component ponger = create(Ponger.class, new Ponger.Init(init.self));
connect(ponger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY);
}
public static class Init extends se.sics.kompics.Init<PongerParent> {
public final TAddress self;
public Init(TAddress self) {
this.self = self;
}
}
}
package se.sics.test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.Kompics;
import se.sics.kompics.network.netty.serialization.Serializers;
public class Main {
static {
// register
Serializers.register(new NetSerializer(), "netS");
Serializers.register(new PingPongSerializer(), "ppS");
// map
Serializers.register(TAddress.class, "netS");
Serializers.register(THeader.class, "netS");
Serializers.register(Ping.class, "ppS");
Serializers.register(Pong.class, "ppS");
}
public static void main(String[] args) {
try {
if (args.length == 2) { // start Ponger
InetAddress ip = InetAddress.getByName(args[0]);
int port = Integer.parseInt(args[1]);
TAddress self = new TAddress(ip, port);
Kompics.createAndStart(PongerParent.class, new PongerParent.Init(self), 2);
System.out.println("Starting Ponger at " + self);
// no shutdown this time...act like a server and keep running until externally exited
} else if (args.length == 4) { // start Pinger
InetAddress myIp = InetAddress.getByName(args[0]);
int myPort = Integer.parseInt(args[1]);
TAddress self = new TAddress(myIp, myPort);
InetAddress pongerIp = InetAddress.getByName(args[2]);
int pongerPort = Integer.parseInt(args[3]);
TAddress ponger = new TAddress(pongerIp, pongerPort);
Kompics.createAndStart(PingerParent.class, new PingerParent.Init(self, ponger), 2);
System.out.println("Starting Pinger at" + self + " to " + ponger);
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
System.exit(1);
}
Kompics.shutdown();
System.exit(0);
} else {
System.err.println("Invalid number of parameters (2 for Ponger, 4 for Pinger)");
System.exit(1);
}
} catch (UnknownHostException ex) {
System.err.println(ex);
System.exit(1);
}
}
}
Now we are finally ready to try this. Use the usual command to compile the code:
mvn clean compile
Now the following commands should be run in different terminals, preferably side by side so you can clearly see how things are happening. For all the examples feel free to change the IPs and ports as you like, but be sure they match up correctly.
Start the Ponger
first with:
mvn exec:java -Dexec.mainClass="se.sics.test.Main" -Dexec.args="127.0.0.1 34567"
Then start a Pinger
with:
mvn exec:java -Dexec.mainClass="se.sics.test.Main" -Dexec.args="127.0.0.1 45678 127.0.0.1 34567"
You can start the same Pinger
multiple times or start as many Pinger
JVM in parallel as you like. Make sure they all have their own port, though!
You will see that there is no Ping
amplification anymore as we saw in the last example. All messages are now correctly addressed and replied to. If you can’t clearly see what’s going on among all the logging output try to reduce the logging level to INFO
in the log4j.properties
. Finally, use Control-c to shutdown the Ponger
as it will run forever otherwise.
As always the code until here can be downloaded here
.
Cleanup: Config files, ClassMatchers, and Assembly¶
While our example from the previous section works, there are still a number of things that are not optimal with it. We’ll use this section to make the whole code a bit nicer, and also change the way we deploy, since we can’t really rely on Maven being present on all target machines.
Configuration Files¶
First of all, you might have noticed that we have a lot of redundancy in the passing around of parameters between the different component Init
objects. Furthermore, our reading from the commandline is not the safest. Sure, there are good libraries for commandline options, but this is not really what we want. What we want is to define a bunch of values once and then be able to access them from anywhere within the code. The right solution for this problem is using a configuration file, where we write IPs and ports and such things, and a configuration library that knows how to give us access to the values in our code. Kompics has its own configuration library, which by default uses Typesafe Config as a backend.
If you prefer a different configuration library, you may of course wrap it in an implementation of se.sics.kompics.config.BaselineConfig
and pass it into se.sics.kompics.config.Config.Factory
and then replace the default config with your custom one in Kompics.setConfig
before starting the runtime.
For the tutorial we are going to stick to Typesafe Config as a baseline. We are thus going to add a src/main/resources/reference.conf
file, where we describe default values for all our config options. This is not strictly speaking necessary, but it is generally a good idea to have one place your users can look at where all possible config values are outlined. While we are at it, we also make the timeout period configurable.
pingpong {
self {
host = "127.0.0.1"
port = 34567
}
pinger {
timeout = 1000
pongeraddr {
host = "127.0.0.1"
port = 45678
}
}
}
Now that we have a configuration file, we can simply throw away all the Init
classes we created before, and pull out the desired values from the config in the Pinger
and Ponger
constructors.
public Pinger() {
try {
InetAddress selfIp = InetAddress.getByName(config().getValue("pingpong.self.host", String.class));
int selfPort = config().getValue("pingpong.self.port", Integer.class);
this.self = new TAddress(selfIp, selfPort);
InetAddress pongerIp = InetAddress.getByName(config().getValue("pingpong.pinger.pongeraddr.host", String.class));
int pongerPort = config().getValue("pingpong.pinger.pongeraddr.port", Integer.class);
this.ponger = new TAddress(pongerIp, pongerPort);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
Handler<Start> startHandler = new Handler<Start>() {
@Override
public void handle(Start event) {
long period = config().getValue("pingpong.pinger.timeout", Long.class);
SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, period);
PingTimeout timeout = new PingTimeout(spt);
spt.setTimeoutEvent(timeout);
trigger(spt, timer);
timerId = timeout.getTimeoutId();
}
};
public Ponger() {
try {
InetAddress selfIp = InetAddress.getByName(config().getValue("pingpong.self.host", String.class));
int selfPort = config().getValue("pingpong.self.port", Integer.class);
this.self = new TAddress(selfIp, selfPort);
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
However, since the NettyNetwork
needs to know the self address as well, we are going to have to duplicate some of this work in the PingerParent
and PongerParent
. Alternatively we could continue to pass in the self address via the Pinger
and Ponger
Init
classes and only construct it once in the respective parent class.
There is another solution, though, that gives a lot more concise code. Kompics’ configuration system supports so called conversions, which are used to convert compatible types from the config values to the requested values. For example, it would be unnecessary to throw an exception when the user is asking for an instance of Long
but the value is returned as Integer
by Typesafe Config. Instead the config library will look through a number of se.sics.kompics.config.Converter
instances that are registered at se.sics.kompics.config.Conversions
(this is very similar to how the serialisation framework is used) and try to find one that can convert an Integer
object to a Long
object. Thus we can use this system to write Converter
that takes the object at "pingpong.self"
for example and converts it to a TAddress
. It turns out that the way we wrote the reference.conf
Typesafe Config will give us a Map
with the subvalues as keys. In this case we can pull out the values, convert them to String
and Integer
instances respectively and then construct the TAddress
as before. As an example, we are also going to support an alternative way to write a TAddress
, which is a single String
in the following format: "127.0.0.1:34567"
.
package se.sics.test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import se.sics.kompics.config.Conversions;
import se.sics.kompics.config.Converter;
public class TAddressConverter implements Converter<TAddress> {
@Override
public TAddress convert(Object o) {
if (o instanceof Map) {
try {
Map m = (Map) o;
String hostname = Conversions.convert(m.get("host"), String.class);
int port = Conversions.convert(m.get("port"), Integer.class);
InetAddress ip = InetAddress.getByName(hostname);
return new TAddress(ip, port);
} catch (UnknownHostException ex) {
return null;
}
}
if (o instanceof String) {
try {
String[] ipport = ((String) o).split(":");
InetAddress ip = InetAddress.getByName(ipport[0]);
int port = Integer.parseInt(ipport[1]);
} catch (UnknownHostException ex) {
return null;
}
}
return null;
}
@Override
public Class<TAddress> type() {
return TAddress.class;
}
}
Additionally we also need to register the new Converter
in the static initialisation block of the Main
class and then we can get a TAddress
by simply calling config().getValue("pingpong.self", TAddress.class);
, for example.
static {
// register
Serializers.register(new NetSerializer(), "netS");
Serializers.register(new PingPongSerializer(), "ppS");
// map
Serializers.register(TAddress.class, "netS");
Serializers.register(THeader.class, "netS");
Serializers.register(Ping.class, "ppS");
Serializers.register(Pong.class, "ppS");
// conversions
Conversions.register(new TAddressConverter());
}
Note
We are still doing the same work as before, technically even more since we have to look through all the Converter
instances now. It simply looks a lot nicer like this, as there is less unnecessary code duplication. At this time none of the converted values are cached anywhere, thus it is recommended to always write values from the configuration, that are often used, into local fields, instead of pulling them out of the config on demand every time they are needed.
Message Matching Handlers¶
Another thing that feels awkward with our code is how we write network messages. Our TMessage
class does almost nothing except define what kind of header we expect, and all actual network messages like Ping
and Pong
have to implement all these annoying constructors that TMessage
requires instead of focusing on their business logic (which is trivially simple to non-existent^^). We would much rather have the TMessage
act as a kind of container for data and then Ping
and Pong
would simply be payloads. But then how would the handlers of Pinger
and Ponger
know which messages are for them, i.e. are Ping
and Pong
respectively, and which are for other classes. They would have to match on TMessage
and handle all network messages. That would be way too expensive in a large system. Under no circumstance do we want to schedule components unnecessarily. The solution to our problem can be found in se.sics.kompics.ClassMatchedHandler
which provides a very simple form of pattern matching for Kompics. Instead of matching on a single event type, it matches on two event types: The context type, which we will define as TMessage
, and the content type which will be Ping
and Pong
respectively.
We shall rewrite TMessage
to carry any kind of KompicsEvent
as a payload, and to act as se.sics.kompics.PatternExtractor
for the payload. We’ll also move its serialisation logic into the NetSerializer
leaving the PingPongSerializer
rather trivial as a result.
package se.sics.test;
import se.sics.kompics.KompicsEvent;
import se.sics.kompics.PatternExtractor;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Transport;
public class TMessage implements Msg<TAddress, THeader>, PatternExtractor<Class<Object>, KompicsEvent> {
public final THeader header;
public final KompicsEvent payload;
public TMessage(TAddress src, TAddress dst, Transport protocol, KompicsEvent payload) {
this.header = new THeader(src, dst, protocol);
this.payload = payload;
}
TMessage(THeader header, KompicsEvent payload) {
this.header = header;
this.payload = payload;
}
@Override
public THeader getHeader() {
return this.header;
}
@Override
public TAddress getSource() {
return this.header.src;
}
@Override
public TAddress getDestination() {
return this.header.dst;
}
@Override
public Transport getProtocol() {
return this.header.proto;
}
@Override
public Class<Object> extractPattern() {
Class c = payload.getClass();
return (Class<Object>) c;
}
@Override
public KompicsEvent extractValue() {
return payload;
}
}
package se.sics.test;
import se.sics.kompics.KompicsEvent;
public class Ping implements KompicsEvent {
}
package se.sics.test;
import se.sics.kompics.KompicsEvent;
public class Pong implements KompicsEvent {
}
package se.sics.test;
import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.UnknownHostException;
import se.sics.kompics.KompicsEvent;
import se.sics.kompics.network.Transport;
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.netty.serialization.Serializers;
public class NetSerializer implements Serializer {
private static final byte ADDR = 1;
private static final byte HEADER = 2;
private static final byte MSG = 3;
@Override
public int identifier() {
return 100;
}
@Override
public void toBinary(Object o, ByteBuf buf) {
if (o instanceof TAddress) {
TAddress addr = (TAddress) o;
buf.writeByte(ADDR); // mark which type we are serialising (1 byte)
addressToBinary(addr, buf); // 6 bytes
// total 7 bytes
} else if (o instanceof THeader) {
THeader header = (THeader) o;
buf.writeByte(HEADER); // mark which type we are serialising (1 byte)
headerToBinary(header, buf); // 13 bytes
// total 14 bytes
} else if (o instanceof TMessage) {
TMessage msg = (TMessage) o;
buf.writeByte(MSG); // mark which type we are serialising (1 byte)
headerToBinary(msg.header, buf); // 13 bytes
Serializers.toBinary(msg.payload, buf); // no idea what it is, let the framework deal with it
}
}
@Override
public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
byte type = buf.readByte(); // read the first byte to figure out the type
switch (type) {
case ADDR:
return addressFromBinary(buf);
case HEADER:
return headerFromBinary(buf);
case MSG: {
THeader header = headerFromBinary(buf); // 13 bytes
KompicsEvent payload = (KompicsEvent) Serializers.fromBinary(buf, Optional.absent()); // don't know what it is but KompicsEvent is the upper bound
return new TMessage(header, payload);
}
}
return null; // strange things happened^^
}
private void headerToBinary(THeader header, ByteBuf buf) {
addressToBinary(header.src, buf); // 6 bytes
addressToBinary(header.dst, buf); // 6 bytes
buf.writeByte(header.proto.ordinal()); // 1 byte is enough
// total of 13 bytes
}
private THeader headerFromBinary(ByteBuf buf) {
TAddress src = addressFromBinary(buf); // 6 bytes
TAddress dst = addressFromBinary(buf); // 6 bytes
int protoOrd = buf.readByte(); // 1 byte
Transport proto = Transport.values()[protoOrd];
return new THeader(src, dst, proto); // total of 13 bytes, check
}
private void addressToBinary(TAddress addr, ByteBuf buf) {
buf.writeBytes(addr.getIp().getAddress()); // 4 bytes IP (let's hope it's IPv4^^)
buf.writeShort(addr.getPort()); // we only need 2 bytes here
// total of 6 bytes
}
private TAddress addressFromBinary(ByteBuf buf) {
byte[] ipBytes = new byte[4];
buf.readBytes(ipBytes); // 4 bytes
try {
InetAddress ip = InetAddress.getByAddress(ipBytes);
int port = buf.readUnsignedShort(); // 2 bytes
return new TAddress(ip, port); // total of 6, check
} catch (UnknownHostException ex) {
throw new RuntimeException(ex); // let Netty deal with this
}
}
}
package se.sics.test;
import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import se.sics.kompics.network.netty.serialization.Serializer;
public class PingPongSerializer implements Serializer {
private static final byte PING = 1;
private static final byte PONG = 2;
@Override
public int identifier() {
return 200;
}
@Override
public void toBinary(Object o, ByteBuf buf) {
if (o instanceof Ping) {
Ping ping = (Ping) o;
buf.writeByte(PING); // 1 byte
// total 1 bytes
} else if (o instanceof Pong) {
Pong pong = (Pong) o;
buf.writeByte(PONG); // 1 byte
// total 1 bytes
}
}
@Override
public Object fromBinary(ByteBuf buf, Optional<Object> hint) {
byte type = buf.readByte(); // 1 byte
switch (type) {
case PING:
return new Ping(); // 1 bytes total, check
case PONG:
return new Pong(); // 1 bytes total, check
}
return null;
}
}
package se.sics.test;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
public class Pinger extends ComponentDefinition {
private static final Logger LOG = LoggerFactory.getLogger(Pinger.class);
Positive<Network> net = requires(Network.class);
Positive<Timer> timer = requires(Timer.class);
private long counter = 0;
private UUID timerId;
private final TAddress self;
private final TAddress ponger;
public Pinger() {
this.self = config().getValue("pingpong.self", TAddress.class);
this.ponger = config().getValue("pingpong.pinger.pongeraddr", TAddress.class);
}
Handler<Start> startHandler = new Handler<Start>() {
@Override
public void handle(Start event) {
long period = config().getValue("pingpong.pinger.timeout", Long.class);
SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, period);
PingTimeout timeout = new PingTimeout(spt);
spt.setTimeoutEvent(timeout);
trigger(spt, timer);
timerId = timeout.getTimeoutId();
}
};
ClassMatchedHandler<Pong, TMessage> pongHandler = new ClassMatchedHandler<Pong, TMessage>() {
@Override
public void handle(Pong content, TMessage context) {
counter++;
LOG.info("Got Pong #{}!", counter);
}
};
Handler<PingTimeout> timeoutHandler = new Handler<PingTimeout>() {
@Override
public void handle(PingTimeout event) {
trigger(new TMessage(self, ponger, Transport.TCP, new Ping()), net);
}
};
{
subscribe(startHandler, control);
subscribe(pongHandler, net);
subscribe(timeoutHandler, timer);
}
@Override
public void tearDown() {
trigger(new CancelPeriodicTimeout(timerId), timer);
}
public static class PingTimeout extends Timeout {
public PingTimeout(SchedulePeriodicTimeout spt) {
super(spt);
}
}
}
package se.sics.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Positive;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;
public class Ponger extends ComponentDefinition {
private static final Logger LOG = LoggerFactory.getLogger(Ponger.class);
Positive<Network> net = requires(Network.class);
private long counter = 0;
private final TAddress self;
public Ponger() {
this.self = config().getValue("pingpong.self", TAddress.class);
}
ClassMatchedHandler<Ping, TMessage> pingHandler = new ClassMatchedHandler<Ping, TMessage>() {
@Override
public void handle(Ping content, TMessage context) {
counter++;
LOG.info("Got Ping #{}!", counter);
trigger(new TMessage(self, context.getSource(), Transport.TCP, new Pong()), net);
}
};
{
subscribe(pingHandler, net);
}
}
And of course remember to register TMessage
to the "netS"
serialiser in the static initilisation block of Main
.
Note
The ClassMatchedHandler
is in fact only a specialisation of the more general se.sics.kompics.MatchedHandler
which can use any kind of pattern to select values, and not just Class
instances. The advantage of the ClassMatchedHandler
is that the pattern to match against can be automatically extracted from the signature of the handle
method using Java’s reflection API. For more general MatchedHandler
usages the pattern would have to be supplied manually by overriding the pattern
method.
Assembly¶
Lastly we finally need to move away from Maven to run our code. We need to able to deploy, configure, and run complete artifacts with all dependencies included. To achieve that goal we are going to need four things:
- A Maven assembly plugin,
- a
dist
folder where we collect all deployment artifacts,- an
application.conf`
in thedist
folder that is used to override configuration values from thereference.conf
we need to customise for a specific deployment, and- two bash scripts
pinger.sh
andponger.sh
that hide away the ugly JVM configuration parameters from the users.
For the assembly plugin we have two options. Either we use the default Maven Assembly Plugin, which is a bit simpler, or we use Maven Shade Plugin, which is a bit more powerful. For the tutorial we are going to use the Shade plugin, as it is usually the better long-term choice.
Our new pom.xml
then looks as follows:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>se.sics.test</groupId>
<artifactId>ping-pong</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>PingPong</name>
<description>Ping Pong
</description>
<properties>
<java.compiler.version>1.7</java.compiler.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kompics.version>1.0.0</kompics.version>
</properties>
<dependencies>
<dependency>
<groupId>se.sics.kompics</groupId>
<artifactId>kompics-core</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>se.sics.kompics.basic</groupId>
<artifactId>kompics-port-timer</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>se.sics.kompics.basic</groupId>
<artifactId>kompics-component-java-timer</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>se.sics.kompics.basic</groupId>
<artifactId>kompics-port-network</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>se.sics.kompics.basic</groupId>
<artifactId>kompics-component-netty-network</artifactId>
<version>${kompics.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<inherited>true</inherited>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
<source>${java.compiler.version}</source>
<target>${java.compiler.version}</target>
<debug>true</debug>
<optimize>true</optimize>
<showDeprecations>true</showDeprecations>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<!--<phase>package</phase>//-->
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>fat</shadedClassifierName>
<!-- Any name that makes sense -->
<createDependencyReducedPom>true</createDependencyReducedPom>
<dependencyReducedPomLocation>${java.io.tmpdir}/dependency-reduced-pom.xml
</dependencyReducedPomLocation>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>se.sics.test.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>log4j:log4j</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>commons-logging:commons-logging</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<snapshots>
<enabled>false
</enabled>
</snapshots>
<id>bintray-kompics-Maven
</id>
<name>bintray
</name>
<url>https://dl.bintray.com/kompics/Maven
</url>
</repository>
</repositories>
</project>
After we create the new dist
folder and move the new shaded fat jar from the target
folder into it, we create the two scripts and the application.conf
such that the content is as follows:
$ ls -ohn
total 9984
-rw-r--r-- 1 501 165B Dec 26 18:43 application.conf
-rw-r--r-- 1 501 4.9M Dec 26 18:28 ping-pong-1.0-SNAPSHOT-fat.jar
-rwxr-xr-x 1 501 93B Dec 26 18:35 pinger.sh
-rwxr-xr-x 1 501 93B Dec 26 18:33 ponger.sh
Note the executable flag set on the bash scripts. Now write the following into the newly created files.
pingpong.self.host = "" // insert self hostname
// pinger only
pingpong.pinger.pongeraddr.host = "" // insert target hostname
pingpong.pinger.pongeraddr.port = 34567
#!/bin/bash
java -Dconfig.file=./application.conf -jar ping-pong-1.0-SNAPSHOT-fat.jar ponger
#!/bin/bash
java -Dconfig.file=./application.conf -jar ping-pong-1.0-SNAPSHOT-fat.jar pinger
And now simply pack up the dist
folder and distribute it to two machines that are connected via the network, unpack and fill in the necessary fields in application.conf
on both machines.
Finally, start first the ponger with:
./ponger.sh
And then the pinger:
./pinger.sh
As always the final code can be downloaded here
.
Note
Of course the bash files only work on *nix machines. If you need this to run on Windows, you’ll either have to write .bat
files or use one of the application packaging tools that generate .exe
files from .jar
files and you’ll have to fix all the paths.