Messages, Addresses, and Headers
The Network
port only allows three kinds of events: A Msg
may pass in both directions (indication and request), while MessageNotify.Req
is a request and MessageNotify.Resp
is an indication. The latter two can be used to ask the Network
service for feedback about whether or not a specific message was sent, how long it took, and how big the serialised version was. This is convenient for systems that need to keep statistics about their messages, or where resources should be freed as soon as a message crosses the wire. All messages that go over the network have to implement the Msg
interface, which merely stipulates that a message has to have some kind of Header
. The deprecated methods still need to be implemented for backwards compatibility, but should simply forward fields from the header.
Header
The header itself requires three fields:
- A source address, which on the sender side is typically the self address of the node, and on the receiver side refers to the sender.
- A destination address, which tells the network on the sender side, where the message should go, and is typically the self address on the receiver side.
- The transport protocol to be used for the message. There is no requirement for all
Network
implementations to implement all possible protocols.NettyNetwork
implementsTCP
,UDP
, andUDT
only.
Address
The Address
interface has four required methods:
IP
andport
are pretty self explanatory.asSocket
should get a combined represenation of IP and port. It is strongly recommended to keep the internal representation of the address asInetSocketAddress
, since this method will be called a lot more often than the first two, and creating a new object for it on the fly is rather expensive.- Finally, the
sameHostAs
method is used for avoiding serialisation overhead when the message would end up in the same JVM anyway. This can be used for local addressing of components and is typically important for virtual nodes, where components on the same JVM will often communicate via the network port.
None of the interface presented above make any requirements as to the (im-)mutability of their fields. It is generally recommended to make all of them immutable whenever possible. However, certain setups may require mutable headers, for example. A routing component might be implemented in such a way.
Implementations
Since almost all application will need custom fields in addition to the fields in the Msg
, Header
, or Address
interfaces, almost everyone will want to write their own implementations. However, if that should not be the case, a simple default implementation can be found in netty.DirectMessage
, netty.DirectHeader
, and NettyAddress
. For the purpose of this tutorial, however, we will write our own, which we will prefix with the letter T (for Tutorial) to easily differentiate it from the interfaces.
TAddress
- Java
-
package jexamples.networking.pingpong; import java.net.InetAddress; import java.net.InetSocketAddress; import se.sics.kompics.network.Address; public class TAddress implements Address { private final InetSocketAddress isa; public TAddress(InetAddress addr, int port) { this.isa = new InetSocketAddress(addr, port); } @Override public InetAddress getIp() { return this.isa.getAddress(); } @Override public int getPort() { return this.isa.getPort(); } @Override public InetSocketAddress asSocket() { return this.isa; } @Override public boolean sameHostAs(Address other) { return this.isa.equals(other.asSocket()); } // Not required but strongly recommended @Override public final String toString() { return isa.toString(); } @Override public int hashCode() { int hash = 3; hash = 11 * hash + (this.isa != null ? this.isa.hashCode() : 0); return hash; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final TAddress other = (TAddress) obj; if (this.isa != other.isa && (this.isa == null || !this.isa.equals(other.isa))) { return false; } return true; } }
- Scala
-
object TAddress { def apply(addr: InetAddress, port: Int): TAddress = { val isa = new InetSocketAddress(addr, port); TAddress(isa) } } case class TAddress(isa: InetSocketAddress) extends Address { override def getIp(): InetAddress = isa.getAddress(); override def getPort(): Int = isa.getPort(); override def asSocket(): InetSocketAddress = isa; override def sameHostAs(other: Address): Boolean = { this.isa.equals(other.asSocket()) } }
THeader
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.network.Address; import se.sics.kompics.network.Header; import se.sics.kompics.network.Transport; public class THeader implements Header<TAddress> { public final TAddress src; public final TAddress dst; public final Transport proto; public THeader(TAddress src, TAddress dst, Transport proto) { this.src = src; this.dst = dst; this.proto = proto; } @Override public TAddress getSource() { return src; } @Override public TAddress getDestination() { return dst; } @Override public Transport getProtocol() { return proto; } }
- Scala
-
case class THeader(src: TAddress, dst: TAddress, proto: Transport) extends Header[TAddress] { override def getSource(): TAddress = src; override def getDestination(): TAddress = dst; override def getProtocol(): Transport = proto; }
TMessage
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.network.Address; import se.sics.kompics.network.Header; import se.sics.kompics.network.Msg; import se.sics.kompics.network.Transport; public abstract class TMessage implements Msg<TAddress, THeader> { public final THeader header; public TMessage(TAddress src, TAddress dst, Transport protocol) { this.header = new THeader(src, dst, protocol); } @Override public THeader getHeader() { return this.header; } @SuppressWarnings("deprecation") @Override public TAddress getSource() { return this.header.src; } @SuppressWarnings("deprecation") @Override public TAddress getDestination() { return this.header.dst; } @SuppressWarnings("deprecation") @Override public Transport getProtocol() { return this.header.proto; } }
- Scala
-
abstract class TMessage(val header: THeader) extends Msg[TAddress, THeader] { def this(src: TAddress, dst: TAddress, proto: Transport) { this(THeader(src, dst, proto)) } override def getHeader(): THeader = header; override def getSource(): TAddress = header.src; override def getDestination(): TAddress = header.dst; override def getProtocol(): Transport = header.proto; }
For our PingPong example we shall have both Ping
and Pong
extend TMessage
instead of using the direct request-response. We also hard-code TCP
as protocol for both for now.
Ping
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.network.Transport; public class Ping extends TMessage { public Ping(TAddress src, TAddress dst) { super(src, dst, Transport.TCP); } }
- Scala
-
object Ping { def apply(src: TAddress, dst: TAddress): Ping = new Ping(THeader(src, dst, Transport.TCP)); } class Ping(_header: THeader) extends TMessage(_header);
Pong
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.network.Transport; public class Pong extends TMessage { public Pong(TAddress src, TAddress dst) { super(src, dst, Transport.TCP); } }
- Scala
-
object Pong { def apply(src: TAddress, dst: TAddress): Pong = new Pong(THeader(src, dst, Transport.TCP)); } class Pong(_header: THeader) extends TMessage(_header);
Now we need to change the Pinger
and the Ponger
to take a self address as init-parameter, require the Network
port and feed the new constructor arguments to the Ping
and Pong
classes. Since we are using the network port now for the ping-pong exchange, we can remove the requirement for the PingPongPort
. In this first example we shall make our lives somewhat simple and use the self address as source and destination for both classes. This corresponds to the local reflection example mentioned above.
Pinger
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.Kompics; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Positive; import se.sics.kompics.Handler; import se.sics.kompics.Start; import se.sics.kompics.timer.*; import se.sics.kompics.network.Network; import java.util.UUID; public class Pinger extends ComponentDefinition { Positive<Network> net = requires(Network.class); Positive<Timer> timer = requires(Timer.class); private long counter = 0; private UUID timerId; private final TAddress self; public Pinger(Init init) { this.self = init.self; } Handler<Start> startHandler = new Handler<Start>() { public void handle(Start event) { SchedulePeriodicTimeout spt = new SchedulePeriodicTimeout(0, 1000); PingTimeout timeout = new PingTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt, timer); timerId = timeout.getTimeoutId(); } }; Handler<Pong> pongHandler = new Handler<Pong>() { public void handle(Pong event) { counter++; logger.info("Got Pong #{}!", counter); if (counter > 10) { Kompics.asyncShutdown(); } } }; Handler<PingTimeout> timeoutHandler = new Handler<PingTimeout>() { public void handle(PingTimeout event) { trigger(new Ping(self, self), net); } }; { subscribe(startHandler, control); subscribe(pongHandler, net); subscribe(timeoutHandler, timer); } @Override public void tearDown() { trigger(new CancelPeriodicTimeout(timerId), timer); } public static class PingTimeout extends Timeout { public PingTimeout(SchedulePeriodicTimeout spt) { super(spt); } } public static class Init extends se.sics.kompics.Init<Pinger> { public final TAddress self; public Init(TAddress self) { this.self = self; } } }
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import se.sics.kompics.network.Network import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer} import java.util.UUID object Pinger { class PingTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt); } class Pinger(init: Init[Pinger]) extends ComponentDefinition { import Pinger.PingTimeout; val Init(self: TAddress) = init; val net = requires[Network]; val timer = requires[Timer]; private var counter: Long = 0L; private var timerId: Option[UUID] = None; ctrl uponEvent { case _: Start => { val spt = new SchedulePeriodicTimeout(0, 1000); val timeout = new PingTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt -> timer); timerId = Some(timeout.getTimeoutId()); } } net uponEvent { case _: Pong => { counter += 1L; log.info(s"Got Pong #${counter}!"); if (counter > 10) { Kompics.asyncShutdown(); } } } timer uponEvent { case _: PingTimeout => { trigger(Ping(self, self) -> net); } } override def tearDown(): Unit = { timerId match { case Some(id) => { trigger(new CancelPeriodicTimeout(id) -> timer); } case None => () // no cleanup necessary } } }
Ponger
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Positive; import se.sics.kompics.Handler; import se.sics.kompics.network.Network; public class Ponger extends ComponentDefinition { Positive<Network> net = requires(Network.class); private long counter = 0; private final TAddress self; public Ponger(Init init) { this.self = init.self; } Handler<Ping> pingHandler = new Handler<Ping>() { public void handle(Ping event) { counter++; logger.info("Got Ping #{}!", counter); trigger(new Pong(self, event.getSource()), net); } }; { subscribe(pingHandler, net); } public static class Init extends se.sics.kompics.Init<Ponger> { public final TAddress self; public Init(TAddress self) { this.self = self; } } }
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import se.sics.kompics.network.Network class Ponger(init: Init[Ponger]) extends ComponentDefinition { val Init(self: TAddress) = init; val net = requires[Network]; private var counter: Long = 0L; net uponEvent { case ping: Ping => { counter += 1L; log.info(s"Got Ping #${counter}!"); trigger(Pong(self, ping.getSource()) -> net); } } }
Parent
Finally, we have to create the NettyNetwork
component in the Parent
class, and connect it appropriately.
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.Channel; import se.sics.kompics.ComponentDefinition; import se.sics.kompics.Component; import se.sics.kompics.Init; import se.sics.kompics.timer.Timer; import se.sics.kompics.timer.java.JavaTimer; import se.sics.kompics.network.Network; import se.sics.kompics.network.netty.NettyNetwork; import se.sics.kompics.network.netty.NettyInit; public class Parent extends ComponentDefinition { public Parent(Init init) { Component timer = create(JavaTimer.class, Init.NONE); Component network = create(NettyNetwork.class, new NettyInit(init.self)); Component pinger = create(Pinger.class, new Pinger.Init(init.self)); Component ponger = create(Ponger.class, new Ponger.Init(init.self)); Component pinger2 = create(Pinger.class, new Pinger.Init(init.self)); connect(pinger.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY); connect(pinger2.getNegative(Timer.class), timer.getPositive(Timer.class), Channel.TWO_WAY); connect(pinger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY); connect( pinger2.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY); connect(ponger.getNegative(Network.class), network.getPositive(Network.class), Channel.TWO_WAY); } public static class Init extends se.sics.kompics.Init<Parent> { public final TAddress self; public Init(TAddress self) { this.self = self; } } }
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import se.sics.kompics.timer.Timer import se.sics.kompics.timer.java.JavaTimer import se.sics.kompics.network.Network import se.sics.kompics.network.netty.{NettyInit, NettyNetwork} class Parent(init: Init[Parent]) extends ComponentDefinition { val Init(self: TAddress) = init; val timer = create[JavaTimer]; val network = create[NettyNetwork](new NettyInit(self)); val pinger = create[Pinger](Init[Pinger](self)); val pinger2 = create[Pinger](Init[Pinger](self)); val ponger = create[Ponger](Init[Ponger](self)); connect[Timer](timer -> pinger); connect[Timer](timer -> pinger2); connect[Network](network -> pinger); connect[Network](network -> pinger2); connect[Network](network -> ponger); }
Main
As a preparation for later, we are going to take the port for self address as a commandline argument in the Main
class, and hardcode 127.0.0.1
as IP address.
- Java
-
package jexamples.networking.pingpong; import se.sics.kompics.Kompics; import java.net.InetAddress; import java.net.UnknownHostException; public class Main { public static void main(String[] args) throws InterruptedException, UnknownHostException { InetAddress ip = InetAddress.getLocalHost(); int port = Integer.parseInt(args[0]); TAddress self = new TAddress(ip, port); Kompics.createAndStart(Parent.class, new Parent.Init(self), 2); Kompics.waitForTermination(); } }
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import java.net.{InetAddress, UnknownHostException} object Main { def main(args: Array[String]): Unit = { val ip = InetAddress.getLocalHost(); val port = Integer.parseInt(args(0)); val self = TAddress(ip, port); Kompics.createAndStart(classOf[Parent], Init[Parent](self), 2); Kompics.waitForTermination(); } }
Execution
At this point we can compile and run the code with a slight variation to the usual commands, since we need the port as an argument now (pick a different port if 34567
is in use on your system):
runMain jexamples.networking.pingpong.Main 34567
runMain sexamples.networking.pingpong.Main 34567
Problems
We can see from the output that our setup technically works, but we are back to the problem of getting four Pong
s on our two Ping
s. The reason is, of course, that we are cheating. We are running all components in the same JVM, but we are not using proper virtual network support, yet. We also do not actually want to use virtual nodes here, but instead each Pinger
and Ponger
should be on a different host (or at least in a different JVM).
Before we can fix this, though, we have to fix another problem that we do not even see, yet: None of our messages are currently serialisable.