April 29, 2016 | Software Consultancy
In this post, I’ll demonstrate an alternative API which uses some of the advanced language features of the new Kotlin language from Jetbrains. As Kotlin is a JVM-based language, it interoperates seamlessly with Concursus’s Java 8 classes; however, it also offers powerful ways to extend their functionality.
WRITTEN BY
In my last post on Concursus, I showed how an Event
is constructed using a TupleSchema
which represents the names and types of the event’s expected parameters, and how an EventTypeMatcher
is used to map serialised event data back into the Java type system. The reflection-based method mapping demonstrated in the previous two posts (part 1 and part 2) builds on these mechanisms to provide a convenient Java 8 API for emitting and receiving events.
In this post, I’ll demonstrate an alternative API which uses some of the advanced language features of the new Kotlin language from Jetbrains. As Kotlin is a JVM-based language, it interoperates seamlessly with Concursus’s Java 8 classes; however, it also offers powerful ways to extend their functionality.
A sealed class in Kotlin is an abstract class that can only be extended by subclasses defined as nested classes within the sealed class itself – a kind of souped-up Enum
. This means that a compiler can always tell whether an expression which matches objects of the sealed class by type covers all of the possible cases. Sealed classes are ideal for defining algebraic data types, such as the following:
sealed class BinaryTree {
class EmptyNode : BinaryTree()
class Branch(
val left: BinaryTree,
val value: T,
val right: BinaryTree): BinaryTree()
}
When we want to write an operation on BinaryTree
, we can use Kotlin’s when
syntax to match against its nested types:
fun toList(tree: BinaryTree): List =
when(tree) {
is EmptyNode -> emptyList()
is Branch -> toList(tree.left) + tree.value + toList(tree.right)
}
This suggests an alternative “encoding” for a collection of events addressed to the same type of object:
sealed class LightbulbEvent {
class Created(val wattage: Int) : LightbulbEvent()
class ScrewedIn(val location: String) : LightbulbEvent()
class Unscrewed() : LightbulbEvent()
class SwitchedOn() : LightbulbEvent()
class SwitchedOff() : LightbulbEvent()
}
A handler for LightbulbEvent
s could then use the where
syntax to match on the type of event it had received:
val message = when (event) {
is Created -> "Lightbulb created with wattage " + event.wattage
is ScrewedIn -> "Lightbulb screwed in @ " + event.location
is Unscrewed -> "Lightbulb unscrewed"
is SwitchedOn -> "Lightbulb switched on"
is SwitchedOff -> "Lightbulb switched off"
}
In fact, we need a little more information to represent a complete Concursus event:
data class KEvent(val timestamp: StreamTimestamp, val aggregateId : String, val data: E) {
fun toEvent() : Event = // reflective mapping to Concursus Event happens here
}
but we now no longer have to depend on the convention that event method definitions (in the Java 8 API) must have timestamp and id values for their first two parameters.
Companion objects enable us to add the equivalent of Java’s static methods to a Kotlin class, with the added advantage that companion objects can be passed around as objects in their own right. We can equip the LightbulbEvent
class with a create
method that returns a Concursus Event
like so:
open class KEventFactory {
fun create(timestamp: StreamTimestamp, aggregateId : String, data: E) : Event =
KEvent(timestamp, aggregateId, data).toEvent()
}
sealed class LightbulbEvent {
companion object Factory : KEventFactory()
class Created(val wattage: Int) : LightbulbEvent()
class ScrewedIn(val location: String) : LightbulbEvent()
class Unscrewed() : LightbulbEvent()
class SwitchedOn() : LightbulbEvent()
class SwitchedOff() : LightbulbEvent()
}
val event: Event = LightbulbEvent.create(StreamTimestamp.now(), "id1", Created(60))
We can make use of the fact that companion objects can be passed around, to create a function that collects multiple events into a batch:
class KEventWriter(val factory: KEventFactory, val receiver: (Event) -> Unit) {
fun write(timestamp: StreamTimestamp, aggregateId: String, data: E): Unit =
receiver(factory.create(timestamp, aggregateId, data))
}
fun createBatch(factory: KEventFactory, writeEvents: KEventWriter.() -> Unit): List {
val eventCollector = mutableListOf()
KEventWriter(factory, { eventCollector.add(it) }).writeEvents()
return eventCollector.toList()
}
val timestamp = StreamTimestamp.now()
val events = createBatch(LightbulbEvent.Factory) {
write(timestamp, "id1", Created(60))
write(timestamp.plus(1, MINUTES), "id1", ScrewedIn("hallway"))
write(timestamp.plus(2, MINUTES), "id1", SwitchedOn())
}
Here, the companion object which provides LightbulbEvent
‘s “static” factory method, also functions as a factory object that KEventWriter
can use to create new events. Better still, we can use Kotlin’s support for function literals with receivers to create a function that takes a block in which the KEventWriter
which adds events to the batch is the implicit receiver of every write
method invocation. This results in a syntax with the fluency of a “method-chaining fluent API”, but without the need to actually chain methods together.
The Java 8 implementation of state classes in Concursus routes events to methods which mutate an object. In Kotlin, we can create a fully immutable alternative using data classes. To begin with, we need a way to define a state transition as a function of a possibly existing state and an event:
interface Transitions<S, E : Any> {
fun update(previousState: S?, event: KEvent): S? =
if (previousState == null) initial(event.timestamp, event.data)
else next(previousState, event.timestamp, event.data)
fun initial(timestamp: StreamTimestamp, data: E): S?
fun next(previousState: S, timestamp: StreamTimestamp, data: E): S
fun runAll(events: List, initialState: S? = null): S? {
var workingState: S? = initialState
events.forEach { workingState = update(workingState, it) }
return workingState
}
}
(We cheat a little on immutability here by using a mutable workingState
variable inside runAll
, instead of defining a fold over the list of events). The update
method determines whether an event should be considered as an “initial” event that might create a new state from nothing, or a “next” event that creates a new state from an existing state; a class implementing Transitions
must provide implementations of both initial
and next
. We now create such an implementation for a data class representing the state of a lightbulb:
data class LightbulbState(val wattage: Int, val location: String?, val isSwitchedOn: Boolean,
val switchedOnAt: Instant?, val millisecondsActive: Long) {
companion object LightbulbTransitions : Transitions<LightbulbState, LightbulbEvent> {
override fun initial(timestamp: StreamTimestamp, data: LightbulbEvent): LightbulbState? = when(data) {
is Created -> LightbulbState(data.wattage, null, false, null, 0)
else -> null
}
override fun next(
previousState: LightbulbState,
timestamp: StreamTimestamp,
data: LightbulbEvent): LightbulbState = when(data) {
is Created -> previousState
is ScrewedIn -> previousState.copy(location = data.location)
is Unscrewed -> switchedOff(previousState.copy(location = null), timestamp)
is SwitchedOn -> switchedOn(previousState, timestamp)
is SwitchedOff -> switchedOff(previousState, timestamp)
}
private fun switchedOn(state: LightbulbState, timestamp: StreamTimestamp) =
if (state.isSwitchedOn) state
else state.copy(isSwitchedOn = true, switchedOnAt = timestamp.timestamp)
private fun switchedOff(state: LightbulbState, timestamp: StreamTimestamp): LightbulbState =
if (!state.isSwitchedOn) state
else state.copy(
isSwitchedOn = false,
switchedOnAt = null,
millisecondsActive = state.millisecondsActive +
millisSwitchedOn(state, timestamp.timestamp))
private fun millisSwitchedOn(state: LightbulbState, timestamp: Instant): Long =
state.switchedOnAt?.let { Duration.between(it, timestamp).toMillis() } ?: 0
}
fun millisecondsActiveAt(time: Instant): Long = millisecondsActive + millisSwitchedOn(this, time)
fun kwhAt(time: Instant): Double =
wattage.toDouble() / 1000 * (millisecondsActiveAt(time).toDouble() / 3600000)
}
Each transition creates a copy of the current data class instance, selectively replacing the values of updated fields. We can now play a series of events to the LightbulbState
class, and get back an instance which we can use to calculate the power consumption of the lightbulb to date:
val kevents = events.map { it.toKEvent(LightbulbEvent::class) }
val state = LightbulbTransitions.runAll(kevents)
val kwh = state?.kwhAt(Instant.now()) ?: 0
Note that we first have to map our Concursus Events
into KEvents
, so that the transition code can interpret them. Here we do this with an extension method added to Event
– which brings us to one final trick Kotlin can perform for us…
In Concursus’s Java 8 code, core classes such as Event
and EventSource
are defined without reference to any mapping layer, which means that when we want an EventSource
that knows how to map event types to methods on an interface and dispatch them to a handler object, we have to create a DispatchingEventSource
object that wraps an existing EventSource
:
DispatchingEventSource dispatchingEventSource = DispatchingEventSource.dispatching(
eventSource,
LightbulbEvents.class);
This isn’t an enormous hardship, but it’s even nicer to be able to define an extension method in Kotlin that is directly added to EventSource
, so that we can simply write:
val messages = eventSource.replaying(LightbulbEvent::class, lightbulbId)
.inAscendingCausalOrder()
.collectAll { event ->
event.data.let {
when (it) {
is Created -> "Lightbulb created with wattage " + it.wattage
is ScrewedIn -> "Lightbulb screwed in @ " + it.location
is Unscrewed -> "Lightbulb unscrewed"
is SwitchedOn -> "Lightbulb switched on"
is SwitchedOff -> "Lightbulb switched off"
}
} + " at " + event.timestamp.timestamp
}
In this way, “outer” modules can progressively layer functionality on top of “inner” modules, adding a sprinkle of reflection-based magic to the more pedestrian core implementation.
While Kotlin clearly has a lot to offer us here, the modular design of Concursus means that it should be straightforward to build custom APIs making use of the syntax and idioms of other JVM languages, such as Scala, Clojure and Ceylon.
This blog is written exclusively by the OpenCredo team. We do not accept external contributions.
Agile India 2022 – Systems Thinking for Happy Staff and Elated Customers
Watch Simon Copsey’s talk from the Agile India Conference on “Systems Thinking for Happy Staff and Elated Customers.”Lean-Agile Delivery & Coaching Network and Digital Transformation Meetup
Watch Simon Copsey’s talk from the Lean-Agile Delivery & Coaching Network and Digital Transformation Meetup on “Seeing Clearly in Complexity” where he explores the Current…When Your Product Teams Should Aim to be Inefficient – Part 2
Many businesses advocate for efficiency, but this is not always the right goal. In part one of this article, we explored how product teams can…