Open Credo

February 8, 2012 | Data Analysis, Data Engineering

A Simple Introduction to Complex Event Processing – Stock Ticker End-to-End Sample

Most of the important players in this space are large IT corporations like Oracle and IBM with their commercial (read expensive) offerings.

While most of CEP products offer some great features, it’s license model and close code policy doesn’t allow developers to play with them on pet projects, which would drive adoption and usage of CEP in every day programming.

WRITTEN BY

Aleksa Vukotic

A Simple Introduction to Complex Event Processing – Stock Ticker End-to-End Sample

Luckily, there is one product that comes with the open source license, allowing anyone to download it and use it in their Java or .NET applications – Esper (http://esper.codehaus.org/).

But even with great open source product like Esper, the CEP has still not become mainstream tool in the developer’s sandbox.

Most of us think of CEP and ESP as something that should only be used for very complex projects, which are part of very few large systems in the world.

In this blog entry we are going to try to bring CEP and ESP with Esper closer to developers that haven’t got the chance to use it yet.

We are going to implement simple stock data ticker displayed in the browser and continuously updated using AJAX.

The solution will be simple, but complete – we will implement our own data generator, configure Esper CEP engine, collect the aggregated data and display it in the browser.

The data will enter the system at the rate on 10,000 events per second, with each event holding information about current price, as well as ask and bid sizes.

In order to make things more interesting, our ticker is going to display average values for stock price, ask and size over last 30 seconds, updated in real-time.

The technology stack we are going to use will be starndard Java EE with Java webapp as the front end. We will use Esper engine along with few other open source frameworks and components that will make our development easier (namely Spring, Spring Integration and OpenCredo Esper Extension).

The data enters the system as event, so first we need to define how will that event look like. Listing 1 shows the MarkedDataEvent class which we will use in our system:

Listing 1: MarketDataEvent Java Bean describing the stock update event

public class MarketDataEvent {
    private String symbol;
    private String name;

    private double price;
    private double ask;
    private double bid;

//getters and setters omitted for clarity

}

 

We are keeping things simple – MarketDataEvent is a standard java bean with few properties. In addition to symbol and name of the stock, the MarketDataEvent holds information about the stock price, as well as current bid and ask values.

But how is this data going to feed into our system? Next step will be to generate some market data for our feed.
While there are a number of public REST APIs that can be used for this purpose (search yahoo and google finance APIs online), in order to keep things simple we are going to implement our own random data generator.

Listing 2 shows our implementation of the random market data generator.

Listing 2: Custom generator of random market data events

public class RandomMarketDataGenerator implements MarketDataGenerator {
    enum Symbol {
        ACME(1, "ACME", 100, 1), FGTS(2, "FGTS", 13, 5), JUYY(3, "JUYY", 5, 10), NYSR(4, "NYSR", 657, 7);       #1

        private int id;
        private String name;
        //rest omitted for clarity
    }

    private EventGateway eventGateway;
    
    public void generateEvents() {
        List result = new ArrayList();
        int eventBatchSize = 10000;
        for (int i = 0; i < eventBatchSize; i++) {
            MarketDataEvent event = new MarketDataEvent();
            Symbol symbol = Symbol.fromInt(getRandomAbsInt(4) + 1);//select stock symbol randomly
            event.setSymbol(symbol.name());
            event.setName(symbol.toString());
            event.setBid((getRandomAbsBigDecimal(100).doubleValue()));//random bid
            event.setAsk(getRandomAbsBigDecimal(100).doubleValue());//random ask
            event.setPrice(generateNewPrice(symbol).doubleValue());//random new price

            eventGateway.sendEvent(event);                                                                      #2
        }
        return result;
    }
//random number generation methods omitted for clarity - see code on github for details
}

 

Our market data ticker will follow four hard-coded (imaginary) stocks (#1).
After data for each event is randomly generated, events are sent straight away to the Esper engine, using EventGateway.sendEvent() method (#2).

EventGateway is a very simple interface, with only one define method:

Listing 3: Interface for sending events to Esper engine

public interface EventGateway {
    void sendEvent(MarketDataEvent event);
}

 

Good thing is that we don’t need an implementation of this interface at all! Instead, we are going to use it as an inbound gateway to our system, using Spring Integration.

If you’re not familiar with SI Inbound Gateway, take a look at the SI reference manual.

Listing 4 shows the Spring Integration configuration of our market data generator

Listing 4: Spring Integration configuration for incoming pipeline of market data events

    
<!-- Entry point into the market data bar pipeline -->
<si:gateway service-interface="com.opencredo.sandbox.aleksav.esper.generator.EventGateway"
            id="eventGateway" default-request-channel="marketData"/>                                               #1

<bean id="marketDataGenerator" class="com.opencredo.sandbox.aleksav.esper.generator.RandomMarketDataGenerator">    #2
    <property name="eventGateway" ref="eventGateway"/>
</bean>

<task:scheduler id="scheduler" pool-size="10"/>
<task:scheduled-tasks scheduler="scheduler">                                                                       #3
    <task:scheduled ref="currencyDataGenerator"
                    method="generateEvents"
                    fixed-rate="10"/>
</task:scheduled-tasks>

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">                  #4
    <property name="corePoolSize" value="5"/>
    <property name="maxPoolSize" value="10"/>
</bean>
    
<si:channel id="marketData">                                                                                       #5
    <si:dispatcher task-executor="taskExecutor"/>
</si:channel>

<si:service-activator ref="dataProcessor"
                      input-channel="marketData" method="process"/>                                                #6

 

#1 Definintion of the eventGateway bean as inbound gateway using EventGateway interface. Every invokation of the EventGateway.sendEvent(MarketDataEvent event) method will result in the new Spring Integration message with MarketDataEvent object as payload, on the marketData channel

#2 RandomMarketDataGenerator instance as spring bean, wired using eventGateway bean

#3 Spring scheduler configuration, to enable continous market data generator for your application

#4 TaskExecutor spring bean, used for asynchronous message dispatching between channels in Spring Integration

#5 Defintion of marketData message channel, to which all message from eventGateway will be sent. This channel is defined as asyncronous ExecutorChannel.

#6 data processor bean which consumes the messages from the marketData channel

Using the Spring Framework and Spring Integration, we have configured our data generator to be continually executed every 10 milliseconds, and each MarketDataEvent is sent to the marketData message channel via inbound gateway.

Messages on the market data channel can be consumed concurently, by multiple consumers at the same time.
This configuration should maximise inbound throughput of Esper messages in our system.

We have also defined service activator, which is the consumer of all messages coming to marketData channel. This service activator can for example persist the messages for historical analysys and audit, or forward them to outbound adapters for further processing (for example to JMS queue).

The end result of message processing is not of our interest in this project, so our data processor will simply discard all messages that it receives.

Listing 5: Dummy data processor that simply discards any messages it receives

@Component
public class DummyDataProcessor {
    public void process(MarketDataEvent event){
        //NOOP
    }
}

 

We now have a full system with incoming flow of messages that are eventually processed by service activator (which in our case doesn’t do anything at all).

But what we are interested in is to collect some useful information about the messages flowing through the system. In other words, we want to perform Event Stream Processing on the data that passes through the Spring Integration pipeline.

This is where Esper comes into place.

In order to integrate Esper in the Spring Integration message flow, we are going to use another open source component which will enable us to to just that without too much work.

That component is OpenCredo Esper Extension.

OpenCredo Esper contains two modules which we will use in our sample market ticker application:

  • esper-template – provides simple API to interaction with Esper CEP engine along with simplified Spring configuration, building on the existing Spring template-model implementations (JdbcTemplate, JmsTemplate…), so you can concentrate implementing Esper functionality to solve specific problems without re-wriritng the boilerplate code all over again.
  • esper-si-support – provides Esper Spring Integration support, including the SI Esper wiretap, which we will demonstrate shortly.

To start using OpenCredo Esper Extensions, simply reference the maven pom which is available at maven central repositories:

<dependency>
    <groupId>org.opencredo.esper</groupId>
    <artifactId>esper-si-support</artifactId>
    <version>${oc-esper.version}</version>
</dependency>

 

Once you have dependecy jars on the classpath, you can use OpenCredo Esper’s <si-esper> and <esper> XML namespaces to integrate Esper engine with the existing Spring Integration pipeline:

Listing 6: Configuring Esper template and wiretap using OpenCredo Esper

<!-- Wiretap the market data for esper processing-->
<si-esper:wire-tap-channels default-wire-tap="esperWiretap">                                                  #1
    <si-esper:channel pattern="marketData"/>
</si-esper:wire-tap-channels>


<!-- Configure the wiretap with the appropriate Esper statements -->
<si-esper:wire-tap id="esperWiretap" sourceId="defaultWireTap" template-ref="template" post-send="true"/>     #2

<esper:template id="template" configuration="esper-configuration.xml">                                        #3
    <esper:statements>
        <esper:statement                                                                                      #4
                epl="select symbol,avg(ask),avg(bid),avg(price)
                         from com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
                         group by symbol">
            <esper:listeners>
                <!-- Pass Esper messages to the inPlaceCollectionGateway -->
                <esper:ref bean="inPlaceCollectionGateway"/>                                                  #5
            </esper:listeners>
        </esper:statement>
    </esper:statements>
</esper:template>

 

#1 Define channels that will be intercepted by Spring Integration and sent to Esper wire tap for event stream processing (using OpenCredo Esper XML namespace)

#2 Define the esper wiretap using OpenCredo Esper XML namespace

#3 Configure Esper engine using the EsperTemplate bean

#4 Define Esper statements using EPL

#5 Send any events that match the statement defintion to another Spring Integration channel

The most important part of the Esper configuration is the EPL statement configuration (#4).

Using EPL (Event Processing Language), we have registered the statement against the MarketDataEvent event flow.

This statement calculates average price, ask and bid of the MarketDataEvent stream during the 30 seconds time window and publishes event every time it is triggered.

As you can see the EPL statement looks very similar to the standard SQL (we used select statement, avg aggregation function and group by clause in the manner very similar to the SQL).

However, while SQL statements are used to query static data stored in the database, this EPL statment is continuosly querying the event stream, and issues an even whenever it’s executed.

Every time the statement is executed, all Esper listerners for that statement are notified. We have configured one esper listener that will collect the averages and eventually display them to the browser (#5).

Esper configuration file referenced in the esper template bean definition (esper-configuration.xml) file must be located on the classpath, and in our example contains basic configuration:

Listing 7: Standard esper XML configuration

<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.espertech.com/schema/esper"
    xsi:schemaLocation="http://www.espertech.com/schema/esper
                        http://www.espertech.com/schema/esper/esper-configuration-3.0.xsd">
    <!-- A simple example Esper Configuration. -->
    <engine-settings>
        <defaults>
            <threading>
                <listener-dispatch preserve-order="true"
                    timeout-msec="1000" locking="spin" />
                <threadpool-inbound enabled="true" num-threads="2" />
                <threadpool-outbound enabled="true" num-threads="2"
                    capacity="1000" />
                <threadpool-timerexec enabled="true"
                    num-threads="2" />
                <threadpool-routeexec enabled="true"
                    num-threads="2" />
            </threading>
        </defaults>
    </engine-settings>

</esper-configuration>

 

Final step is to configure the outgoing gateway that will server as a esper listener that will be notified whenever the configured ELP statement is executed.

Listing 8: Configuring esper statement listener flow using Spring Integration and OpenCredo Esper Extension

<si-esper:inbound-channel-adapter id="inPlaceCollectionGateway" channel="inboundRawEsperInPlaceCollectionMessages"/>   #1
<si:channel id="inboundRawEsperInPlaceCollectionMessages"/>                                                            #2
<si:service-activator ref="inPlaceEventCollector"                                                                      #3
                      input-channel="inboundRawEsperInPlaceCollectionMessages" method="collect"/>

 

#1 OpenCredo Esper oubound channel adapter that collects all events published by Esper EPL statement
#2 Spring Integration message channel for processed Esper events
#3 Endpoint service activator is notified for every matching Esper event.

Our goal was to create the marked data browser ticker, which will be continuously updated based on the incoming data.

To be able to do that we need to be able to store the last event for each stock symbol. To keep things simple, we will store the data in in-memory hash map, with the configurable key.

Listing 9 shows the implementation of the InPlaceEventCollector which will be used to collect the results from Esper statement execution.

Listing 9: Simple in-memory storage of the EPL statement execution results

public class InPlaceEventCollector {
    private Map<String, MarketDataEvent> store = Collections.synchronizedMap(new HashMap<String, MarketDataEvent>());

    private final Object keyPropertyName;

    public InPlaceEventCollector(Object keyPropertyName) {
        this.keyPropertyName = keyPropertyName;
    }

    public void collect(EventBean[] eventBeans)
            throws RuntimeException {
        for(EventBean eventBean : eventBeans){
            addEvent(eventBean);
        }
    }

    private synchronized void addEvent(EventBean eventBean){
        String key = eventBean.get(keyPropertyName.toString()).toString();
        MarketDataEvent marketDataEvent = new MarketDataEvent();
        marketDataEvent.setSymbol(key);
        marketDataEvent.setPrice(new Double(eventBean.get("avg(price)").toString()));
        marketDataEvent.setAsk(new Double(eventBean.get("avg(ask)").toString()));
        marketDataEvent.setBid(new Double(eventBean.get("avg(bid)").toString()));

        store.put(key, marketDataEvent);
    }

    public Map<String, MarketDataEvent> getStore() {
        return store;
    }
}

 

spring config:

<bean id="inPlaceEventCollector" class="com.opencredo.sandbox.aleksav.esper.listener.collector.InPlaceEventCollector">
    <constructor-arg index="0" value="symbol"/>
</bean>

 

InPlaceEventCollector stores data in the synchronized hash map to allow safe concurrent access. The keyPropertyName property is the configurable name of the property used as key for in-place updates.
As we will updated data grouped by stock symbol, so MarketDataEvent’s symbol property will be our keyPropertyName, as you can see from the spring configuration.

So we now have the aggregated data based on the incoming event stream stored in memory, and all we have to do is display it as an update-in-place table in the browser.

To feed that data to the browser we will implement continuos polling over HTTP using Ajax and jQuery. The data will be sent back in the standard JSON format, and for that we will need very simple Spring MVC controller:

Listing 10: Spring MVC controller used to render HTML and JSON data to the browser

@Controller
public class TestController {
    @Autowired
    private InPlaceEventCollector inPlaceEventCollector;

    @RequestMapping("/index")
    public String index() {                                             #1
        return "index";
    }
    @RequestMapping("/inplace.json")
    public @ResponseBody Map<String, MarketDataEvent> inPlace() {       #2
        return inPlaceEventCollector.getStore();
    }
}

 

#1 handler method for rendering the HTML page where the ticker is located
#2 handle method for AJAX requests for data updates

The controller uses spring-wired InPlaceEventCollector to send the up-to-date data feed to the browser. Using Spring @ResponseBody annotation, the return value of the controller method will be automatically serialized to JSON, as long as the jackson json  library is available on classpath at runtime (#2).
We also need an endpoint to send the html and javascript page to the browser (#1)

On Listing 11 you can see The HTML and Javascript code we use to poll and render data.

Listing 11: HTML page displayed in the browser, with Javascript and AJAX code

<html>
    <head>
        <title>Esper Market Data Ticker CEP Sample</title>
        <script src="http://ajax.googleapis.com/ajax/libs/jquery/1.2.6/jquery.min.js" type="text/javascript"
                charset="utf-8"></script>
      <script type="text/javascript" charset="utf-8">
            function renderMarketData(type, data) {
                var html = "<table border='1'>"
                $.each(data, function(key, val) {
                    html = html + "<tr>" + "<td class='head'>" + key + "</td>"
                    html = html + renderCell(key, "avg(price)", val.price);
                    html = html + renderCell(key, "avg(bid)", val.bid);
                    html = html + renderCell(key, "avg(ask)", val.ask);

                    html = html + "</tr>"
                });
                html = html + "</table>";
                $("#market_data").html(
                        html
                );
            }

            function renderCell(symbol, propertyName, propertyValue) {
                var rowHtml = "";
                var cellId = (symbol + propertyName).replace("=", "").replace("(", "").replace(")", "");
                var curVal = $("#" + cellId).text();
                var cellClass = "";
                if (curVal && curVal < propertyValue) {//if new value is higher, render in green
                    cellClass = cellClass + "higher";
                }
                if (curVal && curVal > propertyValue) {//if new value is higher, render in red
                    cellClass = cellClass + "lower";
                }
                rowHtml = rowHtml + "<td id='" + cellId + "' class='" + cellClass + "'>" + propertyValue + "</td>"
                return rowHtml;
            }

            function pollMarketData() {
            When it complete (or errors)*/
                $.ajax({
                    type: "GET",
                    url: "inplace.json",
                    dataType:'json',
                    async: true, // If set to non-async, browser shows page as "Loading.."
                    cache: false,
                    timeout:50000, // Timeout in ms

                    success: function(data) {
                        renderMarketData("success", data);// Add response to a market_data div
                        setTimeout(
                                'pollMarketData()', // Request next message
                                1000 // ..after 1 seconds
                        );
                    },
                });
            };
            $(document).ready(function() {
                pollMarketData();// Start the inital request
            });
        </script>
    </head>
    <body>
        <div id="market_data">
            No Data
        </div>
    </body>
</html>

 

Once every second we issue AJAX GET request to inplace.json, to get the latest stock prices. After successful retrieval of the data (which is in format of key-value pairs, where key is stock symbol and value is a JSON object representing MarketDataEvent), we render it in the “market_data” div as HTML table.

Each cell in the table ha generated HTML id, containing the symbol name appended with the property name displayed in the cell (for example ACME-price or ACME-ask).

Using some jQuery wizadry in the renderCell method, we compare the latest and the current value for the property. If the property value has incresed we render it in green, and if the value has decreased, we render in in red.

And that’s it! You can see the result of our marked data ticker running on cloud foundry.

The code described in this blog entry is available on github (http://github.com/opencredo/esper-stockticker-sample) so feel free to play with it.

We kept the solution simple enough so it can be completed in less then one hour, but it should serve as a good starting point to learn about CEP and event stream processing with Esper, Spring Integration and OpenCredo Esper extensions.

 

This blog is written exclusively by the OpenCredo team. We do not accept external contributions.

RETURN TO BLOG

SHARE

Twitter LinkedIn Facebook Email

SIMILAR POSTS

Blog