Foreign Data

Foreign data wrappers are a new feature introduced in PostgreSQL 9.1. They allow executing SQL queries on data that's external to the database. To enable them, you need to define a foreign data wrapper that can convert the foreign data from its original form into PostgreSQL's internal tuple format.

For us, the really exciting part is enabling this functionality in CitusDB. We think having the ability to run SQL on distributed data, without having to do any data loads, provides substantial value. In the following, we describe examples where we run queries on distributed flat files and MongoDB. Through writing different foreign data wrappers, other data formats can be processed as well.

File Foreign Data Wrapper

One example foreign data wrapper that comes with PostgreSQL's contrib package is file_fdw. This wrapper handles flat files with tabular data, and several good tutorials for using it with PostgreSQL already exist.

CitusDB extends on this functionality by creating a foreign table for each flat file, and distributing SQL queries across these foreign tables. The CitusDB master node can efficiently parallelize filters, groupings, sorts, limits, aggregations, most table joins, and certain subselects.

To demonstrate this with an example, we describe a setup that uses customer reviews data files from Amazon. We start by showing commands to install the foreign data wrapper package on the local node for 64-bit Fedora or Ubuntu systems. If you'd like to run foreign data wrappers on a multiple node cluster, you'll need to repeat these installation steps on all the nodes.

localhost# wget http://packages.citusdata.com/contrib/citusdb-contrib-2.0.0-1.x86_64.rpm
localhost# sudo rpm --install citusdb-contrib-2.0.0-1.x86_64.rpm

OR

localhost# wget http://packages.citusdata.com/contrib/citusdb-contrib-2.0.0-1.amd64.deb
localhost# sudo dpkg --install citusdb-contrib-2.0.0-1.amd64.deb

Then, you need to download customer reviews data for years 1998 and 1999. These data include 1.75M customer reviews, and more data from subsequent years (2000 to 2004) are also available.

localhost# wget http://examples.citusdata.com/customer_reviews_1998.csv.gz
localhost# wget http://examples.citusdata.com/customer_reviews_1999.csv.gz

localhost# gzip -d customer_reviews_1998.csv.gz
localhost# gzip -d customer_reviews_1999.csv.gz

Now, we need to create an extension and server for foreign files, and then create a distributed foreign table associated with the file server. Note that you need to connect to the master database to issue these commands.

localhost# /opt/citusdb/2.0/bin/psql -h localhost -p 5432 -d postgres

postgres# CREATE EXTENSION file_fdw;
postgres# CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;

postgres# CREATE FOREIGN TABLE customer_reviews
(
    customer_id TEXT not null,
    review_date DATE not null,
    review_rating INTEGER not null,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10) not null,
    product_title TEXT not null,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
)
DISTRIBUTE BY APPEND (review_date)
SERVER file_server
OPTIONS (filename '', format 'csv');

The first two commands are standard in PostgreSQL. The last command creates a new distributed foreign table, and has has almost the same syntax as PostgreSQL's create foreign table command. The only difference is that the command has a distribute by append clause at the end. In this context, the append clause tells the database to extract and keep the minimum and maximum values for review_dates in each foreign file.

Now, let's "upload" our customer reviews files to the database by specifying the correct file path. In practice, the following commands simply change table related metadata on the master and the worker node. They initiate file copy operations only if the replication factor is set to more than 1.

postgres=# \STAGE customer_reviews FROM '/home/user/customer_reviews_1998.csv'
postgres=# \STAGE customer_reviews FROM '/home/user/customer_reviews_1999.csv'

Once the \stage command finishes, you are now ready to run analytic queries on these flat files. CitusDB will partition an incoming analytic query into one query per foreign file, run these queries in parallel, and merge their results.

-- Find all popular reviews made on the Dune series in the spring of 1998.

SELECT
    customer_id, review_date, review_rating, product_id, product_title
FROM
    customer_reviews
WHERE
    product_title LIKE '%Dune%' AND
    review_votes >= 10 AND
    review_date >= '1998-03-01' AND
    review_date < date '1998-03-01' + interval '3' month;

-- Do we have a correlation between a book's title's length and its review ratings?

SELECT
    width_bucket(length(product_title), 1, 50, 5) title_length_bucket,
    round(avg(review_rating), 2) AS review_average,
    count(*)
FROM
   customer_reviews
WHERE
    product_group = 'Book'
GROUP BY
    title_length_bucket
ORDER BY
    title_length_bucket;

MongoDB Foreign Data Wrapper

Foreign data wrappers can also enable SQL queries on semi-structured data. For this, the wrapper needs to establish a convention that maps semi-structured data fields into table columns in PostgreSQL. To demonstrate such an example mapping, we implemented an open source wrapper named mongo_fdw.

    Note that we are in the process of upgrading mongo_fdw to use PostgreSQL 9.2's
    foreign data wrapper APIs. In the meantime, you will either need to install
    using the instructions on the GitHub page, or keep using CitusDB 1.0 packages.

In the following, we assume that you already have a MongoDB installation, and describe doing a fresh installation from CitusDB packages. You can also use regular PostgreSQL 9.1 packages; we chose our packages to ensure consistent use of directory paths.

Now, let's first download some example JSON files and import them into MongoDB. We use customer reviews data from Amazon for this example, and download 1.75M reviews for years 1998 and 1999. More data from subsequent years are also available.

localhost# wget http://examples.citusdata.com/customer_reviews_1998.json.gz
localhost# wget http://examples.citusdata.com/customer_reviews_1999.json.gz

localhost# gzip -d customer_reviews_1998.json.gz
localhost# gzip -d customer_reviews_1999.json.gz

localhost# mongoimport --port 27017 --db test --collection customer_reviews \
           --type json --file customer_reviews_1998.json
localhost# mongoimport --port 27017 --db test --collection customer_reviews \
           --type json --file customer_reviews_1999.json

Then, we need to install packages for CitusDB and foreign data wrappers; and we have these packages available for different platforms and platform versions. If you are using Fedora 12+ or Ubuntu 10.04+ on a 64-bit machine:

localhost# wget http://packages.citusdata.com/readline-6.0/citusdb-1.0.1-1.x86_64.rpm
localhost# wget http://packages.citusdata.com/contrib/citusdb-contrib-1.0.0-1.x86_64.rpm

localhost# sudo rpm --install citusdb-1.0.1-1.x86_64.rpm
localhost# sudo rpm --install citusdb-contrib-1.0.0-1.x86_64.rpm

OR

localhost# wget http://packages.citusdata.com/readline-6.0/citusdb-1.0.1-1.amd64.deb
localhost# wget http://packages.citusdata.com/contrib/citusdb-contrib-1.0.0-1.amd64.deb

localhost# sudo dpkg --install citusdb-1.0.1-1.amd64.deb
localhost# sudo dpkg --install citusdb-contrib-1.0.0-1.amd64.deb

After installing the PostgreSQL database, you are now ready to start it up.

localhost# /opt/citusdb/1.0/bin/pg_ctl -D /opt/citusdb/1.0/data -l logfile start

Next, you can connect to the PostgreSQL database, and create a foreign table for the customer_reviews collection.

localhost# /opt/citusdb/1.0/bin/psql -h localhost -d postgres

postgres=# CREATE EXTENSION mongo_fdw;
postgres=# CREATE SERVER mongo_server FOREIGN DATA WRAPPER mongo_fdw
           OPTIONS (address '127.0.0.1', port '27017');

postgres=# CREATE FOREIGN TABLE customer_reviews
(
    customer_id TEXT,
    review_date TIMESTAMP,
    review_rating INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_group TEXT,
    product_category TEXT,
    similar_product_ids CHAR(10)[]
)
SERVER mongo_server
OPTIONS (database 'test', collection 'customer_reviews');

The first command here loads the foreign data wrapper extension into the database; this needs to happen only once after a database install. The following commands create a new server and foreign table, and associate the foreign table with the customer_reviews collection. Note that the foreign table schema includes only some of the fields defined in the BSON documents. This is fine as we don't impose any restrictions on the number or order of column definitions so as long as the column and field names match.

In here, we also specify the default option values for server and foreign table commands. In practice, if your option values are the same as defaults, you don't need to specify these.

Another point that is worth mentioning is that mongo_fdw requires certain column names to be declared in double quotes. In particular, BSON document keys that contain upper-case letters or that occur within a nested document, need to be quoted when declared as column names. For example, a nested field such as "review": { "Votes": 19 } should be declared as "review.Votes" INTEGER in the foreign table schema.

Once you create the foreign table, you are now ready to run SQL queries on it. In the following, we show a rather simple query, but complex queries that involve sub-selects, SQL window functions, and collection joins are also possible. In fact, you can even join a MongoDB collection with a PostgreSQL table.

-- Do we have a correlation between a book's title's length and its review ratings?

SELECT
    round(avg(review_rating), 2),
    width_bucket(length(product_title), 1, 50, 5) as title_length,
    count(*)
FROM
    customer_reviews
WHERE
    product_group='Book' AND
    review_date >= '1998-01-01' AND
    review_date < date '1998-01-01' + interval '1 year'
GROUP BY
    title_length
ORDER BY
    title_length;

Behind the covers, mongo_fdw first takes all filtering expressions in the WHERE clause, and converts these expressions into their equivalents in the Mongo query language. The wrapper then sends this query to MongoDB, fetches all documents that fit the query criteria, and converts these documents into PostgreSQL's internal tuple format.

[conn2] query test.customer_reviews query: { product_group: "Book", review_date: { $gte:
new Date(883612800000), $lt: new Date(915148800000) } }
[conn2] getmore test.customer_reviews query: { product_group: "Book", review_date: { $gte: 
new Date(883612800000), $lt: new Date(915148800000) } } cursorid:8599789499140355936
...

As the wrapper converts fetched documents into tuples, the PostgreSQL executor iterates over tuples and executes the query plan. To see this plan and estimated query execution costs, you can run EXPLAIN along with your queries. Currently, mongo_fdw only incorporates the document count and average document size when estimating costs for different query plans. A better cost estimate also includes data distribution statistics; and the newer PostgreSQL 9.2 APIs allow for making use of such statistics.