Open Credo

March 21, 2012 | Software Consultancy

Esper Extensions – Implementing Custom Aggregation Function

Event processing Language (EPL) enables us to write complex queries to get the most out of our event stream in real time, using SQL-like syntax.

EPL allows us to use full power of aggregation of the high volume event stream to get required results with the minimal latency. In this blog we are going to explore some aspects of numerical aggregation of data with high precision BigDecimal values. We will also demonstrate how you can add you own aggregation function to Esper engine and use them in EPL statements.

WRITTEN BY

Aleksa Vukotic

Esper Extensions – Implementing Custom Aggregation Function

Let’s take a look at the simple POJO market data event:

public class MarketDataEvent {   
    private String symbol; 
    private BigDecimal price;
    //getters and setters
}

 

All we need for the stock price ticker is the stock symbol and the price. As we already mentioned, in order to keep the precision of the price, the price property will be of type BigDecimal.

To sent the MarketDataEven objects to Esper engine we are going to use EsperTemplate – component of the OpenCredo Esper Extension open source project with the goal to simplify configuration and usage of Esper CEP engine. You can explore and download OpenCredo Esper Extension here. First we need to make sure we have the Esper XML configuration file on the project classpath. The basic empty Esper configuration will look something like this:

<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://www.espertech.com/schema/esper"
      xsi:schemaLocation="http://www.espertech.com/schema/esper
      http://www.espertech.com/schema/esper/esper-configuration-4.5.xsd">
</esper-configuration>

 

Once we instantiated EsperTemplate, we will add an EPL statement we want to process as well as UpdateListener which will get notified whenever statement yields any results. Here is the code that we need to implement to achiev all that:

EsperTemplate esperTemplate = new EsperTemplate();                                 #1
esperTemplate.setConfiguration(new ClassPathResource("/esper-configuration.xml")); #2
EsperStatement statement = new EsperStatement(                                     #3
         "select symbol,avg(price) as avgPrice 
          from
com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
          group by symbol");
SimpleListener listener = new SimpleListener();
statement.addListener(listener);                                                   #4
esperTemplate.addStatement(statement);                                             #5
esperTemplate.initialize();                                                        #6

 

#1 Instantiate EsperTemplate
#2 Specify the Esper configuration XML file
#3 Add an EPL statement that will calculate average price per symbol during last 30 seconds of the MarketDataEvent even stream
#4 Add Esper listener which will be notified whenever statement produces results
#5 Add statement to the instantiated EsperTemplate
#6 Finally, initially EsperTemplate and the underlying Esper runtime

Output from the EPL statement registered above will be series of events with stock symbol and the average of that stock price during last 30 seconds of trading. Whenever the statement matches new event entering or exiting the window, it will trigger execution and notify the configured listener about the change. Let’s take a look at our listener code:

public class SimpleListener implements UpdateListener{                            #1
  private BigDecimal averagePrice;                                                #2
  @Override
  public void update(EventBean[] newEvents, EventBean[] oldEvents) {
    if(newEvents.length > 0){
      this.averagePrice =
        new BigDecimal(newEvents[newEvents.length-1].get("avgPrice").toString()); #3
    }
  }
  public BigDecimal getAveragePrice() {
    return averagePrice;
  }
}

 

#1 We are implementing Esper’s UpdateListener
#2 We store the last calculated average price as field member
#3 We set the average price property to the calculated value of the last execution event

Because of the asynchronous nature of event stream processing with Esper, listener can be invoked with the number of statement executions at the same time. Since we’re only interested in the latest average price, we only interrogate the last of the passed events from the array of EventBeans passed as the method argument (#3).

So let’s now write a test to demonstrate the work of Esper engine with configured statement and its listener.

public class AllRoundIntegrationTest {
  EsperTemplate esperTemplate;
  SimpleListener listener = new SimpleListener();
  @Before
  public void setup() {                                                          #1
    esperTemplate = new EsperTemplate();
    esperTemplate.setConfiguration(
                     new ClassPathResource("/esper-configuration.xml"));
    EsperStatement statement =
         new EsperStatement(
                 "select symbol,avg(price) as avgPrice
         from com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
                  group by symbol");
    statement.addListener(listener);
    esperTemplate.addStatement(statement);
    esperTemplate.initialize();
  }
  @Test
  public void testIndefinitePrecision() throws InterruptedException {            #2
    esperTemplate.sendEvent(new MarketDataEvent("ACME", new BigDecimal(5.0)));   #3
    esperTemplate.sendEvent(new MarketDataEvent("ACME", new BigDecimal(3.0)));
    esperTemplate.sendEvent(new MarketDataEvent("ACME", new BigDecimal(2.0)));
    Thread.sleep(7000);
    assertEquals("Must have correct average price",                              #4
         new BigDecimal("3.33"),
        listener.getAveragePrice());
  }

 

#1 – Setup Esper runtime and EsperTemplate
#2 – Test method
#3 – Push 4 POJO events to Esper engine
#4 – Assert that the average is calculated correctly

Using JUnit’s @Before annotation we setup the Esper environment, register statement and listener all in the setup() method (#1) before implementing the test case (#2). We send 4 events to Esper, all for the same stock symbol (ACME), but with different values. With the help of the standard calculator, we determined that the expected average should be (5.0+3.0+2.0)/3 = 3.333333…. and that’s what we try assert for in the test (#4).

We run the test now, and instead of green pass, we get test failure:
java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result.
at java.math.BigDecimal.divide(BigDecimal.java:1603)
at com.espertech.esper.epl.agg.BigDecimalAvgAggregator.getValue(BigDecimalAvgAggregator.java:80)
at com.espertech.esper.epl.agg.AggSvcGroupByRefcountedNoAccessImpl.getValue(AggSvcGroupByRefcountedNoAccessImpl.java:148)

How can we explain this? The result of the BigDecimal.divide() method must be representable as a decimal result. In our case 3.33333…. has indefinite number of digits after decimal space and it cannot be represented as a decimal digit without rounding – so the ArithmeticException is thrown. Unfortunatelly Esper avg aggregation function for BigDecimal values does not take this into account, so cannot make Esper round the result of the average calculation.

What we can do is implement our own, custom aggregation function and register it with Esper engine. All we need to do is to extend the abstract com.espertech.esper.epl.agg.AggregationSupport class and implement the missing methods. Here is how our own BigDecimalRoundingAverageAggregator class would look like:

public class BigDecimalRoundingAverageAggregator extends AggregationSupport {
  private BigDecimal sum = BigDecimal.ZERO;
  private long numDataPoints;
  private int scale = 2;                                                         #1
  private RoundingMode roundingMode = RoundingMode.HALF_EVEN;                    #2
  public void clear() {
    sum = BigDecimal.ZERO;
    numDataPoints = 0;
  }
  public void enter(Object object) {                                             #3
    if (object == null) {
      return;
    }
    numDataPoints++;
    if (object instanceof BigDecimal) {
      sum = sum.add((BigDecimal) object);
    } else if (object instanceof Number) {
      sum = sum.add(new BigDecimal(((Number) object).doubleValue()));
    }else{
      throw new RuntimeException("Must be a number");
    }
  }
  public void leave(Object object) {                                             #4
    if (object == null) {
      return;
    }
    numDataPoints--;
    if (object instanceof BigDecimal) {
      sum = sum.add((BigDecimal) object);
    } else if (object instanceof Number) {
      sum = sum.add(new BigDecimal(((Number) object).doubleValue()));
    }else{
      throw new RuntimeException("Must be a number");
    }
  }
  public Object getValue() {
    if (numDataPoints == 0) {
      return null;
    }
    return sum.divide(new BigDecimal(numDataPoints), scale, roundingMode);       #5
  }
  public Class getValueType() {
    return BigDecimal.class;
  }
  @Override
  public void validate(AggregationValidationContext validationContext) {         #6
    for (Class clazz : validationContext.getParameterTypes()) {
      if (!clazz.isAssignableFrom(BigDecimal.class) && !clazz.isAssignableFrom(Number.class)) {
        throw new RuntimeException("Argument must be either BigDecimal or Number");
      }
    }
  }
}

 

#1 Set scale to use for result rounding
#2 Set rounding mode to use for result rounding
#3 enter() method is invoked whenever new event enters the time window
#4 leave() method is invoked whenever event leaves the time window (expires)
#5 Perform division using provided scale and rounding mode
#6 validate() method checks whether argument type match the aggregator allowed types at EPL statement compile time

The key improvement on the Esper’s original big decimal aggregator, is that the final division to calculate average is rounding using configured scale and rounding mode (#5). In addition to that, our implementation handles mixing BigDecimal and any Number instances like Long, Double, Integer… (#3, #4)

Final step is to map this aggregation function implementation with the function name to use in EPL statements. To achieve that all we have to do is add the mapping to the Esper configuration XML file:

<plugin-aggregation-function name="avgRound"
       function-class="com.opencredo.sandbox.aleksav.esper.agg.BigDecimalRoundingAverageAggregator" />

 

We cannot overrride existing function names, so we gave our new function name avgRound. Before we run the test we need to change the function name in the EPL statement:

EsperStatement statement =
    new EsperStatement(
        "select symbol,avgRound(price) as avgPrice
        from com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
        group by symbol");

 

If you rerun the test, you will see the calming green bar in your IDE, with “All tests passed” message.

We made a quick fix for an annoying problem by adding custom aggregation function to Esper runtime in no time. Interesting thing to note is that EPL keywords used within the custom aggregation function will be processed and applied automatically by Esper, so for example we can do the following:

select symbol, avgRound(distict price) from MarketDataEvent

 

and the distinct keyword will make sure only unique prices would be fed into avgRound function.

Aggregation extensions are very powerful feature of Esper. If used wisely, it enables you do implement even the most complex scenarios for complex event processing.

 

This blog is written exclusively by the OpenCredo team. We do not accept external contributions.

RETURN TO BLOG

SHARE

Twitter LinkedIn Facebook Email

SIMILAR POSTS

Blog