Citus warp: Database migrations without the pain

We rolled out a new database migration feature for the Citus fully-managed database as a service—the Warp migration feature—as part of our Citus Cloud 2 announcement. Today I wanted to walk through Citus Cloud’s warp migration feature in more detail. Before we drill in, we should probably take a step back and look at what typically (and sometimes painfully) goes on for a large database migration.

Pain and heartache of database migrations in the past

For many companies, migrating from one database to another is a decision that gets pushed out as long as possible. After all, migrating your underlying infrastructure isn’t a shiny new feature you can wave around to customers. And we all know that premature optimization can cause a business to die before it even has a chance to become a business. But, pushing out the database migration to the last possible moment has in the past caused feature development to come to a standstill when a large database migration does come.

When you’re smaller, migrating from one database to another is fast and simple. If you have 100 GB of data, it would take roughly ~45 minutes to dump that database and restore to a new system. And if the 100 GB is your total datasize including indexes, your raw data might be even smaller than 100 GB, hence your dump/restore would go even faster. So while 45 minutes of downtime is not ideal under any circumstance, the downtime is likely manageable within some maintenance window. However, with a database that is 1 TB in size, it will probably take you ~7.5 hrs to dump and restore, which is much harder to justify. 7.5 hours of downtime causes a dump/restore method of database migration to become a blocker.

With a dump/restore method of migration a non-starter, you probably turn to other alternatives.

The simplest migration alternative is a brute force method of adding in your application logic to write to two databases in parallel, so that you can preserve uptime while doing the database migration. At the start, adding application logic to write to two data stores doesn’t necessarily seem hard, rather it seems like a lot of boring manual labor.

As you start to drill in, though, you’re suddenly handling all sorts of error cases in your application: what happens when you have a transaction that does a write then an update, and the write worked but update errored—now you need to ensure the other database is in the same state. You’re now left with the work of re-implementing all this database transaction logic into your application: transaction logic you had for free with a database. There is likely no end in sight to getting this approach perfect. Eventually, you’ll get things mostly in place, let double writing run as a best effort, then cutover and hope there aren’t too many data integrity issues.

Enter Warp migration for Citus Cloud

All of the above was the state of the art for database migration up until the launch of Citus Cloud 2 in November. With the new Warp migration feature in Citus Cloud, database migrations in the cloud became a whole lot less painful. Citus Cloud’s Warp migration reads data continuously from the single-node Postgres database that you’re currently writing to and streams data to your new Citus database cluster. We do this without requiring you to pause any writes or reads against your existing cloud database.

During the database migration, you can almost think of your sharded Citus Cloud database cluster as a follower or read-replica of your existing single-node Postgres database. Sweet.

In building the Warp migration feature for Citus Cloud, we made a few explicit design decisions that cause it to stand apart from other database migration tools:

  1. Did not introduce heavy ETL processes, which inevitably add complexity, tend to break over time, and limit how fast you can replicate data
  2. Leveraged as much of core Postgres as we can to not have to re-invent the wheel.
  3. Minimized impact to your existing production database

How Warp migration works in Citus Cloud

Underneath the covers in Citus Cloud, Warp migration uses two important building blocks:

  • Logical stream of modifications to your database
  • Making a database backup at a specific snapshot (point in the replication stream)

To understand how we can get a stream of modifications, we first need to remember that PostgreSQL has a concept called WAL, or the write-ahead log. Typically writes to your database, be they UPDATEs, DELETEs, or INSERTs, will first land in WAL before they get written to the actual data files. The WAL is also aware of transaction boundaries, and can be seen as a single, consistent stream of everything that has happened in your database.

Most of the time WAL is used for backup purposes (so you can take a daily base backup and then copy just the WAL files for newer changes), point-in-time recovery, as well as replication and high availability. But due the nature of WAL containing a single stream of all changes, we can also use it as a source that contains all modifications, in their correct order.

To make this accessible PostgreSQL has added the feature of Logical Decoding in PostgreSQL 9.4, and Logical Replication in Postgres 10. The Warp migration feature in Citus Cloud relies on Postgres’s Logical Decoding to get a stream of changes from your source database.

In order to setup a new logical decoding stream, we call something like the following on your source database, to create what is called a logical replication slot:

SELECT * FROM pg_create_logical_replication_slot(test_slot, 'test_decoding');
slot_name |    lsn
----------+-----------
test_slot | 0/16B1970
(1 row)

As soon as the logical replication slot has been created, all new WAL data is retained until the point where they have been consumed in all replication slots. That means you need to be careful with having open replication slots, as you could run out of disk space if you don’t consume them.

In order to consume a slot, you can run the following:

SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL);

   lsn     | xid |                       data
-----------+-----+--------------------------------------------------
 0/16D30F8 | 691 | BEGIN
 0/16D32A0 | 691 | table public.data: INSERT: id[int4]:2 data[text]:'arg'
 0/16D32A0 | 691 | table public.data: INSERT: id[int4]:3 data[text]:'demo'
 0/16D32A0 | 691 | COMMIT
 0/16D32D8 | 692 | BEGIN
 0/16D3398 | 692 | table public.data: DELETE: id[int4]:2
 0/16D3398 | 692 | table public.data: DELETE: id[int4]:3
 0/16D3398 | 692 | COMMIT
(8 rows)

As you can see, we don’t get the original modification statements that ran, but instead just their logical equivalent as how it was actually applied to the database. That also means if you were to run an INSERT … ON CONFLICT UPDATE statement, you might see either INSERT or UPDATE in the logical decoding stream.

Before we continue, make sure to cleanup the slot again, to avoid retaining WAL for too long:

SELECT pg_drop_replication_slot('test_slot');

Based on our start point in the logical replication stream, Citus Cloud’s warp first does a full backup of the database using pg_dump’s --snapshot option. That gives us a consistent copy of the database as it was at that point in the replication stream, so we can then apply all modifications in the stream to move forward.

Recovering from Crashes

Another thing that is quite important with replication tools like Warp, is the ability to recover from a crash of any of the servers involved. Whilst of course this should never happen, the reality is that we want to be able to consistently recover from both database crashes as well as hardware failures.

On the source system this functionality is already provided by the logical replication slot we’ve looked at. As long as we remember the snapshot LSN (log sequence number) that we have applied to the target, we are able to ask the source system for all modifications starting at that LSN. You can retrieve the current LSN on the source from pg_stat_replication:

> SELECT slot_name, replay_location FROM pg_stat_replication

Now, the other interesting part is the target system: how do we remember how far we have applied the replication stream from the source?

The simplest possible method would be to update a table on the target system, that keeps track of replication status, every time before we COMMIT any changes. In case the target system fails we could rely on the consistency guarantees of transactions to safely know how far we have applied the stream, and resume from there.

The actual method used by Citus Warp is similar, but uses a somewhat unknown Postgres feature called Replication Origins. Replication origins essentially act like our replication status table, but avoid the UPDATE overhead and bloat problems.

Warp migration to Citus Cloud—in Practice

In order to migrate your database to Citus Cloud using Warp migration, you first provision a regular Citus Cloud formation using the Citus Cloud dashboard. Next, we usually setup a VPC peering connection to your current database environment, to ensure the replication runs over a secure, private connection.

Then you provide us with credentials for a user on the source database that has replication privileges, in order for us to setup a replication slot and stream the changes.

With this new formation, you will also need to setup the schema on the distributed Citus target database. That means you first setup the same schema as on the source, and then run create_distributed_table for each distributed table to create the shards in the background.

That’s all you need to do on your end. In coordination with you, we will then run Citus warp on the coordinator node of your Citus Cloud formation, first taking a base backup, and then streaming changes continuously. At no point do you need to stop writes or take downtime on the source database.

Whenever you are ready, you can simply switch your application’s connection details to your new, distributed Citus Cloud database.