Fun with SQL: Relocating shards on a Citus database cluster

The Citus extension to Postgres allows you to shard your Postgres database across multiple nodes without having to make major changes to your SaaS application. Citus then provides performance improvements (as compared to single-node Postgres) by transforming SQL queries and distributing queries across multiple nodes, thereby parallelizing the workload. This means that a 2 node, 4 core Citus database cluster could perform 4x faster than single node Postgres.

With the Citus shard rebalancer, you can easily scale your database cluster from 2 nodes to 3 nodes or 4 nodes, with no downtime. You simply run the move shard function on the co-location group you want move shards for, and Citus takes care of the rest. When Citus moves shards, it ensures tables that are co-located stay together. This means all of your joins, say, from orders to order_items still work, just as you’d expect.

Because of how Citus shards your data, data usually follows an even distribution. We hash each shard key as it comes in and then bucket the resulting value. We recently encountered an interesting case with a customer though where they had one node of their 8 node database cluster that was taking on more load than others. Load average on this particular node was about 60% whereas most other nodes were seeing around 15%. One option would have been to add nodes and rebalance, but we thought we could come up with a solution to take better advantage of those unused nodes.

Custom tuning and relocating shards within Citus

In addition to the zero-downtime shard rebalancer feature in Citus, we also give you the ability to manually move shards between nodes. Just like the shard rebalancer, the ability to manually move shards is an online operation. But the question remains, how do you know which shards to move to which nodes? Of course we turned to SQL to figure this out.

Before we dig into SQL, some of our thought process might help.

We reasoned that if one node was at 60% load average, and 7 other nodes closer to 15% load average, if each node took 5% more of the load that would bring 7 nodes up to 20% and would bring our problematic node down to 25%.

Our math: (60% orig load - (5% increased load per node * 7 nodes)) = 25% new load

Within SQL our plan was:

  1. Get the largest node within the cluster
  2. Get the smallest node within the cluster and save for later
  3. Go back and capture a shard with an appropriate size to help balance the cluster

Here is what we ended up with:

--- Calculate total size of the cluster, later we could adapt this if the individual was a certain percentage larger than the average would be
WITH total AS (
SELECT sum(result::bigint) as size, count(nodename) as node_count FROM (
  SELECT nodename,nodeport,result
  FROM run_command_on_workers($cmd$SELECT pg_database_size('citus');$cmd$)
) a ),

--- Get the size of each node
ind_size AS (
  SELECT nodename,result::bigint as node_size
  FROM run_command_on_workers($cmd$SELECT pg_database_size('citus')::bigint;$cmd$)

--- Save our most crowded node
most_crowded_node AS (
  SELECT nodename
  FROM ind_size, total
  WHERE ind_size.node_size::bigint > ((cast(total.size as bigint) / cast(node_count as bigint))) ORDER BY ind_size.node_size::bigint DESC LIMIT 1

--- Save our least crowded node
least_crowded_node AS (
  SELECT nodename
  FROM ind_size, total
  WHERE ind_size.node_size::bigint < ((cast(total.size as bigint) / cast(node_count as bigint))) ORDER BY ind_size.node_size::bigint LIMIT 1

--- Calculate the shard sizes, which includes data related by colocation groups
shard_sizes AS(
   SELECT shardid, result::bigint size FROM
   (SELECT (run_command_on_shards(logicalrelid::text,$cmd$SELECT pg_total_relation_size('%s')$cmd$)).*
   FROM pg_dist_partition pp where pp.partmethod='h')a
colocated_shard_sizes AS(
   SELECT colocationid, nodename, CASE WHEN  shard_group IS NULL THEN NULL ELSE shard_group[1] END shardid, group_size
       SELECT colocationid,nodename,array_agg(ps.shardid) shard_group ,sum(size) group_size
       from shard_sizes ss,pg_dist_shard ps, pg_dist_shard_placement psp,
       pg_dist_partition pp
       WHERE (ss.shardid=ps.shardid AND pp.logicalrelid=ps.logicalrelid AND psp.shardid=ps.shardid AND pp.partmethod='h')
       GROUP BY shardmaxvalue,shardminvalue,nodename,colocationid

--- Get a shard with appropriate size to balance the cluster
optimal_move AS (
   SELECT src.nodename source_node,src.shardid shardid,abs((size/node_count)::bigint-(node_size+group_size)) optimal_size,least_crowded_node.nodename dest_node, group_size
   FROM (
         SELECT mc.nodename, group_size,shardid from colocated_shard_sizes cs,most_crowded_node mc WHERE cs.nodename=mc.nodename)src,
   WHERE ind_size.nodename=least_crowded_node.nodename ORDER BY optimal_size LIMIT 1

--- Pull out our information so we can then move our shard
SELECT source_node,dest_node,shardid,pg_size_pretty(group_size) from optimal_move;

Running this on an example cluster we receive:

               source_node                |                 dest_node                  | shardid | pg_size_pretty
------------------------------------------+--------------------------------------------+---------+---------------- | |  106448 | 57 GB
(1 row)

We can now take an relocate our shard:

SELECT master_move_shard_placement(106448,'', 5432,'', 5432);

Taking the Citus shard rebalancer even further

Because the Citus database is a pure extension to Postgres, all of the metadata we use to coordinate and optimize queries and transactions in Citus is available for you to access and query. Here we found helping a customer to optimize and tune their shard locations was a great exercise in practicing our SQL skills (hence: fun with SQL!). And if you find this helpful in fine tuning your own Citus cluster let us know.