Rollup Ingestion in Druid using SQL
Apache Druid 24 introduced SQL-based ingestion functionality. Rollup in Apache Druid during ingestion is a common technique used to reduce data size in order to improve query performance while still retaining enough detail to meet the end user’s ad hoc query needs. This article walks through the details of the SQL statements used to execute rollup ingestion.
Rollup Ingestion = Data Reduction for Faster Queries
The Basics
With SQL based ingestion, rollup is specified by using an aggregation query in the SELECT portion of the INSERT/REPLACE statement.
The basic structure of a rollup ingestion SQL statement is:
INSERT/REPLACE INTO <target table>
SELECT
FLOOR( TIME_PARSE(“timestamp”) TO MINUTE) __time,
<other grouping dimensions>,
<aggregations>
FROM TABLE(EXTERN ( … ) )
GROUP BY 1, <other grouping dimensions>
PARTITIONED BY MONTH;
The SELECT portion of the query indicates that rollup ingestion will be used by specifying aggregations and a GROUP BY clause grouping on __time and all other grouping dimensions.
The __time dimension: The expression that calculates __time controls the query granularity in the resulting table. This is the minimum time dimension unit that will be available on the target table for users to query. In order to optimize query performance, the coarser this is, the better. Event data usually arrives with high precision timestamps including fractions of a second. By aggregating on time at a coarser level during ingestion, you can accelerate query performance significantly. Use the FLOOR or CEILING functions to truncate timestamps to the desired level.
Grouping Dimensions: These are the set of dimensions, other than time, that users of the target table will be able to select, filter and group on. Reducing the number of dimensions used will further reduce storage size because it is the unique combinations of time and the grouping dimensions that will result in a target row. You should include all the dimensions that are important to your users, but also avoid dimensions that are unlikely to be used and just cause bloat.
Aggregations: This includes traditional SQL aggregate functions such as SUM, MIN, MAX as well as approximations. There are some special considerations for approximations and the average calculation that I’ll discuss further below.
INSERT vs REPLACE During Rollup
INSERT statements will cause new data to be appended to existing data. New segment files will be created from the SELECT query results. Existing segment files from prior ingestions are not changed. For rollup ingestion, this means that both old and new segment files could contain the same combination of time and dimension values with aggregate metrics calculated from different inputs. This means that INSERT based rollup ingestions do not guarantee perfect rollups. When querying a table loaded this way, it is important to aggregate further at query time in order to complete the rollup. Otherwise the results may contain more than one row for the same combination of dimensions each with only partially calculated metrics. Compaction can also be used to merge segments from multiple INSERTs such that the rollup is completed.
REPLACE statements, on the other hand, will overwrite existing segments with the new data for all the time intervals specified in the OVERWRITE WHERE clause. This has the advantage of resulting in perfect rollup. That’s one row for each unique combination of __time and the other grouping dimensions. It has the disadvantage of having to ingest and reaggregate all the data that corresponds to the target time interval.
Aggregation Considerations
The aggregation done during ingestion is not intended to be a final result to the user, but rather a pre-aggregation of data that is meant to be aggregated further at query time. While this does not affect MIN, MAX and SUM aggregates, it does affect the calculation of approximations and averages.
Average
If you use the AVG aggregate function at ingestion time and then query that data by aggregating further using AVG, you will essentially be doing an average of averages which does not produce an accurate result, so instead you can use SUM and COUNT aggregations at ingestion.
An example rollup ingestion for calculating averages:
REPLACE INTO wikipedia_avg OVERWRITE ALL
WITH ext AS (SELECT * FROM TABLE( EXTERN(... ) )) -- external input SELECT
TIME_FLOOR(TIME_PARSE("timestamp"), 'PT1M') AS __time,
channel,
page,
SUM(added) as sum_added,
COUNT(*) as event_count
FROM ext
GROUP BY 1,2,3
PARTITIONED BY DAY
24,389 rows inserted into 'wikipedia_avg'.
Insert query took 0:00:20
And then query this data using SUM and COUNT to calculate the average:
SELECT
channel,
SUM(sum_added) / SUM( event_count) as avg_added
FROM wikipedia_avg
GROUP BY 1
ORDER BY 2 DESC
Approximations
For approximations, something similar occurs. Approximations during rollup ingestion need to be stored using their internal representation, so that when they are queried they can be aggregated further across any of the dimensions. You’ll need to set the query context variable "finalizeAggregations": false
, so that the query engine does not finalize that approximate calculations before storing them. This query context variable is set correctly by default in the Query view of the Druid Console when executing an INSERT or REPLACE statements. If you use the API, you will need to specify this in the context property of the request payload.
An example of using approximation during rollup ingestion:
REPLACE INTO wikipedia_ds OVERWRITE ALL
WITH ext AS (SELECT * FROM TABLE( EXTERN(... ) )) -- external input
SELECT
TIME_FLOOR(TIME_PARSE("timestamp"), 'PT1M') AS __time,
channel,
page,
DS_THETA(user) theta_user,
DS_QUANTILES_SKETCH( added) theta_added
FROM ext
GROUP BY 1,2,3
PARTITIONED BY DAY
24,389 rows inserted into 'wikipedia_ds'.
Insert query took 0:01:02.
An example approximation query for distinct count and quantile calculations after rollup ingestion:
SELECT channel,
APPROX_COUNT_DISTINCT_DS_THETA(theta_user) as aprox_distinct_users,
APPROX_QUANTILE_DS( theta_added, 0.5) as median_added,
APPROX_QUANTILE_DS( theta_added, 0.95) as p95_added
FROM wikipedia_ds
GROUP BY 1
ORDER BY 2 DESC
Conclusion
Aggregation at ingestion with or without approximations is a very useful performance optimization technique which helps deliver the query speed that Apache Druid is known for. With Apache Druid 24.0 this functionality is very easy to use by just writing SQL. Another important aspect of SQL based ingestion is the CLUSTERED BY clause which can be used to further optimize storage and query performance in larger datasets. Stay tuned for that post…