Messages, Addresses, and Headers
The Network
port only allows three kinds of events: A Msg
may pass in both directions (indication and request), while MessageNotify.Req
is a request and MessageNotify.Resp
is an indication. The latter two can be used to ask the Network
service for feedback about whether or not a specific message was sent, how long it took, and how big the serialised version was. This is convenient for systems that need to keep statistics about their messages, or where resources should be freed as soon as a message crosses the wire. All messages that go over the network have to implement the Msg
interface, which merely stipulates that a message has to have some kind of Header
. The deprecated methods still need to be implemented for backwards compatibility, but should simply forward fields from the header.
Header
The header itself requires three fields:
- A source address, which on the sender side is typically the self address of the node, and on the receiver side refers to the sender.
- A destination address, which tells the network on the sender side, where the message should go, and is typically the self address on the receiver side.
- The transport protocol to be used for the message. There is no requirement for all
Network
implementations to implement all possible protocols.NettyNetwork
implementsTCP
,UDP
, andUDT
only.
Address
The Address
interface has four required methods:
IP
andport
are pretty self explanatory.asSocket
should get a combined represenation of IP and port. It is strongly recommended to keep the internal representation of the address asInetSocketAddress
, since this method will be called a lot more often than the first two, and creating a new object for it on the fly is rather expensive.- Finally, the
sameHostAs
method is used for avoiding serialisation overhead when the message would end up in the same JVM anyway. This can be used for local addressing of components and is typically important for virtual nodes, where components on the same JVM will often communicate via the network port.
None of the interface presented above make any requirements as to the (im-)mutability of their fields. It is generally recommended to make all of them immutable whenever possible. However, certain setups may require mutable headers, for example. A routing component might be implemented in such a way.
Implementations
Since almost all application will need custom fields in addition to the fields in the Msg
, Header
, or Address
interfaces, almost everyone will want to write their own implementations. However, if that should not be the case, a simple default implementation can be found in netty.DirectMessage
, netty.DirectHeader
, and NettyAddress
. For the purpose of this tutorial, however, we will write our own, which we will prefix with the letter T (for Tutorial) to easily differentiate it from the interfaces.
TAddress
- Java
- Scala
-
object TAddress { def apply(addr: InetAddress, port: Int): TAddress = { val isa = new InetSocketAddress(addr, port); TAddress(isa) } } case class TAddress(isa: InetSocketAddress) extends Address { override def getIp(): InetAddress = isa.getAddress(); override def getPort(): Int = isa.getPort(); override def asSocket(): InetSocketAddress = isa; override def sameHostAs(other: Address): Boolean = { this.isa.equals(other.asSocket()) } }
THeader
- Java
- Scala
-
case class THeader(src: TAddress, dst: TAddress, proto: Transport) extends Header[TAddress] { override def getSource(): TAddress = src; override def getDestination(): TAddress = dst; override def getProtocol(): Transport = proto; }
TMessage
- Java
- Scala
-
abstract class TMessage(val header: THeader) extends Msg[TAddress, THeader] { def this(src: TAddress, dst: TAddress, proto: Transport) { this(THeader(src, dst, proto)) } override def getHeader(): THeader = header; override def getSource(): TAddress = header.src; override def getDestination(): TAddress = header.dst; override def getProtocol(): Transport = header.proto; }
For our PingPong example we shall have both Ping
and Pong
extend TMessage
instead of using the direct request-response. We also hard-code TCP
as protocol for both for now.
Ping
- Java
- Scala
-
object Ping { def apply(src: TAddress, dst: TAddress): Ping = new Ping(THeader(src, dst, Transport.TCP)); } class Ping(_header: THeader) extends TMessage(_header);
Pong
- Java
- Scala
-
object Pong { def apply(src: TAddress, dst: TAddress): Pong = new Pong(THeader(src, dst, Transport.TCP)); } class Pong(_header: THeader) extends TMessage(_header);
Now we need to change the Pinger
and the Ponger
to take a self address as init-parameter, require the Network
port and feed the new constructor arguments to the Ping
and Pong
classes. Since we are using the network port now for the ping-pong exchange, we can remove the requirement for the PingPongPort
. In this first example we shall make our lives somewhat simple and use the self address as source and destination for both classes. This corresponds to the local reflection example mentioned above.
Pinger
- Java
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import se.sics.kompics.network.Network import se.sics.kompics.timer.{CancelPeriodicTimeout, SchedulePeriodicTimeout, Timeout, Timer} import java.util.UUID object Pinger { class PingTimeout(_spt: SchedulePeriodicTimeout) extends Timeout(_spt); } class Pinger(init: Init[Pinger]) extends ComponentDefinition { import Pinger.PingTimeout; val Init(self: TAddress) = init; val net = requires[Network]; val timer = requires[Timer]; private var counter: Long = 0L; private var timerId: Option[UUID] = None; ctrl uponEvent { case _: Start => { val spt = new SchedulePeriodicTimeout(0, 1000); val timeout = new PingTimeout(spt); spt.setTimeoutEvent(timeout); trigger(spt -> timer); timerId = Some(timeout.getTimeoutId()); } } net uponEvent { case _: Pong => { counter += 1L; log.info(s"Got Pong #${counter}!"); if (counter > 10) { Kompics.asyncShutdown(); } } } timer uponEvent { case _: PingTimeout => { trigger(Ping(self, self) -> net); } } override def tearDown(): Unit = { timerId match { case Some(id) => { trigger(new CancelPeriodicTimeout(id) -> timer); } case None => () // no cleanup necessary } } }
Ponger
- Java
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import se.sics.kompics.network.Network class Ponger(init: Init[Ponger]) extends ComponentDefinition { val Init(self: TAddress) = init; val net = requires[Network]; private var counter: Long = 0L; net uponEvent { case ping: Ping => { counter += 1L; log.info(s"Got Ping #${counter}!"); trigger(Pong(self, ping.getSource()) -> net); } } }
Parent
Finally, we have to create the NettyNetwork
component in the Parent
class, and connect it appropriately.
- Java
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import se.sics.kompics.timer.Timer import se.sics.kompics.timer.java.JavaTimer import se.sics.kompics.network.Network import se.sics.kompics.network.netty.{NettyInit, NettyNetwork} class Parent(init: Init[Parent]) extends ComponentDefinition { val Init(self: TAddress) = init; val timer = create[JavaTimer]; val network = create[NettyNetwork](new NettyInit(self)); val pinger = create[Pinger](Init[Pinger](self)); val pinger2 = create[Pinger](Init[Pinger](self)); val ponger = create[Ponger](Init[Ponger](self)); connect[Timer](timer -> pinger); connect[Timer](timer -> pinger2); connect[Network](network -> pinger); connect[Network](network -> pinger2); connect[Network](network -> ponger); }
Main
As a preparation for later, we are going to take the port for self address as a commandline argument in the Main
class, and hardcode 127.0.0.1
as IP address.
- Java
- Scala
-
package sexamples.networking.pingpong import se.sics.kompics.sl._ import java.net.{InetAddress, UnknownHostException} object Main { def main(args: Array[String]): Unit = { val ip = InetAddress.getLocalHost(); val port = Integer.parseInt(args(0)); val self = TAddress(ip, port); Kompics.createAndStart(classOf[Parent], Init[Parent](self), 2); Kompics.waitForTermination(); } }
Execution
At this point we can compile and run the code with a slight variation to the usual commands, since we need the port as an argument now (pick a different port if 34567
is in use on your system):
runMain sexamples.networking.pingpong.Main 34567
Problems
We can see from the output that our setup technically works, but we are back to the problem of getting four Pong
s on our two Ping
s. The reason is, of course, that we are cheating. We are running all components in the same JVM, but we are not using proper virtual network support, yet. We also do not actually want to use virtual nodes here, but instead each Pinger
and Ponger
should be on a different host (or at least in a different JVM).
Before we can fix this, though, we have to fix another problem that we do not even see, yet: None of our messages are currently serialisable.