Window and Aggregate operators

Example 1: Aggregating Clickstream data

Goal: Use sample Clickstream data to answer the following questions about online sales activity.

  1. For every minute, how many unique customers are there for every event type?

  2. For every minute, what is the total price of items in a customer’s shopping cart for every event type?

Clickstream data is data that is generated by a user’s interaction with an online website. For example, the user might browse for an item, add the item to a shopping cart, remove an item from the cart, log out without any purchase, or complete a purchase. Every click generates an event that can be captured in a streams flow.

Let’s design a simple streams flow in the canvas with three operators.

  1. Sample Data – This operator provides sample Clickstream data.

  2. Aggregate – This operator aggregates the data by customer ID and then applies specified functions on the data every minute.

  3. ‘Debug’ operator - This operator acts as a target so that you can view tuples coming from a selected operator and see if they represent the expected output.

    Aggregation streams flow

Now let’s define each operator in its Properties pane.

Define Sample Data operator

Select Clickstream. Click Edit Source Schema to look at the schema that’s coming in to the streams flow.

Define Window operator

We define the parameters of this operator by setting the following properties.

Window operator

Define Aggregate operator

We define the parameters of this operator to give the metrics that we need to answer our two questions. Set the following properties.

Define the Aggregate operator with preset functions:

Aggregator preset

Define the Aggregate operator using the Custom Code function:

Aggregator custom

Aggregation Window Type is set to tumbling because we want the aggregation functions done at a set time interval, regardless of how often or how many tuples arrive. For more information, see tumbling windows, sliding windows.

Aggregation Window size is determined by Time Unit and Number of Time Units. In our case, we’ll set it to minute and 1 because we want metrics every minute. Every minute, all tuples ‘tumble out’ and an Aggregation function is applied to each field.

The window is partitioned by customer_id into subwindows. This partition gives us metrics by customer.

Aggregation functions

We set up three Aggregation functions to define what metrics are done on all tuples in every subwindow (meaning, every customer). The following Aggregation functions are done on all subwindows every minute.

Aggregation output fields Aggregation output fields Aggregation output fields

Here’s how our streams flow looks now.

The following functions can be applied to specific schema attributes.

Aggregate function Argument type Return type Return value
Average Integer, float Float Average of all input values.
Null if no rows are selected.
Count Integer Same as argument Number of all non-null input rows.
CountDistinct Integer Same as argument Number of distinct expression values that are computed for the tuples in the group.
Maximum Integer, float Same as argument Maximum element across all input values.
Null if no rows are selected.
Minimum Integer, float Same as argument Minimum element across all input values.
Null if no rows are selected.
PassThrough Integer, float, text Same as argument Passes schema attribute to the output as is.
Sum Integer, float Same as argument Sum of all input values.
Null if no rows are selected.
Standard Deviation Integer, float Same as argument Standard deviation of all input values.
Null if no rows are selected.

Run the streams flow

All operations are now defined. We save the streams flow, and then click Metrics icon to run the flow.

Metrics page

Example 2: Parallel Aggregation

Let’s say that our online website has irregular peaks of data according to the hour of the day and the season.

In this case, use parallel Aggregate operators. Both Aggregate operators are set up identically, except that each operator uses a different value for Time and Number of Time Units to evict tuples.

In our Clickstream example, the first Aggregate operator has a tumbling window every minute. Let’s set the second Aggregate operator to have a tumbling window every 10 minutes.

More examples

For more examples of the Aggregate operator, see the article Calculate moving averages on real-time data in a streams flow.

Learn more