A Full Example – PingPong

As a proper example of the usage of Kompics Scala we are going to rewrite the Distributed Ping Pong example from section Cleanup: Config files, ClassMatchers, and Assembly.

SBT Setup

Set up your SBT project as follows, or simply download the whole project here.

$ ls -ohn
total 6
drwxr-xr-x  6 501   204B Apr  1 16:39 distPinger
drwxr-xr-x  6 501   204B Apr  1 16:39 distPonger
-rw-r--r--  1 501   861B Apr  1 16:40 pingpong.sbt
drwxr-xr-x  5 501   170B Apr  1 16:16 project
drwxr-xr-x  3 501   102B Mar 30 13:14 src

The two distPinger and distPinger folders will be used later for running the application with different config files.

You need a SBT file pingpong.sbt and a project/plugins.sbt similar to below:

pingpong.sbt
name := "Ping Pong"

organization := "se.sics.test"

version := "1.1"

scalaVersion := "2.11.12"

val kompicsVersion = "1.0.0"
val kompicsScalaVersion = "1.0.1"

resolvers += Resolver.bintrayRepo("kompics", "Maven")

libraryDependencies += "se.sics.kompics" %% "kompics-scala" % kompicsScalaVersion
libraryDependencies += "se.sics.kompics.basic" % "kompics-component-netty-network" % kompicsVersion
libraryDependencies += "se.sics.kompics.basic" % "kompics-component-java-timer" % kompicsVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "org.scala-lang.modules" %% "scala-pickling" % "0.10.1" // no Scala 2.12 version, yet
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"

mainClass in assembly := Some("se.sics.test.Main")
plugins.sbt
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

Messages

As before we’ll have to implement our own Address, Header and Msg types, as well as the Ping and Pong events. To allow for convenient pattern matching in our handlers later, we are going to use only case classes and case objects for these types. We are also going to make significant use of the fact that Scala allows us to define multiple types in the same file. While this makes types sometimes difficult to locate in a code base, it provides convenient grouping and locality while working and reduces the number of times one has to write the same set of imports. For convenience we also define the Ping and Pong events in the package object.

TestNetTypes.scala
package se.sics.test

import java.net.InetAddress;
import java.net.InetSocketAddress;
import se.sics.kompics.network.{ Address, Header, Msg, Transport };
import se.sics.kompics.KompicsEvent;

final case class TAddress(isa: InetSocketAddress) extends Address {
    override def asSocket(): InetSocketAddress = isa;
    override def getIp(): InetAddress = isa.getAddress;
    override def getPort(): Int = isa.getPort;
    override def sameHostAs(other: Address): Boolean = {
        this.isa.equals(other.asSocket());
    }
}

final case class THeader(src: TAddress, dst: TAddress, proto: Transport) extends Header[TAddress] {
    override def getDestination(): TAddress = dst;
    override def getProtocol(): Transport = proto;
    override def getSource(): TAddress = src;
}

final case class TMessage[C <: KompicsEvent](header: THeader, payload: C) extends Msg[TAddress, THeader] {
    override def getDestination(): TAddress = header.dst;
    override def getHeader(): THeader = header;
    override def getProtocol(): Transport = header.proto;
    override def getSource(): TAddress = header.src;
}

Note that we limited our TMessage class to subtypes of KompicsEvent, but we could just as well have allowed any kind of type that can be serialized as a payload.

package.scala
package se.sics

import se.sics.kompics.KompicsEvent

package object test {
    case object Ping extends KompicsEvent
    case object Pong extends KompicsEvent
}

Components

Now that we have messages, we can write the components that use them.

The Ponger is rather short, and as before it simply keeps a counter, which it increments every time it received a TMessage with the payload Ping. The underscore in the pattern means that we don’t care about the THeader which is the first argument of TMessage.

We also define the setup component PongerParent in the same file. There is no DSL call for create, yet, but note the new Scala signature for connect. The arrow here always goes from positive to negative, or from “service provider” to “service consumer”, if you like. The arrow is also used in the DSL trigger method, where it simply goes from event to port.

See also

There’s also a new public version for the APIs that are thread-safe, which is not bound to the ComponentDefinition context. These are defined in the se.sics.kompics.sl package object and imitate the Kola syntax somewhat: `!trigger` (event -> port) and `!connect`[PortType](positive -> negative)

Ponger.scala
package se.sics.test

import se.sics.kompics.sl._

import se.sics.kompics.Init
import se.sics.kompics.network.{Network, Transport}
import se.sics.kompics.network.netty.{ NettyNetwork, NettyInit }

class PongerParent extends ComponentDefinition {
    val self = cfg.getValue[TAddress]("pingpong.self");
    val network = create(classOf[NettyNetwork], new NettyInit(self));
    val ponger = create(classOf[Ponger], Init.NONE);

    connect[Network](network -> ponger);
}

class Ponger extends ComponentDefinition {

    val net = requires[Network];

    private var counter: Long = 0;
    private val self = cfg.getValue[TAddress]("pingpong.self");

    net uponEvent {
        case context@TMessage(_, Ping) => handle {
            counter += 1;
            logger.info("Got Ping #{}!", counter);
            trigger (TMessage(THeader(self, context.getSource, Transport.TCP), Pong) -> net)
        }
    }
}

Note

The method cfg returns a memoized Scala wrapper for Kompics’ config(). You can also access the Java API as before by using the latter method. The Scala API is more condensed, as it avoids the annoying {type}.class/classOf[{type}] calls. Note that there are no unchecked methods in the Scala API.

The Pinger is slightly more involved, but most of that really comes from the use of the Timer. As with the message types, the Timeout is defined as a case class to allow pattern matching in the handler. New in this section is the use of Scala DSL Init which allows any sequence of parameters, which can then be matched into internal fields in the component. The idea is simply to avoid the creation of a large number of init classes which act as nothing but reference carriers and are then discarded. Classes with such minimal reusability are in some way an abuse of the object-oriented paradigm and should be avoided in general.

The custom tearDown method, is very similar to the Java version, just using an Option instead of checking for null, which is better style.

Pinger.scala
package se.sics.test

import se.sics.kompics.sl._

import se.sics.kompics.network.{ Network, Transport }
import se.sics.kompics.network.netty.{ NettyNetwork, NettyInit }
import se.sics.kompics.timer.{ Timer, SchedulePeriodicTimeout, Timeout, CancelPeriodicTimeout }
import se.sics.kompics.timer.java.JavaTimer
import se.sics.kompics.Start

import java.util.UUID

class PingerParent extends ComponentDefinition {
    val self = cfg.getValue[TAddress]("pingpong.self");
    val ponger = cfg.getValue[TAddress]("pingpong.pinger.pongeraddr");
    
    val timer = create(classOf[JavaTimer], se.sics.kompics.Init.NONE);
    val network = create(classOf[NettyNetwork], new NettyInit(self));
    val pinger = create(classOf[Pinger], Init[Pinger](ponger));

    connect[Network](network -> pinger);
    connect[Timer](timer -> pinger);
}

case class PingTimeout(spt: SchedulePeriodicTimeout) extends Timeout(spt)

class Pinger(init: Init[Pinger]) extends ComponentDefinition {
    val net = requires[Network];
    val timer = requires[Timer];

    private val self = cfg.getValue[TAddress]("pingpong.self");
    private val ponger = init match {
        case Init(pongerAddr: TAddress) => pongerAddr
    }
    private var counter: Long = 0;
    private var timerId: Option[UUID] = None;

    ctrl uponEvent {
        case _: Start => handle {
            val period = cfg.getValue[Long]("pingpong.pinger.timeout");
            val spt = new SchedulePeriodicTimeout(0, period);
            val timeout = PingTimeout(spt);
            spt.setTimeoutEvent(timeout);
            trigger(spt -> timer);
            timerId = Some(timeout.getTimeoutId());
        }
    }

    net uponEvent {
        case context @ TMessage(_, Pong) => handle {
            counter += 1;
            logger.info("Got Pong #{}!", counter);
        }
    }

    timer uponEvent {
        case PingTimeout(_) => handle {
            trigger(TMessage(THeader(self, ponger, Transport.TCP), Ping) -> net);
        }
    }

    override def tearDown(): Unit = {
        timerId match {
            case Some(id) =>
                trigger(new CancelPeriodicTimeout(id) -> timer);
            case None => // nothing
        }
    }
}

Before we can get to the Main object, we are still missing two important things, that we had to take care of in Java as well: Serialization and Config Converters.

Serialization

In order to be able to send things over the network, we need to be able to serialize all our classes. In the Java version we used custom serializers for all our classes. In Scala, on the other hand, we have a very good library called Scala Pickling which can take care of all the case classes/objects and Scala primitives, leaving us only to deal with Java’s enums and InetSocketAddress.

The two custom “picklers” in the following code might seem a bit arcane, but this is a tutorial on Kompics Scala and not on Scala Pickling, so feel free to gloss over the details.

Note the use an object instead of a class for the PickleSerializer. We anyway only want a single instance of our serializers to be registered, so unless there is custom state that needs to be passed in, there is no reason to ever have more than one instance and thus use a class.

PickleSerializer.scala
package se.sics.test

import scala.language.implicitConversions

import se.sics.kompics.network.netty.serialization.Serializer;
import se.sics.kompics.network.Transport
import io.netty.buffer.ByteBuf;
import com.google.common.base.Optional;
import scala.pickling._
import scala.pickling.Defaults._
import scala.pickling.binary._
//import scala.pickling.json._ //alternatively JSON pickling

// Custom Serialization for TAddress (the case class itself is fine, but the InetSocketAddress is problematic)
object TAddressPickler extends Pickler[TAddress] with Unpickler[TAddress] with pickler.PrimitivePicklers with pickler.PrimitiveArrayPicklers {
    import java.net.InetSocketAddress
    import java.net.InetAddress

    override val tag = FastTypeTag[TAddress]

    override def pickle(picklee: TAddress, builder: PBuilder): Unit = {
        builder.hintTag(tag) // This is always required
        builder.beginEntry(picklee)
        builder.putField("ip", { fieldBuilder =>
            fieldBuilder.hintTag(byteArrayPickler.tag)
            fieldBuilder.hintStaticallyElidedType()
            byteArrayPickler.pickle(picklee.isa.getAddress.getAddress, fieldBuilder)
        })
        builder.putField("port", { fieldBuilder =>
            fieldBuilder.hintTag(intPickler.tag)
            fieldBuilder.hintStaticallyElidedType()
            intPickler.pickle(picklee.isa.getPort, fieldBuilder)
        })
        builder.endEntry()
    }

    override def unpickle(tag: String, reader: PReader): Any = {
        val ipReader = reader.readField("ip");
        ipReader.hintStaticallyElidedType();
        val ip = byteArrayPickler.unpickleEntry(ipReader).asInstanceOf[Array[Byte]];
        val portReader = reader.readField("port");
        portReader.hintStaticallyElidedType();
        val port = intPickler.unpickleEntry(portReader).asInstanceOf[Int];
        new TAddress(new InetSocketAddress(InetAddress.getByAddress(ip), port));
    }
}

// Custom Serialization for Transport because enum picklers don't work properly in this version
object TransportPickler extends Pickler[Transport] with Unpickler[Transport] with pickler.PrimitivePicklers {

    override val tag = FastTypeTag[Transport]

    override def pickle(picklee: Transport, builder: PBuilder): Unit = {
        builder.hintTag(tag) // This is always required
        builder.beginEntry(picklee)
        builder.putField("ordinal", { fieldBuilder =>
            fieldBuilder.hintTag(bytePickler.tag)
            fieldBuilder.hintStaticallyElidedType()
            bytePickler.pickle(picklee.ordinal().toByte, fieldBuilder)
        })
        builder.endEntry()
    }

    override def unpickle(tag: String, reader: PReader): Any = {
        val ordinalReader = reader.readField("ordinal");
        ordinalReader.hintStaticallyElidedType();
        val ordinal = bytePickler.unpickleEntry(ordinalReader).asInstanceOf[Byte].toInt;

        Transport.values()(ordinal);
    }
}

// serialize all object with Scala pickling
object PickleSerializer extends Serializer {
    override def identifier(): Int = 100;

    // register our custom picklers for use with reflection picklers
    implicit val addressPickler = TAddressPickler
    scala.pickling.runtime.GlobalRegistry.picklerMap += (addressPickler.tag.key -> (x => addressPickler))
    scala.pickling.runtime.GlobalRegistry.unpicklerMap += (addressPickler.tag.key -> addressPickler)
    implicit val transportPickler = TransportPickler
    scala.pickling.runtime.GlobalRegistry.picklerMap += (transportPickler.tag.key -> (x => transportPickler))
    scala.pickling.runtime.GlobalRegistry.unpicklerMap += (transportPickler.tag.key -> transportPickler)

    override def toBinary(o: Any, buf: ByteBuf): Unit = {
        val ser = o.pickle;
        val bytes = ser.value;
        buf.writeInt(bytes.length);
        buf.writeBytes(bytes);
    }

    override def fromBinary(buf: ByteBuf, hint: Optional[Object]): Object = {
        val len = buf.readInt();
        val ser = Array.ofDim[Byte](len);
        buf.readBytes(ser);
        val o = ser.unpickle[Any];
        o.asInstanceOf[Object];
    }

    // a nice implicit conversion between Guava's Optional and Scala's Option
    // in case anyone wants to call our serializer manually from Scala code
    implicit def optional2Option[T](o: Option[T]): Optional[T] = o match {
        case Some(x) => Optional.of(x)
        case None    => Optional.absent()
    }
}

Config Converters

The Kompics Scala package already adds a number of new Converters for Scala primitives, but we require one more, to deal with our TAddress properly.

There is no new Scala API for Converter so some of the code looks a bit Java:y. Notice especially the awkward `type` method, which is a workaround for the fact that type is a reserved keyword in Scala, but not in Java.

The @unchecked pattern simply surpresses a compiler warning that the generics of the pattern would be unchecked by the pattern matcher, as they are removed by erasure. The same thing happens in Java, just Scala is nice and tells you about it by default, and you can explicitly ignore the problem if you choose to.

TAddressConverter.scala
package se.sics.test

import java.net.{ InetAddress, InetSocketAddress };
import java.util.Map;
import se.sics.kompics.config.Conversions;
import se.sics.kompics.config.Converter;
import com.typesafe.scalalogging.LazyLogging;

object TAddressConverter extends Converter[TAddress] with LazyLogging {

    override def convert(o: Object): TAddress = {
        o match {
            case m: Map[String, Any] @unchecked => {
                try {
                    val hostname = Conversions.convert(m.get("host"), classOf[String]);
                    val port = Conversions.convert(m.get("port"), classOf[Integer]);
                    val ip = InetAddress.getByName(hostname);
                    return TAddress(new InetSocketAddress(ip, port));
                } catch {
                    case ex: Throwable =>
                        logger.error(s"Could not convert $m to TAddress", ex)
                        return null;
                }
            }
            case s: String => {
                try {
                    val ipport = s.split(":");
                    val ip = InetAddress.getByName(ipport(0));
                    val port = Integer.parseInt(ipport(1));
                    return TAddress(new InetSocketAddress(ip, port));
                } catch {
                    case ex: Throwable =>
                        logger.error(s"Could not convert '$s' to TAddress", ex)
                        return null;
                }
            }
        }
        logger.warn(s"Could not convert $o to TAddress");
        return null;
    }

    override def `type`(): Class[TAddress] = {
        return classOf[TAddress];
    }

}

Main Object and Running the Code

Now we are finally in a position to write the Main object. The code is nothing fancy, and is pretty much a one-to-one translation of the Java version, just with one serializer less.

Main.scala
package se.sics.test

import se.sics.kompics.network.netty.serialization.Serializers
import se.sics.kompics.config.Conversions
import se.sics.kompics.Kompics
import com.typesafe.scalalogging.StrictLogging

object Main extends StrictLogging {

    // register serializer
    Serializers.register(PickleSerializer, "pickleS");
    // map types to serializer
    Serializers.register(classOf[TAddress], "pickleS");
    Serializers.register(classOf[THeader], "pickleS");
    Serializers.register(classOf[TMessage[_]], "pickleS");
    // conversions
    Conversions.register(TAddressConverter);

    def main(args: Array[String]): Unit = {
        if (args.length == 1) {
            try {
                if (args(0).equalsIgnoreCase("ponger")) {
                    Kompics.createAndStart(classOf[PongerParent], 2);
                    System.out.println("Starting Ponger");
                    Kompics.waitForTermination();
                    // no shutdown this time...act like a server and keep running until externally exited
                } else if (args(0).equalsIgnoreCase("pinger")) {
                    Kompics.createAndStart(classOf[PingerParent], 2);
                    System.out.println("Starting Pinger");
                   
                    Thread.sleep(10000);
                    
                    Kompics.shutdown();
                    System.exit(0);
                }
            } catch {
                case e: Throwable =>
                    logger.error("Error while pinging!", e);
                    System.exit(1);
            }
        } else {
            System.err.println("Invalid number of parameters");
            System.exit(1);
        }
    }
}

Also include the following reference.conf in src/main/resources and a logback.xml of your choosing (for example the one from the Hello World in Kompics Scala example) in the same folder.

pingpong {
	self {
		host = "127.0.0.1"
		port = 34567
	}
	pinger {
		timeout = 1000
		pongeraddr {
			host = "127.0.0.1"
			port = 45678
		}
	}
}

Now that we have all of that we can finally compile and assemble the project:

sbt
> compile
> assembly

The move the resulting assembled jar from the target/scala_2.11 folder both to distPinger and distPonger (or symlink it, whichever you prefer).

All we need now is some quick configuration files and maybe a nice script to run each version.

distPonger/application.conf
pingpong.self.host = "127.0.0.1"

The Ponger simply uses the default port 34567.

distPinger/application.conf
pingpong.self.host = "127.0.0.1"
pingpong.self.port = 45678
// pinger only
pingpong.pinger.pongeraddr.host = "127.0.0.1"
pingpong.pinger.pongeraddr.port = 34567
distPonger/ponger.sh
#!/bin/bash

java -Dconfig.file=./application.conf -jar Ping\ Pong-assembly-1.1.jar ponger
distPinger/pinger.sh
#!/bin/bash

java -Dconfig.file=./application.conf -jar Ping\ Pong-assembly-1.1.jar pinger

Then simply run the ponger.sh first, and then the pinger.sh.

You can download the whole project here.