Scaling Out MySQL with PostgreSQL and Citus

Written by Eren Basak
June 10, 2016

PostgreSQL is known for its great extensibility and powerful plugins. One particular category of extensions is Foreign Data Wrappers or FDWs. FDWs allow us to interact from within Postgres directly with other data stores such as hdfs, columnar stores, mysql, etc. Combined with Citus’ scalability features, we can even leverage them to help us scale out those data stores where might otherwise be quite difficult.

Imagine having you have a very large and growing table in MySQL on which queries are taking and longer and longer. Fortunately, Citus can solve your problems with the help of mysql_fdw. Before we get to the meat of it, we would like to thank Eugen Konkov for his interesting question on StackOverflow and inspiring us. Note that this tutorial is experimental work. Feel free to try this at home, but use caution before advancing it to production.

In this blog post we will see how PostgreSQL and Citus can help us to scale out existing MySQL data. For this we will do the following:

  1. Create a MySQL table and fill it with some data.
  2. Partition the MySQL table into smaller MySQL tables.
  3. Create a distributed table in PostgreSQL with Citus.
  4. Connect the distributed table shards to the corresponding MySQL tables.
  5. Run a query on the master Citus node and see that it correctly fetches the data from MySQL tables in parallel.

The architecture that we will work on will look like this:

architecture

First we assume that you have MySQL, PostgreSQL, Citus and mysql_fdw installed. For this demo, we will use LINEITEM table, from the standard TPC-H benchmarks. Let’s create it in MySQL and fill it with some data:

mysql> CREATE TABLE LINEITEM ( L_ORDERKEY    INTEGER NOT NULL,
                             L_PARTKEY     INTEGER NOT NULL,
                             L_SUPPKEY     INTEGER NOT NULL,
                             L_LINENUMBER  INTEGER NOT NULL,
                             L_QUANTITY    DOUBLE PRECISION NOT NULL,
                             L_EXTENDEDPRICE  DOUBLE PRECISION NOT NULL,
                             L_DISCOUNT    DOUBLE PRECISION NOT NULL,
                             L_TAX         DOUBLE PRECISION NOT NULL,
                             L_RETURNFLAG  CHAR(1) NOT NULL,
                             L_LINESTATUS  CHAR(1) NOT NULL,
                             L_SHIPDATE    DATE NOT NULL,
                             L_COMMITDATE  DATE NOT NULL,
                             L_RECEIPTDATE DATE NOT NULL,
                             L_SHIPINSTRUCT CHAR(25) NOT NULL,
                             L_SHIPMODE     CHAR(10) NOT NULL,
                             L_COMMENT      VARCHAR(44) NOT NULL);

mysql> LOAD DATA LOCAL INFILE 'tpch_2_13_0/lineitem.tbl' INTO TABLE LINEITEM FIELDS TERMINATED BY '|';

Note: To create lineitem.tbl, download the TPC-H bundle and use the dbgen tool. For this demo, we have generated the data with scale factor 10:

./dbgen -f -s 10 -T L

Now let’s get some information about the data we have:

mysql> SELECT count(*) FROM LINEITEM;
+----------+
| count(*) |
+----------+
| 59986052 |
+----------+

We now have about 60 million tuples and partitioning this data according to shipdate ranges sounds nice. Let’s see which dates should we pick as range limits with the following simple cumulative counting query:

SET @running_total:=0;
SELECT
    totals.shipdate as shipdate,
    (@running_total := @running_total + totals.shipcount) as cumulative_count
FROM
    (SELECT
        l_shipdate AS shipdate,
        count(*) AS shipcount
    FROM LINEITEM
    GROUP BY
        shipdate
    ORDER BY shipdate) AS totals;

From the query, we can pick 1993-04-07, 1994-05-13, 1995-06-18, 1996-07-23, 1997-08-28. Let’s create our partitions:

$ mysql
CREATE TABLE LINEITEM_1 AS SELECT * FROM LINEITEM WHERE l_shipdate < '1993-04-07';
CREATE TABLE LINEITEM_2 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1993-04-07' AND l_shipdate < '1994-05-13';
CREATE TABLE LINEITEM_3 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1994-05-13' AND l_shipdate < '1995-06-18';
CREATE TABLE LINEITEM_4 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1995-06-18' AND l_shipdate < '1996-07-23';
CREATE TABLE LINEITEM_5 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1996-07-23' AND l_shipdate < '1997-08-28';
CREATE TABLE LINEITEM_6 AS SELECT * FROM LINEITEM WHERE l_shipdate >= '1997-08-28';

We have partitioned our data in MySQL. For this demonstration, we are using a single MySQL server, but we can place the partitions on other MySQL servers as well. Now, let’s head to Citus and create our distributed table.

First, let’s tell postgres which foreign server will we use with mysql_fdw and also tell it how the current role will use this foreign server:

$ psql
CREATE SERVER mysql_svr
    FOREIGN DATA WRAPPER mysql_fdw
    OPTIONS (host '127.0.0.1', port '3306');
CREATE USER MAPPING FOR eren
    SERVER mysql_svr
    OPTIONS (username 'root', password 'toor');

Then create our distributed foreign table for LINEITEM:

CREATE FOREIGN TABLE LINEITEM ( L_ORDERKEY    INTEGER NOT NULL,
                             L_PARTKEY     INTEGER NOT NULL,
                             L_SUPPKEY     INTEGER NOT NULL,
                             L_LINENUMBER  INTEGER NOT NULL,
                             L_QUANTITY    DOUBLE PRECISION NOT NULL,
                             L_EXTENDEDPRICE  DOUBLE PRECISION NOT NULL,
                             L_DISCOUNT    DOUBLE PRECISION NOT NULL,
                             L_TAX         DOUBLE PRECISION NOT NULL,
                             L_RETURNFLAG  CHAR(1) NOT NULL,
                             L_LINESTATUS  CHAR(1) NOT NULL,
                             L_SHIPDATE    DATE NOT NULL,
                             L_COMMITDATE  DATE NOT NULL,
                             L_RECEIPTDATE DATE NOT NULL,
                             L_SHIPINSTRUCT CHAR(25) NOT NULL,
                             L_SHIPMODE     CHAR(10) NOT NULL,
                             L_COMMENT      VARCHAR(44) NOT NULL)
SERVER mysql_svr
OPTIONS (dbname 'test', table_name 'LINEITEM');
SELECT master_create_distributed_table('LINEITEM', 'l_shipdate', 'range');

Now, let’s create 6 shards that will correspond to the 6 MySQL tables:

SET citus.shard_replication_factor TO 1;
CREATE OR REPLACE FUNCTION master_create_range_shard(
                                                        table_name text,
                                                        minvalue text, 
                                                        maxvalue text)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
        new_shard_id bigint;
BEGIN
        SELECT master_create_empty_shard(table_name)
        INTO new_shard_id;

        UPDATE pg_dist_shard
        SET shardminvalue = minvalue, shardmaxvalue = maxvalue
        WHERE shardid = new_shard_id;
END;
$function$;

SELECT master_create_range_shard('LINEITEM', '1992-01-02', '1993-04-06');
SELECT master_create_range_shard('LINEITEM', '1993-04-07', '1994-05-12');
SELECT master_create_range_shard('LINEITEM', '1994-05-13', '1995-06-17');
SELECT master_create_range_shard('LINEITEM', '1995-06-18', '1996-07-22');
SELECT master_create_range_shard('LINEITEM', '1996-07-23', '1997-08-27');
SELECT master_create_range_shard('LINEITEM', '1997-08-28', '1998-12-01');

Let’s see our shards:

SELECT * FROM pg_dist_shard WHERE logicalrelid='LINEITEM'::regclass;
 logicalrelid | shardid | shardstorage | shardalias | shardminvalue | shardmaxvalue 
--------------+---------+--------------+------------+---------------+---------------
        16468 |  102008 | f            |            | 1992-01-02    | 1993-04-06
        16468 |  102009 | f            |            | 1993-04-07    | 1994-05-12
        16468 |  102010 | f            |            | 1994-05-13    | 1995-06-17
        16468 |  102011 | f            |            | 1995-06-18    | 1996-07-22
        16468 |  102012 | f            |            | 1996-07-23    | 1997-08-27
        16468 |  102013 | f            |            | 1997-08-28    | 1998-12-01
(6 rows)

We have finished configuring the master. We need to connect to workers set which shard is associated with which mysql partition table. Let’s run the commands at workers:

ALTER TABLE lineitem_102008 OPTIONS (SET table_name 'LINEITEM_1');
ALTER TABLE lineitem_102009 OPTIONS (SET table_name 'LINEITEM_2');
ALTER TABLE lineitem_102010 OPTIONS (SET table_name 'LINEITEM_3');
ALTER TABLE lineitem_102011 OPTIONS (SET table_name 'LINEITEM_4');
ALTER TABLE lineitem_102012 OPTIONS (SET table_name 'LINEITEM_5');
ALTER TABLE lineitem_102013 OPTIONS (SET table_name 'LINEITEM_6');

We also need to define the user mappings for shard foreign servers. Let’s run the user mapping creation commands at workers:

CREATE USER MAPPING FOR eren SERVER mysql_svr_102008 OPTIONS (username 'root', password 'toor');
CREATE USER MAPPING FOR eren SERVER mysql_svr_102009 OPTIONS (username 'root', password 'toor');
CREATE USER MAPPING FOR eren SERVER mysql_svr_102010 OPTIONS (username 'root', password 'toor');
CREATE USER MAPPING FOR eren SERVER mysql_svr_102011 OPTIONS (username 'root', password 'toor');
CREATE USER MAPPING FOR eren SERVER mysql_svr_102012 OPTIONS (username 'root', password 'toor');
CREATE USER MAPPING FOR eren SERVER mysql_svr_102013 OPTIONS (username 'root', password 'toor');

Now we’re ready to see things in action. Let’s reconnect to the master and run our distributed query:

mysql> SELECT L_SHIPMODE, COUNT(*) FROM LINEITEM WHERE L_PARTKEY > 100 AND L_PARTKEY < 150 GROUP BY L_SHIPMODE;
+------------+----------+
| L_SHIPMODE | COUNT(*) |
+------------+----------+
| AIR        |      197 |
| FOB        |      203 |
| MAIL       |      213 |
| RAIL       |      228 |
| REG AIR    |      222 |
| SHIP       |      197 |
| TRUCK      |      215 |
+------------+----------+
7 rows in set (1 min 27.44 sec)

postgres=# SELECT L_SHIPMODE, COUNT(*) FROM LINEITEM WHERE L_PARTKEY > 100 AND L_PARTKEY < 150 GROUP BY L_SHIPMODE;
 l_shipmode | count 
------------+-------
 REG AIR    |   222
 MAIL       |   213
 TRUCK      |   215
 SHIP       |   197
 AIR        |   197
 FOB        |   203
 RAIL       |   228
(7 rows)

Time: 28090.748 ms

At this point, we have connected our shards to actual MySQL tables and use Citus’ parallel query capabilities. We can now migrate the data to PostgreSQL tables for even better performance. We can do it by creating actual tables for each shard and replacing the real table with the foreign table shard. For example for shard 102008, in the worker:

BEGIN; 
CREATE TABLE intermediate_102008 AS SELECT * FROM lineitem_102008; 
DROP TABLE lineitem_102008; 
ALTER TABLE intermediate_102008 RENAME TO lineitem_102008; 
END;

As we can see, it is possible to bring Citus’ parallel query planning and horizontal scaling capabilities with mysql_fdw to MySQL data. This approach can potentially be extended to any data source with the help of Postgres’ great FDWs. Alternatively, we could migrate the data into regular PostgreSQL tables, which could provide even better performance.

Eren Basak

Written by Eren Basak

Engineering lead of the Azure Cosmos DB for PostgreSQL team at Microsoft. Experience with Postgres, Ruby, Rails, Django, Node, React, C#, Java, & Unity. Loves motorcycles and StarCraft.

@aamederen aamederen