A Full Example – PingPong¶
As a proper example of the usage of Kompics Scala we are going to rewrite the Distributed Ping Pong example from section Cleanup: Config files, ClassMatchers, and Assembly.
SBT Setup¶
Set up your SBT project as follows, or simply download the whole project here
.
$ ls -ohn
total 6
drwxr-xr-x 6 501 204B Apr 1 16:39 distPinger
drwxr-xr-x 6 501 204B Apr 1 16:39 distPonger
-rw-r--r-- 1 501 861B Apr 1 16:40 pingpong.sbt
drwxr-xr-x 5 501 170B Apr 1 16:16 project
drwxr-xr-x 3 501 102B Mar 30 13:14 src
The two distPinger
and distPinger
folders will be used later for running the application with different config files.
You need a SBT file pingpong.sbt
and a project/plugins.sbt
similar to below:
name := "Ping Pong"
organization := "se.sics.test"
version := "1.1"
scalaVersion := "2.11.12"
val kompicsVersion = "1.0.0"
val kompicsScalaVersion = "1.0.1"
resolvers += Resolver.bintrayRepo("kompics", "Maven")
libraryDependencies += "se.sics.kompics" %% "kompics-scala" % kompicsScalaVersion
libraryDependencies += "se.sics.kompics.basic" % "kompics-component-netty-network" % kompicsVersion
libraryDependencies += "se.sics.kompics.basic" % "kompics-component-java-timer" % kompicsVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "org.scala-lang.modules" %% "scala-pickling" % "0.10.1" // no Scala 2.12 version, yet
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"
mainClass in assembly := Some("se.sics.test.Main")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
Messages¶
As before we’ll have to implement our own Address
, Header
and Msg
types, as well as the Ping
and Pong
events. To allow for convenient pattern matching in our handlers later, we are going to use only case classes and case objects for these types. We are also going to make significant use of the fact that Scala allows us to define multiple types in the same file. While this makes types sometimes difficult to locate in a code base, it provides convenient grouping and locality while working and reduces the number of times one has to write the same set of imports. For convenience we also define the Ping
and Pong
events in the package object.
package se.sics.test
import java.net.InetAddress;
import java.net.InetSocketAddress;
import se.sics.kompics.network.{ Address, Header, Msg, Transport };
import se.sics.kompics.KompicsEvent;
final case class TAddress(isa: InetSocketAddress) extends Address {
override def asSocket(): InetSocketAddress = isa;
override def getIp(): InetAddress = isa.getAddress;
override def getPort(): Int = isa.getPort;
override def sameHostAs(other: Address): Boolean = {
this.isa.equals(other.asSocket());
}
}
final case class THeader(src: TAddress, dst: TAddress, proto: Transport) extends Header[TAddress] {
override def getDestination(): TAddress = dst;
override def getProtocol(): Transport = proto;
override def getSource(): TAddress = src;
}
final case class TMessage[C <: KompicsEvent](header: THeader, payload: C) extends Msg[TAddress, THeader] {
override def getDestination(): TAddress = header.dst;
override def getHeader(): THeader = header;
override def getProtocol(): Transport = header.proto;
override def getSource(): TAddress = header.src;
}
Note that we limited our TMessage
class to subtypes of KompicsEvent
, but we could just as well have allowed any kind of type that can be serialized as a payload.
package se.sics
import se.sics.kompics.KompicsEvent
package object test {
case object Ping extends KompicsEvent
case object Pong extends KompicsEvent
}
Components¶
Now that we have messages, we can write the components that use them.
The Ponger
is rather short, and as before it simply keeps a counter, which it increments every time it received a TMessage
with the payload Ping
. The underscore in the pattern means that we don’t care about the THeader
which is the first argument of TMessage
.
We also define the setup component PongerParent
in the same file. There is no DSL call for create
, yet, but note the new Scala signature for connect
. The arrow here always goes from positive to negative, or from “service provider” to “service consumer”, if you like. The arrow is also used in the DSL trigger
method, where it simply goes from event to port.
See also
There’s also a new public version for the APIs that are thread-safe, which is not bound to the ComponentDefinition
context. These are defined in the se.sics.kompics.sl
package object and imitate the Kola syntax somewhat: `!trigger` (event -> port)
and `!connect`[PortType](positive -> negative)
package se.sics.test
import se.sics.kompics.sl._
import se.sics.kompics.Init
import se.sics.kompics.network.{Network, Transport}
import se.sics.kompics.network.netty.{ NettyNetwork, NettyInit }
class PongerParent extends ComponentDefinition {
val self = cfg.getValue[TAddress]("pingpong.self");
val network = create(classOf[NettyNetwork], new NettyInit(self));
val ponger = create(classOf[Ponger], Init.NONE);
connect[Network](network -> ponger);
}
class Ponger extends ComponentDefinition {
val net = requires[Network];
private var counter: Long = 0;
private val self = cfg.getValue[TAddress]("pingpong.self");
net uponEvent {
case context@TMessage(_, Ping) => handle {
counter += 1;
logger.info("Got Ping #{}!", counter);
trigger (TMessage(THeader(self, context.getSource, Transport.TCP), Pong) -> net)
}
}
}
Note
The method cfg
returns a memoized Scala wrapper for Kompics’ config()
. You can also access the Java API as before by using the latter method. The Scala API is more condensed, as it avoids the annoying {type}.class
/classOf[{type}]
calls. Note that there are no unchecked methods in the Scala API.
The Pinger
is slightly more involved, but most of that really comes from the use of the Timer
. As with the message types, the Timeout
is defined as a case class to allow pattern matching in the handler. New in this section is the use of Scala DSL Init
which allows any sequence of parameters, which can then be matched into internal fields in the component. The idea is simply to avoid the creation of a large number of init classes which act as nothing but reference carriers and are then discarded. Classes with such minimal reusability are in some way an abuse of the object-oriented paradigm and should be avoided in general.
The custom tearDown
method, is very similar to the Java version, just using an Option
instead of checking for null
, which is better style.
package se.sics.test
import se.sics.kompics.sl._
import se.sics.kompics.network.{ Network, Transport }
import se.sics.kompics.network.netty.{ NettyNetwork, NettyInit }
import se.sics.kompics.timer.{ Timer, SchedulePeriodicTimeout, Timeout, CancelPeriodicTimeout }
import se.sics.kompics.timer.java.JavaTimer
import se.sics.kompics.Start
import java.util.UUID
class PingerParent extends ComponentDefinition {
val self = cfg.getValue[TAddress]("pingpong.self");
val ponger = cfg.getValue[TAddress]("pingpong.pinger.pongeraddr");
val timer = create(classOf[JavaTimer], se.sics.kompics.Init.NONE);
val network = create(classOf[NettyNetwork], new NettyInit(self));
val pinger = create(classOf[Pinger], Init[Pinger](ponger));
connect[Network](network -> pinger);
connect[Timer](timer -> pinger);
}
case class PingTimeout(spt: SchedulePeriodicTimeout) extends Timeout(spt)
class Pinger(init: Init[Pinger]) extends ComponentDefinition {
val net = requires[Network];
val timer = requires[Timer];
private val self = cfg.getValue[TAddress]("pingpong.self");
private val ponger = init match {
case Init(pongerAddr: TAddress) => pongerAddr
}
private var counter: Long = 0;
private var timerId: Option[UUID] = None;
ctrl uponEvent {
case _: Start => handle {
val period = cfg.getValue[Long]("pingpong.pinger.timeout");
val spt = new SchedulePeriodicTimeout(0, period);
val timeout = PingTimeout(spt);
spt.setTimeoutEvent(timeout);
trigger(spt -> timer);
timerId = Some(timeout.getTimeoutId());
}
}
net uponEvent {
case context @ TMessage(_, Pong) => handle {
counter += 1;
logger.info("Got Pong #{}!", counter);
}
}
timer uponEvent {
case PingTimeout(_) => handle {
trigger(TMessage(THeader(self, ponger, Transport.TCP), Ping) -> net);
}
}
override def tearDown(): Unit = {
timerId match {
case Some(id) =>
trigger(new CancelPeriodicTimeout(id) -> timer);
case None => // nothing
}
}
}
Before we can get to the Main
object, we are still missing two important things, that we had to take care of in Java as well: Serialization and Config Converters.
Serialization¶
In order to be able to send things over the network, we need to be able to serialize all our classes. In the Java version we used custom serializers for all our classes. In Scala, on the other hand, we have a very good library called Scala Pickling which can take care of all the case classes/objects and Scala primitives, leaving us only to deal with Java’s enums and InetSocketAddress
.
The two custom “picklers” in the following code might seem a bit arcane, but this is a tutorial on Kompics Scala and not on Scala Pickling, so feel free to gloss over the details.
Note the use an object instead of a class for the PickleSerializer
. We anyway only want a single instance of our serializers to be registered, so unless there is custom state that needs to be passed in, there is no reason to ever have more than one instance and thus use a class.
package se.sics.test
import scala.language.implicitConversions
import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.Transport
import io.netty.buffer.ByteBuf;
import com.google.common.base.Optional;
import scala.pickling._
import scala.pickling.Defaults._
import scala.pickling.binary._
//import scala.pickling.json._ //alternatively JSON pickling
// Custom Serialization for TAddress (the case class itself is fine, but the InetSocketAddress is problematic)
object TAddressPickler extends Pickler[TAddress] with Unpickler[TAddress] with pickler.PrimitivePicklers with pickler.PrimitiveArrayPicklers {
import java.net.InetSocketAddress
import java.net.InetAddress
override val tag = FastTypeTag[TAddress]
override def pickle(picklee: TAddress, builder: PBuilder): Unit = {
builder.hintTag(tag) // This is always required
builder.beginEntry(picklee)
builder.putField("ip", { fieldBuilder =>
fieldBuilder.hintTag(byteArrayPickler.tag)
fieldBuilder.hintStaticallyElidedType()
byteArrayPickler.pickle(picklee.isa.getAddress.getAddress, fieldBuilder)
})
builder.putField("port", { fieldBuilder =>
fieldBuilder.hintTag(intPickler.tag)
fieldBuilder.hintStaticallyElidedType()
intPickler.pickle(picklee.isa.getPort, fieldBuilder)
})
builder.endEntry()
}
override def unpickle(tag: String, reader: PReader): Any = {
val ipReader = reader.readField("ip");
ipReader.hintStaticallyElidedType();
val ip = byteArrayPickler.unpickleEntry(ipReader).asInstanceOf[Array[Byte]];
val portReader = reader.readField("port");
portReader.hintStaticallyElidedType();
val port = intPickler.unpickleEntry(portReader).asInstanceOf[Int];
new TAddress(new InetSocketAddress(InetAddress.getByAddress(ip), port));
}
}
// Custom Serialization for Transport because enum picklers don't work properly in this version
object TransportPickler extends Pickler[Transport] with Unpickler[Transport] with pickler.PrimitivePicklers {
override val tag = FastTypeTag[Transport]
override def pickle(picklee: Transport, builder: PBuilder): Unit = {
builder.hintTag(tag) // This is always required
builder.beginEntry(picklee)
builder.putField("ordinal", { fieldBuilder =>
fieldBuilder.hintTag(bytePickler.tag)
fieldBuilder.hintStaticallyElidedType()
bytePickler.pickle(picklee.ordinal().toByte, fieldBuilder)
})
builder.endEntry()
}
override def unpickle(tag: String, reader: PReader): Any = {
val ordinalReader = reader.readField("ordinal");
ordinalReader.hintStaticallyElidedType();
val ordinal = bytePickler.unpickleEntry(ordinalReader).asInstanceOf[Byte].toInt;
Transport.values()(ordinal);
}
}
// serialize all object with Scala pickling
object PickleSerializer extends Serializer {
override def identifier(): Int = 100;
// register our custom picklers for use with reflection picklers
implicit val addressPickler = TAddressPickler
scala.pickling.runtime.GlobalRegistry.picklerMap += (addressPickler.tag.key -> (x => addressPickler))
scala.pickling.runtime.GlobalRegistry.unpicklerMap += (addressPickler.tag.key -> addressPickler)
implicit val transportPickler = TransportPickler
scala.pickling.runtime.GlobalRegistry.picklerMap += (transportPickler.tag.key -> (x => transportPickler))
scala.pickling.runtime.GlobalRegistry.unpicklerMap += (transportPickler.tag.key -> transportPickler)
override def toBinary(o: Any, buf: ByteBuf): Unit = {
val ser = o.pickle;
val bytes = ser.value;
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
}
override def fromBinary(buf: ByteBuf, hint: Optional[Object]): Object = {
val len = buf.readInt();
val ser = Array.ofDim[Byte](len);
buf.readBytes(ser);
val o = ser.unpickle[Any];
o.asInstanceOf[Object];
}
// a nice implicit conversion between Guava's Optional and Scala's Option
// in case anyone wants to call our serializer manually from Scala code
implicit def optional2Option[T](o: Option[T]): Optional[T] = o match {
case Some(x) => Optional.of(x)
case None => Optional.absent()
}
}
Config Converters¶
The Kompics Scala package already adds a number of new Converter
s for Scala primitives, but we require one more, to deal with our TAddress
properly.
There is no new Scala API for Converter
so some of the code looks a bit Java:y. Notice especially the awkward `type`
method, which is a workaround for the fact that type
is a reserved keyword in Scala, but not in Java.
The @unchecked
pattern simply surpresses a compiler warning that the generics of the pattern would be unchecked by the pattern matcher, as they are removed by erasure. The same thing happens in Java, just Scala is nice and tells you about it by default, and you can explicitly ignore the problem if you choose to.
package se.sics.test
import java.net.{ InetAddress, InetSocketAddress };
import java.util.Map;
import se.sics.kompics.config.Conversions;
import se.sics.kompics.config.Converter;
import com.typesafe.scalalogging.LazyLogging;
object TAddressConverter extends Converter[TAddress] with LazyLogging {
override def convert(o: Object): TAddress = {
o match {
case m: Map[String, Any] @unchecked => {
try {
val hostname = Conversions.convert(m.get("host"), classOf[String]);
val port = Conversions.convert(m.get("port"), classOf[Integer]);
val ip = InetAddress.getByName(hostname);
return TAddress(new InetSocketAddress(ip, port));
} catch {
case ex: Throwable =>
logger.error(s"Could not convert $m to TAddress", ex)
return null;
}
}
case s: String => {
try {
val ipport = s.split(":");
val ip = InetAddress.getByName(ipport(0));
val port = Integer.parseInt(ipport(1));
return TAddress(new InetSocketAddress(ip, port));
} catch {
case ex: Throwable =>
logger.error(s"Could not convert '$s' to TAddress", ex)
return null;
}
}
}
logger.warn(s"Could not convert $o to TAddress");
return null;
}
override def `type`(): Class[TAddress] = {
return classOf[TAddress];
}
}
Main Object and Running the Code¶
Now we are finally in a position to write the Main
object. The code is nothing fancy, and is pretty much a one-to-one translation of the Java version, just with one serializer less.
package se.sics.test
import se.sics.kompics.network.netty.serialization.Serializers
import se.sics.kompics.config.Conversions
import se.sics.kompics.Kompics
import com.typesafe.scalalogging.StrictLogging
object Main extends StrictLogging {
// register serializer
Serializers.register(PickleSerializer, "pickleS");
// map types to serializer
Serializers.register(classOf[TAddress], "pickleS");
Serializers.register(classOf[THeader], "pickleS");
Serializers.register(classOf[TMessage[_]], "pickleS");
// conversions
Conversions.register(TAddressConverter);
def main(args: Array[String]): Unit = {
if (args.length == 1) {
try {
if (args(0).equalsIgnoreCase("ponger")) {
Kompics.createAndStart(classOf[PongerParent], 2);
System.out.println("Starting Ponger");
Kompics.waitForTermination();
// no shutdown this time...act like a server and keep running until externally exited
} else if (args(0).equalsIgnoreCase("pinger")) {
Kompics.createAndStart(classOf[PingerParent], 2);
System.out.println("Starting Pinger");
Thread.sleep(10000);
Kompics.shutdown();
System.exit(0);
}
} catch {
case e: Throwable =>
logger.error("Error while pinging!", e);
System.exit(1);
}
} else {
System.err.println("Invalid number of parameters");
System.exit(1);
}
}
}
Also include the following reference.conf
in src/main/resources
and a logback.xml
of your choosing (for example the one from the Hello World in Kompics Scala example) in the same folder.
pingpong {
self {
host = "127.0.0.1"
port = 34567
}
pinger {
timeout = 1000
pongeraddr {
host = "127.0.0.1"
port = 45678
}
}
}
Now that we have all of that we can finally compile and assemble the project:
sbt
> compile
> assembly
The move the resulting assembled jar from the target/scala_2.11
folder both to distPinger
and distPonger
(or symlink it, whichever you prefer).
All we need now is some quick configuration files and maybe a nice script to run each version.
pingpong.self.host = "127.0.0.1"
The Ponger
simply uses the default port 34567.
pingpong.self.host = "127.0.0.1"
pingpong.self.port = 45678
// pinger only
pingpong.pinger.pongeraddr.host = "127.0.0.1"
pingpong.pinger.pongeraddr.port = 34567
#!/bin/bash
java -Dconfig.file=./application.conf -jar Ping\ Pong-assembly-1.1.jar ponger
#!/bin/bash
java -Dconfig.file=./application.conf -jar Ping\ Pong-assembly-1.1.jar pinger
Then simply run the ponger.sh
first, and then the pinger.sh
.
You can download the whole project here
.