Open Credo

April 27, 2016 | Software Consultancy

The Concursus Programming Model: Events

Concursus is an open source Java 8 framework for building distributed systems using CQRS and event sourcing patterns. One of its major differences from other such frameworks (such as Jdon, Axon and ES4J) is that it eschews a programming model where each event type is represented by a separate Java class, instead mapping event types to methods on interfaces.

 

WRITTEN BY

Dominic Fox

Dominic Fox

The Concursus Programming Model: Events

Here is an example event-emitting interface, which defines the events for a lightbulb:

@HandlesEventsFor("lightbulb")
public interface LightbulbEvents {
    @Initial
    void created(StreamTimestamp timestamp, String id, int wattage);
    void screwedIn(StreamTimestamp timestamp, String id, String location);
    void switchedOn(StreamTimestamp timestamp, String id);
    void switchedOff(StreamTimestamp timestamp, String id);
    void unscrewed(StreamTimestamp timestamp, String id);
    @Terminal
    void deleted(StreamTimestamp timestamp, String id);
}

The conventions used here as as follows:

  • The interface itself is annotated with @HandlesEventsFor, and the annotation’s value indicates the kind of entity it defines events for.
  • The method name identifies the type of event to emit (although this can be overriden with a @Name annotation).
  • Each method returns void: calling it will emit an event, which will be processed downstream by being written into an event log and/or propagated to event handlers.
  • The first parameter of each method is a StreamTimestamp, which is a combination of a millisecond-resolution timestamp and a “stream identifier” which can be used to distinguish between events from different sources which occur during the same millisecond.
  • The second parameter of each method is a String identifier, which identifies the identity to which the event occurred.
  • The remaining parameters contain the event’s data; for example, a lightbulb is always created with a specific wattage, which never changes.
  • Events which cause an entity to come into existence are annotated with @Initial; events which must be the last thing to happen to an entity are annotated with @Terminal.

In order to use this interface to emit events, we need an implementation of the interface which translates each method call into a bundle of data representing an event, and passes this data on to something which can process it. We could write one manually, but as all of the information we need to define an event type is contained in the class definition and method signature for each method, it’s easy to create a proxy object that “interprets” each method call, using this information to create an event based on the method signature and arguments.

Concursus represents each event’s data as an instance of the class Event, and event handlers are objects which implement Consumer. Here’s a simple example, where we create an event handler that just writes events out to the console, create a proxy for the LightbulbEvents interface which passes events to this handler, then call methods on the proxy to generate events:

// Create an EventOutChannel that simply prints events to the command line
Consumer eventHandler = System.out::println;

// Create a proxy that sends events to the eventHandler.
LightbulbEvents proxy = EventEmittingProxy.proxying(eventHandler, LightbulbEvents.class);

// Send an event via the proxy.
proxy.created(StreamTimestamp.now(), "id1", 60);

This will output a String like the following, representing the event’s data:

lightbulb:id1 created_0
at 2016-04-215:24.324Z/
with lightbulb/created_0{wattage=60}

The same mapping in reverse can be used to dispatch an Event to a method on an object that implements the LightbulbEvents interface:

LightbulbEvents mockHandler = Mockito.mock(LightbulbEvents.class);
Consumer eventHandler = DispatchingEventOutChannel.toHandler(LightbulbEvents.class, mockHandler);

LightbulbEvents proxy = EventEmittingProxy.proxying(eventHandler, LightbulbEvents.class);
StreamTimestamp ts = StreamTimestamp.now();
proxy.created(ts, "id1", 60);

Mockito.verify(mockHandler).created(ts, "id1", 60);

In practice we will normally want to dispatch events to an event store of some kind, or to queue them up for later processing. Concursus provides mechanisms to store events in Redis or Cassandra, or to serialise them to JSON so they can be transmitted via HTTP or placed in an AMQP or Kafka topic. Given an event store, we can query for the event history associated with a particular entity type/id, and replay that history into a handler that extracts the information we need. Here is the previous example, rewritten to write events into an in-memory event store and then retrieve them by querying:

LightbulbEvents mockHandler = Mockito.mock(LightbulbEvents.class);
InMemoryEventStore eventStore = InMemoryEventStore.empty();
Consumer eventHandler = eventStore.toEventOutChannel()
.andThen(System.out::println);

LightbulbEvents proxy = EventEmittingProxy.proxying(
eventHandler, LightbulbEvents.class);

StreamTimestamp ts = StreamTimestamp.now();
proxy.created(ts, "id1", 60);

DispatchingEventSource eventSource = DispatchingEventSource.dispatching(
EventSource.retrievingWith(eventStore), LightbulbEvents.class);

eventSource.replaying("id1").inAscendingOrder().replayAll(mockHandler);
Mockito.verify(mockHandler).created(ts, "id1", 60);

Observe that we have composed two event handlers here – one which writes to the event store, and another which prints a representation of each event to the console. An event system built with Concursus will typically compose together multiple pieces of event-handling middleware, using the proxying and method-dispatching approach demonstrated above to emit and receive events. It is up to each piece of middleware to implement the scalability, fault-tolerance and reliability characteristics required by the system as a whole. For example, the Cassandra-backed event store implementation provided by Concursus leverages Cassandra’s scalability and high-availability, while a Kafka-based event publication mechanism can be used for durable event propagation between different service components.

In the next post, I discuss how Concursus uses “state classes” to support “rolling up” the event history of an object into a representation of that object’s state at a particular time.

 

This blog is written exclusively by the OpenCredo team. We do not accept external contributions.

RETURN TO BLOG

SHARE

Twitter LinkedIn Facebook Email

SIMILAR POSTS

Blog