Citus 12.1 is out! Now with PG16 Support. Read all about it in Naisila’s 12.1 blog post. 💥
Many companies generate large volumes of time series data from events happening in their application. It’s often useful to have a real-time analytics dashboard to spot trends and changes as they happen. You can build a real-time analytics dashboard on Postgres by constructing a simple pipeline:
For large data streams, Citus (an open source extension to Postgres that scales out Postgres horizontally) can scale out each of these steps across all the cores in a cluster of Postgres nodes.
One of the challenges of maintaining a rollup table is tracking which events have already been aggregated—so you can make sure that each event is aggregated exactly once. A common technique to ensure exactly-once aggregation is to run the aggregation for a particular time period after that time period is over. We often recommend aggregating at the end of the time period for its simplicity, but you cannot provide any results before the time period is over and backfilling is complicated.
In this blog post, we’ll introduce a new approach to building rollup tables which addresses the limitations of using time windows. With the new approach, you can easily backfill (add older events and see them reflected in the rollup table) and start aggregating data for the current time period before the time period is over, giving a more real time view of the data.
We assume all events have an identifier which is drawn from a sequence and provide a simple SQL function that enables you to incrementally aggregate ranges of sequence values in a safe, transactional manner.
We tested this approach for a CDN use case and found that a 4-node Citus database cluster can simultaneously:
To do incremental aggregation, you need a way to distinguish which events have been aggregated. We assume each event has an identifier that is drawn from a Postgres sequence, and events have been aggregated up to a certain sequence number. To track this, you can create a
rollups table that also contains the name of the events table and the sequence name.
CREATE TABLE rollups ( name text primary key, event_table_name text not null, event_id_sequence_name text not null, last_aggregated_id bigint default 0 );
We wrote a PL/pgSQL function which use the rollups table to track which events have been aggregated, and returns the range of sequence numbers that are ready to be aggregated next. This PL/pgSQL function can be used in a transaction with an
INSERT..SELECT, where the
SELECT part filters out sequence numbers that fall outside the range.
In Postgres 10, you can use the
pg_sequence_last_value function to check the most recently issued sequence number. However, it would not be safe to simply aggregate all events up to the most recent sequence value. There might still be in-progress writes to the events table that were assigned lower sequence values, but are not yet visible when the aggregation runs. To wait for in-progress writes to finish, we use an explicit table lock as discussed in our recent Postgres locking tips blog post. New writes will briefly block from the moment the LOCK command is executed. Once existing writes are finished, we have the lock, and then we immediately release it to allow new writes to continue. We can do that because we know that all new writes will have higher sequence number, and we can allow those writes to continue as long as we don’t include them in the current aggregation. As a result, we can obtain a range of new events that are ready to be aggregated with minimal interruption on the write side.
CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint) RETURNS record LANGUAGE plpgsql AS $function$ DECLARE table_to_lock regclass; BEGIN /* * Perform aggregation from the last aggregated ID + 1 up to the last committed ID. * We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent * aggregations from running concurrently. */ SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name) INTO table_to_lock, window_start, window_end FROM rollups WHERE name = rollup_name FOR UPDATE; IF NOT FOUND THEN RAISE 'rollup ''%'' is not in the rollups table', rollup_name; END IF; IF window_end IS NULL THEN /* sequence was never used */ window_end := 0; RETURN; END IF; /* * Play a little trick: We very briefly lock the table for writes in order to * wait for all pending writes to finish. That way, we are sure that there are * no more uncommitted writes with a identifier lower or equal to window_end. * By throwing an exception, we release the lock immediately after obtaining it * such that writes can resume. */ BEGIN EXECUTE format('LOCK %s IN EXCLUSIVE MODE', table_to_lock); RAISE 'release table lock'; EXCEPTION WHEN OTHERS THEN END; /* * Remember the end of the window to continue from there next time. */ UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name; END; $function$;
Now let’s look at an example of using this function for a typical rollup use case.
A simple example of a real-time analytics dashboard I like to use is for monitoring page views on a website. In a past life I worked for a Content Delivery Network (CDN), where such a dashboard is essential both to operators and customers.
Back when I worked for the CDN, it would have been almost unthinkable to store full page view logs in a SQL database like Postgres. The logs would go into a big distributed storage system and were difficult to process. But with the Citus extension to Postgres, you can now scale Postgres as well as any distributed storage system, while supporting distributed queries, indexes, and rollups. Storing raw events in the database suddenly makes a lot of sense.
The table definition for my page views table is given below. To deal with large write volumes, it is still helpful to minimise index maintenance overhead. We recommend using a BRIN index for looking up ranges of sequence IDs during aggregation. A BRIN index takes very little storage space and is cheap to maintain in this case.
CREATE TABLE page_views ( site_id int, path text, client_ip inet, view_time timestamptz default now(), view_id bigserial ); -- Allow fast lookups of ranges of sequence IDs CREATE INDEX view_id_idx ON page_views USING BRIN (view_id); -- Citus only: distribute the table by site ID SELECT create_distributed_table('page_views', 'site_id');
I also created a rollup table to keep track of the the number of views per page for a particular minute. Once populated, I can run SQL queries on the table to aggregate further.
CREATE TABLE page_views_1min ( site_id int, path text, period_start timestamptz, view_count bigint, primary key (site_id, path, period_start) ); -- Citus only: distribute the table by site ID SELECT create_distributed_table('page_views_1min', 'site_id'); -- Add the 1-minute rollup to the rollups table INSERT INTO rollups (name, event_table_name, event_id_sequence_name) VALUES ('page_views_1min_rollup', 'page_views', 'page_views_view_id_seq');
The final step is to define a function for incrementally aggregating the page views using INSERT..SELECT..ON CONFLICT.. and the
incremental_rollup_window to select a batch of new, unaggregated events:
CREATE OR REPLACE FUNCTION do_page_view_aggregation(OUT start_id bigint, OUT end_id bigint) RETURNS record LANGUAGE plpgsql AS $function$ BEGIN /* determine which page views we can safely aggregate */ SELECT window_start, window_end INTO start_id, end_id FROM incremental_rollup_window('page_views_1min_rollup'); /* exit early if there are no new page views to aggregate */ IF start_id > end_id THEN RETURN; END IF; /* aggregate the page views, merge results if the entry already exists */ INSERT INTO page_views_1min (site_id, path, period_start, view_count) SELECT site_id, path, date_trunc('minute', view_time), count(*) AS view_count FROM page_views WHERE view_id BETWEEN start_id AND end_id GROUP BY site_id, path, date_trunc('minute', view_time) ON CONFLICT (site_id, path, period_start) DO UPDATE SET view_count = page_views_1min.view_count + EXCLUDED.view_count; END; $function$;
After inserting into the page_views table, the aggregation can be updated by periodically running:
SELECT * FROM do_page_view_aggregation();
By running the
do_page_view_aggregation function frequently, you can keep the rollup table up-to-date within seconds of the raw events table. You can also safely load older page view data with a
view_time in the past, because these records will still have higher sequence numbers.
If you’re building a dashboard that provides real-time insights into page views, you could actually run queries directly on the raw event data. Citus will parallelise the query at different levels to achieve high performance.
The following query gets the number of page views for an entire site per minute for the last 30 minutes, which takes between 800-900ms on Citus with 1 billion rows loaded.
SELECT date_trunc('minute', view_time) period_start, count(*) FROM page_views WHERE site_id = 2 AND view_time >= '2018-06-07 08:54:00' GROUP BY period_start ORDER BY period_start; period_start | sum ------------------------+------- 2018-06-07 08:54:00+00 | 36482 2018-06-07 08:55:00+00 | 51272 2018-06-07 08:56:00+00 | 55216 2018-06-07 08:57:00+00 | 74936 2018-06-07 08:58:00+00 | 15776 … (30 rows) Time: 869.478 ms
This may actually be fast enough to power a single user dashboard, but if there are multiple users then the query uses ways too much raw CPU time. Fortunately, the equivalent query on the rollup table is more than 100x faster because the table is smaller and indexed:
citus=# SELECT period_start, sum(view_count) FROM page_views_1min WHERE site_id = 2 AND period_start >= '2018-06-07 08:54:00' GROUP BY period_start ORDER BY period_start; period_start | sum ------------------------+------- 2018-06-07 08:54:00+00 | 36482 2018-06-07 08:55:00+00 | 51272 2018-06-07 08:56:00+00 | 55216 2018-06-07 08:57:00+00 | 74936 2018-06-07 08:58:00+00 | 15776 ... (30 rows) Time: 5.473 ms
It’s clear that when we want to support a larger number of users, rollup tables are the way to go.
To test the performance of the data pipeline we randomly generate page view data for 1,000 sites and 100,000 pages. We compared the performance of a distributed Citus Cloud formation (4*r4.4xlarge) against a single Postgres node with equivalent hardware (r4.16xlarge RDS).
We also tried using Aurora, but since it runs an older version of Postgres the
pg_sequence_last_value function was unavailable.
We loaded 1 billion rows into the
page_views table using the COPY command over 4 connections in batches of 1 million rows. Below are the average data loading speeds.
To actually be able to process 1 million rows per second, it’s important for the aggregation process to keep up with the stream of data, while it is being loaded.
During the data load, we ran
SELECT * FROM do_page_view_aggregation() in a loop to update the
page_views_1min table. Citus parallelises the aggregation across all the cores in the cluster and is easily able to keep up with the COPY stream. In contrast, single-node Postgres does not parallelise INSERT...SELECT commands and could not keep up with its own ingest speed.
On Citus, every individual run took around 10 seconds, which means that the
page_views_1min table was never more than 10 seconds behind on the
page_views table. Whereas with a single Postgres node, the aggregation could not keep up, so it started taking arbitrarily long (>10 minutes).
Being able to express your aggregations in SQL and using indexes that keep your aggregations performant are both invaluable in doing real-time analytics on large data streams. While a single Postgres server cannot always keep up with large data streams, Citus transforms Postgres into a distributed database and enables you to scale out across multiple cores, and process over a million rows per second.