SQL on Hadoop (Beta)

We describe here a setup that runs PostgreSQL instances on Hadoop's data nodes. Each database instance accesses data in colocated HDFS blocks through PostgreSQL foreign data wrappers, and a coordinator node distributes and executes incoming SQL queries. This way, users can immediately run real-time SQL queries on their Hadoop clusters without having to load any data into a database.

In this section, we first give an overview of PostgreSQL foreign data wrappers and their use in a distributed setting. We then talk about our approach in the context of a new generation of analytics databases inspired by Google’s Dremel. Next, we describe the installation steps needed to get CitusDB up and running on a Hadoop cluster.


Overview

Hadoop File System (HDFS) is a distributed file system that partitions its data into blocks and then replicates these blocks across a number of nodes. Metadata on these blocks are kept in the HDFS NameNode, and the actual block data are stored on a cluster of DataNodes. One key property of Hadoop involves colocating Map tasks with block data, and therefore avoiding network transfers between DataNodes.

CitusDB extends on this idea by running a PostgreSQL instance on each DataNode. No data needs to be loaded into Postgres; users simply write a foreign data wrapper that knows how to convert block data into PostgreSQL's tuple format. In a sense, these wrappers are similar to Hive's SerDes.

The user then declares a table schema on the CitusDB master node, and associates this table with an HDFS directory path. Then, they run a Java application that synchronizes block metadata from the HDFS NameNode to the CitusDB master node. This application also creates a colocated foreign table for each new block in HDFS, and optionally collects statistics about the underlying blocks.

Since this application only synchronizes new block metadata, and creates foreign tables that are effectively views on block data, it executes fast. Still, it is worth noting that we envision this application to evolve over time. We could extend the application to pick up metadata changes from the HDFS NameNode's transaction logs in real-time, or provide integration with the metastore in Hive.

Once block metadata appear on the CitusDB master, users can start issuing real-time SQL queries against their data. The master node replans each incoming query for distributed execution, partitions it into smaller queries, and pushes these queries to PostgreSQL instances "holding" the relevant blocks. The master node also bypasses Map/Reduce entirely, and therefore removes any latency associated with running batch jobs.

At a high level, the architecture looks like the following.

CitusDB Architecture

Several important benefits that stem from this approach are:

  • Avoiding network bottlenecks by pushing queries to the data nodes, and running them locally there.
  • Instantly running SQL queries on Hadoop clusters without having to load any data into a database.
  • Leveraging decades of performance and feature work done on PostgreSQL (see section below).


Google Dremel

Google Dremel and Apache Hive have popularized several ideas that we think now define a new generation in analytics databases. These ideas include supporting semi-structured data, defining schemas at read time, and colocating queries with nodes that have the data.

We think these ideas define a new generation in analytics databases, and built CitusDB from the ground up with them in mind. Citus pushes queries to the related data nodes, and uses PostgreSQL foreign data wrappers to read data in any format.

Further, CitusDB enables much more. Existing Hadoop-based systems start with a file system abstraction and intend to build everything that relates to a distributed database from scratch. CitusDB on the other hand builds on top of PostgreSQL, and heavily leverages all optimizations and features that come with a database. These out of the box benefits include:

Thanks to all these features that we inherit from Postgres, the CitusDB master node can solely focus on planning SQL queries for fast parallel execution. For each incoming query, the master node creates a distributed query plan that's commutative and distributive, and then breaks the query into one per relevant data block. The master node then pushes these queries to the data nodes and handles automatic recovery of failures.

Queries that can be efficiently parallelized by the CitusDB master include filters, groupings, sorts, limits, aggregations, most table joins, and certain subselects. We are actively working on improving our SQL functionality; and features that we don't yet support can be found here.


Frequently Asked Questions

How does CitusDB's performance compare against Apache Hive?

The precise amount of performance gains in CitusDB depends on the underlying data’s format and the nature of the SQL queries. The following lists our initial findings; and we are working on producing fair and repeatable benchmark results to share with the community in the upcoming months.

  • CitusDB has notable latency advantages when executing short queries. For example, it can execute a simple query on Hadoop in as little as 100ms, but we don't think such queries make fair performance comparisons.
  • When we compare CitusDB against Apache Hive using the industry standard TPC-H benchmark, we see performance gains of 3-5x for data in text format. When the data is in binary format, these gains increase up to 4-20x.
  • Hive's query performance depends on the join order you specify. CitusDB doesn't have that restriction, and for fair comparisons, we are experimenting with different join orders for Hive QL queries.

Does CitusDB recover from failures?

Yes. The CitusDB master node intelligently re-routes the work on any failed nodes to the remaining nodes in real-time. Since the underlying data are kept in fixed-size blocks in HDFS, a failed node's work can evenly be distributed among the remaining nodes in the cluster.

What about CitusDB master node failures?

CitusDB handles master node failures through PostgreSQL's streaming replication feature. Users set up streaming replicas of the master node, and edit configuration to fail over to one of these replicas in case the master node fails. Further, users can issue read-only SELECT queries against these streaming replicas, but write queries that edit metadata can only go through the active master.

As a side note, any CitusDB node can "act" as a master as long as it has access to table and block metadata. We currently don't cache this metadata on the worker nodes though, in order to avoid potential data consistency issues.

How different is CitusDB from stock PostgreSQL?

CitusDB isn’t a fork of PostgreSQL; it simply extends Postgres to support distributed SQL queries. Also, CitusDB version numbers are aligned with major version upgrades in Postgres; CitusDB v2.0 is based on PostgreSQL 9.2, and v3.0 is based on PostgreSQL 9.3.

In terms of distributed functionality, CitusDB in fact doesn't make a distinction between master and worker nodes. A database becomes the master node simply when the user creates a distributed table on it and uploads block metadata. This master node then executes built in logic to plan and execute distributed SQL queries. On the worker nodes, such logic doesn't need to exist, and one can use stock PostgreSQL there if:

  • Queries don't involve joins. If they do, the worker nodes need to access user-defined functions that fetch block data from other nodes or that repartition table data across a distributed cluster. (CitusDB comes pre-bundled with these functions, but users can also dynamically load them into stock PostgreSQL.)
  • Foreign table create statements for HDFS blocks are issued manually. If foreign table names need to be automatically extended and associated with HDFS blocks, worker nodes need access to CitusDB's user-defined functions.

What are CitusDB's technical limitations?

As previously mentioned, CitusDB doesn't yet support the entire spectrum of SQL queries. Features that are missing from the current release are outlined in here; and we found that many of these limitations can be overcome by modeling the data differently.

Further, CitusDB only supports file formats that have foreign data wrappers implemented for them. This at the moment only includes text data in tabular format, but we are working on implementing foreign wrappers for many other data formats. Finally, we associate one HDFS block with one foreign table, and execute the entire SQL query locally on that block. If bytes for the last record(s) in one HDFS block spill over to the next one, we currently don't fetch those bytes and instead skip the last record. This again is a limitation we intend to remove in v3.1.

What other data formats are in the works?

We are currently implementing foreign data wrappers for JSON, Avro, and Sequence files. We also intend to support common compression formats such as Snappy and Gzip with these foreign wrappers. The nice thing about PostgreSQL's wrappers is that their APIs are publicly available; and one can immediately start benchmarking SQL queries against an HDFS block (local file) after writing the wrapper.

What about running SQL on other distributed databases such as MongoDB and HBase?

CitusDB already provides SQL support for MongoDB through distributed foreign tables. For HBase, we are considering adding support soon. In practice, we find that supporting SQL on any distributed database requires the following:

  1. A mechanism to synchronize shard metadata from the distributed database into the CitusDB master node, and
  2. A foreign data wrapper designed to run locally on each worker node, and that can convert external shard data into PostgreSQL's tuple format.


Pseudo-Distributed Cluster

After talking about our architecture's technical details, we describe here setting up a single node Hadoop cluster operating in pseudo-distributed mode. We then set up CitusDB databases on this node, and cover all the steps involved in querying the data that's in Hadoop. To simplify installation instructions, we use an EC2 AMI that includes Hadoop 1.1, CitusDB 3.0, and our metadata syncing application. You can alternatively use the Hadoop and CitusDB setup guides to download these packages and install from there.

  1. Click on this link to launch an EC2 instance, and pick an instance type with reasonable I/O performance. Or, you can manually sign in to the AWS Management Console, and launch ami-975043fe yourself.
  2. Log in to the EC2 instance, and download some example data.
  3. localhost# ssh -i <private SSH key file> ec2-user@<ec2 nodename>
    
    ec2-node# wget http://examples.citusdata.com/customer_reviews_1998.csv.gz
    ec2-node# wget http://examples.citusdata.com/customer_reviews_1999.csv.gz
    
    ec2-node# gzip -d customer_reviews_1998.csv.gz
    ec2-node# gzip -d customer_reviews_1999.csv.gz
    
  4. Configure passwordless SSH using the following commands.
  5. ec2-node# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
    ec2-node# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
    
  6. Format a new distributed filesystem. Then, start up all Hadoop daemons and say yes to add localhost to the list of known hosts. Next, create an HDFS directory, and stage data into this directory. After these steps, we have a Hadoop cluster that has some example data loaded into it.
  7. ec2-node# cd hadoop-1.1.1
    ec2-node# bin/hadoop namenode -format
    ec2-node# bin/start-all.sh
    
    ec2-node# bin/hadoop fs -mkdir /user/data/reviews
    ec2-node# bin/hadoop fs -put ~/customer_reviews_1998.csv /user/data/reviews
    ec2-node# bin/hadoop fs -put ~/customer_reviews_1999.csv /user/data/reviews
    
  8. Initialize a CitusDB worker database (we assume that the already installed database acts as the master). Then, open the master database's membership file, and append the worker database's network location to this file. Note that you need to specify localhost. with the extra period in here to make it match to the hostname used by Hadoop.
  9. ec2-node# /opt/citusdb/3.0/bin/initdb -D /opt/citusdb/3.0/data.9700
    ec2-node# emacs -nw /opt/citusdb/3.0/data/pg_worker_list.conf
    
    # HOSTNAME     [PORT]     [RACK]
    localhost. 9700
    
  10. Start up CitusDB master and worker databases.
  11. localhost# /opt/citusdb/3.0/bin/pg_ctl -D /opt/citusdb/3.0/data -l logfile start
    localhost# /opt/citusdb/3.0/bin/pg_ctl -D /opt/citusdb/3.0/data.9700 -o "-p 9700" \
                 -l logfile.9700 start
    
  12. Connect to the CitusDB master database, and load the file foreign data wrapper. Then, create a distributed foreign table that specifies the file format and the HDFS directory path.
  13. ec2-node# /opt/citusdb/3.0/bin/psql -h localhost -p 5432 -d postgres
    
    postgres# CREATE EXTENSION file_fdw;
    postgres# CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
    
    postgres# CREATE FOREIGN TABLE customer_reviews
    (
        customer_id TEXT not null,
        review_date DATE not null,
        review_rating INTEGER not null,
        review_votes INTEGER,
        review_helpful_votes INTEGER,
        product_id CHAR(10) not null,
        product_title TEXT not null,
        product_sales_rank BIGINT,
        product_group TEXT,
        product_category TEXT,
        product_subcategory TEXT,
        similar_product_ids CHAR(10)[]
    )
    DISTRIBUTE BY APPEND (review_date)
    SERVER file_server
    OPTIONS (filename '', hdfs_directory_path '/user/data/reviews', format 'csv');
    
  14. Run the hadoop-sync application to synchronize new HDFS block metadata. Note that hadoop-sync is idempotent, so you can run it multiple times and it will only propagate new block metadata to the CitusDB master node.
  15. ec2-node# cd ~/hadoop-sync
    ec2-node# java -jar target/hadoop-sync-0.1.jar customer_reviews --fetch-min-max
    
  16. Run SQL queries on HDFS data. For this, you can use /opt/citusdb/3.0/bin/psql or a graphical user interface.
  17. -- Find all reviews a particular customer made on the Dune series in 1998.
    
    SELECT
        customer_id, review_date, review_rating, product_id, product_title
    FROM
        customer_reviews
    WHERE
        customer_id ='A27T7HVDXA3K2A' AND
        product_title LIKE '%Dune%' AND
        review_date >= '1998-01-01' AND
        review_date <= '1998-12-31';
    
    -- Do we have a correlation between a book's title's length and its review ratings?
    
    SELECT
        width_bucket(length(product_title), 1, 50, 5) title_length_bucket,
        round(avg(review_rating), 2) AS review_average,
        count(*)
    FROM
       customer_reviews
    WHERE
        product_group = 'Book'
    GROUP BY
        title_length_bucket
    ORDER BY
        title_length_bucket;
    

Distributed Cluster

We now describe the steps involved in setting up CitusDB on a multiple node Hadoop cluster. To reduce the number of steps involved, we use an EC2 machine image that includes Hadoop 1.1, CitusDB 3.0, and our metadata syncing application. Alternatively, you can use the Hadoop and CitusDB setup guides to download these packages and install from there.

For our example cluster, we assume a four node setup. One of these nodes act as the HDFS NameNode, and the others act as the DataNodes. We use the terminology node-name-100 to refer to the NameNode's hostname; and for simplicity assume that the CitusDB master node lives on name-node-100 in our setting. In production deployments however, we advise against running the NameNode and the CitusDB master on the same machine.

  1. Click on this link to launch an EC2 instance, and pick an instance type with reasonable I/O performance. Or, you can manually sign in to the AWS Management Console, and launch ami-975043fe yourself.
  2. Choose one of the instances as the HDFS NameNode, and then log in to this instance to set up passwordless SSH.
  3. localhost# ssh -i <private SSH key file> ec2-user@<name-node-100>
    
    name-node-100# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
    name-node-100# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
    
  4. Copy over this public key to all DataNodes in the cluster. To do this, you can scp the generated id_dsa.pub to each DataNode, and then append this key to each node's ~/.ssh/authorized_keys. Alternatively, you can manually log in to each DataNode, and use an editor such as vi or emacs to copy/paste the public key to authorized_keys files. Once you make these changes, you can confirm they took effect by sshing to all DataNodes.
  5. name-node-100# ssh [email protected]
    name-node-100# ssh [email protected]
    name-node-100# ssh ec2-user@ip-data-node-103.ec2[email protected]
    
  6. On all nodes, change Hadoop configuration to enable distributed execution. First, edit conf/core-site.xml to configure the NameNode's EC2 hostname. Then, remove the first two properties from conf/hdfs-site.xml.
  7. all-nodes# emacs -nw ~/hadoop-1.1.1/conf/core-site.xml
    
    <configuration>
      <property>
        <name>fs.default.name</name>
        <value>hdfs://ip-namenode-name-goes-here.ec2.internal:9000</value>
      </property>
    </configuration>
    
    all-nodes# emacs -nw ~/hadoop-1.1.1/conf/hdfs-site.xml
    
    <configuration>
      <property>
        <name>dfs.name.dir</name>
        <value>/opt/citusdb/hdfs/name</value>
      </property>
      <property>
        <name>dfs.data.dir</name>
        <value>/opt/citusdb/hdfs/data</value>
      </property>
      <property>
        <name>dfs.block.local-path-access.user</name>
        <value>ec2-user</value>
      </property>
    </configuration>
    
  8. On the HDFS NameNode, change conf/slaves to include EC2 hostnames for all DataNodes.
  9. name-node-100# emacs -nw ~/hadoop-1.1.1/conf/slaves
    
    ip-data-node-100-goes-here.ec2.internal
    ip-data-node-101-goes-here.ec2.internal
    ip-data-node-102-goes-here.ec2.internal
    
  10. Download some example data to the NameNode. Alternately, you can download this data to any one of the DataNodes. Further, you can also download more example data for years 2000 to 2004.
  11. name-node-100# wget http://examples.citusdata.com/customer_reviews_1998.csv.gz
    name-node-100# wget http://examples.citusdata.com/customer_reviews_1999.csv.gz
    
    name-node-100# gzip -d customer_reviews_1998.csv.gz
    name-node-100# gzip -d customer_reviews_1999.csv.gz
    
  12. Format a new distributed filesystem. Then start up HDFS nodes from the NameNode, answering yes to all SSH related questions. Next, create an HDFS directory, and stage data into this directory. After these steps, we have a running Hadoop cluster with some example data loaded into it.
  13. name-node-100# cd hadoop-1.1.1
    name-node-100# bin/hadoop namenode -format
    name-node-100# bin/start-dfs.sh
    
    name-node-100# bin/hadoop fs -mkdir /user/data/reviews
    name-node-100# bin/hadoop fs -put ~/customer_reviews_1998.csv /user/data/reviews
    name-node-100# bin/hadoop fs -put ~/customer_reviews_1999.csv /user/data/reviews
    
  14. Edit the CitusDB master node's membership file to include EC2 hostnames for all worker nodes in the cluster.
  15. name-node-100# emacs -nw /opt/citusdb/3.0/data/pg_worker_list.conf
    
    # HOSTNAME     [PORT]     [RACK]
    ip-data-node-100-goes-here.ec2.internal
    ip-data-node-101-goes-here.ec2.internal
    ip-data-node-102-goes-here.ec2.internal
    
  16. Start up CitusDB on all nodes in the cluster.
  17. all-nodes# /opt/citusdb/3.0/bin/pg_ctl -D /opt/citusdb/3.0/data -l logfile start
    
  18. Connect to the CitusDB master on the NameNode. Then, load the file foreign data wrapper, and create a distributed foreign table that specifies the HDFS directory path.
  19. name-node-100# /opt/citusdb/3.0/bin/psql -h localhost -d postgres
    
    postgres# CREATE EXTENSION file_fdw;
    postgres# CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
    
    postgres# CREATE FOREIGN TABLE customer_reviews
    (
        customer_id TEXT not null,
        review_date DATE not null,
        review_rating INTEGER not null,
        review_votes INTEGER,
        review_helpful_votes INTEGER,
        product_id CHAR(10) not null,
        product_title TEXT not null,
        product_sales_rank BIGINT,
        product_group TEXT,
        product_category TEXT,
        product_subcategory TEXT,
        similar_product_ids CHAR(10)[]
    )
    DISTRIBUTE BY APPEND (review_date)
    SERVER file_server
    OPTIONS (filename '', hdfs_directory_path '/user/data/reviews', format 'csv');
    
  20. Before running the hadoop-sync application, edit its properties file. Configure NameNode's hostname and CitusDB worker node's port number.
  21. name-node-100# cd ~/hadoop-sync
    name-node-100# cp target/classes/sync.properties .
    name-node-100# emacs -nw sync.properties
    
    # HDFS related cluster configuration settings
    HdfsMasterNodeName = ip-namenode-name-goes-here.ec2.internal
    HdfsMasterNodePort = 9000
    HdfsWorkerNodePort = 50020
    
    # CitusDB related cluster configuration settings
    CitusMasterNodeName = localhost
    CitusMasterNodePort = 5432
    CitusWorkerNodePort = 5432
    
  22. Run the hadoop-sync application to synchronize new HDFS block metadata. Note that hadoop-sync is idempotent, so you can run it multiple times and it will only propagate new block metadata to the CitusDB master node.
  23. name-node-100# java -jar target/hadoop-sync-0.1.jar customer_reviews --fetch-min-max
    
  24. Run SQL queries on HDFS data. For this, you can use /opt/citusdb/3.0/bin/psql or a graphical user interface.
  25. -- Find all reviews a particular customer made on the Dune series in 1998.
    
    SELECT
        customer_id, review_date, review_rating, product_id, product_title
    FROM
        customer_reviews
    WHERE
        customer_id ='A27T7HVDXA3K2A' AND
        product_title LIKE '%Dune%' AND
        review_date >= '1998-01-01' AND
        review_date <= '1998-12-31';
    
    -- Do we have a correlation between a book's title's length and its review ratings?
    
    SELECT
        width_bucket(length(product_title), 1, 50, 5) title_length_bucket,
        round(avg(review_rating), 2) AS review_average,
        count(*)
    FROM
       customer_reviews
    WHERE
        product_group = 'Book'
    GROUP BY
        title_length_bucket
    ORDER BY
        title_length_bucket;