POSETTE 2024 is a wrap! 💯 Thanks for joining the fun! Missed it? Watch all 42 talks online 🍿
POSETTE 2024 is a wrap! 💯 Thanks for joining the fun! Missed it? Watch all 42 talks online 🍿
Written by Craig Kerstiens
December 27, 2017
Citus scales out Postgres for a number of different use cases, both as a system of record and as a system of engagement. One use case we're seeing implemented a lot these days: using the Citus database to power customer-facing real-time analytics dashboards, even when dealing with billions of events per day. Dashboards and pipelines are easy to handle when you’re at 10 GB of data, as you grow even basic operations like a count of unique users require non-trivial engineering work to get performing well.
Citus is a good fit for these types of event dashboards because of Citus’ ability to ingest large amounts of data, to perform rollups concurrently, to mix both raw unrolled-up data with pre-aggregated data, and finally to support a large number of concurrent users. Adding all these capabilities together, the Citus extension to Postgres works well for end users where a data warehouse may not work nearly as well. We've talked some here about various parts of building a real-time customer facing dashboard, but today we thought we'd go one step further and give you a guide for doing it end to end.
The first step is data ingestion. In any POC we generally start with single record inserts. This is usually the quickest item to put in place and we can start ingesting data easily at tens of thousands of events per second. We generally opt for single row inserts as the quickest thing to setup, but before production we move to a micro-batching approach.
With the \copy
Postgres bulk loading utility, we're able to ingest millions of events per second if needed. Copy is fully transactional and the load is distributed across all nodes with Citus, this makes your Citus coordinator node less of a bottleneck that you would expect.
Most event pipelines we've observed have some upstream process already handling events, such as Kafka or Kinesis, which makes batching easy to put in place. As for the batching process itself, you can opt to do this either on a time basis, say minutely batches, or every x records (this could also be every 5 minutes or even hourly depending on your requirements.) Your batches don’t have to be hundreds of thousands of events, even something like every 1,000 records can give you a nice performance boost as you can have multiple \copy
processes running in parallel.
Your raw event table will vary based on your use case, but there are some commonalities across most. In most all cases, you have the time of the event and some customer identifier associated with it. Typically there will be some categorization and details about the event, these details can be broken out as columns, or could also be contained within a JSONB datatype. For this example dashboard, we’ll use the following schema:
CREATE TABLE events(
id bigint,
event_time timestamptz,
customer_id bigint,
event_type varchar,
country varchar,
browser varchar,
device_id bigint,
session_id bigint
);
SELECT create_distributed_table('events','customer_id');
Once you've got some raw data coming in, we can now start rolling up data. To do this, we're going to first create several rollup tables, some for 5 minute intervals, some for hourly, and some for daily.
CREATE TABLE rollup_events_5min (
customer_id bigint,
event_type varchar,
country varchar,
browser varchar,
minute timestamptz,
event_count bigint,
device_distinct_count hll,
session_distinct_count hll
);
CREATE UNIQUE INDEX rollup_events_5min_unique_idx ON rollup_events_5min(customer_id,event_type,country,browser,minute);
SELECT create_distributed_table('rollup_events_5min','customer_id');
CREATE TABLE rollup_events_1hr (
customer_id bigint,
event_type varchar,
country varchar,
browser varchar,
hour timestamptz,
event_count bigint,
device_distinct_count hll,
session_distinct_count hll
);
CREATE UNIQUE INDEX rollup_events_1hr_unique_idx ON rollup_events_1hr(customer_id,event_type,country,browser,hour);
SELECT create_distributed_table('rollup_events_1hr','customer_id');
One thing you'll notice in our rollup tables is the use of the HLL (HyperLogLog) data type. HyperLogLog is a sketch algorithm that allows you to do operations over unique buckets. HyperLogLog makes it easy to find intersections, unions, etc. across various buckets, making it incredibly useful for the types of reports your dashboard may generate.
In the above example, we have chosen (customer_id, event_type, country, browser, minute/hour) as the dimensions on which we evaluate metrics such as event_count, device_distinct_count, session_distinct_count, etc. Based on your query workload and performance requirements, you can choose the dimensions that make sense for you. If needed, you can create multiple rollup tables to serve different query types (just don’t go too crazy with a table for every dimension). Ideally you should choose dimensions that you get suitable compression (>5-10x) compared to the raw tables. Based on our customers’ experiences, we have at times seen orders of magnitude in compression after rollups—up to 100x or 1000x.
For our rollup query, we're going to do an INSERT INTO... SELECT
which will run across all the nodes in our cluster and parallelize. Note here that both our raw and rollup tables are sharded on the same key. In this case the sharding key is the customer_id, which is more or less a proxy for customer/tenant id. Other granular columns can also be chosen as shard keys depending on use-case.
To perform our rollup, we’ll create the function:
CREATE OR REPLACE FUNCTION compute_rollups_every_5min(start_time timestamptz, end_time timestamptz) RETURNS void LANGUAGE PLPGSQL AS $function$
BEGIN
RAISE NOTICE 'Computing 5min rollups from % to % (excluded)', start_time, end_time;
RAISE NOTICE 'Aggregating data into 5 min rollup table';
INSERT INTO rollup_events_5min
SELECT customer_id,
event_type,
country,
browser,
date_trunc('seconds', (event_time - timestamptz 'epoch') / 300) * 300 + timestamptz 'epoch' AS minute,
count(*) as event_count,
hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count,
hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count
FROM events WHERE event_time >= start_time AND event_time <= end_time
GROUP BY
customer_id,
event_type,
country,
browser,
minute
ON CONFLICT (customer_id,event_type,country,browser,minute)
DO UPDATE
SET
event_count = rollup_events_5min.event_count + excluded.event_count,
device_distinct_count = rollup_events_5min.device_distinct_count || excluded.device_distinct_count,
session_distinct_count = rollup_events_5min.session_distinct_count || excluded.session_distinct_count;
RAISE NOTICE 'Aggregating/Upserting into 1 hr rollup table';
INSERT INTO rollup_events_1hr
SELECT customer_id,
event_type,
country,
browser,
date_trunc('hour', event_time) as hour,
count(*) as event_count,
hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count,
hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count
FROM events WHERE event_time >= start_time AND event_time <= end_time
GROUP BY
customer_id,
event_type,
country,
browser,
hour
ON CONFLICT (customer_id,event_type,country,browser,hour)
DO UPDATE
SET
event_count = rollup_events_1hr.event_count+excluded.event_count,
device_distinct_count = rollup_events_1hr.device_distinct_count || excluded.device_distinct_count,
session_distinct_count = rollup_events_1hr.session_distinct_count || excluded.session_distinct_count;
END;
$function$;
For rolling up the last 5 minutes of data we can now trigger our function:
SELECT compute_rollups_every_5min(now()-interval '5 minutes', now());
It's one option to schedule your own background job to run and perform the rollups. When using Citus, you can just as easily schedule a job directly in your database, since the database is doing all the heavy lifting. With pg_cron
you can schedule your rollup and call the function compute_rollups_every_5min to run every 5 minutes on top of the raw data.
With our system in place now ingesting data and performing rollups, we can get to the fun part of querying. Because all of our rollup tables are significantly smaller in size than the raw data you’ll see performant queries against them. And all the while Citus is now continually rolling up the raw data and storing it in an efficient data type (HyperLogLog) to let you compose all sorts of reports based on the data.
Let’s look at a few examples of queries you may want to run:
--Get me the total number of events and count of distinct devices in the last 5 minutes?
SELECT sum(event_count), hll_cardinality(hll_union_agg(device_distinct_count)) FROM rollup_events_5min where minute >=now()-interval '5 minutes' AND minute <=now() AND customer_id=1;
--Get me the count of distinct sessions over the last week?
SELECT sum(event_count), hll_cardinality(hll_union_agg(device_distinct_count)) FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '7 days' AND hour <=now() AND customer_id=1;
-- Get me the trend of my app usage in the last 2 days broken by hour
SELECT hour, sum(event_count) event_count, hll_cardinality(hll_union_agg(device_distinct_count)) device_count, hll_cardinality(hll_union_agg(session_distinct_count)) session_count FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '2 days' AND hour <=now() AND customer_id=1 GROUP BY hour;
Data is everywhere in this day and age, and key to being successful is being able to derive insights from your data. By making data available in real time, you’re able to create more value for your users. With the Citus extension to Postgres, you can both have real-time ingestion as well as real-time reporting with high concurrency without having to throw out SQL, which is the lingua franca when it comes to data.
If you want to dig in and learn more, you can read our use case guide on building real-time analytics dashboards. Or, if you have questions about how we can help, just let us know.