Distribute PostgreSQL 17 with Citus 13

Written by Naisila Puka
February 6, 2025

The Citus 13.0 release is out and includes PostgreSQL 17.2 support! We know you’ve been waiting, and we’ve been hard at work adding features we believe will take your experience to the next level, focusing on bringing the Postgres 17 exciting improvements to you at distributed scale.

The Citus database is an open-source extension of Postgres that brings the power of Postgres to any scale, from a single node to a distributed database cluster. Since Citus is an extension, using Citus means you're also using Postgres, giving you direct access to the Postgres features. And the latest of such features came with Postgres 17 release! In addition, Citus 13 will be made available on the elastic clusters (preview) feature on Azure Database for PostgreSQL - Flexible Server, along with PostgreSQL 17 support, in the near future.

PostgreSQL 17 highlights include performance improvements in query execution for indexes, a revamped memory management system for vacuum, new monitoring and analysis features, expanded functionality for managing data in partitions, optimizer improvements, and enhancements for high-concurrency workloads. PostgreSQL 17 also expands on SQL syntax that benefits both new workloads and mission-critical systems, such as the addition of the SQL/JSON JSON_TABLE() command for developers, and the expansion of the MERGE command. For those of you who are interested in upgrading to Postgres 17 and scaling these new features of Postgres: you can upgrade to Citus 13.0!

Along with Postgres 17 support, Citus 13.0 also fixes important bugs, and we are happy to say that we had many community contributions here as well. These bugfixes focus on data integrity and correctness, crash and fault tolerance, and cluster management, all of which are critical for ensuring reliable operations and user confidence in a distributed PostgreSQL environment.

Let's take a closer look at what's new in Citus 13.0:

Postgres 17 support in Citus 13.0

Citus 13.0 introduces support for PostgreSQL 17. This means that just by enabling PG17.2 in Citus 13.0, all the query performance improvements directly reflect on the Citus distributed queries, and several optimizer improvements benefit queries in Citus out of the box! Among the many new features in PG 17, the following capabilities enabled in Citus 13.0 are especially noteworthy for Citus users:

To learn more about how you can use Citus 13.0 + PostgreSQL 17.2, as well as currently unsupported features and future work, you can consult the Citus 13.0 Updates page, which gives you detailed release notes.

JSON_TABLE() support in distributed queries

One of the most discussed features in Postgres 17 is the enhanced management of JSON data. The key addition here is the JSON_TABLE() function. JSON_TABLE() converts JSON data into a standard PostgreSQL table. This means that developers can leverage the full capabilities of SQL on data that was originally in JSON format, by getting data from complicated JSONs into a simpler relational view. The commit that adds basic JSON_TABLE() functionality explains that through JSON_TABLE() function, JSON data can be used like other tabular data, with capabilities such as sorting, filtering, joining with regular Postgres tables and usage in a FROM clause. Joining is specifically interesting for Citus: how can we perform a join between the JSON_TABLE() function and a Citus distributed table?

As you may know, Citus uses the sharding technique to distribute the tables, where each table is divided into smaller chunks called shards. Citus 13.0 supports JSON_TABLE() in a distributed query by treating it as a recurring tuple, i.e. an expression that gives the same set of results across multiple shards. This is useful for executing a JOIN between a distributed table and a table coming from the JSON_TABLE() function. The main point is that recurring tuples "recur" for each shard in a multi-shard query. For more technical details on this, check out Citus Technical Documentation on recurring tuples.

Let’s look at example distributed queries of how you can combine the JSON_TABLE() function with Citus distributed tables:

-- create, distribute and populate the table with a jsonb column
-- we will use that column as a regular postgres table through JSON_TABLE()
CREATE TABLE my_favorite_books(book_collection_id bigserial, jsonb_column jsonb);
SELECT create_distributed_table(' my_favorite_books ', 'book_collection_id');

-- these books will be inserted with the automatic book_collection_id of 1
INSERT INTO my_favorite_books (jsonb_column) VALUES (
'{ "favorites" : [
   { "kind" : "mystery", "books" : [
     { "title" : "The Count of Monte Cristo", "author" : "Alexandre Dumas"},
     { "title" : "Crime and Punishment", "author" : "Fyodor Dostoevsky" } ] },
   { "kind" : "drama", "books" : [
     { "title" : "Anna Karenina", "author" : "Leo Tolstoy" } ] },
   { "kind" : "poetry", "books" : [
     { "title" : "Masnavi", "author" : "Jalal al-Din Muhammad Rumi" } ] },
   { "kind" : "autobiography", "books" : [
     { "title" : "The Autobiography of Malcolm X", "author" : "Alex Haley" } ] }
  ] }');

-- these books will be inserted with the automatic book_collection_id of 2
INSERT INTO my_favorite_books (jsonb_column) VALUES (
'{ "favorites" : [
   { "kind" : "mystery", "books" : [
     { "title" : "To Kill a Mockingbird", "author" : "Harper Lee"},
     { "title" : "Our Mutual Friend", "author" : "Charles Dickens" } ] },
   { "kind" : "drama", "books" : [
     { "title" : "Pride and Prejudice", "author" : "Jane Austen" } ] },
   { "kind" : "poetry", "books" : [
     { "title" : "The Odyssey", "author" : "Homer" } ] },
   { "kind" : "autobiography", "books" : [
     { "title" : "The Diary of a Young Girl", "author" : "Anne Frank" } ] }
  ] }');

-- a simple router query, that outputs all the books under book_collection_id = 1
SELECT json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
   key FOR ORDINALITY, kind text PATH '$.kind',
   NESTED PATH '$.books[*]' COLUMNS (
     title text PATH '$.title', author text PATH '$.author'))) AS json_table_output
WHERE my_favorite_books. book_collection_id = 1
ORDER BY 1, 2, 3, 4;

 key |     kind      |             title              |           author
-----+---------------+--------------------------------+----------------------------
   1 | mystery       | The Count of Monte Cristo      | Alexandre Dumas
   1 | mystery       | Crime and Punishment           | Fyodor Dostoevsky
   2 | drama         | Anna Karenina                  | Leo Tolstoy
   3 | poetry        | Masnavi                        | Jalal al-Din Muhammad Rumi
   4 | autobiography | The Autobiography of Malcolm X | Alex Haley
(5 rows)

-- a simple multi-shard query, where we want to see all the books
SELECT json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
   key FOR ORDINALITY, kind text PATH '$.kind',
   NESTED PATH '$.books[*]' COLUMNS (
     title text PATH '$.title', author text PATH '$.author'))) AS json_table_output
ORDER BY 1, 2, 3, 4;

 key |     kind      |             title              |           author
-----+---------------+--------------------------------+----------------------------
   1 | mystery       | The Count of Monte Cristo      | Alexandre Dumas
   1 | mystery       | Crime and Punishment           | Fyodor Dostoevsky
   1 | mystery       | Our Mutual Friend              | Charles Dickens
   1 | mystery       | To Kill a Mockingbird          | Harper Lee
   2 | drama         | Anna Karenina                  | Leo Tolstoy
   2 | drama         | Pride and Prejudice            | Jane Austen
   3 | poetry        | Masnavi                        | Jalal al-Din Muhammad Rumi
   3 | poetry        | The Odyssey                    | Homer
   4 | autobiography | The Autobiography of Malcolm X | Alex Haley
   4 | autobiography | The Diary of a Young Girl      | Anne Frank
(10 rows)

-- more complex router query involving LATERAL and LIMIT
-- select two books under book_collection_id = 2
SELECT sub.*
FROM my_favorite_books,
lateral(SELECT * FROM JSON_TABLE (jsonb_column, '$.favorites[*]'
    COLUMNS (key FOR ORDINALITY, kind text PATH '$.kind',
        NESTED PATH '$.books[*]' COLUMNS
            (title text PATH '$.title', author text PATH '$.author')))
    AS json_table_output ORDER BY key DESC LIMIT 2) as sub
WHERE my_favorite_books.book_collection_id = 2;

 key |     kind      |           title           |   author
-----+---------------+---------------------------+------------
   4 | autobiography | The Diary of a Young Girl | Anne Frank
   3 | poetry        | The Odyssey               | Homer
(2 rows)

Furthermore, JSON_TABLE() can be on the inner part of an outer join, as well as in the outer part of a join as long as there is one distributed table (or even more). The limitations of using JSON_TABLE() in distributed queries are the same as the general limitations of the usage of recurring tuples in distributed queries. For more technical examples on usages of JSON_TABLE() in distributed queries, as well as the limitations, you can check out the Updates page.

Propagate "MERGE ... WHEN NOT MATCHED BY SOURCE" syntax

As you may know, the MERGE statement in SQL is used to perform INSERT, UPDATE, and DELETE operations on a target table based on the results of a join with a source table. This allows for efficient data synchronization between the target and source tables because it combines multiple operations into one.

PG15 added support for MERGE, with the syntax originally allowing only defining actions for rows that exist in the data source, but not in the target relation., i.e. WHEN NOT MATCHED BY TARGET.

As of PG17, one may use the MERGE command to operate on rows that exist in the target relation, but not in the data source, by using WHEN NOT MATCHED BY SOURCE. This is a fantastic addition and will greatly simplify various data loading and updating processes, because if a row in the target table being merged doesn’t exist in the source table, we can now perform any necessary actions on that row. Citus extended its already existing strategies employed for handling MERGE in a distributed environment to include this syntax as well. For more details, you can look at how Citus 12 supports MERGE.

Let’s see a simple example, similar to the tests in Postgres, on how to make use of MERGE ... WHEN NOT MATCHED BY SOURCE with Citus managed tables:

-- create and distribute the target and source tables
CREATE TABLE target_table (tid integer, balance float, val text);
CREATE TABLE source_table (sid integer, delta float);
SELECT create_distributed_table('target_table', 'tid');
SELECT create_distributed_table('source_table', 'sid');

-- populate the tables
INSERT INTO target_table SELECT id, id * 100, 'initial' FROM generate_series(1,5,2) AS id;
INSERT INTO source_table SELECT id, id * 10 FROM generate_series(1,4) AS id;

-- Use WHEN NOT MATCHED BY SOURCE
MERGE INTO target_table t
    USING source_table s
    ON t.tid = s.sid AND tid = 1
    WHEN MATCHED THEN
        UPDATE SET balance = balance + delta, val = val || ' updated by merge'
    WHEN NOT MATCHED BY TARGET THEN
        INSERT VALUES (sid, delta, 'inserted by merge')
    WHEN NOT MATCHED BY SOURCE THEN
        UPDATE SET val = val || ' not matched by source';

-- see the updated distributed target table
SELECT * FROM target_table ORDER BY tid;

 tid | balance |              val
-----+---------+-------------------------------
   1 |     110 | initial updated by merge
   2 |      20 | inserted by merge
   3 |      30 | inserted by merge
   3 |     300 | initial not matched by source
   4 |      40 | inserted by merge
   5 |     500 | initial not matched by source
(6 rows)

Expanded functionality on distributed partitioned tables

PG17 has expanded functionality for managing data in partitions. Now you can specify an access method for partitioned tables. Also, you can add exclusion constraints on partitions. Another great addition is supporting identity columns in partitioned tables. Citus has extended the distributed tables capabilities to include these 3 amazing functionalities for distributed partitioned tables as well! Let’s dive in a bit more detail below:

  1. Citus 13.0 allows specifying an access method for distributed partitioned tables: After specifying a table access method via CREATE TABLE ... USING for a partitioned table, you can then distribute it through the Citus signature function: create_distributed_table(). From that point forward, this table will be managed by Citus with the specified access method. Also, Citus propagates ALTER TABLE ... SET ACCESS METHOD to all the nodes in the cluster, allowing to not only specify the access method for the distributed partitioned table, but also modify it.
  2. Adds support for identity columns in distributed partitioned tables: Citus on Postgres 17 allows specifying generated identity columns for Citus managed tables by maintaining generated identity logic while propagating distributed partitioned table DDL to all the cluster nodes. For more details, check out how Citus 11.2 introduced identity column support for Citus managed tables.
  3. Allows exclusion constraints on distributed partitioned tables: Similarly, Citus now allows adding an exclusion constraint because it seamlessly propagates the ALTER TABLE distributed_partitioned_table ADD CONSTRAINT ... SQL command to all the nodes in the cluster.

Let’s demonstrate all of the above with examples below:

-- let's say we are at node 0
-- create a partitioned table
-- specify access method as columnar, use generated identity column
CREATE TABLE dist_partitioned_table
( id_test bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
n int )
PARTITION BY RANGE (n)
USING columnar;

-- create a partition for the table
CREATE TABLE pt_1 PARTITION OF dist_partitioned_table FOR VALUES FROM (1) TO (50);

-- distribute the table, making it a distributed partitioned table
SELECT create_distributed_table('dist_partitioned_table', 'id_test');

-- create another partition of the table, it will be automatically distributed
CREATE TABLE pt_2 PARTITION OF dist_partitioned_table FOR VALUES FROM (50) TO (1000);

-- Altering an access method for a partitioned table lets the value be used
-- for all future partitions created under it.
-- Existing partitions are not modified.
ALTER TABLE dist_partitioned_table SET ACCESS METHOD heap;

-- Add an exclusion constraint, which will be part of current and future partitions as well
ALTER TABLE dist_partitioned_table ADD EXCLUDE USING btree (id_test WITH =, n WITH =);

-- Attaching a partition inherits the identity column from the parent table
CREATE TABLE pt_3 (id_test bigint not null, n int);
ALTER TABLE dist_partitioned_table ATTACH PARTITION pt_3 FOR VALUES FROM (1000) TO (2000);

-- verify that the identity column is inherited in all children
SELECT attrelid::regclass, attname, attidentity FROM pg_attribute
WHERE attname = 'id_test' AND attidentity = 'd' ORDER BY 1;

        attrelid        | attname | attidentity
------------------------+---------+-------------
 dist_partitioned_table | id_test | d
 pt_1                   | id_test | d
 pt_2                   | id_test | d
 pt_3                   | id_test | d
(4 rows)

-- the parent table and the new partition have the altered access method "heap"
-- whereas the old two partitions have the original access method "columnar"
SELECT relname, amname FROM pg_class c LEFT JOIN pg_am am ON (c.relam = am.oid)
WHERE relname IN ('dist_partitioned_table', 'pt_1', 'pt_2', 'pt_3') ORDER BY relname;

        relname         |  amname
------------------------+----------
 dist_partitioned_table | heap
 pt_1                   | columnar
 pt_2                   | columnar
 pt_3                   | heap
(4 rows)

-- verify that the distributed partitioned table and its distributed partitions
-- have exclude constraints
SELECT conname FROM pg_constraint
WHERE conname LIKE '%id_test%' ORDER BY 1;

           conname
------------------------
dist_partitioned_table_id_test_n_excl
 pt_1_id_test_n_excl
 pt_2_id_test_n_excl
 pt_3_id_test_n_excl
(4 rows)

-- now, verify correct propagation to all the nodes in the cluster
\c - - - :node_1_port
SELECT attrelid::regclass, attname, attidentity FROM pg_attribute
WHERE attname = 'id_test' AND attidentity = 'd' ORDER BY 1;

        attrelid        | attname | attidentity
------------------------+---------+-------------
 dist_partitioned_table | id_test | d
 pt_1                   | id_test | d
 pt_2                   | id_test | d
 pt_3                   | id_test | d
(4 rows)

SELECT relname, amname FROM pg_class c LEFT JOIN pg_am am ON (c.relam = am.oid)
WHERE relname IN ('dist_partitioned_table', 'pt_1', 'pt_2', 'pt_3') ORDER BY relname;

        relname         |  amname
------------------------+----------
 dist_partitioned_table | heap
 pt_1                   | columnar
 pt_2                   | columnar
 pt_3                   | heap
(4 rows)

-- this node is not a coordinator
-- so we can also see the exclusion constraints on the shards
SELECT conname FROM pg_constraint
WHERE conname LIKE '%id_test%' ORDER BY 1;

           conname
------------------------
dist_partitioned_table_id_test_n_excl
dist_partitioned_table_id_test_n_excl_102008
....
 pt_1_id_test_n_excl
 pt_1_id_test_n_excl_102040
...
 pt_2_id_test_n_excl
 pt_2_id_test_n_excl_102072
...
 pt_3_id_test_n_excl
 pt_3_id_test_n_excl_102104
...

Propagate new EXPLAIN options: MEMORY and SERIALIZE

EXPLAIN in PG17 now includes two new options: SERIALIZE and MEMORY. SERIALIZE option investigates the real cost of converting the query's output data into displayable form and the cost of sending the data to the client, whereas the MEMORY option reports planner memory consumption. Citus 13.0 allows these options when trying to explain a distributed query.

As you may know, Citus distributes the query to the appropriate nodes that contain the shards that the query is referring to. Let’s refer to these as tasks sent to shards. For each of those shard tasks, it will return the explain output to the node that runs the EXPLAIN query. As a start in Citus, the MEMORY option will be especially useful for parallelized queries across shards to see the amount of memory used in a single task. The SERIALIZE option is useful in the collecting node, because after retrieving the whole data of the query, the serialize time can be properly calculated. Let’s see a simple example below:

-- create, distribute and populate a simple table
SET citus.shard_count TO 32;
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table SELECT c, c * 10000 FROM generate_series(0, 1000) AS c;
-- explain a simple multi-shard query on the table using memory and serialize options
EXPLAIN (costs off, analyze, serialize, memory) SELECT * FROM dist_table;

                                      QUERY PLAN
--------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual time=18.490..18.519 rows=1001 loops=1)
Task Count: 32
Tuple data received from nodes: 8008 bytes
Tasks Shown: One of 32
-> Task
    Tuple data received from node: 272 bytes
     Node: host=localhost port=9702 dbname=Naisila
     ->  Seq Scan on dist_table_102141 dist_table (actual time=0.013..0.016 rows=34 loops=1)
         Planning:
           Memory: used=7kB  allocated=8kB
         Planning Time: 0.024 ms
         Serialization: time=0.000 ms  output=0kB  format=text
         Execution Time: 0.031 ms
Planning:
Memory: used=359kB allocated=512kB
Planning Time: 0.287 ms
Serialization: time=0.097 ms output=20kB format=text
Execution Time: 18.902 ms
(18 rows)

This EXPLAIN query is showing one of 32 tasks, where tasks correspond with shards. We can see the amount of memory consumed in a single task in the node where it was executed. In the example above, there is more memory consumed in the coordinator node because the Custom Scan node is coalescing results from all the tasks. Serialization value is shown for the collected results only, for now.

Leveraging optimizer improvements with Citus in Postgres 17

As soon as we enable PG17 in Citus, we can make use of query and optimizer improvements without any further action needed. That’s because such improvements are reflected directly in Citus table shards, which are essentially regular Postgres tables. Thanks to PG17 enabling correlated subqueries to be pulled to a join, Citus can make use of this feature in its distributed planning phase and run even more types of distributed queries, which were not supported with previous PG versions.

PG17 has several commits that bring significant optimizer improvements. This commit in particular: Allow correlated IN subqueries to be transformed into joins, is worth calling out because it enables Citus 13.0 to plan and execute a query with a correlated IN subquery using query pushdown, where it was challenging for pre-PG17 Citus to plan the query. Let’s see what type of new queries in Citus 13.0 with PG17 we can run:

-- create, distribute and populate two simple tables
CREATE TABLE customer ( id int, name text, contact text, category text);
CREATE TABLE orders ( customer_id int, category text);
SELECT create_distributed_table('customer', 'id');
SELECT create_distributed_table('orders', 'customer_id');
INSERT INTO customer VALUES (1, 'Beana', 'beana1234@gmail.com', 'books'),
                            (2, 'Erida', 'erida1234@gmail.com', 'notebooks'),
                            (3, 'Redi', 'redi1234@gmail.com', 'pens');
INSERT INTO orders VALUES (1, 'books'), (2, 'notebooks'), (3, 'hats');

-- with Citus 13.0 in PG17 we are able to run a query
-- on the customer table that has a correlated subquery!
SELECT c.name, c.contact
FROM customer c
WHERE c.id in (SELECT customer_id FROM orders o WHERE o.category = c.category);

name  |       contact
------+---------------------
Beana | beana1234@gmail.com
Erida | erida1234@gmail.com
(2 rows)

-- pre Citus 13 or pre PG17 this query would fail with the following
ERROR:  complex joins are only supported when all distributed tables
        are co-located and joined on their distribution columns

Let’s get into more details on how Citus 13.0 leverages PG17 optimizer improvements and is able to plan the query and execute it:

EXPLAIN (costs off)
SELECT c.name, c.contact
FROM customer c
WHERE c.id in (SELECT customer_id FROM orders o WHERE o.category = c.category);

With Citus 13.0 the plan for this query is:

                              QUERY PLAN
--------------------------------------------------------------------------
Custom Scan (Citus Adaptive)
   Task Count: 32
   Tasks Shown: One of 32
   -> Task
    Node: host=localhost port=9701 dbname=citus
    -> Hash Join
        Hash Cond: ((c.category = o.category) AND (c.id = o.customer_id))
        -> Seq Scan on customer_105861 c
        -> Hash
            -> HashAggregate
        Group Key: o.category, o.customer_id
            -> Seq Scan on orders_105893 o

The Postgres 17 planner converts the IN subquery to a join between the customer and `orders table (technically it is a semi-join). Then, Citus can push down the query to all worker nodes because the join includes an equality on the distribution columns of the tables. In contrast, the same query hits a planning limitation with previous versions of Citus:

-- Pre-13.0 Citus:
EXPLAIN (costs off)
SELECT c.name, c.contact
FROM customer c
WHERE c.id in (SELECT customer_id FROM orders  o WHERE o.category = c.category);
DEBUG:  skipping recursive planning for the subquery since it contains
        references to outer queries
ERROR:  complex joins are only supported when all distributed tables
        are co-located and joined on their distribution columns

Prior to version 17, Postgres planned the subquery as a correlated Subplan and applied that as a filter on the customer table. With Citus, if it is not possible to push down a correlated subquery. But with Postgres 17 the subquery is planned as a join, the query plan has no correlated Subplans, and Citus can naturally pushdown this join!

Important bugfixes, including community contributions into Citus 13.0

Citus 13.0 has bug fixes that address some crashes caused by unsafe catalog access, and segmentation faults in distributed procedures. Citus 13.0 also resolves issues related to role synchronization across nodes, server crashes in specific cluster configurations, and improves handling of shard placement when new nodes are introduced without required reference data.

Other than work from Citus engineers, we have seen significant community contributions to Citus, which we always love to see. We are really grateful for all the contributions to the Citus open-source repository in GitHub, both pull requests and issues. We would like to thank:

For more details on these community contributions, and more notable fixes, you can check the Citus 13.0 Updates page.

Diving deeper into Citus 13.0 and distributed Postgres

To learn more about Citus 13.0, you can:

You can also stay connected on the Citus Slack and visit the Citus open source GitHub repo to see recent developments as well. If there’s something you’d like to see next in Citus, feel free to also open a feature request issue :)

Naisila Puka

Written by Naisila Puka

Software Engineer on the Postgres & Citus team at Microsoft. B.S. in Computer Engineering with Mathematics minor from Bilkent University. Conference speaker at PGCon and Citus Con: An Event for Postgres. Boy mom. Language buff.

@naisilapuka naisila