April 27, 2016 | Software Consultancy
Concursus is an open source Java 8 framework for building distributed systems using CQRS and event sourcing patterns. One of its major differences from other such frameworks (such as Jdon, Axon and ES4J) is that it eschews a programming model where each event type is represented by a separate Java class, instead mapping event types to methods on interfaces.
Here is an example event-emitting interface, which defines the events for a lightbulb:
@HandlesEventsFor("lightbulb")
public interface LightbulbEvents {
@Initial
void created(StreamTimestamp timestamp, String id, int wattage);
void screwedIn(StreamTimestamp timestamp, String id, String location);
void switchedOn(StreamTimestamp timestamp, String id);
void switchedOff(StreamTimestamp timestamp, String id);
void unscrewed(StreamTimestamp timestamp, String id);
@Terminal
void deleted(StreamTimestamp timestamp, String id);
}
The conventions used here as as follows:
@HandlesEventsFor
, and the annotation’s value indicates the kind of entity it defines events for.@Name
annotation).void
: calling it will emit an event, which will be processed downstream by being written into an event log and/or propagated to event handlers.StreamTimestamp
, which is a combination of a millisecond-resolution timestamp and a “stream identifier” which can be used to distinguish between events from different sources which occur during the same millisecond.String
identifier, which identifies the identity to which the event occurred.@Initial
; events which must be the last thing to happen to an entity are annotated with @Terminal
.In order to use this interface to emit events, we need an implementation of the interface which translates each method call into a bundle of data representing an event, and passes this data on to something which can process it. We could write one manually, but as all of the information we need to define an event type is contained in the class definition and method signature for each method, it’s easy to create a proxy object that “interprets” each method call, using this information to create an event based on the method signature and arguments.
Concursus represents each event’s data as an instance of the class Event
, and event handlers are objects which implement Consumer
. Here’s a simple example, where we create an event handler that just writes events out to the console, create a proxy for the LightbulbEvents interface which passes events to this handler, then call methods on the proxy to generate events:
// Create an EventOutChannel that simply prints events to the command line
Consumer eventHandler = System.out::println;
// Create a proxy that sends events to the eventHandler.
LightbulbEvents proxy = EventEmittingProxy.proxying(eventHandler, LightbulbEvents.class);
// Send an event via the proxy.
proxy.created(StreamTimestamp.now(), "id1", 60);
This will output a String like the following, representing the event’s data:
lightbulb:id1 created_0 at 2016-04-215:24.324Z/ with lightbulb/created_0{wattage=60}
The same mapping in reverse can be used to dispatch an Event to a method on an object that implements the LightbulbEvents
interface:
LightbulbEvents mockHandler = Mockito.mock(LightbulbEvents.class);
Consumer eventHandler = DispatchingEventOutChannel.toHandler(LightbulbEvents.class, mockHandler);
LightbulbEvents proxy = EventEmittingProxy.proxying(eventHandler, LightbulbEvents.class);
StreamTimestamp ts = StreamTimestamp.now();
proxy.created(ts, "id1", 60);
Mockito.verify(mockHandler).created(ts, "id1", 60);
In practice we will normally want to dispatch events to an event store of some kind, or to queue them up for later processing. Concursus provides mechanisms to store events in Redis or Cassandra, or to serialise them to JSON so they can be transmitted via HTTP or placed in an AMQP or Kafka topic. Given an event store, we can query for the event history associated with a particular entity type/id, and replay that history into a handler that extracts the information we need. Here is the previous example, rewritten to write events into an in-memory event store and then retrieve them by querying:
LightbulbEvents mockHandler = Mockito.mock(LightbulbEvents.class);
InMemoryEventStore eventStore = InMemoryEventStore.empty();
Consumer eventHandler = eventStore.toEventOutChannel()
.andThen(System.out::println);
LightbulbEvents proxy = EventEmittingProxy.proxying(
eventHandler, LightbulbEvents.class);
StreamTimestamp ts = StreamTimestamp.now();
proxy.created(ts, "id1", 60);
DispatchingEventSource eventSource = DispatchingEventSource.dispatching(
EventSource.retrievingWith(eventStore), LightbulbEvents.class);
eventSource.replaying("id1").inAscendingOrder().replayAll(mockHandler);
Mockito.verify(mockHandler).created(ts, "id1", 60);
Observe that we have composed two event handlers here – one which writes to the event store, and another which prints a representation of each event to the console. An event system built with Concursus will typically compose together multiple pieces of event-handling middleware, using the proxying and method-dispatching approach demonstrated above to emit and receive events. It is up to each piece of middleware to implement the scalability, fault-tolerance and reliability characteristics required by the system as a whole. For example, the Cassandra-backed event store implementation provided by Concursus leverages Cassandra’s scalability and high-availability, while a Kafka-based event publication mechanism can be used for durable event propagation between different service components.
In the next post, I discuss how Concursus uses “state classes” to support “rolling up” the event history of an object into a representation of that object’s state at a particular time.
This blog is written exclusively by the OpenCredo team. We do not accept external contributions.
Agile India 2022 – Systems Thinking for Happy Staff and Elated Customers
Watch Simon Copsey’s talk from the Agile India Conference on “Systems Thinking for Happy Staff and Elated Customers.”Lean-Agile Delivery & Coaching Network and Digital Transformation Meetup
Watch Simon Copsey’s talk from the Lean-Agile Delivery & Coaching Network and Digital Transformation Meetup on “Seeing Clearly in Complexity” where he explores the Current…When Your Product Teams Should Aim to be Inefficient – Part 2
Many businesses advocate for efficiency, but this is not always the right goal. In part one of this article, we explored how product teams can…