High performance distributed DML in Citus

Written by Marco Slot
July 25, 2018

One of the many unique abilities of SQL databases is to transform data using advanced SQL queries and joins in a transactional manner. Commands like UPDATE and DELETE are commonly used for manipulating individual rows, but they become truly powerful when you can use subqueries to determine which rows to modify and how to modify them. It allows you to implement batch processing operations in a thread-safe, transactional, scalable manner.

Citus recently added support for UPDATE/DELETE commands with subqueries that span across all the data. Together with the CTE infrastructure that we’ve introduced over the past few releases, this gives you a new set of powerful distributed data transformation commands. As always, we’ve made sure that queries are executed as quickly and efficiently as possible by spreading out the work to where the data is stored.

Let’s look at an example of how you can use UPDATE/DELETE with subqueries.

Batching data transformations using DML with subqueries

Let’s say we have a website where people can review movies. The review score is always shown alongside the title of movie, so we want looking up the score to be fast and the fastest way is to store it alongside the movie itself.

CREATE TABLE movies (
    movie_id bigint primary key,
    movie_name text not null,
    release_date date,
    movie_details jsonb,
    review_score double precision,
    review_count bigint not null default 0
);

SELECT create_distributed_table('movies', 'movie_id');

INSERT INTO
    movies (movie_id, movie_name, release_date)
VALUES
    (1, 'Star Wars: Episode I – The Phantom Menace', '1999-05-19'),
    (4, 'Star Wars: Episode IV - A New Hope', '1977-12-15');

It’s important to realise that updating the review score whenever a review is added is often a bad idea. An UPDATE takes a row-level lock, which blocks concurrent UPDATEs on the same row. If many people review a movie at the same time (e.g. when it was just released), then they will all have to wait for each other. Moreover, Postgres implements updates as a delete and an insert, which means Postgres will rewrite all the full details of the movie every time a review is added and the table will get bloated with deleted tuples that need to be vacuumed.

Fortunately, it’s usually not a requirement that the overall review score is always up-to-date. We can afford to wait at least a few minutes before updating the overall score. That allows us to process all new reviews in a bulk UPDATE, which avoids the problems mentioned above.

We’ll keep a separate table of reviews, which also allows us to store more details on the review. Inserting into a table is cheap and inserts never block each other.

CREATE TABLE movie_reviews (
    movie_id bigint not null,
    review_id bigserial,
    review_time timestamptz not null default now(),
    user_id bigint,
    score double precision not null,
    review_text text,
    PRIMARY KEY (review_id, movie_id)
);

SELECT create_distributed_table('movie_reviews', 'movie_id');

INSERT INTO
    movie_reviews (movie_id, user_id, score, review_text)
VALUES
    (4, 799, 5, 'Life-changing experience'),
    (1, 803, 2, 'My first Star Wars!'),
    (1, 799, 1, 'What did they do??');

Now we’ll update all the review scores in the movies table in bulk using an UPDATE with a subquery on the reviews table. You typically want to join the tables in the subquery with the table being updated on the distribution columns of the tables. That way, Citus will be able to parallelise the UPDATE across all cores in a single round and get speedups that are proportional to the number of cores, but even if you can’t join on the distribution column, Citus will find the most efficient way to distribute and run your query.

UPDATE
    movies
SET
    review_score = overall.score,
    review_count = overall.count
FROM (
    SELECT
        movie_id, avg(score) AS score, count(*)
    FROM
        movie_reviews
    GROUP BY movie_id) overall
WHERE
    overall.movie_id = movies.movie_id AND review_count <> overall.count;

By aggregating the reviews before updating the scores, we avoid re-writing the row for each review which can save a lot of write overhead and table bloat. By doing it in bulk, we also avoid concurrency issues that would occur when many transactions try to update the score separately.

Now when we query the movies table, we get the overall scores:

SELECT movie_name, release_date, review_score FROM movies;
                movie_name                  release_date  review_score
───────────────────────────────────────────┼──────────────┼──────────────
 Star Wars: Episode I  The Phantom Menace  1999-05-19             1.5
 Star Wars: Episode IV - A New Hope         1977-12-15               5
(2 rows)

We can run the bulk UPDATE periodically to keep the score up-to-date with new reviews. That way, looking up a review score is practically free when you are already looking up a movie in the database, and all the compute-heavy work is done in batches.

A downside of the approach above is that every review is counted every time we run the UPDATE, meaning the UPDATE will slow down as the number of reviews grows. A more efficient approach would be to incrementally compute the average.

Incremental batch computation using UPDATE with subqueries

We recently introduced a new way of efficiently doing incremental aggregation in Postgres and Citus. We can use that in combination with UPDATEs as well.

The following command computes the change in overall review score since the last time the command ran and leaves rows without new reviews untouched.

INSERT INTO rollups (name, event_table_name, event_id_sequence_name)
VALUES ('reviews_rollup', 'movie_reviews', 'movie_reviews_review_id_seq');

CREATE OR REPLACE FUNCTION do_reviews_aggregation(OUT start_id bigint, OUT end_id bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
BEGIN
    /* determine which reviews we can safely aggregate */
    SELECT window_start, window_end INTO start_id, end_id
    FROM incremental_rollup_window('reviews_rollup');

    /* exit early if there are no new reviews to aggregate */
    IF start_id > end_id THEN RETURN; END IF;

    UPDATE
      movies
    SET
      review_score = (Coalesce(review_score, 0) * review_count + increment.score) / (review_count + increment.count),
      review_count = review_count + increment.count
    FROM (
      SELECT
        movie_id, sum(score) AS score, count(*)
      FROM
        movie_reviews
      WHERE
        review_id BETWEEN start_id AND end_id
      GROUP BY movie_id
    ) increment
    WHERE
      increment.movie_id = movies.movie_id;
END;
$function$;

SELECT * FROM do_reviews_aggregation();

The entire command runs as a single distributed transaction. If anything goes wrong, all the changes will be rolled back meaning the review scores will remain unchanged and the pending reviews will be processed the next time the command runs along with new reviews. While the command is running, the application can continue adding new reviews to the movie_reviews table and those reviews will be aggregated on a subsequent run.

Benchmarking UPDATE with subquery performance: Citus vs. RDS vs. Aurora

We compared the performance of the UPDATE in Citus to single Postgres server, taking the biggest available server on RDS and Aurora (r4.16xlarge) and a Citus Cloud formation with the same number of cores and the same amount of memory across four r4.4xlarge nodes. To make it a bit more interesting, we used movie and review data from IMDB, expanded to have a separate row in movie_reviews for every review (around 850 million rows).

We first ran the UPDATE that aggregates across all review scores on each system. The Citus Cloud formation was 27x faster than RDS (Postgres 10) and 32x faster than Aurora (Postgres 9.6). Citus parallelises the UPDATE across all available cores and the system had 32 physical cores (64 vCPUs), hence these results are as expected. Still, even we were impressed that Citus managed to process over 100 million reviews per second.

UPDATE all review scores graph

We then generated an additional 10 million review scores and used the incremental UPDATE approach (do_reviews_aggregation) to incrementally update the scores. Unfortunately, we could not run this command on Aurora, since it relies on Postgres 10 features. We again saw a speedup of around 27x against RDS.

Incrementally UPDATE review scores graph

Long story short, distributed DML commands on Citus are very powerful and very, very fast.

Big Data Transformations using Citus

We took movie reviews as a playful example, but to be perfectly honest computing the average review score across all movie reviews is probably not the most challenging problem… but what if you’re building a movie recommendation system? Maybe you want to automatically tag movies based on reviews, or score them for different user profiles. Your application is usually not done once the easy problems are solved, and once you start solving the hard problems, a relational database that scales can be your biggest friend. The ability to transform your data using SQL, in a safe, transactional manner and at enormous scale is a superpower that might save you from having to learn and deploy many separate data processing systems.

Marco Slot

Written by Marco Slot

Former lead engineer for the Citus database engine at Microsoft. Speaker at Postgres Conf EU, PostgresOpen, pgDay Paris, Hello World, SIGMOD, & lots of meetups. Talk selection team member for Citus Con: An Event for Postgres. PhD in distributed systems. Loves mountain hiking.

@marcoslot marcocitus