v11.3 (May 2023)

Updates to this version:

What’s new in Citus 11.3?

Welcome to the release notes for Citus 11.3. This page dives deep into many of the changes in Citus 11.3 open source extension to PostgreSQL, including these headline features:

Read the 11.3 release blog post: If you’re looking for an overview about what’s new in Citus 11.3 for multi-tenant SaaS workloads (and more), Marco Slot's 11.3 blog post is a good place to start.

Watch the replay of the Citus 11.3 release party livestream: Our Citus database team held a “Release Party” on Mon May 15, 2023 to informally chat about multi-tenant SaaS apps and what’s new in Citus 11.3—also, to demo 3 of the new capabilities in Citus.

CHANGELOG: If you’re looking for more extended list of changes in Citus 11.3, please refer to Citus 11.3 ChangeLog on Github.

As always, we welcome issues & feedback on our GitHub repo: Stay connected with the Citus open source project on GitHub. And if you appreciate what Citus is doing for distributed PostgreSQL, our team always appreciates GitHub stars ⭐!

Tenant monitoring with citus_stat_tenants

Monitoring and statistics are both important for optimising databases. You can easily find stats of the nodes your Citus cluster is built on but sometimes you need a little bit more granularity, especially for your multi-tenant SaaS applications. So, in 11.3 we introduced a new tenant-level statistics view, citus_stat_tenants. With the citus_stat_tenants view you can track the usage data of the tenants in your Citus database.

In citus_stat_tenants view we keep track of:

  • Read query count (SELECT queries)
  • Total query count (SELECT, INSERT, DELETE, and UPDATE queries)
  • and total CPU usage in seconds

for each of the tenants within defined periods.

Let's say you have tables defined and distributed like:

CREATE TABLE companies (id BIGSERIAL, name TEXT);
SELECT create_distributed_table ('companies', 'id');

CREATE TABLE campaigns (id BIGSERIAL, company_id BIGINT, name TEXT);
SELECT create_distributed_table ('campaigns', 'company_id');

Each company will be a tenant and companies.id and campaigns.company_id columns are tenant ids, or tenant attributes.

Now let's create some tenants and run some queries for those tenants:

INSERT INTO companies(name) VALUES ('BestBooks Inc.');
INSERT INTO campaigns(company_id, name) VALUES (1, 'E-Book Tuesday'), (1, 'Sci-fi Romance');

INSERT INTO companies(name) VALUES ('Cafe Raspberry');
INSERT INTO campaigns(company_id, name) VALUES (2, 'Hot Shot Latte');

INSERT INTO companies(name) VALUES ('Bunburger Ventures');
INSERT INTO campaigns(company_id, name) VALUES (3, 'Invest in Bun'), (3, 'Max&Tax'), (3, 'Buntastic Profits');
INSERT INTO campaigns(company_id, name) VALUES (3, 'Milkshake as a Service');

SELECT count(*) FROM campaigns WHERE company_id = 3;
(1 row)

UPDATE campaigns SET name = UPPER(name) WHERE company_id = 3;

SELECT name FROM campaigns WHERE company_id = 3 AND name LIKE '%BUN%';
(2 rows)

Note: You need to set citus.stat_tenants_track to 'all' for citus_stat_tenants to track your tenants' statistics in all Citus nodes and for all backends. You can put the setting in the postgresql.conf file.

You can query citus_stat_tenants for the statistics:

SELECT tenant_attribute,
FROM citus_stat_tenants

 tenant_attribute | read_count_in_this_period | query_count_in_this_period | cpu_usage_in_this_period
 1                |                         0 |                          2 |                 0.000199
 2                |                         0 |                          2 |                 0.000196
 3                |                         2 |                          6 |                 0.000544
(3 rows)

Tenant level monitoring is designed for tracking the tenants with most usage. The view will show you the top citus.stat_tenants_limit tenants. If a tenant is no longer active, the row for the tenant might drop from the view.

The query and CPU usage is counted in period buckets. You can see the current and last period's query and CPU statistics. If you want to create a dashboard you need to use the previous period's numbers. Because previous period has ended and previous period's numbers are settled. Tenants can still run new queries in the current period, so current period's numbers can change. The current period's numbers can be used to see changes in real time by querying citus_stat_tenants again and again. By default after 60 seconds a period passes and the statistics are updated. You can set the period length with citus.stat_tenants_period parameter.

After running the above queries if you wait 60 seconds (1 period) and run new queries like:

DELETE FROM campaigns WHERE company_id = 3 AND LENGTH(name) < 10;

SELECT count(*) FROM campaigns WHERE company_id = 3;
(1 row)

you can see statistics updated for both current period and the last period:

SELECT * FROM citus_stat_tenants WHERE tenant_attribute = '3';

-[ RECORD 1 ]--------------+---------
nodeid                     | 2
colocation_id              | 1
tenant_attribute           | 3
read_count_in_this_period  | 1
read_count_in_last_period  | 2
query_count_in_this_period | 2
query_count_in_last_period | 6
cpu_usage_in_this_period   | 0.000171
cpu_usage_in_last_period   | 0.000544

If at any point you decide you want to clear the monitor you can use citus_stat_tenants_reset() function.

Non-transactional metadata sync

Citus 11.3 introduces improvements related to metadata sync. Citus database clusters that have thousands of distributed objects (such as distributed tables) in their metadata could experience memory problems during metadata sync. Due to these errors, some users were unable to add new nodes or upgrade beyond Citus 11.0, which introduced the ability to query from any node and scale clusters.

To address those issues, in Citus 11.3 we added an alternative non-transactional mode to the current metadata sync which performs inside a single transaction. The default mode for metadata sync is the single transaction mode, which may cause issues since Postgres has a hard memory limit when a single transaction contains numerous DDL commands that execute on workers during metadata sync. But now in 11.3 or later, users can optionally switch to the non-transactional mode, which syncs the metadata via many transactions, if they have such a memory error.

An example course of action, when a user hits hard memory limit while adding nodes or syncing the metadata to all nodes, can be shown below:

SELECT citus_add_node(<ip>, <port>);
ERROR:  invalid memory alloc size 1073741824

SET citus.metadata_sync_mode TO 'nontransactional';
SELECT citus_add_node(<ip>, <port>);
SELECT start_metadata_sync_to_all_nodes();
ERROR:  invalid memory alloc size 1073741824

SET citus.metadata_sync_mode TO 'nontransactional';
SELECT start_metadata_sync_to_all_nodes();

Some corner cases and gotchas:

  • Default mode is transactional for metadata sync
  • If any error occurs in the middle, non-transactional sync could put the cluster into an inconsistent state . To monitor a node that has not been synced, check the metadatasynced flag in pg_dist_node
  • Non-transactional mode is designed to be idempotent, which means users can run it over and over again even if the operation fails in the middle for some reason
  • Non-transactional mode cannot be run inside a transaction block
  • Only superusers have permission to switch to the non-transactional mode

Parallel shard rebalancing

For some use cases, scaling out the cluster is bottlenecked on the speed of shard moves. With this release, we are adding a capabilty to the rebalancer such that multiple shard moves can happen concurrently.

The background shard rebalancer in Citus executes multiple shard moves in a sequential order by default. In 11.3, we introduced the capability of executing shard moves in parallel. The number of maximum parallel shard moves per node is configured using the new GUC citus.max_background_task_executors_per_node. For any node in the cluster, the total number of concurrent incoming/outgoing shard moves to/from a node cannot exceed this value.

There are also dependency requirements that needs to be completed for a shard move before it becomes runnable. In 11.3, there are two types of dependencies:

  • Shard moves within the same colocation group depend sequentially on each other. In other words, Citus does not move multiple shards of a distributed table—nor multiple shards of colocated distributed tables—in parallel.
  • An incoming shard move to a node requires the previous outgoing shard move from that node to have been completed.

Runnable shard moves start to execute when a concurrency spot in their path becomes available.


-- create two tables that are not colocated with each other amd insert some rows
SELECT create_distributed_table('table1', 'a', shard_count => 4, colocate_with => 'none');
INSERT INTO table1 SELECT generate_series(1,10000) AS a;

SELECT create_distributed_table('table2', 'a', shard_count => 4, colocate_with => 'none');
INSERT INTO table2 SELECT generate_series(1,10000) AS a;

-- add/enable a new node, say node with port 9702, in the cluster
SELECT * from pg_dist_node;
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
      1 |       1 | localhost |     9701 | default  | t           | t        | primary  | default     | t              | t
      3 |       3 | localhost |     9703 | default  | t           | t        | primary  | default     | t              | t
      2 |       2 | localhost |     9702 | default  | t           | t        | primary  | default     | t              | f
(3 rows)

SELECT * FROM citus_set_node_property('localhost', 9702, 'shouldhaveshards', true);

-- see the rebalance plan. There are two shard moves planned to the new node
SELECT * FROM get_rebalance_table_shards_plan();
 table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
 table1     |  102008 |          0 | localhost  |       9701 | localhost  |       9702
 table2     |  102013 |          0 | localhost  |       9703 | localhost  |       9702

-- set citus.max_background_task_executors_per_node to 2 enabling the parallel independent moves up to 2.
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();

-- start the rebalancer in the background
SELECT citus_rebalance_start();
NOTICE:  Scheduled 2 moves as job 1
DETAIL:  Rebalance scheduled as background job
HINT:  To monitor progress, run: SELECT * FROM citus_rebalance_status();
(1 row)

-- Monitor the status. Note that there are two moves running concurrently
SELECT * FROM citus_rebalance_status() \gx
-[ RECORD 1 ]--------------------------------------------------
job_id      | 1
state       | scheduled
job_type    | rebalance
description | Rebalance all colocation groups
started_at  |
finished_at |
details     | {"tasks": [], "task_state_counts": {"runnable": 2}}

SELECT * FROM citus_rebalance_status() \gx
-[ RECORD 1 ]----------------------------------------------------
job_id      | 1
state       | running
job_type    | rebalance
description | Rebalance all colocation groups
started_at  | 2023-04-28 15:01:38.157575+03
finished_at |
details     | {"tasks": [{"LSN": {"lag": null, "source": "0/1CE46000", "target": null}, "size": {"source": "14 MB", "target": "0 bytes"}, "hosts": {"source": "localhost:9701", "target": "localhost:9702"}, "phase": "Setting Up", 
"state": "running", "command": "SELECT pg_catalog.citus_move_shard_placement(102008,1,2,'auto')", "message": "", "retried": 0, "task_id": 1}, {"LSN": {"lag": null, "source": "0/1A865480", "target": null}, "size": {"source": "140 MB", "target": "3248 kB"}, "hosts": {"source": "localhost:9703", "target": "localhost:9702"}, "phase": "Copying Data", 
"state": "running", "command": "SELECT pg_catalog.citus_move_shard_placement(102013,3,2,'auto')", "message": "", "retried": 0, "task_id": 2}], "task_state_counts": {"running": 2}}

SELECT * FROM citus_rebalance_status() \gx
-[ RECORD 1 ]------------------------------------------------
job_id      | 1
state       | finished
job_type    | rebalance
description | Rebalance all colocation groups
started_at  | 2023-04-28 15:01:38.157575+03
finished_at | 2023-04-28 15:01:49.856551+03
details     | {"tasks": [], "task_state_counts": {"done": 2}}

  • Pull request #6739: Rebalance shard groups with placement count less than worker count
  • Pull request #6756: Schedule parallel shard moves in background rebalancer by removing task dependencies between shard moves across colocation groups
  • Pull request #6771: Adds control for background task executors involving a node

MERGE support

Postgres 15 introduced support for the MERGE command where a single command can conditionally insert, update, or delete rows of a table. And when we added the first phase of MERGE support to Citus 11.2, we could only MERGE two tables if and only if they both are Citus local tables. As of 11.3, MERGE is now supported for Citus distributed tables. The merge-modifications are done individually on each shard, so, for the MATCHED conditions to be meaningful, source and target have to be colocated and joined on their respective distribution columns.

CREATE TABLE target_table(tid int, val varchar);
CREATE TABLE source_table(sid int, val varchar);
SELECT create_distributed_table('target_table', 'tid');
SELECT create_distributed_table('source_table', 'sid', colocate_with=>'target_table');

INSERT INTO target_table VALUES(1, 'target'); /* MATCHED clause true, row is UPDATED */
INSERT INTO target_table VALUES(1, 'target-delete'); /* MATCHED clause true, row is DELETED */

INSERT INTO source_table VALUES(1, 'source');
INSERT INTO source_table VALUES(2, 'source'); /* NOT MATCHED clause true, row is INSERTED */

MERGE INTO target_table t
USING source_table s
ON t.tid = s.sid
WHEN MATCHED AND t.val = 'target-delete' THEN
        UPDATE SET val = s.val
        INSERT(tid, val) VALUES(sid, val);


SELECT * FROM target_table ORDER BY 1,2;
 tid   val   
   1  source 
   2  source 
(2 rows)

Identity Column Changes

Identity columns allow users to automatically assign a unique value to a column. This release introduces improvements to the handling of identity columns in Citus. With this update, we have improved the logic used for propagating identity columns to worker nodes by passing the columns as-is, rather than creating dependent sequences. If you are using distributed identity columns introduced in Citus 11.2, please read the following carefully as the user experience has slightly changed. This change improves the compatibility with the DDL from any node functionality and also fixes a bug that prevented enforcing identity column restrictions on worker nodes.

Additionally, this release includes a fix for a bug related to insert..select queries with identity columns. This fix improves the reliability of insert..select queries with identity columns in Citus.

Known limitations of identity column support in Citus

  • Identity columns can only be used with the bigint data type on distributed tables.
  • alter_distributed_table and undistribute_table UDFs currently do not support identity columns.
  • Adding an identity constraint to an existing column using an ALTER TABLE statement is not supported.
  • ALTER TABLE statements that involve changing the identity column are not supported.
  • Adding a new identity column to a table with existing data is not supported.
  • Metadata resync causes a loss of state for the sequence used by identity columns.

These limitations may cause errors and related error messages provided.

Notable pull requests for Citus identity column support

  1. PR 6738 This PR improves the logic used for propagating identity columns to worker nodes.
  2. PR 6802 This PR fixes a bug with insert..select queries with identity columns.

CDC support in Citus (Preview)

Change Data Capture (CDC) is a process of identifying and capturing changes made to data in a database. It allows you to track modifications made to a database, including inserts, updates, and deletions. CDC for Citus is offered via logical decoding—and enables logical replication for distributed tables and reference tables to the CDC clients. The Citus CDC support is new to 11.3 and is currently in preview (which means beta.)

CDC Implementation Overview:

Here is the high level overview of CDC implementation in Citus:

  • Any events occuring due to Cluster management operations like shard splits, shard moves, rebalancing operations will not result in publishing of already published events.
  • This is achieved by setting replication origin fields in WAL entries of events created during such cluster management operations.
  • CDC decoder plugins(pgoutput, wal2json) are added to the standard library path, which gets loaded when a new subscription is made to a logical replication slot.
  • This decoder ignores any event that has the replication origin set for the shard management operations as described above.
  • This decoder translates any table name in the event from a particular shard name of a disributed table to its corresponding distributed table name

There are 2 important type of CDC clients:

  • PostgreSQL client to replicate data from a distributed table
  • Event stream consumers like Apache Kafka

The sections below describe how to setup and cosume CDC events for these 2 types of CDC Clients.

CDC for PostgreSQL Client:

This example below shows how to logically replicate a distributed table from a Citus cluster to a Postgres client. For this exmple, the Citus cluster is comprised of coordinator and 2 worker nodes. The CDC client is a regular PostgreSQL server that replicates all changes to a distributed table to a local PostgreSQL table.

Step 1: Enable CDC feature:

CDC for distributed tables feature is in preview (beta) and should be enabled explicitly. This flag should be set in the postgresql.conf so that it is enabled for all backends.


Step 2: Create Distributed Tables:

Here is an example of a distributed table 'sensors', which will be logically replicated to a client using CDC.

    CREATE TABLE sensors(
        measureid               integer,
        eventdatetime           timestamptz,
        measure_data            jsonb,
        meaure_quantity         decimal(15, 2),
        measure_status          char(1),
        measure_comment         varchar(44),
        PRIMARY KEY (measureid, eventdatetime, measure_data)
    SELECT create_distributed_table('sensors','measureid');

Step 3: Schema match in CDC client:

To replicate a distributed table in the CDC client, a table with same name as that of the distributed table and same schema should be created on it.

Step 4: Create publications for CDC:

Create a publication on the distributed table in the co-ordinator node.

    CREATE PUBLICATION cdc_publication FOR TABLE sensors;

This will automatically propogate the publication to all worker nodes.

Step 5: Create logical replication slots for CDC:

In co-ordinator node, run this command to create a logical replication slot on all citus nodes.

    SELECT * FROM run_command_on_all_nodes
      ($$SELECT pg_create_logical_replication_slot('cdc_replication_slot', 'pgoutput', false);$$);

NOTE: Before this step, make sure that the coordinator is already in the metadata, using set_citus_coordinator_host() function.

Step 6: Create subscriptions from CDC client to Citus Cluster:

The following steps are required for subscribing to changes from a CDC client to Citus cluster.

Step 6A: Create subscriptions to co-ordinator node:

Create a subscription in the CDC client with the co-ordinator node to copy any existing data from the distributed table to the CDC client.

Replace these variables in the SQL statemement below with approprate values as described below:
- dbname: Name of the database where the distributed table to be replicated resides.
- username: Database user name.
- coordinator host: hostname of the co-ordinator node
- coordinator port: port of the co-ordinator node.      
- cdc_publication_name: publication created in the co-ordinator for the distributed table.
- cdc_replication_slot: logical replication slot in co-ordinator node for CDC.
    CREATE SUBSCRIPTION cdc_subscription_1 
        CONNECTION 'dbname=<dbname> host=<coordinator host> user=<username> port=<coordinator port>' 
        PUBLICATION <cdc_publication_name> 
        WITH (copy_data=true,create_slot=false,slot_name='<cdc_replication_slot>');
NOTE: 'copy_data' argument should be set to true to copy any existing data to the CDC client. 
This should be done for only one subscription in CDC client otherwise it will result in duplicate data in  CDC client and cause replication errors.
Step 6B: Create subscriptions to worker nodes:

Create subscriptions in CDC client to every worker node to get changes from all worker nodes to the CDC client. For reference tables, this step is not required since the changes to Reference tables will be published only from co-ordinator node. - worker host: hostname of the worker node - coordinator port: port of the worker node

    CREATE SUBSCRIPTION cdc_subscription_2 
        CONNECTION 'dbname=<dbname> host=<worker1 host> user=<user> port=<worker1 port>' 
        PUBLICATION cdc_publication 
        WITH (copy_data=false,create_slot=false,slot_name='cdc_replication_slot');

    CREATE SUBSCRIPTION cdc_subscription_3 
        CONNECTION 'dbname=<dbname> host=<worker2 host> user=<user> port=<worker2 port>' 
        PUBLICATION cdc_publication 
        WITH (copy_data=false,create_slot=false,slot_name='cdc_replication_slot');
NOTE: 'copy_data' argument should be set to false to avoid copying existing data into CDC client from every worker node because that will result in duplicate data and cause replication errors.

Step 7: Verify the CDC subscriptions working correctly

Now, the CDC client is ready to receive all changes happening to the distributed table in the citus cluster. To verify this, for example, an INSERT statement can be run on the co-ordinator node:

    INSERT INTO sensors
    SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
    FROM generate_series(11,20)i;

Now verify the data in 'sensors' table in CDC client to match with data in distributed table.

    SELECT * FROM sensors;

Similarly any UPDATE, DELETE and TRUNCATE statements will replicated.

CDC setup for Apache Kafka Clients:

The steps for publishing events to a Kafka client are similar to that of PostgreSQL client except that, for Kafka, the client side does not have to create a table to replicate the changes and instead need to create some configuration files. Debezium connector is used to connect kafka with each node in Citus cluster.

Step 1: Complete prerequsites for CDC subscriptions from Kafka

Complete these steps below from the CDC for PostgreSQL Client section

Step 2: Create debezium connector configuration files:

For distributed tables, for every worker node a debezium connector configuration file should be created with details of the publication and replication slot information for the distributed table in each worker node, an example is provide below(dbz-test-connector.properties):

    database.hostname=<worker host name>
    database.port=<worker port>
    database.user=<DB user name>
    database.dbname=<database name>
    database.server.name = <kafka topic name>
    slot.name=<CDC replication slot name>
    publication.name=<CDC publication name>
NOTE: For getting changes to reference table the CDC client should connnect only to the co-ordinator node, 
since the changes for Reference tables are published only from co-ordinator node.

Step 3: Start Zookeper and Kafka broker services

If the kafka cluster is not already running, start a zookeper and kafka server with commands like the example below:

    $> cd $kafka_install_dir/bin
    $> ./zookeeper-server-start.sh ../config/zookeeper.properties
    $> ./kafka-server-start.sh ../config/server.properties

Step 4: Start debezium kafka connectors

Run a debezium connector for each worker node, such as the example below.

    $> ./connect-standalone.sh ../config/connect-standalone.properties ../config/dbz-test-connector.properties
NOTE: The 'listeners' property in connect-standalone.properties should be set to be an unique value for each debezium connector, because this determined the REST API port in which the
connector listens to so it must be unique value for each instance of debezium connector.
    for Worker1: (listeners=HTTP://:8083) 
    for Worker2: (listeners=HTTP://:8084) 

Step 5: Start kafka console client

Run a kafka console client to get the changes for the distributed table.

    $> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DBTestServer.public.sensors | jq .

Step 6: Verify CDC changes published to Kafka

Once the kafka setup is done, any insert into the distributed table, will result in an event printed to the kafka console client like this:

        "before": null,
        "after": {
        "measureid": 0,
        "eventdatetime": "2020-01-04T18:30:00Z",
        "measure_data": "{}",
        "meaure_quantity": "EM02",
        "measure_status": "A",
        "measure_comment": "I <3 Citus"
        "source": {
        "version": "1.9.5.Final",
        "connector": "postgresql",
        "name": "DBTestServer",
        "ts_ms": 1683009443335,
        "snapshot": "false",
        "db": "postgres",
        "sequence": "[\"29230288\",\"29230344\"]",
        "schema": "public",
        "table": "sensors",
        "txId": 767,
        "lsn": 29230344,
        "xmin": null
        "op": "c",
        "ts_ms": 1683009443573,
        "transaction": null

Similarly the UPDATE, DELETE events are also published to the kafka clients.

Limitations of Citus CDC as of Citus 11.3

The limitations of CDC support in Citus 11.3 are listed below:

Limitation 1: Inherit all Postgres logical replication limitations

Citus CDC is using logical replication and hence any limitations of Logical Replication apply for CDC also. Some important limitations of Logical replications are:

  • DDL commands are not replicated through logical replication. So any changes to the distributed table schema must be manually applied to the CDC client.
  • A primary or REPLICA IDENTITY should be set to FULL for publishing all the changes to a table to CDC client.
  • The CDC client's replication target must be a regular PostgreSQL table and cannot be distributed table.

Limitation 2: Ordering of events across nodes not guaranteed

The ordering of events published from the same worker node is published in the same order of the events happening in that node. But there is no guarentee on the ordering of events happening across multiple worker nodes.

Limitation 3: Initial snapshot of CDC may not be consistent

The initial snapshot taken for a distributed table using copy_data=true may result in publishing of events out of order, since the data may be changed across worker nodes while the initial data is being published to the CDC client. So it is advisiable to make the initial CDC snapshot during steady state.

Limitation 4: CDC not available for tables based on columnar storage

CDC events are not published for tables with columnar storage methods.

Limitation 5: CDC susbscription to each worker node needed

A seperate subscription from CDC client to each worker node must be created to get events happening to all worker nodes for a distributed table.

Limitation 6: ALL TABLES publication not supported for CDC

If CREATE publication with ALL TABLES is used to create publication, all Citus internal metadata tables are also replicated (such as pg_dist_partition). In that case, if you are replicating from Citus to Postgres, you should make sure that all the metadata tables are created in the subscriber side. As of Citus 11.3, we discourage the use of ALL TABLES syntax in CREATE PUBLICATION command.

Limitation 7: Reference table changes published from co-ordinator only

Reference table events are published from co-ordinator node only, for avoiding publication of duplicate events since the reference tables are replicated to all worker nodes. So the CDC client must be subscribed to the co-ordinator node to get changes for the reference tables.

Notable Fixes