📢 The schedule is out 🗓 for POSETTE: An Event for Postgres 2025!
📢 The schedule is out 🗓 for POSETTE: An Event for Postgres 2025!
Welcome to the release notes for Citus 13.0. The headline for 13.0 is the addition of PostgreSQL 17.2 support, which is packed with new features and enhancements. This page dives deep into many of the changes in Citus 13.0 open source extension to PostgreSQL, including the following:
JSON_TABLE()
support in distributed queriesMERGE ... WHEN NOT MATCHED BY SOURCE
EXPLAIN
optionsFORCE_NULL
and FORCE_NOT_NULL
for all columns in a distributed COPY
queryBecause the Citus database is implemented as an extension to Postgres, staying up to date with the latest releases of Postgres is always top priority for us. Therefore, Citus 13.0 is compatible with PG17.2!
PG17 delivers substantial performance improvements, including a revamped memory management system for vacuum, new monitoring and analysis features, optimized storage access, expanded functionality for managing data in partitions, optimizer improvements, and enhancements for high-concurrency workloads. It also brings faster bulk loading, quicker exports, and better query execution for indexes. PostgreSQL 17 introduces features that benefit both new workloads and mission-critical systems, such as the addition of the SQL/JSON JSON_TABLE()
command for developers, and more features to MERGE
. Check out the official PG17 notes for more details.
As soon as we enable PG17 in Citus, a lot of improvements on query performance, vacuum, replication, monitoring, etc. just work with it, because Citus only needs to learn how to handle SQL interface changes. PG17 work for this release was mostly focused on updating Citus internal logic accordingly to make sure current distributed features of Citus function properly with PG17 enabled, as well as SQL & DDL changes that reflect directly upon the user and developer experience.
We documented our work in detail regarding updating Citus internal logic, as well as Citus compatibility with PG17 SQL changes through GitHub issues. Also, you can check out the pull requests in detail by searching with the pg17_support label.
Postgres 17 has several optimizer improvements, and one in particular: Allow correlated IN subqueries to be transformed into joins, is worth calling out because it enables Citus 13.0 to plan and execute a query with a correlated IN subquery using query pushdown, where it was challenging for pre-13.0 Citus to plan the query. To illustrate we will use the following example; the customer table is distributed on the id column and the orders table is distributed on the customer_id column and the subquery on the orders table has a correlation on the category column of the customer table:
EXPLAIN (costs off)
SELECT c.name, c.contact
FROM customer c
WHERE c.id in (SELECT customer_id FROM orders o WHERE o.category = c.category);
With Citus 13.0 the plan for this query is:
┌─────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN
├─────────────────────────────────────────────────────────────────────────────────┤
│ Custom Scan (Citus Adaptive)
│ Task Count: 32
│ Tasks Shown: One of 32
│ -> Task
│ Node: host=localhost port=9701 dbname=citus
│ -> Hash Join
│ Hash Cond: ((c.category = o.category) AND (c.id = o.customer_id))
│ -> Seq Scan on customer_105861 c
│ -> Hash
│ -> HashAggregate
│ Group Key: o.category, o.customer_id
│ -> Seq Scan on orders_105893 o
│
└─────────────────────────────────────────────────────────────────────────────────┘
The Postgres 17 planner converts the IN subquery to a join between the customer and orders table, (note: technically it is a semi-join, so a unique-ifying operation on the orders table (the HashAggregate) preserves at-most one match semantics), and because the join includes an equality on the distribution columns of the tables Citus can push down the query to all worker nodes. In contrast, the same query hits a planning limitation with previous versions of Citus:
-- Pre-13.0 Citus:
EXPLAIN (costs off)
SELECT c.name, c.contact
FROM customer c
WHERE c.id in (SELECT customer_id FROM orders o WHERE o.category = c.category);
DEBUG: skipping recursive planning for the subquery since it contains references to outer queries
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
Prior to version 17, Postgres planned the subquery as a correlated Subplan and applied that as a filter on the customer table. With Citus, if it is not possible to push down a correlated subquery then recursive planning may not be used either. But with Postgres 17 the subquery is planned as a join, the query plan has no correlated Subplans, and Citus can naturally pushdown this join!
Being an extension enables Citus to leverage this improvement effectively for free; with Postgres 17 support it is available to Citus users. Other Postgres 17 optimizer improvements that can also benefit queries "out of the box" in Citus 13.0 are:
PG17 has added basic JSON_TABLE()
functionality. JSON_TABLE()
allows JSON
data to be converted into a relational view and thus used, for example, in a FROM
clause, like other tabular data. More SQL capabilities that can be used are sorting, filtering and joining with regular Postgres tables. Joining is specifically interesting for Citus: how can we perform a join between the JSON_TABLE()
function and a Citus distributed table?
The answer is that we treat JSON_TABLE()
the same as correlated functions (e.g., recurring tuples). In the end, for multi-shard JSON_TABLE()
commands, we apply the same restrictions as reference tables (e.g., cannot perform a lateral outer join when a distributed subquery references a reference table, json table, etc.) For more technical details on this, check out Citus Technical Documentation on recurring tuples.
Let’s look at example distributed queries of how you can combine the JSON_TABLE()
function with Citus distributed tables, as well as some limitations:
-- create, distribute and populate the table
-- we create a jsonb column here, to use later as regular postgres tables through JSON_TABLE()
CREATE TABLE my_favorite_books(book_collection_id bigserial, jsonb_column jsonb);
SELECT create_distributed_table(' my_favorite_books ', 'book_collection_id');
-- these books will be inserted with the automatic book_collection_id of 1
INSERT INTO my_favorite_books (jsonb_column) VALUES (
'{ "favorites" : [
{ "kind" : "mystery", "books" : [ { "title" : "The Count of Monte Cristo", "author" : "Alexandre Dumas"},
{ "title" : "Crime and Punishment", "author" : "Fyodor Dostoevsky" } ] },
{ "kind" : "drama", "books" : [{ "title" : "Anna Karenina", "author" : "Leo Tolstoy" } ] },
{ "kind" : "poetry", "books" : [{ "title" : "Masnavi", "author" : "Jalal al-Din Muhammad Rumi" } ] },
{ "kind" : "autobiography", "books" : [{ "title" : "The Autobiography of Malcolm X", "author" : "Alex Haley" } ] }
] }');
-- these books will be inserted with the automatic book_collection_id of 2
INSERT INTO my_favorite_books (jsonb_column) VALUES (
'{ "favorites" : [
{ "kind" : "mystery", "books" : [ { "title" : "To Kill a Mockingbird", "author" : "Harper Lee"},
{ "title" : "Our Mutual Friend", "author" : "Charles Dickens" } ] },
{ "kind" : "drama", "books" : [{ "title" : "Pride and Prejudice", "author" : "Jane Austen" } ] },
{ "kind" : "poetry", "books" : [{ "title" : "The Odyssey", "author" : "Homer" } ] },
{ "kind" : "autobiography", "books" : [{ "title" : "The Diary of a Young Girl", "author" : "Anne Frank" } ] }
] }');
-- let’s try a simple router query, where we want to see all the books under book_collection_id = 1
SELECT json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
key FOR ORDINALITY, kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
title text PATH '$.title', author text PATH '$.author'))) AS json_table_output
WHERE my_favorite_books. book_collection_id = 1
ORDER BY 1, 2, 3, 4;
key | kind | title | author
-----+---------------+--------------------------------+----------------------------
1 | mystery | The Count of Monte Cristo | Alexandre Dumas
1 | mystery | Crime and Punishment | Fyodor Dostoevsky
2 | drama | Anna Karenina | Leo Tolstoy
3 | poetry | Masnavi | Jalal al-Din Muhammad Rumi
4 | autobiography | The Autobiography of Malcolm X | Alex Haley
(5 rows)
-- a simple multi-shard query, where we want to see all the books
SELECT json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
key FOR ORDINALITY, kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
title text PATH '$.title', author text PATH '$.author'))) AS json_table_output
ORDER BY 1, 2, 3, 4;
key | kind | title | author
-----+---------------+--------------------------------+----------------------------
1 | mystery | The Count of Monte Cristo | Alexandre Dumas
1 | mystery | Crime and Punishment | Fyodor Dostoevsky
1 | mystery | Our Mutual Friend | Charles Dickens
1 | mystery | To Kill a Mockingbird | Harper Lee
2 | drama | Anna Karenina | Leo Tolstoy
2 | drama | Pride and Prejudice | Jane Austen
3 | poetry | Masnavi | Jalal al-Din Muhammad Rumi
3 | poetry | The Odyssey | Homer
4 | autobiography | The Autobiography of Malcolm X | Alex Haley
4 | autobiography | The Diary of a Young Girl | Anne Frank
(10 rows)
-- more complex router query involving LATERAL and LIMIT
-- select two books under book_collection_id = 2
SELECT sub.*
FROM my_favorite_books,
lateral(SELECT * FROM JSON_TABLE (jsonb_column, '$.favorites[*]'
COLUMNS (key FOR ORDINALITY, kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS
(title text PATH '$.title', author text PATH '$.author')))
AS json_table_output ORDER BY key DESC LIMIT 2) as sub
WHERE my_favorite_books.book_collection_id = 2;
key | kind | title | author
-----+---------------+---------------------------+------------
4 | autobiography | The Diary of a Young Girl | Anne Frank
3 | poetry | The Odyssey | Homer
(2 rows)
-- JSON_TABLE can be on the inner part of an outer join
SELECT sub.*
FROM my_favorite_books LEFT JOIN
lateral
(SELECT *
FROM JSON_TABLE (jsonb_column, '$.favorites[*]' COLUMNS (key FOR ORDINALITY,
kind text PATH '$.kind', NESTED PATH '$.books[*]'
COLUMNS (title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output
LIMIT 1000) AS sub ON (true)
ORDER BY 1,2,3,4;
key | kind | title | author
-----+---------------+--------------------------------+----------------------------
1 | mystery | Crime and Punishment | Fyodor Dostoevsky
1 | mystery | Our Mutual Friend | Charles Dickens
1 | mystery | The Count of Monte Cristo | Alexandre Dumas
1 | mystery | To Kill a Mockingbird | Harper Lee
2 | drama | Anna Karenina | Leo Tolstoy
2 | drama | Pride and Prejudice | Jane Austen
3 | poetry | Masnavi | Jalal al-Din Muhammad Rumi
3 | poetry | The Odyssey | Homer
4 | autobiography | The Autobiography of Malcolm X | Alex Haley
4 | autobiography | The Diary of a Young Girl | Anne Frank
(10 rows)
-- we can pushdown this correlated subquery in WHERE clause
SELECT count(*)
FROM my_favorite_books WHERE
(SELECT count(*) > 0
FROM JSON_TABLE (jsonb_column, '$.favorites[*]' COLUMNS (key FOR ORDINALITY,
kind text PATH '$.kind', NESTED PATH '$.books[*]'
COLUMNS (title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output
LIMIT 1000);
count
-------
2
(1 row)
-- we can pushdown this correlated subquery in SELECT clause
SELECT (SELECT count(*) > 0
FROM JSON_TABLE (jsonb_column, '$.favorites[*]' COLUMNS (key FOR ORDINALITY,
kind text PATH '$.kind', NESTED PATH '$.books[*]'
COLUMNS (title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output)
FROM my_favorite_books;
?column?
----------
t
t
(2 rows)
-- Create another distributed table to run more complex queries involving it
CREATE TABLE test_table(id bigserial, value text);
SELECT create_distributed_table('test_table', 'id');
INSERT INTO test_table (value) SELECT i::text FROM generate_series(0,100)i;
-- recursively plan subqueries that has JSON_TABLE
SELECT count(*) FROM
(
SELECT json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
key FOR ORDINALITY,
kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output
LIMIT 1) as sub_with_json, test_table
WHERE test_table.id = sub_with_json.key;
count
-------
1
(1 row)
-- multi-shard query with an explicit LATEREL SUBQUERY
-- along with other tables
SELECT sub.*
FROM my_favorite_books JOIN
lateral
(SELECT *
FROM JSON_TABLE (jsonb_column, '$.favorites[*]' COLUMNS (key FOR ORDINALITY,
kind text PATH '$.kind', NESTED PATH '$.books[*]'
COLUMNS (title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output
LIMIT 1000) AS sub ON (true)
JOIN test_table ON (my_favorite_books.book_collection_id = test_table.id)
ORDER BY 1,2,3,4;
key | kind | title | author
-----+---------------+--------------------------------+----------------------------
1 | mystery | Crime and Punishment | Fyodor Dostoevsky
1 | mystery | Our Mutual Friend | Charles Dickens
1 | mystery | The Count of Monte Cristo | Alexandre Dumas
1 | mystery | To Kill a Mockingbird | Harper Lee
2 | drama | Anna Karenina | Leo Tolstoy
2 | drama | Pride and Prejudice | Jane Austen
3 | poetry | Masnavi | Jalal al-Din Muhammad Rumi
3 | poetry | The Odyssey | Homer
4 | autobiography | The Autobiography of Malcolm X | Alex Haley
4 | autobiography | The Diary of a Young Girl | Anne Frank
(10 rows)
-- JSON_TABLE can be in the outer part of the join
-- as long as there is a distributed table
SELECT sub.*
FROM my_favorite_books JOIN
lateral
(SELECT *
FROM JSON_TABLE (jsonb_column, '$.favorites[*]' COLUMNS (key FOR ORDINALITY,
kind text PATH '$.kind', NESTED PATH '$.books[*]'
COLUMNS (title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output
LIMIT 1000) AS sub ON (true)
LEFT JOIN test_table ON (my_favorite_books.book_collection_id = test_table.id)
ORDER BY 1,2,3,4;
key | kind | title | author
-----+---------------+--------------------------------+----------------------------
1 | mystery | Crime and Punishment | Fyodor Dostoevsky
1 | mystery | Our Mutual Friend | Charles Dickens
1 | mystery | The Count of Monte Cristo | Alexandre Dumas
1 | mystery | To Kill a Mockingbird | Harper Lee
2 | drama | Anna Karenina | Leo Tolstoy
2 | drama | Pride and Prejudice | Jane Austen
3 | poetry | Masnavi | Jalal al-Din Muhammad Rumi
3 | poetry | The Odyssey | Homer
4 | autobiography | The Autobiography of Malcolm X | Alex Haley
4 | autobiography | The Diary of a Young Girl | Anne Frank
(10 rows)
-- JSON_TABLE can be on the outer side of the join
-- We support outer joins where the outer rel is a recurring one
-- and the inner one is a non-recurring one if we don't reference the outer from the inner
-- https://github.com/citusdata/citus/pull/6512
SELECT *
FROM json_table('[{"a":10,"b":20},{"a":30,"b":40}]'::JSONB, '$[*]'
COLUMNS (json_id FOR ORDINALITY,
column_a int4 PATH '$.a',
column_b int4 PATH '$.b',
a int4, b int4, c text))
LEFT JOIN LATERAL
(SELECT *
FROM my_favorite_books) AS foo on (foo.book_collection_id = a);
json_id | column_a | column_b | a | b | c | book_collection_id | jsonb_column
---------+----------+----------+----+----+---+--------------------+--------------
1 | 10 | 20 | 10 | 20 | | |
2 | 30 | 40 | 30 | 40 | | |
(2 rows)
-- we can recursively plan json_tables on set operations
(SELECT *
FROM json_table('[{"a":10,"b":20},{"a":30,"b":40}]'::JSONB, '$[*]'
COLUMNS (id FOR ORDINALITY)) ORDER BY id ASC LIMIT 1)
UNION
(SELECT *
FROM json_table('[{"a":10,"b":20},{"a":30,"b":40}]'::JSONB, '$[*]'
COLUMNS (id FOR ORDINALITY)) ORDER BY id ASC LIMIT 1)
UNION
(SELECT id FROM test_table ORDER BY id ASC LIMIT 1);
id
----
1
(1 row)
-- we can use JSON_TABLE in modification queries as well
-- use log level such that we can see trace changes
SET client_min_messages TO DEBUG1;
--the JSON_TABLE subquery is recursively planned
UPDATE test_table SET VALUE = 'XXX' FROM(
SELECT json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
key FOR ORDINALITY,
kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output) as foo
WHERE foo.key = test_table.id;
DEBUG: generating subplan 20_1 for subquery SELECT json_table_output.key, json_table_output.kind,
json_table_output.title, json_table_output.author FROM public.my_favorite_books, LATERAL
JSON_TABLE(my_favorite_books.jsonb_column, '$."favorites"[*]' AS json_table_path_0 COLUMNS (key
FOR ORDINALITY, kind text PATH '$."kind"', NESTED PATH '$."books"[*]' AS json_table_path_1 COLUMNS
(title text PATH '$."title"', author text PATH '$."author"'))) json_table_output
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE public.test_table SET
value = 'XXX'::text FROM (SELECT intermediate_result.key, intermediate_result.kind,
intermediate_result.title, intermediate_result.author FROM read_intermediate_result('20_1'::text,
'binary'::citus_copy_format) intermediate_result(key integer, kind text, title text, author text))
foo WHERE (foo.key OPERATOR(pg_catalog.=) test_table.id)
UPDATE 4
-- Subquery with JSON table can be pushed down because two distributed tables
-- in the query are joined on distribution column
UPDATE test_table SET VALUE = 'XXX' FROM (
SELECT my_favorite_books.book_collection_id, json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output) as foo
WHERE foo.book_collection_id = test_table.id;
UPDATE 2
-- we can pushdown with CTEs as well
WITH json_cte AS
(SELECT my_favorite_books.book_collection_id, json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output)
UPDATE test_table SET VALUE = 'XYZ' FROM json_cte
WHERE json_cte.book_collection_id = test_table.id;
UPDATE 2
-- we can recursively with CTEs as well
WITH json_cte AS
(SELECT my_favorite_books.book_collection_id, json_table_output.* FROM
my_favorite_books,
JSON_TABLE ( jsonb_column, '$.favorites[*]' COLUMNS (
kind text PATH '$.kind',
NESTED PATH '$.books[*]' COLUMNS (
key FOR ORDINALITY,
title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output ORDER BY json_table_output.key LIMIT 1)
UPDATE test_table SET VALUE = 'XYZ' FROM json_cte
WHERE json_cte.book_collection_id = test_table.id;
DEBUG: generating subplan 24_1 for CTE json_cte: SELECT my_favorite_books.book_collection_id,
json_table_output.kind, json_table_output.key, json_table_output.title, json_table_output.author
FROM public.my_favorite_books, LATERAL JSON_TABLE(my_favorite_books.jsonb_column, '$."favorites"[*]'
AS json_table_path_0 COLUMNS (kind text PATH '$."kind"', NESTED PATH '$."books"[*]' AS
json_table_path_1 COLUMNS (key FOR ORDINALITY, title text PATH '$."title"', author text PATH
'$."author"'))) json_table_output ORDER BY json_table_output.key LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE public.test_table SET
value = 'XYZ'::text FROM (SELECT intermediate_result.book_collection_id, intermediate_result.kind,
intermediate_result.key, intermediate_result.title, intermediate_result.author FROM
read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(
book_collection_id bigint, kind text, key integer, title text, author text)) json_cte WHERE
(json_cte.book_collection_id OPERATOR(pg_catalog.=) test_table.id)
UPDATE 1
The limitations of using JSON_TABLE()
in distributed queries are the same as the general limitations of the usage of recurring tuples in distributed queries. Here are some examples with the same setup of tables as above:
-- JSON_TABLE cannot be on the FROM clause alone
SELECT *
FROM json_table('[{"a":10,"b":20},{"a":30,"b":40}]'::JSONB, '$[*]'
COLUMNS (json_id FOR ORDINALITY,
column_a int4 PATH '$.a',
column_b int4 PATH '$.b',
a int4, b int4, c text)) as foo
WHERE b >
(SELECT count(*)
FROM my_favorite_books WHERE book_collection_id = foo.a);
ERROR: correlated subqueries are not supported when the FROM clause contains JSON_TABLE
-- non-colocated join fails (note that the join is an in-equality)
SELECT sub.*
FROM my_favorite_books JOIN
lateral
(SELECT *
FROM JSON_TABLE (jsonb_column, '$.favorites[*]' COLUMNS (key FOR ORDINALITY,
kind text PATH '$.kind', NESTED PATH '$.books[*]'
COLUMNS (title text PATH '$.title',
author text PATH '$.author'))) AS json_table_output
LIMIT 1000) AS sub ON (true)
JOIN test_table ON (my_favorite_books.book_collection_id != test_table.id)
ORDER BY 1,2,3,4;
ERROR: complex joins are only supported when all distributed tables are co-located
and joined on their distribution columns
-- we don't support when we reference the JSON_TABLE from the non-recurring distributed
-- table subquery
SELECT *
FROM json_table('[{"a":10,"b":20},{"a":30,"b":40}]'::JSONB, '$[*]'
COLUMNS (json_id FOR ORDINALITY,
column_a int4 PATH '$.a',
column_b int4 PATH '$.b',
a int4, b int4, c text))
LEFT JOIN LATERAL
(SELECT *
FROM my_favorite_books WHERE book_collection_id::text LIKE c)
AS foo on(foo.book_collection_id = a);
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node
that is outer joined with a recurring rel
ERROR: cannot perform a lateral outer join when a distributed subquery references a JSON_TABLE
-- LIMIT in subquery not supported when json_table exists
SELECT *
FROM json_table('[{"a":10,"b":20},{"a":30,"b":40}]'::JSONB, '$[*]'
COLUMNS (id FOR ORDINALITY,
column_a int4 PATH '$.a',
column_b int4 PATH '$.b',
a int4, b int4, c text))
JOIN LATERAL
(SELECT *
FROM my_favorite_books WHERE json_table.id = a LIMIT 1) as foo ON (true);
ERROR: cannot push down this subquery
DETAIL: Limit clause is currently unsupported when a lateral subquery references a column
from a JSON_TABLE
As of PG17, one may use the MERGE
command to operate on rows that exist in the target relation, but not in the data source, by using WHEN NOT MATCHED BY SOURCE
. That means that if a row in the target table being merged doesn’t exist in the source table, we can now perform any necessary actions on that row. Citus extended its already existing strategies employed for handling MERGE
in a distributed environment to include this syntax as well. Let's see examples with a variety of Citus managed tables below:
-- Scenario:
-- Compare vanilla Postgres behaviour with Citus behaviour in a distributed environment
-- We want to see the same results!
-- Step 1
-- Create and populate some tables of different types
-- Regular Postgres tables
CREATE TABLE postgres_target (tid integer, balance float, val text);
CREATE TABLE postgres_source (sid integer, delta float);
INSERT INTO postgres_target SELECT id, id * 100, 'initial' FROM generate_series(1,5,2) AS id;
INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
-- Citus local tables
CREATE TABLE citus_local_target (tid integer, balance float, val text);
CREATE TABLE citus_local_source (sid integer, delta float);
SELECT citus_add_local_table_to_metadata('citus_local_target');
SELECT citus_add_local_table_to_metadata('citus_local_source');
INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,5,2) AS id;
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
-- Citus distributed tables
CREATE TABLE citus_distributed_target (tid integer, balance float, val text);
CREATE TABLE citus_distributed_source (sid integer, delta float);
SELECT create_distributed_table('citus_distributed_target', 'tid');
SELECT create_distributed_table('citus_distributed_source', 'sid');
INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,5,2) AS id;
INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
-- Citus reference tables
CREATE TABLE citus_reference_target (tid integer, balance float, val text);
CREATE TABLE citus_reference_source (sid integer, delta float);
SELECT create_reference_table('citus_reference_target');
SELECT create_reference_table('citus_reference_source');
INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,5,2) AS id;
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
-- Step 2
-- function to compare the output from Citus tables
-- with the expected output from Postgres tables
CREATE OR REPLACE FUNCTION compare_tables(table1 TEXT, table2 TEXT) RETURNS BOOLEAN AS $$
DECLARE ret BOOL;
BEGIN
EXECUTE 'select count(*) = 0 from ((
SELECT * FROM ' || table1 ||
' EXCEPT
SELECT * FROM ' || table2 || ' )
UNION ALL (
SELECT * FROM ' || table2 ||
' EXCEPT
SELECT * FROM ' || table1 || ' ))' INTO ret;
RETURN ret;
END
$$ LANGUAGE PLPGSQL;
-- Step 3
-- try simple MERGE with regular Postgres tables
MERGE INTO postgres_target t
USING postgres_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge')
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
SELECT * FROM postgres_target ORDER BY tid, val;
tid | balance | val
-----+---------+-------------------------------
1 | 110 | initial updated by merge
2 | 20 | inserted by merge
3 | 330 | initial updated by merge
4 | 40 | inserted by merge
5 | 500 | initial not matched by source
(5 rows)
-- Step 4
-- try different combinations of Citus tables with the same query
-- Local-Local
BEGIN;
MERGE INTO citus_local_target t
USING citus_local_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge')
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
SELECT compare_tables('citus_local_target', 'postgres_target');
compare_tables
----------------
t
(1 row)
ROLLBACK;
-- Local-Reference
BEGIN;
MERGE INTO citus_local_target t
USING citus_reference_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED BY TARGET THEN
INSERT VALUES (sid, delta, 'inserted by merge')
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
SELECT compare_tables('citus_local_target', 'postgres_target');
compare_tables
----------------
t
(1 row)
ROLLBACK;
-- Distributed-Distributed
BEGIN;
MERGE INTO citus_distributed_target t
USING citus_distributed_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge')
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
SELECT compare_tables('citus_distributed_target', 'postgres_target');
compare_tables
----------------
t
(1 row)
ROLLBACK;
Note: MERGE ... WHEN NOT MATCHED BY SOURCE
syntax is currently unsupported for the following Target-Source combinations: Local-Distributed, Distributed-Local, Distributed-Reference and Reference-N/A.
In PG17, the EXPLAIN
command introduces two new options: SERIALIZE
and MEMORY
. The SERIALIZE
option examines the actual cost of transforming the query's output into a format suitable for display and the cost of sending that data to the client. The MEMORY
option provides details on the memory usage by the planner. Citus 13.0 supports these options when explaining a distributed query. See an example below:
-- create, distribute and populate a table
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table SELECT c, c * 10000 FROM generate_series(0, 1000) AS c;
-- run an explain query with both new options specified
EXPLAIN (costs off, analyze, serialize, memory) SELECT * FROM dist_table;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual time=18.490..18.519 rows=1001 loops=1)
Task Count: 32
Tuple data received from nodes: 8008 bytes
Tasks Shown: One of 32
-> Task
Tuple data received from node: 272 bytes
Node: host=localhost port=9702 dbname=Naisila
-> Seq Scan on dist_table_102141 dist_table (actual time=0.013..0.016 rows=34 loops=1)
Planning:
Memory: used=7kB allocated=8kB
Planning Time: 0.024 ms
Serialization: time=0.000 ms output=0kB format=text
Execution Time: 0.031 ms
Planning:
Memory: used=359kB allocated=512kB
Planning Time: 0.287 ms
Serialization: time=0.097 ms output=20kB format=text
Execution Time: 18.902 ms
(18 rows)
PG17 has expanded functionality for managing data in partitions, and Citus 13.0 provides those options in a distributed environment as well. Let's look at each option in detail:
With Citus 13.0, one can create partitioned tables with a specified access method and then distribute that table through create_distributed_table()
function. You can also switch the access method of the distributed partitioned table using ALTER TABLE ... SET ACCESS METHOD
command. The access method inheritance for old and new partitions will exhibit the same behavior as for vanilla Postgres tables, which means that future partitions are created under the specified access method, whereas the existing partitions are not modified.
-- Step 1: Create a partitioned table with a specified access method
CREATE TABLE test_partitioned_alter (id INT PRIMARY KEY, value TEXT)
PARTITION BY RANGE (id)
USING columnar;
-- Step 2: Create partitions for the partitioned table
CREATE TABLE test_partition_1 PARTITION OF test_partitioned_alter
FOR VALUES FROM (0) TO (100);
CREATE TABLE test_partition_2 PARTITION OF test_partitioned_alter
FOR VALUES FROM (100) TO (200);
-- Step 3: Distribute the partitioned table
SELECT create_distributed_table('test_partitioned_alter', 'id');
-- Step 4 (on a Worker Node)
-- Verify the access methods
\c - - - :worker_1_port
SELECT relname, amname FROM pg_class c LEFT JOIN pg_am am ON (c.relam = am.oid)
WHERE relname IN ('test_partitioned_alter', 'test_partition_1', 'test_partition_2')
ORDER BY relname;
relname | amname
------------------------+----------
test_partition_1 | columnar
test_partition_2 | columnar
test_partitioned_alter | columnar
(3 rows)
\c - - - :master_port
-- Step 5: Test ALTER TABLE ... SET ACCESS METHOD to a different method
ALTER TABLE test_partitioned_alter SET ACCESS METHOD heap;
-- Step 6: Verify the change is applied to future partitions, but not existing ones
CREATE TABLE test_partition_3 PARTITION OF test_partitioned_alter
FOR VALUES FROM (200) TO (300);
\c - - - :worker_1_port
SELECT relname, amname FROM pg_class c LEFT JOIN pg_am am ON (c.relam = am.oid)
WHERE relname IN ('test_partitioned_alter', 'test_partition_1',
'test_partition_2', 'test_partition_3')
ORDER BY relname;
relname | amname
------------------------+----------
test_partition_1 | columnar
test_partition_2 | columnar
test_partition_3 | heap
test_partitioned_alter | heap
(4 rows)
In earlier versions of PostgreSQL, exclusion constraints were not allowed on partitioned tables. In PostgreSQL 17, exclusion constraints are now allowed on partitioned tables, and in fact, this is one of the feature propagations that we got for free, as soon as we enabled PG17 in Citus. The reason is that Citus already had enough propagation logic to simply include a different type of constraint without the need for extra implementation. If interested, you can check out the pull request, which simply adds some tests to verify correct behavior.
With that said, check out a simple example below on how you can have Citus managed partitioned tables with exclusion constraints:
-- Step 1: Create table
CREATE TABLE distributed_partitioned_table (
id serial NOT NULL,
partition_col int NOT NULL,
PRIMARY KEY (id, partition_col)
) PARTITION BY RANGE (partition_col);
-- Distribute the table
SELECT create_distributed_table('distributed_partitioned_table', 'id');
-- Add partitions to the distributed partitioned table
CREATE TABLE distributed_partitioned_table_p1 PARTITION OF distributed_partitioned_table
FOR VALUES FROM (1) TO (100);
CREATE TABLE distributed_partitioned_table_p2 PARTITION OF distributed_partitioned_table
FOR VALUES FROM (100) TO (200);
-- Step 2: Create a partitioned Citus local table
CREATE TABLE local_partitioned_table (
id serial NOT NULL,
partition_col int NOT NULL,
PRIMARY KEY (id, partition_col)
) PARTITION BY RANGE (partition_col);
-- Let Citus manage the local table
SELECT citus_add_local_table_to_metadata('local_partitioned_table');
-- Add partitions to the local partitioned table
CREATE TABLE local_partitioned_table_p1 PARTITION OF local_partitioned_table
FOR VALUES FROM (1) TO (100);
CREATE TABLE local_partitioned_table_p2 PARTITION OF local_partitioned_table
FOR VALUES FROM (100) TO (200);
-- Step 3: Add an exclusion constraint with a name to the distributed partitioned table
ALTER TABLE distributed_partitioned_table
ADD CONSTRAINT dist_exclude_named EXCLUDE USING btree (id WITH =, partition_col WITH =);
-- Step 4: Verify propagation of exclusion constraint to worker nodes
\c - - :worker_1_port
SELECT conname FROM pg_constraint
WHERE conrelid = 'distributed_partitioned_table'::regclass AND conname = 'dist_exclude_named';
conname
---------------------------------------------------------------------
dist_exclude_named
(1 row)
-- Step 5: Add an exclusion constraint with a name to the Citus local partitioned table
\c - - :master_port
ALTER TABLE local_partitioned_table
ADD CONSTRAINT local_exclude_named EXCLUDE USING btree (partition_col WITH =);
-- Step 6: Verify the exclusion constraint on the local partitioned table
SELECT conname, contype FROM pg_constraint
WHERE conname = 'local_exclude_named' AND contype = 'x';
conname | contype
---------------------------------------------------------------------
local_exclude_named | x
(1 row)
-- Step 7: Add exclusion constraints without names to both tables
ALTER TABLE distributed_partitioned_table ADD EXCLUDE USING btree (id WITH =, partition_col WITH =);
ALTER TABLE local_partitioned_table ADD EXCLUDE USING btree (partition_col WITH =);
-- Step 8: Verify the unnamed exclusion constraints were added
SELECT conname, contype FROM pg_constraint
WHERE conrelid = 'local_partitioned_table'::regclass AND contype = 'x';
conname | contype
---------------------------------------------------------------------
local_exclude_named | x
local_partitioned_table_partition_col_excl | x
(2 rows)
\c - - :worker_1_port
SELECT conname, contype FROM pg_constraint
WHERE conrelid = 'distributed_partitioned_table'::regclass AND contype = 'x';
conname | contype
---------------------------------------------------------------------
dist_exclude_named | x
distributed_partitioned_table_id_partition_col_excl | x
(2 rows)
PG17 added support for identity columns in partitioned tables. And with Citus 13.0, you can have identity columns in distributed partitioned tables as well. In PG17 implementation, partitions with their own identity columns are not allowed. Citus does not need to propagate identity columns in partitions; the identity is inherited by PG17 behavior. See below for an example:
-- (1) Create and distribute a partitioned table
CREATE TABLE partitioned_table (
a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
c int
)
PARTITION BY RANGE (c);
CREATE TABLE pt_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (50);
SELECT create_distributed_table('partitioned_table', 'a');
CREATE TABLE pt_2 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (1000);
-- (2) The partitions have the same identity column as the parent table;
-- This is PG17 behavior for support for identity in partitioned tables.
\d pt_1;
Table "pg17.pt_1"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
c | integer | | |
Partition of: partitioned_table FOR VALUES FROM (1) TO (50)
\d pt_2;
Table "pg17.pt_2"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
c | integer | | |
Partition of: partitioned_table FOR VALUES FROM (50) TO (1000)
-- Attaching a partition inherits the identity column from the parent table
CREATE TABLE pt_3 (a bigint not null, c int);
ALTER TABLE partitioned_table ATTACH PARTITION pt_3 FOR VALUES FROM (1000) TO (2000);
\d pt_3;
Table "pg17.pt_3"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
c | integer | | |
Partition of: partitioned_table FOR VALUES FROM (1000) TO (2000)
-- Partition pt_4 has its own identity column, which is not allowed in PG17
-- and will produce an error on attempting to attach it to the partitioned table
CREATE TABLE pt_4 (a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10), c int);
ALTER TABLE partitioned_table ATTACH PARTITION pt_4 FOR VALUES FROM (2000) TO (3000);
ERROR: table "pt_4" being attached contains an identity column "a"
DETAIL: The new partition may not contain an identity column.
-- Show that DDL for partitioned_table has correctly propagated to the worker node;
\c - - - :worker_1_port
\d pt_1;
Table "pg17.pt_1"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
c | integer | | |
Partition of: partitioned_table FOR VALUES FROM (1) TO (50)
\d pt_2;
Table "pg17.pt_2"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
c | integer | | |
Partition of: partitioned_table FOR VALUES FROM (50) TO (1000)
\d pt_3;
Table "pg17.pt_3"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
c | integer | | |
Partition of: partitioned_table FOR VALUES FROM (1000) TO (2000)
By using sslnegotiation=direct
in PG17, one can skip SSLRequest and therefore eliminate one round trip when establishing a TLS connection. You can configure this option in citus.node_conn_info
.
Well, the title gives a good summary here. In PG17, the capability of the options FORCE_NULL
and FORCE_NOT_NULL
was extended to specify ALL columns at once using *. FORCE_NULL
looks for empty strings and registers them as NULL
. FORCE_NOT_NULL
looks for null values and registers them as empty strings. With Citus 13.0, you can freely use these options with columns of distributed tables. Look at this example from Postgres tests, where we distribute the table:
CREATE TABLE forcetest (
a INT NOT NULL,
b TEXT NOT NULL,
c TEXT,
d TEXT,
e TEXT
);
\pset null NULL
SELECT create_distributed_table('forcetest', 'a');
-- should succeed with no effect ("b" remains an empty string, "c" remains NULL)
BEGIN;
COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL *, FORCE_NULL *);
4,,""
\.
COMMIT;
SELECT b, c FROM forcetest WHERE a = 4;
b | c
---------------------------------------------------------------------
| NULL
(1 row)
-- should succeed with effect ("b" remains an empty string)
BEGIN;
COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL *);
5,,""
\.
COMMIT;
SELECT b, c FROM forcetest WHERE a = 5;
b | c
---------------------------------------------------------------------
|
(1 row)
-- should succeed with effect ("c" remains NULL)
BEGIN;
COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *);
6,"b",""
\.
COMMIT;
SELECT b, c FROM forcetest WHERE a = 6;
b | c
---------------------------------------------------------------------
b | NULL
(1 row)
\pset null ''
PG17 has added support for AT LOCAL
operator. Tt converts the given time type to time stamp with the session's TimeZone value as time zone. In Citus 13.0, we can use AT LOCAL
when we insert to Citus managed tables. The way it works consistently is that we evaluate AT LOCAL
at the coordinator and then perform the insert remotely. See below for an example:
-- create and distribute a table
CREATE TABLE test_at_local (id int, time_example timestamp with time zone);
SELECT create_distributed_table('test_at_local', 'id');
BEGIN;
SET LOCAL TimeZone TO 'Europe/Tirane';
SELECT timestamp '2001-02-16 20:38:40' AT LOCAL;
timezone
---------------------------------------------------------------------
Fri Feb 16 20:38:40 2001 CET
(1 row)
-- verify that we evaluate AT LOCAL at the coordinator and then perform the insert remotely
SET citus.log_remote_commands TO on;
INSERT INTO test_at_local VALUES (1, timestamp '2001-02-16 20:38:40' AT LOCAL);
NOTICE: issuing INSERT INTO pg17.test_at_local_27122024 (id, time_example)
VALUES (1, 'Fri Feb 16 20:38:40 2001 CET'::timestamp with time zone)
ROLLBACK;
Citus greatly appreciates community contributions, and Citus 13.0 has received a substantial number of them. Please see below the linked pull requests:
citus_move_shard_placement()
to fail early if there is a new node without reference tables yet, contributed by Filip Sedlákactivate_node_snapshot()
on a single-node cluster, contributed by KarinaWe removed PG14 support, due to Citus's long policy of maintaining support for the latest 3 Postgres releases. We highly encourage you to upgrade to Citus 13.0 and PostgreSQL 17.
VALID UNTIL
setting assumption made for roles when syncing them to new nodesMERGE
repartition commandapplication_name
changesThe following features of PG17 are currently not supported in Citus, and are part of future work for the next Citus releases:
MERGE
to modify updatable viewsMERGE
to use the RETURNING
clauseALTER TABLE
to change a column's generation expressionDEFAULT
setting for ALTER TABLE .. SET ACCESS METHOD
COPY
option ON_ERROR
COPY
option LOG_VERBOSITY
which reports COPY FROM
ignored error rows