v12.1 (Sep 2023)

Updates to this version:


What’s new in Citus 12.1?

Welcome to the release notes for Citus 12.1. The headline for 12.1 is the addition of PostgreSQL 16 GA support, which is packed with new features and enhancements. This page dives deep into many of the changes in Citus 12.1 open source extension to PostgreSQL, including the following:

Read the 12.1 release blog post: Naisila Puka's 12.1 post takes you through a good overview of the Postgres 16 support in Citus 12.1, and some of the schema-based sharding improvements too.

CHANGELOG: If you’re looking for an extended list of changes in Citus 12.1, please refer to Citus 12.1 [Changelog] on GitHub.

We welcome issues & feedback on our GitHub repo: Stay connected with the Citus open source project on GitHub. And if you appreciate what Citus is doing for distributed PostgreSQL, our team always appreciates GitHub stars ⭐. (Of course we appreciate your inputs on Slack and issues on GitHub even more.)

The Citus 12.1 Release Party livestream will include demos and conversation with a few of our engineers about what's new and interesting in Citus. Watch the livestream on YouTube Wednesday October 18th @ 9:00am PDT. You can use this handy calendar invite to add the livestream to your schedule.

PostgreSQL 16 support

The Citus database is an extension to Postgres, and being an extension is just great, because it's easier to keep in sync with the latest releases of Postgres—and Citus 12.1 does just that, 12.1 adds support for PostgreSQL 16.

PostgreSQL16 highlights include major improvements query parallelization, logical replication, monitoring, load balancing etc. You can read all about them in the official PG16 release notes.

Many of these changes just work with the scalability that Citus provides. Shards are regular Postgres tables, and queries are sent to shards as regular SQL commands. Any query performance improvements are directly reflected on distributed tables. Also, Citus does not interfere with replication, checkpointing, vacuum, logging, monitoring, psql, fdw and other contrib modules, so improvements on these areas are also reflected. For example, the amazing I/O observability addition to PG16, pg_stat_io, can directly show you very useful I/O statistics for every node in a distributed Citus cluster.

PG16 work for this release was mostly focused on compatibility with Postgres query planner changes, as well as SQL & DDL changes that reflect directly upon the user and developer experience. You can find more implementation details in the pull requests labelled with pg16_support. And, if you want to learn about Citus support for all PG16 features in detail, you can also see this issue.

Citus query-from-any-node with load balancing

In PostgreSQL 16, there's a new feature to facilitate load balancing across multiple servers through libpq. This feature involves specifying a connection parameter known as load_balance_hosts. When you set the load_balance_hosts parameter to random, libpq will establish random connections to various hosts and their associated IP addresses. This randomization aids in distributing the workload, particularly in scenarios with numerous clients or frequent connection setups. This feature is particularly valuable for Citus, which benefits from it when executing queries from any node, which is why the Citus engineer Jelte Fennema-Nio contributed this load balancing feature to PostgreSQL 16!

Citus 11.0 introduced "query-from-any-node", enabling distributed queries to be executed from any node within the cluster while automatically synchronizing schema and metadata. In the past, to leverage this capability in Citus, you had to maintain connection strings equal to the number of worker nodes and adapt your application to use all these strings. This approach was cumbersome, especially for applications that typically utilize a single database connection string.

However, with this new load balancing feature, you can continue using your application without modifications. Simply include all the host names in the connection string, allowing you to utilize "query-from-any-node" without altering your application. This feature provides a straightforward method for load balancing across workers and any read replicas you may have. If your client is based on PostgreSQL 16, you can use load balancing by adding multiple comma-separate host names to your URL, and adding the load_balance_hosts argument as below:

psql "postgresql://{worker0}.postgres.cosmos.azure.com,{worker1}.postgres.cosmos.azure.com /?load_balance_hosts=random"

JSON_ARRAYAGG and JSON_OBJECTAGG aggregates

SQL/JSON standard-conforming constructors for JSON types, namely the JSON_ARRAY() and JSON_OBJECT() functions and the JSON_ARRAYAGG() and JSON_OBJECTAGG() aggregates have been added to PG16. Citus supports and parallelizes these aggregates, see below for an example:

-- let’s use 2 shards such that we have 1 shard per worker
SET citus.next_shard_id TO 14000;
SET citus.shard_count TO 2;

-- create, distribute and populate the table
CREATE TABLE agg_test(country text, city text);
SELECT create_distributed_table('agg_test', 'country');
INSERT INTO agg_test VALUES (Albania, Tirana), (Albania, Shkodra), (Albania, Elbasan);
INSERT INTO agg_test VALUES (Turkey, Ankara), (Turkey, Istanbul);

-- change this config to be able to see how the JSON_ARRAYAGG aggregate is executed
SET citus.log_remote_commands TO on;

-- query with the JSON_ARRAYAGG() aggregate grouped on the table’s distribution column: country
-- in this way you get maximum performance from your Citus distributed database
-- Citus can push down execution of the entire query to each worker node, with ports 9701 and 9702
SELECT country, JSON_ARRAYAGG(city),
       JSON_ARRAYAGG(city RETURNING jsonb) AS json_arrayagg_jsonb
FROM agg_test GROUP BY country;
NOTICE:  issuing SELECT country, JSON_ARRAYAGG(city RETURNING json) AS "json_arrayagg", JSON_ARRAYAGG(city RETURNING jsonb) AS json_arrayagg_jsonb FROM public.agg_test_14000 GROUP BY country
DETAIL:  on server postgres@localhost:9701 connectionId: 5
NOTICE:  issuing SELECT country, JSON_ARRAYAGG(city RETURNING json) AS "json_arrayagg", JSON_ARRAYAGG(city RETURNING jsonb) AS json_arrayagg_jsonb FROM public.agg_test_14001 GROUP BY country
DETAIL:  on server postgres@localhost:9702 connectionId: 6
country |          json_arrayagg           |       json_arrayagg_jsonb
---------+----------------------------------+----------------------------------
Albania | ["Tirana", "Shkodra", "Elbasan"] | ["Tirana", "Shkodra", "Elbasan"]
Turkey  | ["Ankara", "Istanbul"]           | ["Ankara", "Istanbul"]
(2 rows)

DEFAULT option to COPY FROM

If a distributed table has defined a default value to a column, then with PG16 you can control in which rows you want to insert the default value, and in which rows you have a defined non-default value. This control is achieved with a specified string that represents a default value. Each time the string is found in the input file, the default value of the corresponding column will be used. See below for a simple example with distributed tables:

-- create a table with default value columns and distribute it
-- using a similar example as in Postgres tests, but with a distributed table
CREATE TABLE dist_table_copy_default(
    id int,
    text_value text NOT NULL DEFAULT 'default_value_for_text',
    ts_value timestamp without time zone NOT NULL DEFAULT '2023-09-21');
SELECT create_distributed_table('dist_table_copy_default', 'id');

-- perform COPY FROM operation with DEFAULT option specified
COPY dist_table_copy_default FROM stdin WITH (default '\D');
1   value   '2022-07-04'
2   \D  '2022-07-03'
3   \D  \D
\.

SELECT * FROM dist_table_copy_default ORDER BY id;
id |       text_value       |      ts_value
----+------------------------+---------------------
  1 | value                  | 2022-07-04 00:00:00
  2 | default_value_for_text | 2022-07-03 00:00:00
  3 | default_value_for_text | 2023-09-21 00:00:00
(3 rows)

Custom ICU collation rules

PG16 allows custom ICU collation rules to be created using the new “rules” option in CREATE COLLATION command. Citus already supports distributed collations, therefore we added the propagation of this new option as well:

CREATE COLLATION default_rule (provider = icu, locale = '');
-- this example is the same as in Postgres with the "&a < g" rule
-- "&a < g" places “g” after “a” and before “b”, and the “a” does not change place.
CREATE COLLATION special_rule (provider = icu, locale = '', rules = '&a < g');

-- Create and distribute a table
CREATE TABLE test_collation_rules (a text);
SELECT create_distributed_table('test_collation_rules', 'a');
-- insert values in the table that reflect sorting changes when two different collations are used
INSERT INTO test_collation_rules VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');

-- first, let’s see collation properties in the coordinator
SELECT collname, collprovider, collicurules FROM pg_collation WHERE collname like '%_rule%';
collname     | collprovider | collicurules
--------------------------------------------
default_rule | i            |
special_rule | i            | &a < g
(2 rows)

-- query the table using the default rule collation
SELECT * FROM test_collation_rules ORDER BY a COLLATE default_rule;
     a
-----------
Abernathy
apple
bird
Boston
Graham
green
(6 rows)

-- now query the table using the special rule collation with "&a < g"
SELECT * FROM test_collation_rules ORDER BY a COLLATE special_rule;
     a
-----------
Abernathy
apple
green
bird
Boston
Graham
(6 rows)

-- connect to the worker node to see that everything is consistent
\c - - - :worker_1_port

SELECT collname, collprovider, collicurules FROM pg_collation WHERE collname like '%_rule%';
collname     | collprovider | collicurules
--------------------------------------------
default_rule | i            |
special_rule | i            | &a < g
(2 rows)

SELECT * FROM test_collation_rules ORDER BY a COLLATE special_rule;
     a
-----------
Abernathy
apple
green
bird
Boston
Graham
(6 rows)

TRUNCATE triggers on Citus foreign tables

Previously, TRUNCATE triggers were not allowed on foreign tables, but now they are, and can be useful for audit logging and preventing undesired truncation. This usage can be extended to the Citus foreign tables as well:

-- prepare the environment for Citus managed local tables
SELECT citus_add_node('localhost', :master_port, groupid => 0);
SET citus.use_citus_managed_tables TO ON;

-- create a foreign table using the postgres_fdw extension
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
INSERT INTO foreign_table_test VALUES (1, 'text_test');
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server
        FOREIGN DATA WRAPPER postgres_fdw
        OPTIONS (host 'coordinator_host', port :'coordinator_port', dbname 'dbname');
CREATE USER MAPPING FOR CURRENT_USER
        SERVER foreign_server
        OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table (
        id integer NOT NULL,
        data text,
        a bigserial)
        SERVER foreign_server
        OPTIONS (table_name 'foreign_table_test');

-- verify it's a Citus foreign table
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
partmethod | repmodel
-----------------------
n          | s
(1 row)

-- insert more data
INSERT INTO foreign_table VALUES (2, 'test_2');
INSERT INTO foreign_table_test VALUES (3, 'test_3');

-- create a simple TRUNCATE trigger on the Citus foreign table
-- this trigger is the same as in Postgres tests for this feature
CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
    RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %',
        TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL;
    RETURN NULL;
END;$$;

CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table
    FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func();

-- query the foreign table, then truncate it, then query it again
SELECT * FROM foreign_table ORDER BY 1;
id |   data    | a
--------------------
  1 | text_test | 1
  2 | test_2    | 1
  3 | test_3    | 2
(3 rows)

-- here the raised notice will let us know that the trigger has been activated
TRUNCATE foreign_table;
NOTICE:  trigger_func(<NULL>) called: action = TRUNCATE, when = BEFORE, level = STATEMENT

-- as expected, it’s empty
SELECT * FROM foreign_table ORDER BY 1;
id | data | a
---------------
(0 rows)

More PG16 DDL propagation

Citus now propagates new CREATE TABLE, VACUUM and ANALYZE options to worker nodes:

STORAGE attribute in CREATE TABLE

Previously, the storage of a column could only be set by using the ALTER COLUMN command, but with PG16 the STORAGE attribute can be specified when creating a new table. Citus propagates this option to each of the shards. This attribute is important because it controls whether the column is held inline or in a secondary TOAST table, and whether the data should be compressed or not:

-- by default, storage of type text is extended, meaning that it is fully toastable
-- here we specify the storage of another text column to be plain - i.e. type not prepared for toasting
CREATE TABLE test_storage (a text, c text STORAGE plain);

-- next, we distribute the table
-- and check that all the shards have the same storage in all the nodes
SELECT create_distributed_table('test_storage', 'a');
SELECT result FROM run_command_on_all_nodes
($$ SELECT array_agg(DISTINCT (attname, attstorage)) FROM pg_attribute
    WHERE attrelid::regclass::text ILIKE 'test_storage%' AND attnum > 0;$$) ORDER BY 1;
      result
------------------
{"(a,x)","(c,p)"}
{"(a,x)","(c,p)"}
{"(a,x)","(c,p)"}
(3 rows)

New options in VACUUM and ANALYZE

Citus now propagates BUFFER_USAGE_LIMIT, PROCESS_MAIN, SKIP_DATABASE_STATS and ONLY_DATABASE_STATS options in VACUUM and/or ANALYZE commands. PG16 allows control of the shared buffer usage by vacuum and analyze through the BUFFER_USAGE_LIMIT option. It also allows VACUUM to only process TOAST tables through the PROCESS_MAIN option and to skip or update all frozen statistics:

-- change these configs to create a hash-distributed table with one shard
SET citus.next_shard_id TO 12000;
SET citus.shard_count TO 1;

-- change this config to be able to see how the command propagates to the shard
SET citus.log_remote_commands TO on;

-- let’s create and distribute a table
CREATE TABLE table1 (a int);
SELECT create_distributed_table('table1', 'a');

-- see how the commands are propagated to the table’s shard on worker 1, port 9701
VACUUM (PROCESS_MAIN FALSE) table1;
NOTICE:  issuing VACUUM (PROCESS_MAIN FALSE) table1_12000
DETAIL: on server postgres@localhost:9701 connectionId: 1

ANALYZE (BUFFER_USAGE_LIMIT '512 kB') table1;
NOTICE:  issuing ANALYZE (BUFFER_USAGE_LIMIT 512) table1_12000
DETAIL:  on server postgres@localhost:9701 connectionId: 1

-- ONLY_DATABASE_STATS cannot be specified with a list of tables
-- So this command propagates to both workers, on ports 9701 and 9702
VACUUM (ONLY_DATABASE_STATS);
NOTICE:  issuing VACUUM (ONLY_DATABASE_STATS)
DETAIL:  on server postgres@localhost:9701 connectionId: 1
NOTICE:  issuing VACUUM (ONLY_DATABASE_STATS)
DETAIL:  on server postgres@localhost:9702 connectionId: 2

Distributed schema move

With Citus 12.1, we brought various schema-based sharding improvements, one of them being the citus_schema_move() function. You can now move a distributed schema to a different node by using citus_schema_move(). This is useful when you want to move all the tables within a distributed schema to a different node to balance the load or to free up space on a node. The function takes the schema name, target identifier and optionally the shard transfer mode as arguments. The function signature is:

citus_schema_move(schema_id regnamespace,
                  target_node_name text, target_node_port integer,
                  shard_transfer_mode citus.shard_transfer_mode DEFAULT 'auto'::citus.shard_transfer_mode)

Or equivalently, to specify the target node by its node id instead of its name and port:

citus_schema_move(schema_id regnamespace,
                  target_node_id integer,
                  shard_transfer_mode citus.shard_transfer_mode DEFAULT 'auto'::citus.shard_transfer_mode)

Where, shard_transfer_mode can optionally be specified as in citus_move_shard_placement() function.

Note that this function does not pin the schema to the target node. In other words, the rebalancer may move the schema to a different node as per the rebalancer strategy specified by the user.

GRANT/REVOKE propagation on database

Another improvement for schema-based sharding in Citus 12.1 is to propagate GRANT/REVOKE ON DATABASE commands.

By default, only the administrative user can create schemas. To allow other database users to create schemas, you would need to run GRANT CREATE ON DATABASE, and with Citus 12.1, grant and revoke statements on databases are automatically propagated to workers.

Before Citus 12.1, grant and revoke statements on databases did not propagate to worker nodes. As a result, users had to manually manage privileges on each worker, which could become quite cumbersome, especially in scenarios with a large number of workers. Below example shows how a privilege on a database is automatically propagated to the workers:

Let's grant the CREATE privilege on the database citus_db to role r1:

CREATE role r1 ;
GRANT create ON DATABASE citus_db TO r1;

To check that the privilege is propagated to the worker node, which runs on the port 9701, you can connect to it via psql:

psql -p 9701 -U postgres

Once connected, you can use the following query to verify that the privilege is propagated to the worker:

select has_database_privilege('r1','citus_db', 'create');
 has_database_privilege 
------------------------
 t

(1 row)

To sum up, Citus 12.1 makes managing database privileges in distributed environments more efficient and user-friendly.

Community contributions

Citus greatly appreciates community contributions, and Citus 12.1 has received a substantial number of them.

In particular, Ivan Vyazmitinov has contributed valuable enhancements to 2PC recovery when employing Citus in multiple databases. Although it's not typically recommended to use Citus in multiple databases, we are steadily enhancing its support, and Ivan's patch has effectively addressed a notable operational challenge:

Additionally, zhjwpku has made enhancements to the metadata cache, improvements in code compatibility, and various other valuable contributions:

Deprecations

We removed the pg_send_cancellation tool. This tool was used to support query cancellations in a topology with multiple PgBouncers behind a load balancer. PgBouncer 1.19.0 and later releases handle the query cancellation without using pg_send_cancellation tool.

Notable Fixes

  • Pull request #7174 : Fixes 2PC transaction recovery when there are multiple databases
  • Pull request #7093 : Fixes an error which may appear while adding a null constraint to a table
  • Pull request #7077 : Prevents unnecessarily pulling the data into coordinator for some INSERT .. SELECT queries that target a single-shard group
  • Pull request #7074 : Makes sure that rebalancer throws an error if replication factor is greater than the shard allowed node count. Also makes sure to avoid moving a shard to a node that it already exists on
  • Pull request #7152 : Fixes a bug that could cause COPY logic to skip data in case of out-of-memory