Distributed Execution of Subqueries and CTEs in Citus

Written by Marco Slot
March 9, 2018

The latest release of the Citus database brings a number of exciting improvements for analytical queries across all the data and for real-time analytics applications. Citus already offered full SQL support on distributed tables for single-tenant queries and support for advanced subqueries that can be distributed ("pushed down") to the shards. With Citus 7.2, you can also use CTEs (common table expressions), set operations, and most subqueries thanks to a new technique we call "recursive planning".

Recursive planning looks for CTEs and subqueries that cannot be distributed along with the rest of the query because their results first need to be merged in one place. To generate a (distributed) plan for executing these subqueries, the internal APIs in Postgres allow us to do something mind-blowingly simple: We recursively call the Postgres planner on the subquery, and we can push the results back into the Citus database cluster. We then get a multi-stage plan that can be efficiently executed in a distributed way. As a result, Citus is now getting much closer to full SQL support for queries across all shards, in a way that’s fast and scalable.

In this post, we'll take a deeper dive into how the distributed query planner in Citus handles subqueries—both subqueries that can be distributed in a single round, and multi-stage queries that use the new recursive planning feature.

Pushing down subqueries that join by distribution column

Citus divides tables into shards, which are regular tables distributed over any number of worker nodes. When running an analytical query, Citus first checks if the query can be answered in a single round of executing a SQL query on all the shards in parallel, and then merging the results on the coordinator.

For example, a SELECT count(*) would be computed by taking the count(*) on each shard and then summing the results on the coordinator. Internally, the Citus planner builds a multi-relational algebra tree for the query and then optimises the query tree by "pushing down" computation to the data.

As it turns out, this method of planning can also be applied to very advanced joins and subqueries, as long as they join on the distribution column. For example, let’s take a slight variant of the funnel query in Heap’s blog post on lateral joins for finding the number of users who entered their credit card number within 2 weeks after first viewing the homepage.

SELECT create_distributed_table('event', 'user_id');

SELECT 
  sum(view_homepage) AS viewed_homepage,
  sum(enter_credit_card) AS entered_credit_card
FROM (
  -- Get the first time each user viewed the homepage.
  SELECT
    1 AS view_homepage
  FROM event
  WHERE
    data->>'type' = 'view_homepage'
  GROUP BY user_id
) e1 LEFT JOIN LATERAL (
  SELECT
    1 AS enter_credit_card                                                                                                                                      
  FROM event
  WHERE
    user_id = e1.user_id AND
    data->>'type' = 'enter_credit_card' AND
    time BETWEEN view_homepage_time AND (view_homepage_time + 1000*60*60*24*14)
  GROUP BY user_id                                                                                                                                                                 
) e2 ON true;

Even though this is a very complex SQL query, Citus can execute the query in a single round because it recognises that a) each subquery returns tuples grouped by user ID—the distribution column— and b) the two subqueries join by the distribution column. Therefore, the query can be answered round by “pushing down” the subqueries, the join, and the partial sum to each shard. This means that Postgres does all the heavy lifting—in parallel—and Citus merges the results (i.e. sums the sums) on the coordinator.

This makes Citus one of the only distributed databases that supports distributed lateral joins, and it's one of the reasons Heap uses Citus.

Adding a touch of reference tables

Reference tables are tables that are replicated to all nodes in the Citus cluster. This means that any shard can be joined with a reference table. Most joins between a distributed table and a reference table can be safely pushed down, including joins that don’t include the distribution column and even non-equi joins. An inner join between a shard and the reference table always returns a strict subset of the overall join result. There are some minor caveats around outer joins, but most types of joins, including spatial joins, are supported when joining with a reference table.

We recently made a lot of improvements for using reference tables in subqueries and then started wondering: Could we apply the same logic to SQL functions? And then: What if we had a function that could read the result of a CTE?

Recursively planning CTEs and Subqueries

Not all subqueries are supported through query pushdown, since the results may need to be merged (e.g. a subquery that computes an aggregate across all the data), but at some point we realised that the unsupported subqueries and CTEs were usually queries that Citus could execute by themselves.

Postgres has a very modular planner and executor, which makes it possible to take part of the query tree and run it through the planner function. At execution time, we can run the query in its own execution context ("Portal") and send the result wherever we want. In this case we send the results to a file on each worker (we used a file instead of a table to be able to use it on read-only follower formations). During planning, we also replace any references to the replaced CTE or subquery that cannot be pushed down with a function that reads from the file.

For example, if you load the github events data into a distributed table then you can now run queries such as the following, which gets the latest commit for each of the 5 most active postgres committers:

WITH postgres_commits AS (                                                                                                                                                 
  SELECT                                                                                                                                                      
    created_at, jsonb_array_elements(payload->'commits') AS comm 
  FROM
    github.events
  WHERE
    repo->>'name' = 'postgres/postgres' AND payload->>'ref' = 'refs/heads/master'
),
commits_by_author AS (
  SELECT
    created_at, comm->'author'->>'name' AS author, comm->>'message' AS message
  FROM
    postgres_commits
),
top_contributors AS (                                                                                                                                         
  SELECT                                                                                                                                                      
    author, count(*)
  FROM
    commits_by_author
  GROUP BY 1 ORDER BY 2 DESC LIMIT 5
)
SELECT
  author, created_at, string_agg(message,'\n') AS latest_messages
FROM
  commits_by_author c JOIN top_contributors USING (author)
WHERE
  created_at = (SELECT max(created_at) FROM commits_by_author WHERE author = c.author)
GROUP BY
  author, created_at, top_contributors.count
ORDER BY
  top_contributors.count DESC;

To plan this query, Citus recursively calls the postgres planner for each CTE, the first one is:

-- postgres_commits CTE is planned as a normal distributed query
  SELECT                                                                                                                                                      
    created_at, jsonb_array_elements(payload->'commits') AS comm 
  FROM
    github.events
  WHERE
    repo->>'name' = 'postgres/postgres' AND payload->>'ref' = 'refs/heads/master'

Interestingly, Citus itself will intercept the query in the recursively called planner hook and plan this query in a distributed way.

The planner also replaces all references to the CTE with a call to the read_intermediate_result function, which will read the file that we place on the workers:

-- commits_by_author query before recursive planning
  SELECT
    created_at,
    comm->'author'->>'name' AS author, comm->>'message' AS message
  FROM
    postgres_commits

-- internally-generated commits_by_author query after recursive planning
-- CTE is replaced by a subquery on the read_intermediate_result function
  SELECT   
    created_at,
    comm->'author'->>'name' AS author, comm->>'message' AS message
  FROM
    (SELECT
       res.created_at, res.comm
     FROM
       read_intermediate_result('1_1') AS res(created_at timestamptz, comm jsonb)
    ) postgres_commits

An interesting thing to note is that after replacing the CTE reference, the (sub)query does not have any references to a distributed table. It therefore gets treated similarly to single-tenant queries, which already have full SQL support without any additional steps. Another interesting thing is that the subquery can be anything that postgres can plan, including queries on local tables or functions. This means you can now even query a local table in a CTE and then join it with a distributed table.

At the end of planning, Citus has a list of plans, one for each CTE or subquery that requires a merge step. At execution time, Citus executes these plans one by one through the postgres, piping the results back into the workers nodes, and then uses the results in the next step(s) as if it was a reference table.

Dynamic replanning for intermediate results

The Postgres planner often has trouble estimating the size of intermediate results, such as the number of rows returned by a CTE. This can lead to suboptimal query plans that use too much CPU time (e.g. nested loops), but Citus has one last trick up its sleeve: "dynamic replanning". A worker node doesn’t get a query on an intermediate result until that result is actually available. The Citus planner hooks can therefore tell the postgres planner exactly how many rows the result file contains, such that postgres will make the perfect execution plan for the query on the shard.

Fully scalable SQL with Citus

As always, performance and scalability are our top priorities, and this is often driven by the needs of Citus customers, some of whom have up to a petabyte of data. In improving Citus we could have quickly achieved full SQL support by pulling all needed information to the coordinator, but doing so would have been little improvement over single-node Postgres.

By leveraging Postgres internal APIs and distributing as much of the work as possible to the Citus worker nodes, we are able to deliver some impressive performance. With the Citus approach to scaling out Postgres, you get the SQL you always wanted, the rich feature set of Postgres, and the scale that typically comes along with NoSQL databases—and best of all it's still performant.

If you have any questions on how the recursive planning or other internals of the Citus database work, join the conversation with our engineers in our slack channel.

Marco Slot

Written by Marco Slot

Former lead engineer for the Citus database engine at Microsoft. Speaker at Postgres Conf EU, PostgresOpen, pgDay Paris, Hello World, SIGMOD, & lots of meetups. Talk selection team member for Citus Con: An Event for Postgres. PhD in distributed systems. Loves mountain hiking.

@marcoslot marcocitus