Citus 10 brings columnar compression to Postgres

Written by Jeff Davis
March 6, 2021

Citus 10 is out! Check out the Citus 10 blog post for all the details. Citus is an open source extension to Postgres (not a fork) that enables scale-out, but offers other great features, too. See the Citus docs and the Citus github repo and README.

This post will highlight Citus Columnar, one of the big new features in Citus 10. You can also take a look at the columnar documentation. Citus Columnar can be used with or without the scale-out features of Citus.

Postgres typically stores data using the heap access method, which is row-based storage. Row-based tables are good for transactional workloads, but can cause excessive IO for some analytic queries.

Columnar storage is a new way to store data in a Postgres table. Columnar groups data together by column instead of by row; and compresses the data, too. Arranging data by column tends to compress well, and it also means that queries can skip over columns they don’t need. Columnar dramatically reduces the IO needed to answer a typical analytic query—often by 10X!

Let's check it out!

Citus Columnar Quick Start

Prerequisites:

  • PostgreSQL version 12 or later
  • Citus version 10 or later
  • psql client is used for these examples to make use of the backslash commands (e.g. \d+, \timing on). If using another client, you can omit the commands beginning with \, which are just informational and not necessary for functionality.
--
-- Like all Postgres extensions, citus needs to be enabled
-- for this database.
--
CREATE EXTENSION IF NOT EXISTS citus;

--
-- Make an ordinary table, which is row-based storage, and a
-- columnar table.
--
CREATE TABLE simple_row(i INT8);
CREATE TABLE simple_columnar(i INT8) USING columnar;

--
-- Columnar tables act like row tables
--
INSERT INTO simple_row SELECT generate_series(1,100000);
INSERT INTO simple_columnar SELECT generate_series(1,100000);
SELECT AVG(i) FROM simple_row;
SELECT AVG(i) FROM simple_columnar;

Notice the "Access Method" when describing the table in psql:

\d+ simple_row
                                Table "public.simple_row"
 Column |  Type  | Collation | Nullable | Default | Storage | Stats target | Description
--------+--------+-----------+----------+---------+---------+--------------+-------------
 i      | bigint |           |          |         | plain   |              |
Access method: heap

\d+ simple_columnar
                             Table "public.simple_columnar"
 Column |  Type  | Collation | Nullable | Default | Storage | Stats target | Description
--------+--------+-----------+----------+---------+---------+--------------+-------------
 i      | bigint |           |          |         | plain   |              |
Access method: columnar

The default Access Method is “heap”, which means it's a plain Postgres table, which is row-based storage. The columnar table has Access Method "columnar".

Citus Columnar Video Demo

This video demo of Citus Columnar below will help you to see:

  1. How the query on the row table was so slow that I needed to skip ahead, while the query on the columnar table finished quickly.
  2. Better visualizations of partitioning, that make it easy to see how columnar and partitioning work together.
  3. The cool turtle in the background!
YouTube video still: Citus 10 Brings Columnar to Postgres
Video: Two-part demo of Citus Columnar. Watch Citus Columnar in action!

What are the Benefits of Columnar?

  • Compression reduces storage requirements
  • Compression reduces the IO needed to scan the table
  • Projection Pushdown means that queries can skip over the columns that they don’t need, further reducing IO
  • Chunk Group Filtering allows queries to skip over Chunk Groups of data if the metadata indicates that none of the data in the chunk group will match the predicate. In other words, for certain kinds of queries and data sets, it can skip past a lot of the data quickly, without even decompressing it!

All of these together mean faster queries and lower costs!

Let's See the Performance of Citus Columnar

Here’s a microbenchmark to show off what columnar can do. This is a “columnar friendly” use case—a wide table , and a query that only reads a few of the columns.

This benchmark illustrates two benefits of columnar for PostgreSQL:

  • Reduced IO due to compression
  • Reduced IO because it skips over the columns not needed to answer the query

System

  • Azure VM: Standard D2s v3 (2 vcpus, 8 GiB memory)
  • Linux (ubuntu 18.04)
  • Data Drive: Standard HDD (512GB, 500 IOPS Max, 60 MB/s Max)
  • PostgreSQL 13.2 (--with-llvm)
  • Citus 10.0 extension to Postgres

Schema

CREATE TABLE perf_row(
  c00 int8, c01 int8, c02 int8, c03 int8, c04 int8, c05 int8, c06 int8, c07 int8, c08 int8, c09 int8,
  c10 int8, c11 int8, c12 int8, c13 int8, c14 int8, c15 int8, c16 int8, c17 int8, c18 int8, c19 int8,
  c20 int8, c21 int8, c22 int8, c23 int8, c24 int8, c25 int8, c26 int8, c27 int8, c28 int8, c29 int8,
  c30 int8, c31 int8, c32 int8, c33 int8, c34 int8, c35 int8, c36 int8, c37 int8, c38 int8, c39 int8,
  c40 int8, c41 int8, c42 int8, c43 int8, c44 int8, c45 int8, c46 int8, c47 int8, c48 int8, c49 int8,
  c50 int8, c51 int8, c52 int8, c53 int8, c54 int8, c55 int8, c56 int8, c57 int8, c58 int8, c59 int8,
  c60 int8, c61 int8, c62 int8, c63 int8, c64 int8, c65 int8, c66 int8, c67 int8, c68 int8, c69 int8,
  c70 int8, c71 int8, c72 int8, c73 int8, c74 int8, c75 int8, c76 int8, c77 int8, c78 int8, c79 int8,
  c80 int8, c81 int8, c82 int8, c83 int8, c84 int8, c85 int8, c86 int8, c87 int8, c88 int8, c89 int8,
  c90 int8, c91 int8, c92 int8, c93 int8, c94 int8, c95 int8, c96 int8, c97 int8, c98 int8, c99 int8
);

CREATE TABLE perf_columnar(LIKE perf_row) USING COLUMNAR;

Data Load

\timing on

INSERT INTO perf_row
  SELECT
    g % 00500, g % 01000, g % 01500, g % 02000, g % 02500, g % 03000, g % 03500, g % 04000, g % 04500, g % 05000,
    g % 05500, g % 06000, g % 06500, g % 07000, g % 07500, g % 08000, g % 08500, g % 09000, g % 09500, g % 10000,
    g % 10500, g % 11000, g % 11500, g % 12000, g % 12500, g % 13000, g % 13500, g % 14000, g % 14500, g % 15000,
    g % 15500, g % 16000, g % 16500, g % 17000, g % 17500, g % 18000, g % 18500, g % 19000, g % 19500, g % 20000,
    g % 20500, g % 21000, g % 21500, g % 22000, g % 22500, g % 23000, g % 23500, g % 24000, g % 24500, g % 25000,
    g % 25500, g % 26000, g % 26500, g % 27000, g % 27500, g % 28000, g % 28500, g % 29000, g % 29500, g % 30000,
    g % 30500, g % 31000, g % 31500, g % 32000, g % 32500, g % 33000, g % 33500, g % 34000, g % 34500, g % 35000,
    g % 35500, g % 36000, g % 36500, g % 37000, g % 37500, g % 38000, g % 38500, g % 39000, g % 39500, g % 40000,
    g % 40500, g % 41000, g % 41500, g % 42000, g % 42500, g % 43000, g % 43500, g % 44000, g % 44500, g % 45000,
    g % 45500, g % 46000, g % 46500, g % 47000, g % 47500, g % 48000, g % 48500, g % 49000, g % 49500, g % 50000
  FROM generate_series(1,50000000) g;

INSERT INTO perf_columnar
  SELECT
    g % 00500, g % 01000, g % 01500, g % 02000, g % 02500, g % 03000, g % 03500, g % 04000, g % 04500, g % 05000,
    g % 05500, g % 06000, g % 06500, g % 07000, g % 07500, g % 08000, g % 08500, g % 09000, g % 09500, g % 10000,
    g % 10500, g % 11000, g % 11500, g % 12000, g % 12500, g % 13000, g % 13500, g % 14000, g % 14500, g % 15000,
    g % 15500, g % 16000, g % 16500, g % 17000, g % 17500, g % 18000, g % 18500, g % 19000, g % 19500, g % 20000,
    g % 20500, g % 21000, g % 21500, g % 22000, g % 22500, g % 23000, g % 23500, g % 24000, g % 24500, g % 25000,
    g % 25500, g % 26000, g % 26500, g % 27000, g % 27500, g % 28000, g % 28500, g % 29000, g % 29500, g % 30000,
    g % 30500, g % 31000, g % 31500, g % 32000, g % 32500, g % 33000, g % 33500, g % 34000, g % 34500, g % 35000,
    g % 35500, g % 36000, g % 36500, g % 37000, g % 37500, g % 38000, g % 38500, g % 39000, g % 39500, g % 40000,
    g % 40500, g % 41000, g % 41500, g % 42000, g % 42500, g % 43000, g % 43500, g % 44000, g % 44500, g % 45000,
    g % 45500, g % 46000, g % 46500, g % 47000, g % 47500, g % 48000, g % 48500, g % 49000, g % 49500, g % 50000
  FROM generate_series(1,50000000) g;

VACUUM (FREEZE, ANALYZE) perf_row;
VACUUM (FREEZE, ANALYZE) perf_columnar;

-- checkpoint if superuser; otherwise wait for system to settle
CHECKPOINT; CHECKPOINT;

The row table loaded in 2158s, while the columnar table loaded in 1271s for a speedup of 1.7X. Load speed is not always better with columnar, but columnar does have the advantage when the system is IO-bound.

Compression Ratio

For this data, you can see a compression ratio of better than 8X when using Columnar.

SELECT pg_total_relation_size('perf_row')::numeric/
       pg_total_relation_size('perf_columnar') AS compression_ratio;
 compression_ratio
--------------------
 8.0196135873627944
(1 row)

We can also get some information from VACUUM VERBOSE:

VACUUM VERBOSE perf_columnar;
INFO:  statistics for "perf_columnar":
storage id: 10000000001
total file size: 5676548096, total data size: 5658583219
compression rate: 7.18x
total row count: 50000000, stripe count: 334, average rows per stripe: 149700
chunk count: 500000, containing data for dropped columns: 0, zstd compressed: 500000

VACUUM

Notice that there are 334 stripes. Stripes are the unit of a data load/write. By default, each stripe can hold up to 150000 tuples.

By default, data is compressed with zstd compression. The compression rate calculated by VACUUM VERBOSE is slightly different than what we saw above, because it considers only the average compression ratio of the data, and doesn't account for metadata (like visibility information).

Queries

Now let's run a couple SQL queries. We will use EXPLAIN ANALYZE so that we can see the details in addition to the overall runtime.

Notice that only a 3 out of 100 columns are necessary to answer this query.

--
-- Parallel query actually slows down the query on the row table in this example, so disable it.
-- Columnar doesn't support parallel query.
--
SET max_parallel_workers_per_gather = 0;

EXPLAIN (ANALYZE, BUFFERS) SELECT c00, SUM(c29), AVG(c71) FROM perf_row GROUP BY c00;
                                                            QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=6430556.07..6430563.57 rows=500 width=72) (actual time=444098.541..444098.823 rows=500 loops=1)
   Group Key: c00
   Batches: 1  Memory Usage: 169kB
   Buffers: shared hit=160 read=5555396
   ->  Seq Scan on perf_row  (cost=0.00..6055556.04 rows=50000004 width=24) (actual time=1.159..428887.294 rows=50000000 loops=1)
         Buffers: shared hit=160 read=5555396
 Planning Time: 0.146 ms
 JIT:
   Functions: 7
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 1.702 ms, Inlining 5.199 ms, Optimization 96.937 ms, Emission 52.101 ms, Total 155.938 ms
 Execution Time: 444100.677 ms
(12 rows)

EXPLAIN (ANALYZE, BUFFERS) SELECT c00, SUM(c29), AVG(c71) FROM perf_columnar GROUP BY c00;
                                                                     QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=395722.35..395729.85 rows=500 width=72) (actual time=18313.813..18314.055 rows=500 loops=1)
   Group Key: c00
   Batches: 1  Memory Usage: 169kB
   Buffers: shared hit=63722 read=27429
   ->  Custom Scan (ColumnarScan) on perf_columnar  (cost=0.00..20722.35 rows=50000000 width=24) (actual time=9.053..9830.727 rows=50000000 loops=1)
         Columnar Chunk Groups Removed by Filter: 0
         Buffers: shared hit=63722 read=27429
 Planning:
   Buffers: shared hit=428 read=60
 Planning Time: 7.883 ms
 JIT:
   Functions: 6
   Options: Inlining false, Optimization false, Expressions true, Deforming true
   Timing: Generation 0.587 ms, Inlining 0.000 ms, Optimization 0.319 ms, Emission 4.586 ms, Total 5.491 ms
 Execution Time: 18336.907 ms
(15 rows)

We see a dramatic difference in the overall buffers read, which results in a dramatic difference in the runtime:

Storage Columnar Speedup
Row 444s
Columnar 18s 25X

What are the Limitations?

These limitations are not set in stone, and we look forward to working on them in the future:

  • No UPDATE or DELETE support
  • No logical replication or logical decoding support
  • See more limitations in the columnar README

[UPDATE in Sep 2021]: Good news, as of Citus 10.2, Citus columnar now supports btree and hash indexes and the constraints requiring them (e.g.: primary key). So as of 10.2 or later, Citus columnar allows you to create such indexes and do plain index-scan on those indexes when it’s beneficial, but does not yet support index-only and bitmap scans. You can learn more in the Citus 10.2 blog post.

Because of the UPDATE/DELETE limitation, as of Citus 10, columnar should be used for append-only tables that are used for analytic queries. Even if UPDATE/DELETE are supported in the future, UPDATE & DELETE will not be as efficient as they are on row-based storage, so columnar is not a good fit for many transactional workloads. However, you can pick and choose columnar where it works best, and use row tables where they work best, to get the benefits of both.

Hybrid Columnar and Row tables with Range Partitioning

A useful way to take advantage of Citus Columnar is when you combine it with native range partitioning. Using columnar with partitioning helps to overcome the limitation on updates and deletes, by using a mix of row and columnar partitions within the same partitioned table.

Normally, range partitioning is used for time-based partitioning. Often, you have one or two recent "active" partitions that are still being updated, and then many older partitions that are rarely updated but still queried. In this case the one or two active partitions can be row-based storage to allow updates, and the older partitions can be converted to columnar storage to benefit from compression and scan speed.

CREATE TABLE events(ts timestamptz, i int, n numeric, s text)
  PARTITION BY RANGE (ts);

CREATE TABLE events_2021_jan PARTITION OF events
  FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');

CREATE TABLE events_2021_feb PARTITION OF events
  FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');

INSERT INTO events SELECT
    '2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
    g,
    g*pi(),
    'number: ' || g::text
    FROM generate_series(1,10000000) g;

VACUUM (FREEZE, ANALYZE) events_2021_feb;

Later, when you're ready to "columnarize" the older January partition, you can use this Citus-provided function to convert from row to columnar storage:

SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
VACUUM (FREEZE, ANALYZE) events_2021_jan;

-- checkpoint if superuser; otherwise wait for system to settle
CHECKPOINT; CHECKPOINT;

Now you can see that the January partition is columnar, and the February partition is row storage (“heap”).

\d+ events_2021_jan
                                       Table "public.events_2021_jan"
 Column |           Type           | Collation | Nullable | Default | Storage  | Stats target | Description
--------+--------------------------+-----------+----------+---------+----------+--------------+-------------
 ts     | timestamp with time zone |           |          |         | plain    |              |
 i      | integer                  |           |          |         | plain    |              |
 n      | numeric                  |           |          |         | main     |              |
 s      | text                     |           |          |         | extended |              |
Partition of: events FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2021-02-01 00:00:00+00')
Partition constraint: ((ts IS NOT NULL) AND (ts >= '2021-01-01 00:00:00+00'::timestamp with time zone) AND (ts < '2021-02-01 00:00:00+00'::timestamp with time zone))
Access method: columnar

\d+ events_2021_feb
                                      Table "public.events_2021_feb"
 Column |           Type           | Collation | Nullable | Default | Storage  | Stats target | Description
--------+--------------------------+-----------+----------+---------+----------+--------------+-------------
 ts     | timestamp with time zone |           |          |         | plain    |              |
 i      | integer                  |           |          |         | plain    |              |
 n      | numeric                  |           |          |         | main     |              |
 s      | text                     |           |          |         | extended |              |
Partition of: events FOR VALUES FROM ('2021-02-01 00:00:00+00') TO ('2021-03-01 00:00:00+00')
Partition constraint: ((ts IS NOT NULL) AND (ts >= '2021-02-01 00:00:00+00'::timestamp with time zone) AND (ts < '2021-03-01 00:00:00+00'::timestamp with time zone))
Access method: heap

And they each have about half the rows. But even though the January partition has more rows, it's much smaller due to columnar compression:

SELECT COUNT(*) FROM events; -- parent table scans both partitions
  count
----------
 10000000
(1 row)

SELECT COUNT(*) FROM events_2021_jan;
  count
---------
 5951999
(1 row)

SELECT COUNT(*) FROM events_2021_feb;
  count
---------
 4048001
(1 row)

SELECT pg_size_pretty(pg_relation_size('events_2021_jan'));
 pg_size_pretty
----------------
 69 MB
(1 row)

SELECT pg_size_pretty(pg_relation_size('events_2021_feb'));
 pg_size_pretty
----------------
 264 MB
(1 row)

What is Chunk Group Filtering?

Let's run a query for a particular hour within the January partition of the events table.

EXPLAIN (ANALYZE,BUFFERS)
SELECT SUM(n)
FROM events_2021_jan
  WHERE ts >= '2021-01-11 01:00'::timestamptz AND
        ts < '2021-01-11 02:00'::timestamptz;
                                                                 QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=4438.09..4438.10 rows=1 width=32) (actual time=8.356..8.357 rows=1 loops=1)
   Buffers: shared hit=2922 read=2
   ->  Custom Scan (ColumnarScan) on events_2021_jan  (cost=0.00..4418.55 rows=7815 width=11) (actual time=2.998..7.703 rows=8000 loops=1)
         Filter: ((ts >= '2021-01-11 01:00:00+00'::timestamp with time zone) AND (ts < '2021-01-11 02:00:00+00'::timestamp with time zone))
         Rows Removed by Filter: 12000
         Columnar Chunk Groups Removed by Filter: 594
         Buffers: shared hit=2922 read=2
 Planning:
   Buffers: shared hit=27 dirtied=2
 Planning Time: 0.233 ms
 Execution Time: 8.380 ms
(11 rows)

Notice Columnar Chunk Groups Removed by Filter: 594.

First, we need some terminology to understand this:

  • Stripe: all loads into a columnar table are broken into stripes of 150000 rows (by default). The larger a stripe, the more sequential access when reading a given column.
  • Chunk Group: Stripes are broken down further into Chunk Groups of 10000 rows (by default).
  • Chunk: Each Chunk Group consists of one Chunk for each column. A Chunk is the unit of compression, and the min/max is tracked for each chunk to enable Chunk Group Filtering.
  • Chunk Group Filtering: When a query's WHERE clause can't possibly match any of the tuples in a Chunk, and we know that by the min/max values of the chunk, then Chunk Group Filtering will simply skip over the whole Chunk Group without decompressing any of the Chunks in the Chunk Group.

You can see above that 594 Chunk Groups were filtered out, which means that 5931999 rows were filtered out without needing to fetch or decompress the data. Only 2 Chunk Groups (20000 rows) needed to be actually fetched and decompressed, which is why the query took only milliseconds.

How is Citus Columnar connected to Citus?

Citus Columnar is a new feature that's included in Citus open source extension, and they share the same code repository. To use the new columnar feature with Postgres, you just need to create the Citus extension, create tables with the new USING columnar syntax, and you're ready to go (of course, read the Citus docs, too!).

Citus is known for its ability to scale Postgres. Importantly, you can use Citus Columnar with or without the Citus scale-out features. Columnar is a great complement to typical Citus use cases, but you can pick and choose whether to use Citus Columnar on a single node, or as part of a distributed Citus cluster.

You can mix and match:

  • columnar and row tables
  • columnar and row partitions of the same table
  • single-node and distributed tables

What About cstore_fdw?

If you've heard of the cstore_fdw extension that my teammates at Citus created a number of years ago, you can think of Citus Columnar as the next generation of cstore_fdw. If using cstore_fdw, consider migrating to Citus Columnar.

cstore_fdw achieved the core benefits of columnar in terms of performance; but Citus Columnar goes much further in terms of integration and feature compatibility.

Citus Columnar works with:

  • ROLLBACK
  • Write-ahead logging (WAL)
  • Phsyical Replication
  • pg_upgrade

and also provides a more seamless user experience, similar to ordinary Postgres row tables.

Citus Columnar was able to accomplish this better experience by using the Table Access Method API, new in PostgreSQL version 12. Using this new extension API allows tighter integration into Postgres while still being a pure extension.

Try Out Citus Columnar for Your Analytic Workloads

The dramatic compression and scan speed improvements offered by Citus Columnar allow you to do more with less. Take advantage by identifying large, append-only tables and evaluate whether columnar will improve performance or reduce costs.

Columnar storage may also allow you to keep data in Postgres for longer, rather than forcing you to archive older data, where it can't be queried efficiently.

If you want to dive even deeper into learning about Citus Columnar, I recommend:

Get Citus 10 (with Postgres 12 or later)

Finally, special thanks to Hadi Moshayedi, who was the original author of cstore_fdw (the mature basis of Citus columnar), and is the co-author of Citus 10 columnar!

Jeff Davis

Written by Jeff Davis

PostgreSQL committer and community member. Former principal engineer at Microsoft. Streaming analytics at Truviso, Teradata Aster, & AWS Aurora. Papers for VLDB & SIGMOD. Talks at PGConf.EU & PGCon. Family. Camping.