Wednesday, August 12, 2015

Two Great Tastes That Taste Great Together: cstore + Pipeline

cstore_fdw, the column-store extension for PostgreSQL by CitusData, is a really good way to add compressed storage for archival data, and analytic data intended to be aggregated, to your application.  Because it's a column store, though, cstore wants new data added in batches, the bigger the better.  This means that you need to find some way to batch-up incoming data, preferably one able to accept a lot of new data rapidly. 

This is where PipelineDB comes in.  Since PipelineDB is open source now, and based on 9.4.4, I can add extensions to it, including cstore_fdw.  I've done so with the PipelineDB 0.7.7a Docker container, so if you use Docker it's simply available.

As a demonstration of this, I'll set up some fake clickstream data, archived to a cstore table hourly.  First, I wrote a quick python script to generate it continuously and push it to a Pipeline stream.

Then I created the stream and continuous view in PipelineDB:

    CREATE STREAM seenstream ( user_id int, page_id int, ts timestamptz );

    CREATE CONTINUOUS VIEW seenfeed as select user_id, page_id, ts
    FROM seenstream
    WHERE arrival_timestamp > ( clock_timestamp() - interval '90 minutes' );


Next, I created the cstore table:

    CREATE EXTENSION cstore_fdw;

    CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;

    CREATE FOREIGN TABLE seen (
        user_id int,
        page_id int,
        ts timestamp tz
    )
    SERVER cstore_server
    OPTIONS (compression 'pglz');


Finally, I added a simple script which ran the following query once per hour:

    INSERT INTO seen
    SELECT user_id, page_id, ts
    FROM seenfeed
    WHERE ts >= ( now() - interval '1 hour' )
    ORDER BY user_id, page_id, ts;


... and then I started everything in motion.

Now, for cstore the ORDER BY is vitally important; it determines how the blocks you create for the column store are organized.  In this particular case, I knew that I would be doing more analysis by user.  But most users would do ORDER BY ts instead.

After a few hours, I checked back, and now I can run some analytical queries on the cstore table.  For example, user activity:

    select user_id, count(distinct page_id), max(ts) 

    from seen group by user_id;

    user_id | count |              max             
    --------+-------+-------------------------------
          1 |    92 | 2015-08-11 22:59:51.504777+00
          2 |    86 | 2015-08-11 22:54:09.77318+00
          3 |    89 | 2015-08-11 22:59:14.574697+00


page activity:

    select page_id, count(distinct user_id) as duv, max(ts) as last
    from seen group by page_id order by duv desc;

    page_id | duv |             last             
    --------+-----+-------------------------------
         71 |  96 | 2015-08-11 22:59:38.690743+00
         99 |  96 | 2015-08-11 22:58:43.004618+00
          4 |  96 | 2015-08-11 22:57:45.95007+00


... and more. 

Now, in a production environment, you'll want to do more than that.  Depending on traffic, you might batch inserts daily instead of hourly.  You might want to use several continuous views to store different summaries of the data instead of raw data.  But the above should be enough to show you how well these two tools go together.

4 comments:

  1. Reframed as: this is how you do lambda architecture. ;-)

    ReplyDelete
    Replies
    1. Huh! You know the thing where it turns out that there's a name for something you've been doing for 10 years, and you never knew there was?

      Delete
  2. Interesting idea! Can you give more performance insights ? (or maybe release the container :])

    ReplyDelete
    Replies
    1. The container for pipeline+hstore is on Docker Hub, just follow the link.

      Delete