Window and Aggregate operators
- The Window operator must be followed by an Aggregate operator, and an Aggregate operator must be proceeded by a Window operator.
- The benefit is that a Window operator can be followed by multiple Aggregate operators.
- The Window operator also brings in the ability to customize the partition function.
- The Aggregate operator continues to provide built-in Aggregate functions, but it also allows you to write custom functionality in a Python code editor.
Example 1: Aggregating Clickstream data
Goal: Use sample Clickstream data to answer the following questions about online sales activity.
-
For every minute, how many unique customers are there for every event type?
-
For every minute, what is the total price of items in a customer’s shopping cart for every event type?
Clickstream data is data that is generated by a user’s interaction with an online website. For example, the user might browse for an item, add the item to a shopping cart, remove an item from the cart, log out without any purchase, or complete a purchase. Every click generates an event that can be captured in a streams flow.
Let’s design a simple streams flow in the canvas with three operators.
-
Sample Data – This operator provides sample Clickstream data.
-
Aggregate – This operator aggregates the data by customer ID and then applies specified functions on the data every minute.
-
‘Debug’ operator - This operator acts as a target so that you can view tuples coming from a selected operator and see if they represent the expected output.
Now let’s define each operator in its Properties pane.
Define Sample Data operator
Select Clickstream. Click Edit Source Schema to look at the schema that’s coming in to the streams flow.
Define Window operator
We define the parameters of this operator by setting the following properties.
Define Aggregate operator
We define the parameters of this operator to give the metrics that we need to answer our two questions. Set the following properties.
Define the Aggregate operator with preset functions:
Define the Aggregate operator using the Custom Code function:
Aggregation Window Type is set to tumbling
because we want the aggregation functions done at a set time interval, regardless of how often or how many tuples arrive. For more
information, see tumbling windows, sliding windows.
Aggregation Window size is determined by Time Unit and Number of Time Units. In our case, we’ll set it to minute
and 1
because we want metrics every minute. Every minute, all tuples ‘tumble out’ and an Aggregation function is applied to each field.
The window is partitioned by customer_id into subwindows. This partition gives us metrics by customer.
We set up three Aggregation functions to define what metrics are done on all tuples in every subwindow (meaning, every customer). The following Aggregation functions are done on all subwindows every minute.
-
click_event_type – The function PassThrough passes the type of event that occurred to a field called “click_event_type”. This field tells us what action the user did, such as logging in, browsing, adding an item to the cart, and so on. The function
PassThrough
passes the schema attributes through to the output without doing any sort of computation or change. -
total_carts – The function Average computes the average price for a customer’s shopping cart and stores the value in a field called “total_carts“.
-
unique_users – The function CountDistinct gives the number of unique users that are logged in, and stores the value in a field called “unique_users”. We do not use the function Count because it gives the total number of users, including multiple entries of users.
Here’s how our streams flow looks now.
The following functions can be applied to specific schema attributes.
Aggregate function | Argument type | Return type | Return value |
---|---|---|---|
Average | Integer, float | Float | Average of all input values. Null if no rows are selected. |
Count | Integer | Same as argument | Number of all non-null input rows. |
CountDistinct | Integer | Same as argument | Number of distinct expression values that are computed for the tuples in the group. |
Maximum | Integer, float | Same as argument | Maximum element across all input values. Null if no rows are selected. |
Minimum | Integer, float | Same as argument | Minimum element across all input values. Null if no rows are selected. |
PassThrough | Integer, float, text | Same as argument | Passes schema attribute to the output as is. |
Sum | Integer, float | Same as argument | Sum of all input values. Null if no rows are selected. |
Standard Deviation | Integer, float | Same as argument | Standard deviation of all input values. Null if no rows are selected. |
Run the streams flow
All operations are now defined. We save the streams flow, and then click to run the flow.
Example 2: Parallel Aggregation
Let’s say that our online website has irregular peaks of data according to the hour of the day and the season.
In this case, use parallel Aggregate operators. Both Aggregate operators are set up identically, except that each operator uses a different value for Time and Number of Time Units to evict tuples.
In our Clickstream example, the first Aggregate operator has a tumbling window every minute. Let’s set the second Aggregate operator to have a tumbling window every 10 minutes.
More examples
For more examples of the Aggregate operator, see the article Calculate moving averages on real-time data in a streams flow.