POSETTE 2024 is a wrap! 💯 Thanks for joining the fun! Missed it? Watch all 42 talks online 🍿
POSETTE 2024 is a wrap! 💯 Thanks for joining the fun! Missed it? Watch all 42 talks online 🍿
Updates to this version:
Welcome to the release notes for Citus 11.3. This page dives deep into many of the changes in Citus 11.3 open source extension to PostgreSQL, including these headline features:
citus_stat_tenants
view to monitor your tenant's usage statisticsRead the 11.3 release blog post: If you’re looking for an overview about what’s new in Citus 11.3 for multi-tenant SaaS workloads (and more), Marco Slot's 11.3 blog post is a good place to start.
Watch the replay of the Citus 11.3 release party livestream: Our Citus database team held a “Release Party” on Mon May 15, 2023 to informally chat about multi-tenant SaaS apps and what’s new in Citus 11.3—also, to demo 3 of the new capabilities in Citus.
CHANGELOG: If you’re looking for more extended list of changes in Citus 11.3, please refer to Citus 11.3 ChangeLog on Github.
As always, we welcome issues & feedback on our GitHub repo: Stay connected with the Citus open source project on GitHub. And if you appreciate what Citus is doing for distributed PostgreSQL, our team always appreciates GitHub stars ⭐!
Monitoring and statistics are both important for optimising databases. You can easily find stats of the nodes your Citus cluster is built on but sometimes you need a little bit more granularity, especially for your multi-tenant SaaS applications. So, in 11.3 we introduced a new tenant-level statistics view, citus_stat_tenants
. With the citus_stat_tenants
view you can track the usage data of the tenants in your Citus database.
In citus_stat_tenants
view we keep track of:
SELECT
queries)SELECT
, INSERT
, DELETE
, and UPDATE
queries)for each of the tenants within defined periods.
Let's say you have tables defined and distributed like:
CREATE TABLE companies (id BIGSERIAL, name TEXT);
SELECT create_distributed_table ('companies', 'id');
CREATE TABLE campaigns (id BIGSERIAL, company_id BIGINT, name TEXT);
SELECT create_distributed_table ('campaigns', 'company_id');
Each company will be a tenant and companies.id
and campaigns.company_id
columns are tenant ids, or tenant attributes.
Now let's create some tenants and run some queries for those tenants:
INSERT INTO companies(name) VALUES ('BestBooks Inc.');
INSERT INTO campaigns(company_id, name) VALUES (1, 'E-Book Tuesday'), (1, 'Sci-fi Romance');
INSERT INTO companies(name) VALUES ('Cafe Raspberry');
INSERT INTO campaigns(company_id, name) VALUES (2, 'Hot Shot Latte');
INSERT INTO companies(name) VALUES ('Bunburger Ventures');
INSERT INTO campaigns(company_id, name) VALUES (3, 'Invest in Bun'), (3, 'Max&Tax'), (3, 'Buntastic Profits');
INSERT INTO campaigns(company_id, name) VALUES (3, 'Milkshake as a Service');
SELECT count(*) FROM campaigns WHERE company_id = 3;
count
-------
4
(1 row)
UPDATE campaigns SET name = UPPER(name) WHERE company_id = 3;
SELECT name FROM campaigns WHERE company_id = 3 AND name LIKE '%BUN%';
name
-------------------
INVEST IN BUN
BUNTASTIC PROFITS
(2 rows)
Note: You need to set citus.stat_tenants_track
to 'all'
for citus_stat_tenants
to track your tenants' statistics in all Citus nodes and for all backends. You can put the setting in the postgresql.conf
file.
You can query citus_stat_tenants
for the statistics:
SELECT tenant_attribute,
read_count_in_this_period,
query_count_in_this_period,
cpu_usage_in_this_period
FROM citus_stat_tenants
ORDER BY 1;
tenant_attribute | read_count_in_this_period | query_count_in_this_period | cpu_usage_in_this_period
------------------+---------------------------+----------------------------+--------------------------
1 | 0 | 2 | 0.000199
2 | 0 | 2 | 0.000196
3 | 2 | 6 | 0.000544
(3 rows)
Tenant level monitoring is designed for tracking the tenants with most usage. The view will show you the top citus.stat_tenants_limit
tenants. If a tenant is no longer active, the row for the tenant might drop from the view.
The query and CPU usage is counted in period buckets. You can see the current and last period's query and CPU statistics. If you want to create a dashboard you need to use the previous period's numbers. Because previous period has ended and previous period's numbers are settled. Tenants can still run new queries in the current period, so current period's numbers can change. The current period's numbers can be used to see changes in real time by querying citus_stat_tenants
again and again. By default after 60 seconds a period passes and the statistics are updated. You can set the period length with citus.stat_tenants_period
parameter.
After running the above queries if you wait 60 seconds (1 period) and run new queries like:
DELETE FROM campaigns WHERE company_id = 3 AND LENGTH(name) < 10;
SELECT count(*) FROM campaigns WHERE company_id = 3;
count
-------
3
(1 row)
you can see statistics updated for both current period and the last period:
SELECT * FROM citus_stat_tenants WHERE tenant_attribute = '3';
-[ RECORD 1 ]--------------+---------
nodeid | 2
colocation_id | 1
tenant_attribute | 3
read_count_in_this_period | 1
read_count_in_last_period | 2
query_count_in_this_period | 2
query_count_in_last_period | 6
cpu_usage_in_this_period | 0.000171
cpu_usage_in_last_period | 0.000544
If at any point you decide you want to clear the monitor you can use citus_stat_tenants_reset()
function.
Citus 11.3 introduces improvements related to metadata sync. Citus database clusters that have thousands of distributed objects (such as distributed tables) in their metadata could experience memory problems during metadata sync. Due to these errors, some users were unable to add new nodes or upgrade beyond Citus 11.0, which introduced the ability to query from any node and scale clusters.
To address those issues, in Citus 11.3 we added an alternative non-transactional mode to the current metadata sync which performs inside a single transaction. The default mode for metadata sync is the single transaction mode, which may cause issues since Postgres has a hard memory limit when a single transaction contains numerous DDL commands that execute on workers during metadata sync. But now in 11.3 or later, users can optionally switch to the non-transactional mode, which syncs the metadata via many transactions, if they have such a memory error.
An example course of action, when a user hits hard memory limit while adding nodes or syncing the metadata to all nodes, can be shown below:
SELECT citus_add_node(<ip>, <port>);
ERROR: invalid memory alloc size 1073741824
SET citus.metadata_sync_mode TO 'nontransactional';
SELECT citus_add_node(<ip>, <port>);
SELECT start_metadata_sync_to_all_nodes();
ERROR: invalid memory alloc size 1073741824
SET citus.metadata_sync_mode TO 'nontransactional';
SELECT start_metadata_sync_to_all_nodes();
metadatasynced
flag in pg_dist_node
For some use cases, scaling out the cluster is bottlenecked on the speed of shard moves. With this release, we are adding a capabilty to the rebalancer such that multiple shard moves can happen concurrently.
The background shard rebalancer in Citus executes multiple shard moves in a sequential order by default. In 11.3, we introduced the capability of executing shard moves in parallel. The number of maximum parallel shard moves per node is configured using the new GUC citus.max_background_task_executors_per_node
. For any node in the cluster, the total number of concurrent incoming/outgoing shard moves to/from a node cannot exceed this value.
There are also dependency requirements that needs to be completed for a shard move before it becomes runnable. In 11.3, there are two types of dependencies:
Runnable shard moves start to execute when a concurrency spot in their path becomes available.
Example:
-- create two tables that are not colocated with each other amd insert some rows
CREATE TABLE table1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1', 'a', shard_count => 4, colocate_with => 'none');
INSERT INTO table1 SELECT generate_series(1,10000) AS a;
CREATE TABLE table2 (a int PRIMARY KEY);
SELECT create_distributed_table('table2', 'a', shard_count => 4, colocate_with => 'none');
INSERT INTO table2 SELECT generate_series(1,10000) AS a;
-- add/enable a new node, say node with port 9702, in the cluster
SELECT * from pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
1 | 1 | localhost | 9701 | default | t | t | primary | default | t | t
3 | 3 | localhost | 9703 | default | t | t | primary | default | t | t
2 | 2 | localhost | 9702 | default | t | t | primary | default | t | f
(3 rows)
SELECT * FROM citus_set_node_property('localhost', 9702, 'shouldhaveshards', true);
-- see the rebalance plan. There are two shard moves planned to the new node
SELECT * FROM get_rebalance_table_shards_plan();
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
------------+---------+------------+------------+------------+------------+------------
table1 | 102008 | 0 | localhost | 9701 | localhost | 9702
table2 | 102013 | 0 | localhost | 9703 | localhost | 9702
-- set citus.max_background_task_executors_per_node to 2 enabling the parallel independent moves up to 2.
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
-- start the rebalancer in the background
SELECT citus_rebalance_start();
NOTICE: Scheduled 2 moves as job 1
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
citus_rebalance_start
-----------------------
1
(1 row)
-- Monitor the status. Note that there are two moves running concurrently
SELECT * FROM citus_rebalance_status() \gx
-[ RECORD 1 ]--------------------------------------------------
job_id | 1
state | scheduled
job_type | rebalance
description | Rebalance all colocation groups
started_at |
finished_at |
details | {"tasks": [], "task_state_counts": {"runnable": 2}}
SELECT * FROM citus_rebalance_status() \gx
-[ RECORD 1 ]----------------------------------------------------
job_id | 1
state | running
job_type | rebalance
description | Rebalance all colocation groups
started_at | 2023-04-28 15:01:38.157575+03
finished_at |
details | {"tasks": [{"LSN": {"lag": null, "source": "0/1CE46000", "target": null}, "size": {"source": "14 MB", "target": "0 bytes"}, "hosts": {"source": "localhost:9701", "target": "localhost:9702"}, "phase": "Setting Up",
"state": "running", "command": "SELECT pg_catalog.citus_move_shard_placement(102008,1,2,'auto')", "message": "", "retried": 0, "task_id": 1}, {"LSN": {"lag": null, "source": "0/1A865480", "target": null}, "size": {"source": "140 MB", "target": "3248 kB"}, "hosts": {"source": "localhost:9703", "target": "localhost:9702"}, "phase": "Copying Data",
"state": "running", "command": "SELECT pg_catalog.citus_move_shard_placement(102013,3,2,'auto')", "message": "", "retried": 0, "task_id": 2}], "task_state_counts": {"running": 2}}
SELECT * FROM citus_rebalance_status() \gx
-[ RECORD 1 ]------------------------------------------------
job_id | 1
state | finished
job_type | rebalance
description | Rebalance all colocation groups
started_at | 2023-04-28 15:01:38.157575+03
finished_at | 2023-04-28 15:01:49.856551+03
details | {"tasks": [], "task_state_counts": {"done": 2}}
Postgres 15 introduced support for the MERGE command where a single command can conditionally insert, update, or delete rows of a table. And when we added the first phase of MERGE support to Citus 11.2, we could only MERGE
two tables if and only if they both are Citus local tables. As of 11.3, MERGE is now supported for Citus distributed tables. The merge-modifications are done individually on each shard, so, for the MATCHED
conditions to be meaningful, source and target have to be colocated and joined on their respective distribution columns.
CREATE TABLE target_table(tid int, val varchar);
CREATE TABLE source_table(sid int, val varchar);
SELECT create_distributed_table('target_table', 'tid');
SELECT create_distributed_table('source_table', 'sid', colocate_with=>'target_table');
INSERT INTO target_table VALUES(1, 'target'); /* MATCHED clause true, row is UPDATED */
INSERT INTO target_table VALUES(1, 'target-delete'); /* MATCHED clause true, row is DELETED */
INSERT INTO source_table VALUES(1, 'source');
INSERT INTO source_table VALUES(2, 'source'); /* NOT MATCHED clause true, row is INSERTED */
MERGE INTO target_table t
USING source_table s
ON t.tid = s.sid
WHEN MATCHED AND t.val = 'target-delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = s.val
WHEN NOT MATCHED THEN
INSERT(tid, val) VALUES(sid, val);
MERGE 3
SELECT * FROM target_table ORDER BY 1,2;
┌─────┬────────┐
│ tid │ val │
├─────┼────────┤
│ 1 │ source │
│ 2 │ source │
└─────┴────────┘
(2 rows)
Identity columns allow users to automatically assign a unique value to a column. This release introduces improvements to the handling of identity columns in Citus. With this update, we have improved the logic used for propagating identity columns to worker nodes by passing the columns as-is, rather than creating dependent sequences. If you are using distributed identity columns introduced in Citus 11.2, please read the following carefully as the user experience has slightly changed. This change improves the compatibility with the DDL from any node functionality and also fixes a bug that prevented enforcing identity column restrictions on worker nodes.
Additionally, this release includes a fix for a bug related to insert..select
queries with identity columns. This fix improves the reliability of insert..select queries with identity columns in Citus.
alter_distributed_table
and undistribute_table
UDFs currently do not support identity columns. ALTER TABLE
statement is not supported.ALTER TABLE
statements that involve changing the identity column are not supported.These limitations may cause errors and related error messages provided.
Change Data Capture (CDC) is a process of identifying and capturing changes made to data in a database. It allows you to track modifications made to a database, including inserts, updates, and deletions. CDC for Citus is offered via logical decoding—and enables logical replication for distributed tables and reference tables to the CDC clients. The Citus CDC support is new to 11.3 and is currently in preview (which means beta.)
Here is the high level overview of CDC implementation in Citus:
replication origin
fields in WAL entries of events created during such cluster management operations.pgoutput
, wal2json
) are added to the standard library path, which gets loaded when a new subscription is made to a logical replication slot.There are 2 important type of CDC clients:
The sections below describe how to setup and cosume CDC events for these 2 types of CDC Clients.
This example below shows how to logically replicate a distributed table from a Citus cluster to a Postgres client. For this exmple, the Citus cluster is comprised of coordinator and 2 worker nodes. The CDC client is a regular PostgreSQL server that replicates all changes to a distributed table to a local PostgreSQL table.
CDC for distributed tables feature is in preview (beta) and should be enabled explicitly. This flag should be set in the postgresql.conf so that it is enabled for all backends.
citus.enable_change_data_capture='on'
Here is an example of a distributed table 'sensors', which will be logically replicated to a client using CDC.
CREATE TABLE sensors(
measureid integer,
eventdatetime timestamptz,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data)
);
SELECT create_distributed_table('sensors','measureid');
To replicate a distributed table in the CDC client, a table with same name as that of the distributed table and same schema should be created on it.
Create a publication on the distributed table in the co-ordinator node.
CREATE PUBLICATION cdc_publication FOR TABLE sensors;
This will automatically propogate the publication to all worker nodes.
In co-ordinator node, run this command to create a logical replication slot on all citus nodes.
SELECT * FROM run_command_on_all_nodes
($$SELECT pg_create_logical_replication_slot('cdc_replication_slot', 'pgoutput', false);$$);
NOTE: Before this step, make sure that the coordinator is already in the metadata, using set_citus_coordinator_host()
function.
The following steps are required for subscribing to changes from a CDC client to Citus cluster.
Create a subscription in the CDC client with the co-ordinator node to copy any existing data from the distributed table to the CDC client.
Replace these variables in the SQL statemement below with approprate values as described below:
- dbname: Name of the database where the distributed table to be replicated resides.
- username: Database user name.
- coordinator host: hostname of the co-ordinator node
- coordinator port: port of the co-ordinator node.
- cdc_publication_name: publication created in the co-ordinator for the distributed table.
- cdc_replication_slot: logical replication slot in co-ordinator node for CDC.
CREATE SUBSCRIPTION cdc_subscription_1
CONNECTION 'dbname=<dbname> host=<coordinator host> user=<username> port=<coordinator port>'
PUBLICATION <cdc_publication_name>
WITH (copy_data=true,create_slot=false,slot_name='<cdc_replication_slot>');
NOTE: 'copy_data' argument should be set to true to copy any existing data to the CDC client.
This should be done for only one subscription in CDC client otherwise it will result in duplicate data in CDC client and cause replication errors.
Create subscriptions in CDC client to every worker node to get changes from all worker nodes to the CDC client. For reference tables, this step is not required since the changes to Reference tables will be published only from co-ordinator node. - worker host: hostname of the worker node - coordinator port: port of the worker node
CREATE SUBSCRIPTION cdc_subscription_2
CONNECTION 'dbname=<dbname> host=<worker1 host> user=<user> port=<worker1 port>'
PUBLICATION cdc_publication
WITH (copy_data=false,create_slot=false,slot_name='cdc_replication_slot');
CREATE SUBSCRIPTION cdc_subscription_3
CONNECTION 'dbname=<dbname> host=<worker2 host> user=<user> port=<worker2 port>'
PUBLICATION cdc_publication
WITH (copy_data=false,create_slot=false,slot_name='cdc_replication_slot');
NOTE: 'copy_data' argument should be set to false to avoid copying existing data into CDC client from every worker node because that will result in duplicate data and cause replication errors.
Now, the CDC client is ready to receive all changes happening to the distributed table in the citus cluster. To verify this, for example, an INSERT statement can be run on the co-ordinator node:
INSERT INTO sensors
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
FROM generate_series(11,20)i;
Now verify the data in 'sensors' table in CDC client to match with data in distributed table.
SELECT * FROM sensors;
Similarly any UPDATE, DELETE and TRUNCATE statements will replicated.
The steps for publishing events to a Kafka client are similar to that of PostgreSQL client except that, for Kafka, the client side does not have to create a table to replicate the changes and instead need to create some configuration files. Debezium connector is used to connect kafka with each node in Citus cluster.
Complete these steps below from the CDC for PostgreSQL Client section
For distributed tables, for every worker node a debezium connector configuration file should be created with details of the publication and replication slot information for the distributed table in each worker node, an example is provide below(dbz-test-connector.properties):
name=dbz-test-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=pgoutput
database.hostname=<worker host name>
database.port=<worker port>
database.user=<DB user name>
database.password=<password>
database.dbname=<database name>
database.server.name = <kafka topic name>
slot.name=<CDC replication slot name>
publication.name=<CDC publication name>
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
NOTE: For getting changes to reference table the CDC client should connnect only to the co-ordinator node,
since the changes for Reference tables are published only from co-ordinator node.
If the kafka cluster is not already running, start a zookeper and kafka server with commands like the example below:
$> cd $kafka_install_dir/bin
$> ./zookeeper-server-start.sh ../config/zookeeper.properties
$> ./kafka-server-start.sh ../config/server.properties
Run a debezium connector for each worker node, such as the example below.
$> ./connect-standalone.sh ../config/connect-standalone.properties ../config/dbz-test-connector.properties
NOTE: The 'listeners' property in connect-standalone.properties should be set to be an unique value for each debezium connector, because this determined the REST API port in which the
connector listens to so it must be unique value for each instance of debezium connector.
ex:
for Worker1: (listeners=HTTP://:8083)
for Worker2: (listeners=HTTP://:8084)
Run a kafka console client to get the changes for the distributed table.
$> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DBTestServer.public.sensors | jq .
Once the kafka setup is done, any insert into the distributed table, will result in an event printed to the kafka console client like this:
{
"before": null,
"after": {
"measureid": 0,
"eventdatetime": "2020-01-04T18:30:00Z",
"measure_data": "{}",
"meaure_quantity": "EM02",
"measure_status": "A",
"measure_comment": "I <3 Citus"
},
"source": {
"version": "1.9.5.Final",
"connector": "postgresql",
"name": "DBTestServer",
"ts_ms": 1683009443335,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"29230288\",\"29230344\"]",
"schema": "public",
"table": "sensors",
"txId": 767,
"lsn": 29230344,
"xmin": null
},
"op": "c",
"ts_ms": 1683009443573,
"transaction": null
}
Similarly the UPDATE, DELETE events are also published to the kafka clients.
The limitations of CDC support in Citus 11.3 are listed below:
Citus CDC is using logical replication and hence any limitations of Logical Replication apply for CDC also. Some important limitations of Logical replications are:
The ordering of events published from the same worker node is published in the same order of the events happening in that node. But there is no guarentee on the ordering of events happening across multiple worker nodes.
The initial snapshot taken for a distributed table using copy_data=true
may result in publishing of events out of order, since the data may be changed across worker nodes while the initial data is being published to the CDC client. So it is advisiable to make the initial CDC snapshot during steady state.
CDC events are not published for tables with columnar storage methods.
A seperate subscription from CDC client to each worker node must be created to get events happening to all worker nodes for a distributed table.
If CREATE publication with ALL TABLES
is used to create publication, all Citus internal metadata tables are also replicated (such as pg_dist_partition
). In that case, if you are replicating from Citus to Postgres, you should make sure that all the metadata tables are created in the subscriber side. As of Citus 11.3, we discourage the use of ALL TABLES
syntax in CREATE PUBLICATION
command.
Reference table events are published from co-ordinator node only, for avoiding publication of duplicate events since the reference tables are replicated to all worker nodes. So the CDC client must be subscribed to the co-ordinator node to get changes for the reference tables.
undistribute_table
alter_distributed_table
create_distributed_table
citus_shards
view