Interactive Analytics on GitHub Data using PostgreSQL with Citus

Written by Marco Slot
March 23, 2016

With the Citus extension, you can use PostgreSQL to build applications that interactively query large data sets, for example analytical dashboards. At the same time, you can keep adding new data at high rates and use PostgreSQL's powerful indexing features. This blog post gives an example of how to use Citus to load ~400GB of data from the GitHub Archive and query tens of millions of events in milliseconds. For a full demonstration check out the video at the end of this post.

We set up a cluster with 21 m3.2xlarge instances (20 workers and 1 master) on EC2 using a CloudFormation template. On the master node, we created a table for the GitHub data and added several indexes to allow fast look-ups by type, user and repository. Since repo and actor are represented by JSONB objects, we can use GIN indexes to index them.

CREATE TABLE github_events (
  event_id bigint,
  event_type text,
  event_public boolean,
  repo_id bigint,
  payload jsonb,
  repo jsonb, actor jsonb,
  org jsonb,
  created_at timestamp
);

CREATE INDEX ON github_events (event_type);
CREATE INDEX ON github_events USING GIN (actor jsonb_path_ops);
CREATE INDEX ON github_events USING GIN (repo jsonb_path_ops);

To turn a regular PostgreSQL table into a distributed table, Citus provides a master_create_distributed_table function, which lets you specify a distribution column and distribution method. Once a table is distributed, data that is added to the table goes into shards, which represent a part of the data that is stored and replicated in regular tables on the worker nodes. For the distribution method, we chose 'append', which lets you append data directly to a new or existing shard. Citus keeps track of the minimum and maximum value in the the distribution column for each shard to optimize distributed queries.

SELECT master_create_distributed_table('github_events', 'created_at', 'append');

A common way to load data into an append-distributed table is to first load data into a staging table and then append the staging table to a shard. This gives you full control over the way the data is sharded. For the GitHub data, we used a single shard per day, which we control using a get_date_shard function on the master. The function looks for an existing shard for a date in the Citus metadata and otherwise creates a new one. A benefit of this approach is that it keeps the number of shards small, which lowers the overhead when querying longer time-periods. A drawback is that a single query can never use more cores than the number of days that are queried. 

To load data into the distributed table, we first need to download and pre-process the GitHub data, which is in compressed JSON format. Fortunately, we can do the pre-processing on PostgreSQL itself, in parallel across all the worker nodes. To achieve this, we define a load_github_events function, which downloads a day of data directly from data.githubarchive.org, decompresses it, filters out rows that cannot be parsed, copies the data into a temporary table with a single JSONB column, converts it into the format of the distributed table, and puts the result in a staging table, by running commands like the following:

CREATE TEMPORARY TABLE input (data jsonb);
COPY input FROM PROGRAM 'curl -s http://data.githubarchive.org/2016-01-01-{0..23}.json.gz | zcat | grep -v \\u0000' CSV QUOTE e'\x01' DELIMITER e'\x02';
CREATE TABLE stage_1 AS SELECT
    (data->>'id')::bigint AS event_id,
    (data->>'type')::text AS event_type,
    (data->>'public')::boolean AS event_public,
    (data->'repo'->>'id')::bigint AS repo_id,
    (data->'payload') AS payload,
    (data->'repo') AS repo,
    (data->'actor') AS actor,
    (data->'org') AS org,
    (data->>'created_at')::timestamp AS created_at FROM input;

The load_github_events function needs to be created on the worker nodes. My favourite way of running commands on all worker nodes is using xargs, which even lets you parallelize the commands using the -P argument:

psql -c "SELECT * FROM master_get_active_worker_nodes()" -tA -F" " | \
xargs -n 2 -P 20 sh -c 'psql -h $0 -p $1 -f load_github_events.sql'

The load_github_events script puts the different pieces together. For a given date it selects a shard, loads the data into a staging table on one of the replicas of the shards, and then appends the staging table to the shard (one of the workers can just copy the staging table locally). To run it for a range of dates in parallel, we can again use xargs:

psql -c "SELECT d::date FROM generate_series(timestamp '2015-01-01 00:00:00', timestamp '2016-03-07 00:00:00', '1 day') d" -tA | \
xargs -n 1 -P 80 sh -c 'load_github_events $0 0 23'

When we ran this command, it took 50 minutes to load over a year of data. As new data becomes available, we can also keep calling the load_github_events function for the current hour and it will be appended to the right shard. 

At this point, the distributed table contains around 400GB of data and over 290 million rows. When querying the distributed table, Citus queries each shard (day) in parallel. An example query is given below. It sums the number of commits per month from 27.5 million push events (in JSON format) in ~1.8 seconds using 67 cores (1 per day). Compared to a regular PostgreSQL server running on i2.8xlarge, this query runs over 50x faster on the cluster at only ~60% higher cost.

SELECT date_trunc('month', created_at) AS month,
       sum((payload->>'distinct_size')::int) AS num_commits
FROM   github_events
WHERE  event_type = 'PushEvent' AND created_at >= date '2016-01-01'
GROUP BY month
ORDER BY month;
        month        | num_commits
---------------------+-------------
 2016-01-01 00:00:00 |    40185719
 2016-02-01 00:00:00 |    41140176
 2016-03-01 00:00:00 |     9963565
(3 rows)

Time: 1862.905 ms

When specifying selective filters and using the indexes, queries can be much faster still and using Citus gives the advantage that the data is always in memory since there is 20*30GB=600GB of memory. 

SELECT created_at::date AS date,
       sum((payload->>'distinct_size')::int) AS num_commits
FROM   github_events
WHERE  event_type = 'PushEvent' AND
       created_at >= date '2016-03-01' AND
       repo @> '{"name":"postgres/postgres"}' AND
       payload @> '{"ref":"refs/heads/master"}'
GROUP BY date
ORDER BY date;
    date    | num_commits
------------+-------------
 2016-03-01 |          10
 2016-03-02 |           8
 2016-03-03 |           8
 2016-03-04 |          16
 2016-03-05 |           2
 2016-03-06 |           5
 2016-03-07 |           8
(7 rows)

Time: 35.118 ms

What's significant about these results is that the query times are low enough to build interactive applications on large-scale, real-time data, while maintaining a lot of the flexibility and powerful features of PostgreSQL.

A full demonstration of this set-up is available as a video, including a comparison to a large PostgreSQL server without Citus:

YouTube video still: Interactive Analytics on Github Data using PostgreSQL with Citus
Marco Slot

Written by Marco Slot

Former lead engineer for the Citus database engine at Microsoft. Speaker at Postgres Conf EU, PostgresOpen, pgDay Paris, Hello World, SIGMOD, & lots of meetups. Talk selection team member for Citus Con: An Event for Postgres. PhD in distributed systems. Loves mountain hiking.

@marcoslot marcocitus