POSETTE 2025 is a wrap! 🎁 Thanks for joining the fun. Missed it? Watch all 42 talks online.
POSETTE 2025 is a wrap! 🎁 Thanks for joining the fun. Missed it? Watch all 42 talks online.
Welcome to the release notes for Citus 13.2. The headline for 13.2 is the performance optimizations along with a behavioral change about creation of citus_columnar. This page dives deep into many of the changes in Citus 13.2 open source extension to PostgreSQL, including the following:
The Snapshot Based Node-Addition feature enables faster scale-out operations by adding a new worker node using a clone of an existing worker node. This significantly reduces rebalance times when adding new nodes to the cluster as the new worker added this way already contains a copy of the data.
A clone node is a streaming replica that has been registered with Citus as a candidate for a new worker node. Clone nodes can be created from the backup of a worker node and then configured as a PostgreSQL streaming replica of the source worker node.
The Snapshot Based Node-Addition feature differs significantly from the traditional add_node
approach:
Traditional Node Addition (add_node
):
Snapshot Based Node-Addition:
This approach is much more efficient because it leverages the fact that the replica already has all the data, so instead of moving data around, the system simply removes the appropriate shards from each node to achieve the desired distribution.
Important Note: The rebalancing in this case only affects shards between the source worker node and its clone. Shards on other worker nodes in the cluster remain completely unaffected during this process.
The feature introduces several new functions to manage clone nodes and perform snapshot-based node additions:
citus_add_clone_node(clone_host, clone_port, primary_host, primary_port)
Registers a streaming replica as a clone of an existing source worker node.
Parameters:
clone_host
(text): Hostname of the clone nodeclone_port
(integer): Port of the clone nodeprimary_host
(text): Hostname of the source worker nodeprimary_port
(integer): Port of the source worker nodeReturns: Node ID of the registered clone
citus_add_clone_node_with_nodeid(clone_host, clone_port, primary_node_id)
Alternative function to register a clone using the source worker node's ID.
Parameters:
clone_host
(text): Hostname of the clone nodeclone_port
(integer): Port of the clone nodeprimary_node_id
(integer): Node ID of the source worker nodeReturns: Node ID of the registered clone
citus_remove_clone_node(clone_host, clone_port)
Removes a clone node from the cluster.
Parameters:
clone_host
(text): Hostname of the clone nodeclone_port
(integer): Port of the clone nodecitus_remove_clone_node_with_nodeid(clone_node_id)
Removes a clone node using its node ID.
Parameters:
clone_node_id
(integer): Node ID of the clone to removecitus_promote_clone_and_rebalance(clone_node_id, rebalance_strategy, catchup_timeout_seconds)
Promotes a clone to a worker node and rebalances shards between the source and newly promoted worker nodes.
Parameters:
clone_node_id
(integer): Node ID of the clone to promoterebalance_strategy
(name, optional): Rebalancing strategy to use (default: 'bydisksize')catchup_timeout_seconds
(integer, optional): Timeout for replica catch-up (default: 300)Returns: Success status
get_snapshot_based_node_split_plan(primary_host, primary_port, clone_host, clone_port, rebalance_strategy)
Provides a preview of the shard distribution after node addition without executing the promotion.
Parameters:
primary_host
(text): Hostname of the source worker nodeprimary_port
(integer): Port of the source worker nodeclone_host
(text): Hostname of the clone nodeclone_port
(integer): Port of the clone noderebalance_strategy
(name, optional): Rebalancing strategy to useReturns: Table showing planned shard distribution
Here's a typical workflow for using the Snapshot Based Node-Addition feature:
Step 1: Create and Start Replica
# Create physical backup of existing worker node (example using pg_basebackup)
pg_basebackup -h primary-node -p 5432 -D /path/to/backup -U postgres -v -P -W
# Start PostgreSQL on replica as streaming replica of original worker
Step 2: Register Clone and Preview Plan
-- Register the replica as a clone
SELECT citus_add_clone_node('clone-node', 5432, 'primary-node', 5432);
-- Preview the planned shard distribution (optional)
SELECT * FROM get_snapshot_based_node_split_plan('primary-node', 5432, 'clone-node', 5432);
Step 3: Promote and Rebalance
-- Promote clone to primary and rebalance shards
SELECT citus_promote_clone_and_rebalance(clone_node_id);
Step 4: Clean Up
-- Clean up unused replication slots after promotion
SELECT pg_drop_replication_slot('slot_name');
The citus_promote_clone_and_rebalance
function performs the following steps:
pg_promote()
Citus has significantly improved shard rebalancing performance through enhanced parallelization and optimized locking strategies. These improvements address key bottlenecks that previously limited rebalancing speed, particularly for clusters with large reference tables and multiple colocated shards.
The citus_rebalance_start()
function now includes two new optional parameters that preserve the old behavior by default:
parallel_transfer_colocated_shards
(default: false): When true, colocated shards can be moved in parallel while maintaining dependency orderparallel_transfer_reference_tables
(default: false): When true, each reference table shard is copied in its own background taskBoth parameters default to false to maintain backward compatibility, and can be enabled as needed for improved performance.
To control the number of concurrent background tasks and leverage these improvements, adjust these GUC parameters:
max_worker_processes
: Increase to allow more parallel background taskscitus.max_background_task_executor
: Configure the number of background task executorscitus.max_background_task_executors_per_node
: Set the maximum executors per node for optimal parallelizationIn general, when the outer side of an outer join is a recurring tuple (e.g., reference table, intermediate results, or set returning functions), it is not safe to push down the join. In these situations, Citus recursively plans the "distributed" part of the join.
"... ref_table LEFT JOIN distributed_table ..."
"... distributed_table RIGHT JOIN ref_table ..."
As of Citus 13.2, under certain conditions, Citus can push down these types of LEFT and RIGHT outer joins by injecting constraints—derived from the shard intervals of distributed tables—into shard queries for the reference table. The eligibility rules for pushdown are defined in CanPushdownRecurringOuterJoin()
, while the logic for computing and injecting the constraints is implemented in UpdateWhereClauseToPushdownRecurringOuterJoin()
.
The example below shows a LEFT JOIN between a reference table (product_categories
) and a distributed table (products_table
). The pushdown effect is observed in the EXPLAIN
output. This feature can be disabled by setting citus.enable_recurring_outer_join_pushdown
GUC to false
.
-- Distributed Table: Products Table
CREATE TABLE products_table (
product_id bigserial PRIMARY KEY,
product_name VARCHAR(100),
category_id INT,
price NUMERIC(10, 2)
);
SELECT create_distributed_table('products_table', 'product_id');
-- Reference Table: Product Categories
CREATE TABLE product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(50)
);
SELECT create_reference_table('product_categories');
Prior to Citus 13.2, this query could not be safely pushed down. Instead, the plan uses Subplan 19_1
to scan the distributed table and store the data in an intermediate result.
EXPLAIN (COSTS OFF) SELECT * FROM product_categories pc LEFT JOIN products_table pt ON pc.category_id = pt.product_id;
QUERY PLAN
---------------------------------------------------------------------------------
Custom Scan (Citus Adaptive)
-> Distributed Subplan 19_1
-> Custom Scan (Citus Adaptive)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> Seq Scan on products_table_102072 pt
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=9700 dbname=postgres
-> Hash Right Join
Hash Cond: (intermediate_result.product_id = pc.category_id)
-> Function Scan on read_intermediate_result intermediate_result
-> Hash
-> Seq Scan on product_categories_102106 pc
As of Citus 13.2, the same query is pushed down by injecting interval constraints on the reference table.
EXPLAIN (COSTS OFF) SELECT * FROM product_categories pc LEFT JOIN products_table pt ON pc.category_id = pt.product_id;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> Hash Right Join
Hash Cond: (pt.product_id = pc.category_id)
-> Seq Scan on products_table_102072 pt
-> Hash
-> Seq Scan on product_categories_102106 pc
Filter: ((category_id IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint8((category_id)::bigint)) < 0) AND (btint4cmp(hashint8((category_id)::bigint), '-20132659
21'::integer) <= 0)))
For some time now Citus has had the fast path router planner, which avoids the overhead of Postgres standard_planner()
and instead creates a placeholder plan that has the necessary detail. For more details, please refer to the Citus technical README.
In Citus 13.2 we enhanced the fast path router planner so that instead of always creating the placeholder plan, it delays doing so until the shard for the query is identified. If that shard is local to the Citus node processing the client's query, then a shortcut can be taken so that deparse, parse and plan of the shard query are avoided. For this to take effect, it must be possible to determine the shard during planning: Citus must be in MX mode and the shard must be local to the Citus node processing the query. If so, the OID of the distributed table is replaced by the OID of the shard in the query parse tree. The parse tree is then given to the Postgres planner, which returns a plan that is stored in the distributed plan's task. That plan can be repeatedly used by the local executor, thus avoiding the need to deparse and plan the shard query on every execution.
We call this delayed fast path planning because if a query is eligible for fast path planning then the fast path router planner delays creating the placeholder plan until the distributed job and task have been created.
Delayed fast path planning can be done when the following properties hold:
SELECT
or UPDATE
on a distributed table (schema or column sharded) or Citus managed local tableIf so, then the function FastPathRouterQuery()
sets a flag indicating that making the placeholder plan should be delayed until after the worker job and its associated task have been created (note - there will be only one task for a fast path query). At that point the router planner uses CheckAndBuildDelayedFastPathPlan()
to see if the task's shard placement is local (and not a dummy placement) and the metadata of the shard table and distributed table are consistent (no DDL in progress on the distributed table). If so, the parse tree with OID of the distributed table replaced by the OID of the shard table is fed to standard_planner() and the resultant plan is saved in the task. Otherwise, if the worker job has been marked for deferred pruning, the shard is not local, or the shard is local but it's not safe to swap OIDs, then CheckAndBuildDelayedFastPathPlan()
calls FastPathPlanner()
to create a placeholder plan and thus ensure a complete plan context.
SELECT
statements in the future.citus.enable_local_fast_path_query_optimization
. It is on by default. To get a sense of the potential improvement from this feature, we ran pgbench
against a Citus instance using Azure Postgres Flexible Server on D8ds_v5 virtual machines at scale factor 2000 and with the shards all on one node so that all queries are locally executed - therefore every query will pass through the delayed fast path planning optimization described here. We ran pgbench
against this instance as follows:
$ pgbench –p 5432–T 900 –c 256 –j 16 –f test_query.sql
where test_query.sql
is the following point query on the pgbench_accounts
table, which is distributed on the aid
column:
\set scale_factor 2000
\set aid :scale_factor * 1000000
SELECT abalance FROM pgbench_accounts WHERE aid=:aid
Clients | TPS | TPS DFPP | % Improvement |
---|---|---|---|
32 | 46471 | 53744 | 15.6 |
128 | 50574 | 64875 | 28.3 |
256 | 46177 | 60022 | 30 |
512 | 40229 | 50628 | 26 |
The table shows TPS observed with citus.enable_local_fast_path_query_optimization
disabled (the TPS
column), and enabled (the TPS DFPP
column; DFPP is Delayed Fast Path Planning) and the % increase given by DFPP, for a range of clients, which is the number of clients that pgbench opens for the benchmark run.
The key observation is that the feature gives a 28% to 30% increase in TPS, particularly when CPU usage is maximised. Therefore, with the shards distributed evenly across a 2 node instance and assuming uniform data and connection distribution, it is reasonable to expect a 15% increase in TPS with this feature, and more generally with shards distributed evenly across an N node instance it is reasonable to expect DFPP to give a (30 / N)% increase in TPS.
As with all performance enhancements the caveat is that your mileage may vary. The query we benchmarked here runs in 1 millisecond on a Postgres instance that is not IO-bound with planning time accounting for half of that time. A query whose time is dominated by execution may not experience such a gain in throughput. But we believe that for OLTP workloads as modeled by pgbench
, this feature can provide a significant throughput increase, particularly at high query volumes.
citus_stats
is a distributed statistics view implemented on Citus, designed to aggregate and present unified Citus table statistics. It aggregates the functionality of PostgreSQL’s built-in pg_stats
, which provides per-table, per-column statistics, such as the fraction of NULL
values, or the most common values in a column. While pg_stats
is limited to local node visibility, citus_stats
collects and merges this information from each worker node, because Citus tables are made up of shards which reside in different nodes of the cluster. In Citus 13.2, through citus_stats
you can find these statistical columns per each Citus table (distributed, reference or Citus local) in your cluster:
null_frac
: Fraction of the column entries of a Citus table that are NULL
. Ranges from 0.0 to 1.0. Helps the user understand how many rows will be excluded by IS NOT NULL
.most_common_vals
: An array of the most common values in the column of a Citus table (up to a limit). Helps the user understand data skew.most_common_freqs
: An array of frequencies corresponding to most_common_vals
. Each value is an estimate, and represents the fraction of total rows.Let's explore an example:
SET citus.shard_count = 2;
-- create and populate 3 types of Citus tables
CREATE TABLE dist_user_payloads (id int, payload text, user_name text);
CREATE TABLE ref_user_payloads (id int, payload text, user_name text);
CREATE TABLE citus_local_user_payloads (id int, payload text, user_name text);
SELECT create_distributed_table('dist_user_payloads', 'id');
SELECT create_reference_table('ref_user_payloads');
SELECT citus_add_local_table_to_metadata('citus_local_user_payloads');
-- populate the tables
INSERT INTO dist_user_payloads VALUES (1, 'abc', 'user1'), (3, 'cde', 'user1'),
(4, 'def', 'user1'), (4, 'def', 'user1'),
(3, 'cde', 'user2'), (5, NULL, NULL),
(4, 'def', 'user1');
INSERT INTO ref_user_payloads VALUES (1, 'abc', 'user1'), (3, 'cde', 'user1'),
(4, 'def', 'user1'), (4, 'def', 'user1'),
(3, 'cde', 'user2'), (5, NULL, NULL),
(4, 'def', 'user1');
INSERT INTO citus_local_user_payloads VALUES (1, 'abc', 'user1'), (3, 'cde', 'user1'),
(4, 'def', 'user1'), (4, 'def', 'user1'),
(3, 'cde', 'user2'), (5, NULL, NULL),
(4, 'def', 'user1');
-- analyze the tables in order to populate pg_stats in the nodes
ANALYZE dist_user_payloads;
ANALYZE ref_user_payloads;
ANALYZE citus_local_user_payloads;
-- query citus_stats
SELECT tablename, attname, null_frac, most_common_vals, most_common_freqs
FROM citus_stats;
tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------+-----------+------------+------------------+------------------------
dist_user_payloads | id | 0 | {4,3} | {0.42857143,0.2857143}
dist_user_payloads | payload | 0.14285715 | {def,cde} | {0.42857143,0.2857143}
dist_user_payloads | user_name | 0.14285715 | {user1} | {0.71428573}
ref_user_payloads | id | 0 | {4,3} | {0.42857143,0.2857143}
ref_user_payloads | payload | 0.14285715 | {def,cde} | {0.42857143,0.2857143}
ref_user_payloads | user_name | 0.14285715 | {user1} | {0.71428573}
citus_local_user_payloads | id | 0 | {4,3} | {0.42857143,0.2857143}
citus_local_user_payloads | payload | 0.14285715 | {def,cde} | {0.42857143,0.2857143}
citus_local_user_payloads | user_name | 0.14285715 | {user1} | {0.71428573}
(9 rows)
This is the initial implementation of this aggregated view in Citus. The view could be extended to include the following in future releases:
WITH (security_barrier)
to citus_stats
to mimic behavior of pg_stats
citus_stats
by aggregating inherited
and avg_width
columns of pg_stats
Before Citus 13.2, when you create the citus
extension, the citus_columnar
extension was also automatically created, even though you didn't provide CASCADE
option when creating the citus
extension. As of Citus 13.2, this behavior has changed.
Note that once you install Citus binaries, you can still create the citus_columnar
extension whenever you want — this part remains unchanged. However, now the citus_columnar
extension will only be created if you explicitly specify CASCADE
when creating the citus
extension or if you explicitly create the citus_columnar
extension yourself, as in:
CREATE EXTENSION citus_columnar;
This section describes bug fixes that require user attention because they fix wrong query results or change query behavior and thus may impact services and applications that use Citus.
Citus 13.2 includes a fix to UPDATE
statements, through PR #7675, that previously mixed up the order of columns to be updated, resulting in the impacted columns getting the wrong values. This could happen with UPDATE
statements that list the columns to be updated in a different order than the table definition, and use a subquery as the source values. For example, given table definitions:
CREATE TABLE tref (id int primary key,col_0 int,col_1 int,col_2 int,col_3 int);
select create_reference_table('tref'); -- could also be a distributed table
insert into tref values (1, 0, 0, 0, 0);
-- The UPDATE statement incorrectly changes `col_3` and `col_2`:
update tref SET (col_0, col_1, col_3, col_2) = (SELECT 10, 11, 33, 22);
select * from tref;
│ id │ col_0 │ col_1 │ col_2 │ col_3 │
│ 1 │ 10 │ 11 │ 33 │ 22 │ <-- wrong values in col_2, col_3
The fix involved changing how Citus generates shard queries, and now the correct values are written to the columns regardless of their order. We are grateful to Cedric Villemain, a frequent contributor to Citus, for providing the patch. We are highlighting this fix here because the problem involves potential silent data corruption and users should check their Citus deployments for UPDATE
statements that use a SELECT
and have the updated columns in a different order to the table definition.
During the development of Citus 13 we were alerted to a category of queries that were producing incorrect query results. Here is an example, taken from #7698:
select t3.vkey
from (t1 right outer join t3 on (t1.c10 = t3.vkey ))
where exists (select * from t3);
Here t1 is a distributed table and t3 is a Postgres table. The query has an outer join and a pseudo-constant qualifier (exists (select * from t3)
) which means its result is independent of the rest of the query and is the same for the entire query execution. Incorrect query results were also reported in issues #7697 and #7696. Investigation showed that the queries all involved an outer join and a pseudo-constant qualifier and that the problem was caused by commit 695f5deb79 in Postgres 16 which prevents the set_join_pathlist_hook
from being called if there is a pseudo-constant qualifier involved. Citus depends on this hook to get information about the joins in the query, and without it Citus is not aware of outer joins. Postgres 17 re-enabled unconditional calling of set_join_pathlist_hook
so these issues are not present with Citus on Postgres 17.
So the recommended solution is to upgrade to Postgres 17. However, we are left with the spectre of potentially incorrect results for Citus running on Postgres 16 and Postgres 15 (because 695f5deb79 was back-ported to PG 15) so in Citus 13.2, through PR #7937, we take the extra measure of throwing an error for Postgres versions less than 17 for this category of queries:
select * from
(t0 full outer join t3
on (t0.c3 = t3.c26 ))
where (exists (select * from t4)) order by 1, 2, 3;
ERROR: Distributed queries with outer joins and pseudoconstant quals are not supported in PG15 and PG16.
DETAIL: PG15 and PG16 disallow replacing joins with scans when the query has pseudoconstant quals
HINT: Consider upgrading your PG version to PG17
We understand that this fix may inconvenience users, but we believe that alerting the user to the presence of an incorrect query plan is preferable to silently producing incorrect query results. In most cases it should be possible to rewrite the query to avoid the check. For example, check out the following query:
select * from
users outer join accounts using (account_id)
It may return sensitive information that certain users should not see, so an application may inject a qualifier to ensure no data is shown to such users:
select * from
users u left outer join accounts a on (u.user_id = a.user_id)
where false
But now the query will error out with Citus 13.2 on Postgres 16 and Postgres 15. If upgrading to Postgres 17 is not an immediate solution the query can be rewritten so that the injected qualifier is not pseudo-constant:
select * from
users u left outer join accounts a on (u.user_id = a.user_id)
where a.account_id != a.account_id
Or the injected qualifier is put in the ON
clause, bypassing the error check and also ensuring that 0 rows are returned:
select * from
users u left outer join accounts a on (u.user_id = a.user_id AND false)