Examples with Sample Data

In the following, we describe two examples that relate to big data analytics. The first example contains customer reviews for different products listed at Amazon.com. This public dataset only includes 7.5M reviews, but it provides a nice example of modeling and analyzing events data. This event-based data model can also be generalized to other use cases; for example, each line in a website's log file or each user action in a mobile application can also be modeled as an event.

The second example covers the TPC-H data warehouse benchmark; this benchmark has several tables of varying sizes and includes a group of ad-hoc queries that are more complex in nature than the previous ones. This benchmark also provides a tool to generate data for different scale factors (1 GB to 100 TB), and as a result offers a nice way to evaluate how Citus DB scales as the size of the dataset increases.

In both of these examples, we describe a setup with one master and two worker databases. We also note that PostgreSQL defaults to conservative resource settings at setup time, and that its performance in these examples will therefore be lower than usual. We describe these performance settings in more detail later on.


Amazon Customer Reviews

Amazon customer reviews data was originally collected by crawling across 500K product pages on the Amazon website and by fetching 7.5M customer reviews that relate to these products. This dataset and a description of the original data's format are explained in more detail here. For our example, we converted these customer reviews from their original form, and denormalized them to also include details about the reviewed product itself.

With these denormalized reviews, we are ready to start working on our example. We first download customer reviews data for years 1998 and 1999. These data represent about one fourth of all reviews, and more data from subsequent years (2000 to 2004) are also available.

localhost# wget http://examples.citusdata.com/customer_reviews_1998.csv.gz
localhost# wget http://examples.citusdata.com/customer_reviews_1999.csv.gz

localhost# gzip -d customer_reviews_1998.csv.gz
localhost# gzip -d customer_reviews_1999.csv.gz

Then, let's connect to the master node running at the default PostgreSQL port, and issue a DDL command to create a distributed table.

localhost# /opt/citusdb/3.0/bin/psql -h localhost -p 5432 -d postgres

postgres=# CREATE TABLE customer_reviews
(
    customer_id TEXT not null,
    review_date DATE not null,
    review_rating INTEGER not null,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10) not null,
    product_title TEXT not null,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
)
DISTRIBUTE BY APPEND (review_date);

This command creates a new distributed table, and has almost the same syntax as PostgreSQL's create table command. The only difference is that the command has a distribute by append clause at the end, which tells the database the column to use for distribution.

Note that we specified append in the distribution clause; this is currently the only distribution method available. Further, this method doesn't in fact enforce a particular distribution; it merely tells the database to keep minimum and maximum values for the review_date column in each shard. The database then uses these values to prune away unrelated shards and optimize queries.

This approach works nicely for event or fact data, as these data already have an inherent time dimension. Still, you may want to query data on secondary dimensions at times, and that's when creating an index comes in handy. Note that indexes visibly increase data loading times in PostgreSQL, so you may want to skip this step if you don't need them:

postgres=# CREATE INDEX customer_id_index ON customer_reviews (customer_id);

Now, let's load these customer reviews to the database by specifying the correct file path. Note that you can also load these files in parallel through separate database connections or from different worker nodes.

postgres=# \STAGE customer_reviews FROM '/home/user/customer_reviews_1998.csv' (FORMAT CSV)
postgres=# \STAGE customer_reviews FROM '/home/user/customer_reviews_1999.csv' (FORMAT CSV)

This \stage command borrows its syntax from the client-side \copy command in PostgreSQL. Behind the covers, the staging command opens a connection to the local worker node, creates at least one shard there, and uploads the customer reviews to this shard. The command then replicates this shard on another worker node, and finalizes the shard's metadata with the master node.

By default, the \stage command depends on two configuration entries for its behavior. These two entries are called shard_max_size and shard_replication_factor, and they live in postgresql.conf. The first entry controls when an uploaded file gets split into a new shard, and defaults to 1 GB. The second entry determines the number of nodes each shard gets replicated to, and defaults to two nodes. You may want to increase this replication factor if you run large clusters and observe node failures on a more frequent basis.

After creating a reviews table and staging data into it, you are now ready to run analytics queries. We use the psql command prompt to issue queries, but you could also use a graphical user interface. Note that you can also measure query response times in psql by typing \timing at the command prompt.

-- Find all reviews a particular customer made on the Dune series in 1998.

SELECT
    customer_id, review_date, review_rating, product_id, product_title
FROM
    customer_reviews
WHERE
    customer_id ='A27T7HVDXA3K2A' AND
    product_title LIKE '%Dune%' AND
    review_date >= '1998-01-01' AND
    review_date <= '1998-12-31';

-- Do we have a correlation between a book's title's length and its review ratings?

SELECT
    width_bucket(length(product_title), 1, 50, 5) title_length_bucket,
    round(avg(review_rating), 2) AS review_average,
    count(*)
FROM
   customer_reviews
WHERE
    product_group = 'Book'
GROUP BY
    title_length_bucket
ORDER BY
    title_length_bucket;

TPC-H Data Warehouse Benchmark

The TPC-H benchmark measures query run time performance for data warehousing workloads. The benchmark simulates activities of an industry that sells or distributes products worldwide (car rental, online retail, etc.). For these simulations, the benchmark defines eight database tables and 22 queries, and includes a tool to generate data at different scale factors.

We configured this tool to generate data for PostgreSQL installations, and altered table DDL commands to create distributed tables. To download the modified benchmark tool, you need to run the following commands.

localhost# wget http://examples.citusdata.com/tpch_2_13_0.tar.gz
localhost# tar xvfz tpch_2_13_0.tar.gz
localhost# cd tpch_2_13_0
localhost# gmake

The compiled benchmark tool provides several options for data generation. For example, the tool can generate a defined subset of the data, allowing for large datasets to be generated in parallel. In the following, you generate and load the entire dataset in serial, as our example only uses a scale factor of 1 (1 GB). Larger datasets will greatly benefit from parallel data loads.

localhost# ./dbgen -f -s 1

Next, you need to create distributed tables and load data into them. Below, you create tables using the DDL file that comes with the benchmark tool. You then load data to the three largest tables in the benchmark; commands for other tables are similar.

localhost# /opt/citusdb/3.0/bin/psql -h localhost -p 5432 -d postgres -f dss_distributed.ddl
localhost# /opt/citusdb/3.0/bin/psql -h localhost -p 5432 -d postgres

postgres=# SET shard_max_size TO '128MB';
postgres=# \STAGE lineitem FROM '/home/user/tpch_2_13_0/lineitem.tbl' WITH DELIMITER '|'
postgres=# \STAGE orders FROM '/home/user/tpch_2_13_0/orders.tbl' WITH DELIMITER '|'

postgres=# SET shard_replication_factor TO '1';
postgres=# \STAGE part FROM '/home/user/tpch_2_13_0/part.tbl' WITH DELIMITER '|'

Note that you changed two run-time configuration parameters in this example. First, you reduced the shard maximum size; this change breaks up staged data into multiple shards and enables parallel execution of analytics queries across these shards. Second, you set the replication factor to 1 for the part table; this ensures that only one worker node has this table's shard(s) and helps us demonstrate how shards are moved around during joins.

Now, let's start with a simple query. As in the previous example, you can type \timing first to measure query response times. You can also notably increase the query's performance with additional PostgreSQL tuning; and we talk about these optimizations later on.

SELECT
    sum(l_extendedprice * l_discount) as revenue
FROM
    lineitem
WHERE
    l_shipdate >= date '1994-01-01' AND
    l_shipdate < date '1994-01-01' + interval '1' year AND
    l_discount between 0.06 - 0.01 AND 0.06 + 0.01 AND
    l_quantity < 24;

This simple query computes an aggregate across one year's worth of data. The query still needs to scan through the entire table though, as the table is distributed on the order key and doesn't yet have any indexes defined on the shipment date.

Our next query is slightly more complicated and involves a table join. Please also note that these two TPC-H queries and six other ones that we currently support are included in the queries_distributed file and come with the benchmark tool.

SELECT
    100.00 *
    sum(case when p_type like 'PROMO%'
             then l_extendedprice * (1 - l_discount)
             else 0 end) /
    sum(l_extendedprice * (1 - l_discount)) AS promo_revenue
FROM
    lineitem,
    part
WHERE
    l_partkey = p_partkey AND
    l_shipdate >= date '1995-09-01' AND
    l_shipdate < date '1995-09-01' + interval '1' month;

This query includes a join between a large lineitem table and a small part table. The distinction between large and small tables is determined by the configuration entry large_table_shard_count; and tables whose shard count exceed this value are considered as large.

Citus DB treats large and small tables differently, and replicates a small table's shards to where the large table's shards are in order to perform the query. In this example, the part table's shard(s) are cached on all worker nodes the very first time the user issues a query. Subsequent queries that involve the part table then use these cached shards.

This model works nicely for queries that join one large and multiple small tables together. The model also supports joins between large tables, provided that the tables are already partitioned on the join key. CitusDB also supports dynamic repartitioning of tables to handle joins between any number of tables, independent of their size and partitioning method. The query planner determines the ideal join strategy from statistics gathered from the partitioned tables.