How Citus works (a look at dynamic executors)

Written by Craig Kerstiens
September 15, 2017

Update: In 2020, our Citus open source team rolled out a single query executor that can handle different types of Postgres workloads and solves all the shortcomings of the real-time and router executor in a single unified code path: the Citus adaptive executor. The Citus adaptive executor uses a dynamic pool of connections to each worker node to execute Postgres queries on the shards (SQL tasks)—and replaces the previous real-time and router executors discussed in this blog post. You can learn all about the Citus adaptive executor in Marco Slot’s newer blog post: How the Citus distributed query executor adapts to your Postgres workload.

In the beginning there was Postgres

We love Postgres at Citus. And rather than create a newfangled database from scratch, we implemented Citus as an extension to Postgres. We've talked a lot on our blog here about you can leverage Citus, about key use cases, and different data models and sharding approaches. But we haven’t spent a lot of time explaining how Citus works. So if you want to dive deeper into how Citus works, here we're going to walk through how Citus shards the data all the way through to how the executors run queries.

Distributing data within Citus

Citus gets its benefits from sharding your data which allows us to split the data across multiple physical nodes. When your tables are significantly smaller due to sharding your indexes are smaller, vacuum runs faster, everything works like it did when your database was smaller and easier to manage.

To setup your initial shards you first create a table, then you tell Citus to shard it with a function:

CREATE TABLE events (id serial, data jsonb);
SELECT create_distributed_table('events', 'id');

When you run the create_distributed_table function we then create a number of other tables under the covers and spread those out across the other nodes within your Citus cluster:

events_102008
events_102009
events_102010
events_102011
...
events_102039

Most of the time you never need to know the names of these tables, look at them, or even be aware that they exist. Once you've done this initial setup Citus then starts to kick in for all new queries that come in. Let's first look at INSERT then we can move on to SELECT queries and see how they're handled.

Getting data in

When you insert data to Citus you’re inserting directly into Postgres. As an extension Citus on the fly determines how to route that insert. In order to do this we first get a hash value for the column you sharded on. We use the internal Postgres hash functions for this, so for example an event with id of 1 has a hash value of -1905060026.

The hash value is the first part of the equation, the second is that we keep metadata tables that know the ranges of hash values and where each is stored. To find this we can execute the following query:

SELECT *
FROM pg_dist_shard;

 logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
 events       |  102008 | t            | -2147483648   | -2013265921
 events       |  102009 | t            | -2013265920   | -1879048193
 events       |  102010 | t            | -1879048192   | -1744830465

And in the example of inserting with an event id of 1 the insert will get rewritten to insert to shardid 102009.

It's important to note that all this is done on the fly as data comes in. You don't have to think about it, you don't have to do extra work in your application, it just works.

When it comes to reading data things get a little more fun.

Querying your data

When you issue a query to Citus you could be doing any number of operations. You could be:

  • Querying data that is a standard Postgres table on the coordinator
  • Getting data that lives in a single shard
  • Aggregating across shards

Depending on which of the above you're looking to do Citus executes the appropriate action. If you're running a query and the table has not yet been distributed then that query runs against the coordinator. If the query does run against a table that has been distributed we check two things on the fly:

  1. Does it target a single node?
  2. Does it do an aggregation across nodes that we can parallelize?

Depending on which of the above paths the query follows we then invoke a different executor.

Router executor

The router executor is the one that runs when you're targeting a single node within Citus. With it we re-write the table that you're targeting from events in the above example to events_102009 and send it to the appropriate node. With this type of query we're able to pushdown the SQL so it operates like a single node Postgres instance, but one that you can keep scaling out horizontally.

The router executor is also capable of realizing when you're targeting a single node that involves multiple tables sharded on the same value, also known as co-located tables. When you’re querying say for the same tenant you can join on any tables sharded on that same value as long as you’re explicit on your join of tenant_id.

Real-time executor

Now onto the other executor type. The real-time executor kicks in when you have some query that intentionally spans across nodes. It can kick-in in a number of cases, but when you really want to use it is when you're trying to parallelize a query to get some better performance. Let's look at a real-world example to get a better idea of this.

SELECT count(*)
FROM events

In this pretty simple query we just want to get a count of how many events we have within our system. The real-time executor is going to realize that you're 1. querying all nodes and then 2. looking to do an aggregation.

The result is that Citus will build a list of all the shards it needs to query: events_102008, events_102009, events_102010 etc. It will then run 32 queries, one against each shard:

SELECT count(*)
FROM events_102008
SELECT count(*)
FROM events_102009
SELECT count(*)
FROM events_102010

When the coordinator gets the results from the pending 32 queries it then does the final aggregation on the coordinator. Citus can handle much more than simple count (*) as well. For aggregations we introspect what you’re looking to do and split up the query to the appropriate map reduce operation. For example for avg we take the 32 sums added together and then divide the sum of 32 counts–without you having to think about it.

Even though it feels magical, it can be understood

While we want your database to scale more simply than ever before, we don’t want you to get the impression that we’ve magically changed the logic of distributed systems. Instead with Citus we’ve followed a similar process to building it as how you might on your own if you manually sharded your database, you just don’t have to spend months or years of engineering to accomplish it now. We hope you’ve found this look under the hood useful, and if you’re curious to learn drop us a note and we’d be happy to setup a demo or answer any questions you may have.

If you are on Postgres and need to continue scaling consider giving Citus a try before you make a heavy investment in a new database or manually sharding yourself. You can get started with Citus open source, or quickly provision a Citus database cluster in the cloud. (Update in October 2022: Citus is available as a managed service in Azure Cosmos DB for PostgreSQL. If you want to try Citus on Azure, you can get started here.)

Craig Kerstiens

Written by Craig Kerstiens

Former Head of Cloud at Citus Data. Ran product at Heroku Postgres. Countless conference talks on Postgres & Citus. Loves bbq and football.