Watch 👀 the Citus 12.0 Release Party livestream replay—with demos + Q&A! 📺
Watch 👀 the Citus 12.0 Release Party livestream replay—with demos + Q&A! 📺
Updates to this version:
Welcome to the release notes for Citus 12.0. This page dives deep into many of the changes in Citus 12.0 open source extension to PostgreSQL, including these headline features:
Rebalance by disk size by default: we changed the default rebalance strategy
Identity column changes: Improved handling of identity columns
Deprecated Features: Dropped PG13 support
Notable fixes: Less issues
Read the 12.0 release blog post: If you’re looking for an overview about what’s new in the 12.0 Citus database extension for multi-tenant SaaS workloads (and more), Marco Slot's 12.0 blog post is a good place to start.
CHANGELOG: If you’re looking for more extended list of changes in Citus 12.0, please refer to Citus 12.0 Changelog on GitHub.
As always, we welcome issues & feedback on our GitHub repo: Stay connected with the Citus open source project on GitHub. And if you appreciate what Citus is doing for distributed PostgreSQL, our team always appreciates GitHub stars ⭐. (Of course we appreciate your inputs on Slack and issues on GitHub even more!)
The Citus 12 Release Party livestream includes demos of schema-based sharding and tenant monitoring—plus an update to PgBouncer that supports schema-based sharding, and more. Watch the replay.
As of Citus 12.0, Citus allows you to shard your database based on schemas, in addition to traditional row-based sharding. This means that tables from the same schema are placed on the same node, while different schemas may be on different nodes. That way, queries and transactions that involve a single schema can always be evaluated efficiently by a single node.
You can use the GUC to enable that feature for newly created schemas:
SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA tenant1;
when this setting is on, each newly created schema will be a distributed schema. However, it does not apply to the already existing ones.
You can also use citus_schema_distribute
and citus_schema_undistribute
UDFs to convert regular schemas to distributed schemas or back.
SELECT citus_schema_undistribute('tenant1');
SELECT citus_schema_distribute('tenant1');
You can query your distributed schemas by using citus_schemas
view.
-- Make sure schema-based sharding is enabled
SET citus.enable_schema_based_sharding TO ON;
-- Create another distributed schema
CREATE SCHEMA tenant2;
-- See distributed schemas
SELECT * FROM citus_schemas;
┌─────────────┬───────────────┬─────────────┬──────────────┐
│ schema_name │ colocation_id │ schema_size │ schema_owner │
├─────────────┼───────────────┼─────────────┼──────────────┤
│ tenant1 │ 2 │ 0 bytes │ citus_user │
│ tenant2 │ 3 │ 0 bytes │ citus_user │
└─────────────┴───────────────┴─────────────┴──────────────┘
(2 rows)
You can create single-shard distributed tables in these schemas. In the background, Citus creates colocated single-shard distributed tables that don’t have a shard-key, on the node that the distributed schema lives on.
See that the tables that belong to distributed schema tenant1
is located on the same node, while the table in tenant2
lives on the other node. (See the port info in the plan)
EXPLAIN SELECT * FROM tenant1.table1 ;
┌────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN │
├────────────────────────────────────────────────────────────────────────────────────┤
│ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) │
│ Task Count: 1 │
│ Tasks Shown: All │
│ -> Task │
│ Node: host=localhost port=9702 dbname=citus_db │
│ -> Seq Scan on table1_102008 table1 (cost=0.00..35.50 rows=2550 width=4) │
└────────────────────────────────────────────────────────────────────────────────────┘
(6 rows)
EXPLAIN SELECT * FROM tenant1.table2 ;
┌────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN │
├────────────────────────────────────────────────────────────────────────────────────┤
│ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) │
│ Task Count: 1 │
│ Tasks Shown: All │
│ -> Task │
│ Node: host=localhost port=9702 dbname=citus_db │
│ -> Seq Scan on table2_102009 table2 (cost=0.00..35.50 rows=2550 width=4) │
└────────────────────────────────────────────────────────────────────────────────────┘
(6 rows)
EXPLAIN SELECT * FROM tenant2.table1 ;
┌────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN │
├────────────────────────────────────────────────────────────────────────────────────┤
│ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) │
│ Task Count: 1 │
│ Tasks Shown: All │
│ -> Task │
│ Node: host=localhost port=9701 dbname=citus_db │
│ -> Seq Scan on table1_102010 table1 (cost=0.00..35.50 rows=2550 width=4) │
└────────────────────────────────────────────────────────────────────────────────────┘
(6 rows)
When using distributed schemas; queries, joins & foreign keys that refer to them should ideally target to a single tenant (e.g., schema) at a time. With that, you essentially can scale your Postgres database into multiple nodes, with almost no limitations.
SELECT * FROM tenant1.table1 first JOIN tenant2.table1 second ON first.a = second.a;
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
citus.enable_schema_based_sharding
is off. UPDATE tenant1.notes SET data=my_random_text_generator();
This release introduces significant performance improvements to the tenant monitoring feature in Citus. The changes made in this release are not only limited to performance improvements but also optimize the tracking of queries by implementing a sampling rate option for tenant statistics. By reducing the number of queries tracked by the tenant monitor, the performance overhead of tenant monitoring is significantly decreased. This enhancement allows for better scalability and improved overall performance of the Citus cluster.
To clarify the sampling process, the sampling rate option only triggers the sampling of queries for currently untracked tenants. For tenants that are already being tracked, all queries are still tracked regardless of the sample rate. This means that the tenant monitoring feature ensures comprehensive query tracking for all previously tracked tenants, while selectively sampling queries for new or untracked tenants. This approach strikes a balance between thorough monitoring and reducing the performance impact on the system.
To enable sampling we introduced a new GUC setting called citus.stat_tenants_untracked_sample_rate
.
Example :
-- enable sampling with a %10 rate
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stat_tenants_untracked_sample_rate to 0.1;');
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
With this release, Citus enables monitoring and tracking of schema based sharded tenants. Users can now gain insights into individual tenant behavior and resource utilization at the schema level, facilitating better resource allocation and optimization.
Example :
SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA my_schema;
CREATE TABLE my_schema.users(id int);
SELECT id FROM my_schema.users WHERE id = 2;
INSERT INTO my_schema.users VALUES (1);
UPDATE my_schema.users SET id = 2 WHERE id = 1;
DELETE FROM my_schema.users WHERE id = 2;
SELECT * FROM citus_stat_tenants ;
SELECT * FROM citus_stat_tenants;
nodeid | 1
colocation_id | 4
tenant_attribute | my_schema
read_count_in_this_period | 0
read_count_in_last_period | 1
query_count_in_this_period | 0
query_count_in_last_period | 4
cpu_usage_in_this_period | 0
cpu_usage_in_last_period | 0.000345
(1 row)
citus.stat_tenants_untracked_sample_rate
PostgreSQL community added support for MERGE
command starting from Postgres 15. Citus 11.2 and Citus 11.3 added support for MERGE
command for local tables and colocated tables, respectively.
As of Citus 12.0, we expanded the MERGE
support for a much wider set of distributed queries. In essence, we expanded MERGE
command in a similar fashion for INSERT .. SELECT
commands. Citus can now handle MERGE
commands using two new strategies
Repartitioning: Plan the source query independently, execute the results into intermediate files, and repartition the files to co-locate them with the merge-target table. Subsequently, compile a final merge query on the target table using the intermediate results as the data source.
Pull-to-coordinator: Execute the plan that requires evaluation at the coordinator, run the query on the coordinator, and redistribute the resulting rows to ensure colocation with the target shards. Direct the MERGE SQL operation to the worker nodes' target shards, using the intermediate files colocated with the data as the data source.
The distributed planner picks the optimal plan based on query, thus eliminating the need for end users to worry about these details. However, similar to INSERT .. SELECT
command processing, the colocated MERGE
command is the most performant way as it pushes down the whole computation to the shards. The other two methods requires shuffling data around the cluster to be able to execute the command in the distributed Citus database.
As a result of these improvements, Citus 12.0 allows arbitrary queries as the source
for the MERGE
command. In other words, the source
clause can include reference tables, non-colocated distributed tables or subqueries/ctes.
Below, we'll show one example for each
CREATE TABLE target_table(tid int, val varchar);
CREATE TABLE colocated_source_table(sid int, val varchar);
CREATE TABLE non_colocated_source_table(sid int, val varchar);
SELECT create_distributed_table('target_table', 'tid');
SELECT create_distributed_table('colocated_source_table', 'sid', colocate_with=>'target_table');
SELECT create_distributed_table('non_colocated_source_table', 'sid', colocate_with=>'none');
-- colocated MERGE command as supported in Citus 11.3+
EXPLAIN MERGE INTO target_table t
USING colocated_source_table s
ON t.tid = s.sid
WHEN MATCHED AND t.val = 'target-delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = s.val
WHEN NOT MATCHED THEN
INSERT(tid, val) VALUES(sid, val);
┌─────────────────────────────────────────
| QUERY PLAN │
├──────────────────────────────────────── ┤
│ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
| ... rest of the plan not interesting ..
│
├──────────────────────────────────────── ┤
-- non-colocated MERGE via repartition
-- supported with Citus 12.0+
EXPLAIN MERGE INTO target_table t
USING non_colocated_source_table s
ON t.tid = s.sid
WHEN MATCHED AND t.val = 'target-delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = s.val
WHEN NOT MATCHED THEN
INSERT(tid, val) VALUES(sid, val);
┌─────────────────────────────────────────
| QUERY PLAN │
├──────────────────────────────────────── ┤
| Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
│ MERGE INTO target_table method: repartition
| ... rest of the plan not interesting ..
│
├──────────────────────────────────────── ┤
-- non-colocated MERGE via pull to coordinator
-- supported with Citus 12.0+
-- although the tables are colocated, the result of the
-- subquery `s` is not colocated with the target_table
EXPLAIN MERGE INTO target_table t
USING (SELECT max(sid) as sid, val FROM colocated_source_table GROUP BY val) s
ON t.tid = s.sid
WHEN MATCHED AND t.val = 'target-delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = s.val
WHEN NOT MATCHED THEN
INSERT(tid, val) VALUES(sid, val);
┌─────────────────────────────────────────
| QUERY PLAN │
├──────────────────────────────────────── ┤
│ Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0) │
│ MERGE INTO target_table method: pull to coordinator
| ... rest of the plan not interesting ..
├──────────────────────────────────────── ┤
(19 rows)
There are still two notable limitations of MERGE
command on the distributed tables even if repartitioning
or pull-to-coordinator
execution is used. These limitations mostly arise from the distributed nature of Citus combined with the nature of MERGE
command. It is not plausible to repartition
or pull-to-coordinator
of the target_table
. Hence, we use target_table
as the anchor in the MERGE
query processing, and only shuffle the source
part of the query.
The two notable limitations that arise from the above are:
ON
clause should include the distribution key of the target_table
NOT MATCHED .. INSERT ()
clause should include the joining clause of the source
query of the ON
clause Since Citus 10.0 you could rebalance your data across the cluster. But so far the default strategy to do so was the very simplistic. By default the rebalancer only tried to make sure that the number of shards on each node were the same. In cases where you have a few hot shards, this was not a very good way of balancing your data. So, we also allowed you to rebalance based on size of each shard on disk. This required manually choosing the by_disk_size
rebalance strategy though. Now in Citus 12.0 we made the by_disk_size
rebalance strategy the default, because it is an improvement for most of our users:
-- Before Citus 12.0
SELECT citus_rebalance_start(rebalance_strategy := 'by_disk_size');
-- Since Citus 12.0
SELECT citus_rebalance_start();
If you still want to use the previous rebalance strategy you still can, you only need to specify the rebalance strategy:
SELECT citus_rebalance_start(rebalance_strategy := 'by_shard_count');
We dropped support for PostgreSQL 13: Our soft policy for Postgres version compatibilty is to support Citus' latest release with Postgres' 3 latest releases. Given that we are expecting Postgres 16 soon, and we don't want to drop support for a major PG version in minor releases, we dropped Postgres 13 support in this major release. In other words, right now Citus 12.0 supports Postgres 14 and 15, and stay tuned for Postgres 16 support in the next release!