Tuesday, July 28, 2015

PipelineDB: streaming Postgres

If you've been following the tech news, you might have noticed that we have a new open source PostgreSQL fork called "PipelineDB".  Since I've joined the advisory board of PipelineDB, I thought I'd go over what it is, what it does, and what you'd use it for.  If you're not interested in Pipeline, or Postgres forks, you can stop reading now.

PipelineDB is a streaming database version of PostgreSQL.  The idea of a streaming database, first introduced in the PostgreSQL fork TelegraphCQ back in 2003, is that queries are run against incoming data before it is stored, as a kind of stream processing with full query support.  If the idea of a standard database is "durable data, ephemeral queries" the idea of a streaming database is "durable queries, ephemeral data".  This was previously implemented in StreamBase, StreamSQL, and the PostgreSQL fork Truviso. In the Hadoop world, the concept is implemented in Apache SparkSQL.

On a practical level, what streaming queries do is allow you to eliminate a lot of ETL and redundant or temporary storage.

PipelineDB 0.7.7 is 100% of PostgreSQL 9.4, plus the ability to create Continuous Views, which are actually standing queries which produce different data each time you query them depending on the incoming stream.  The idea is that you create the queries which filter and/or summarize the data you're looking for in the stream, and store only the data you want to keep, which can go in regular PostgreSQL tables.

As an example of this, we're going to use PipelineDB to do tag popularity counts on Twitter.  Twitter has a nice streaming API, which gives us some interesting stream data to work with.  First I spun up a PipelineDB Docker container.  Connecting to it, I created the "twitter" database and a static stream called "tweets":

Creating a static stream isn't, strictly speaking, necessary; you can create a Continuous View without one.  As a career DBA, though, implied object names give me the heebie-jeebies.  Also, in some future release of PipelineDB, static streams will have performance optimizations, so it's a good idea to get used to creating them now.

    docker run pipelinedb/pipelinedb
    josh@Radegast:~$ psql -h -p 6543 -U pipeline
    Password for user pipeline:
    psql (9.4.1, server 9.4.4)
    Type "help" for help.
    pipeline=# create database twitter;
    pipeline=# \c twitter
    twitter=# create stream tweets ( content json );

Then I created a Continous View which pulls out all identified hashtags from each tweet.  To do this, I have to reach deep inside the JSON of the tweet structure and use json_array_elements to expand that into a column.  Continuous Views also automatically add a timestamp column called "arrival_timestamp" which is the server timestamp when that particular streaming row showed up.  We can use this to create a 1-hour sliding window over the stream, by comparing it to clock_timestamp().  Unlike regular views, volatile expressions are allowed in Continuous Views.

    SELECT json_array_elements(content #>

      ARRAY['entities','hashtags']) ->> 'text' AS tag
    FROM tweets
    WHERE arrival_timestamp >
          ( clock_timestamp() - interval '1 hour' );

This pulls a continous column of tags which appear in the San Francisco stream.

Then I created a linked Docker container with all of the Python tools I need to use TwitterAPI, and then wrote this little script based on a TwitterAPI example.  This pulls a stream of tweets with geo turned on and identified as being in the area of San Francisco.  Yes, INSERTing into the stream is all that's required for a client to deliver stream data.  If you have a high volume of data, it's better to use the COPY interface if your language supports it.

Then I started it up, and it started pushing tweets into my stream in PipelineDB.  After that, I waited an hour for the stream to populate with an hour's worth of data.

Now, let's do some querying:

    twitter=# select * from tagstream limit 5;

How about the 10 most popular tags in the last hour?

    twitter=# select tag, count(*) as tag_count from tagstream group
              by tag order by tag_count desc limit 10;
         tag       | tag_count
    Hiring         |       211
    Job            |       165
    CareerArc      |       163
    Jobs           |       154
    job            |       138
    SanFrancisco   |        69
    hiring         |        60
    FaceTimeMeNash |        42
    CiscoRocks     |        35
    IT             |        35

I detect a theme here.  Namely, one of sponsored tweets by recruiters.

Now, obviously you could do this by storing all of the tweets in an unlogged table, then summarizing them into another table, etc.  However, using continuous views avoids a bunch of disk-and-time-wasting store, transform, store again if you're uninterested in the bulk of the data (and tweet metadata is bulky indeed). Further, I can create more continuous views based on the same stream, and pull different summary information on it in parallel. 

So, there you have it: PipelineDB, the latest addition to the PostgreSQL family.  Download it or install it using the Docker container I built for it.


  1. Thanks for the docker container !
    Note for the Mac users: add port forwarding when you run docker image.

    docker run -p 6543:6543 pipelinedb/pipelinedb

    Also, any chance to include FDW into PipelineDB ? It would be very useful to populate remote databases from PipelineDB server.

    1. Did you mean pgsql_fdw? I could certainly add that to the Docker image ...

  2. Yes, exaclty. That would allow to run both PipelineDB as stream processor and stable PostgreSQL 9.4 as long term data store. Still debating in office what server should originate FDW connections, but for flexibility I assume both possibilities are required.

    1. I'll be adding it when we push PipelineDB 0.7.8.

    2. version with cstore_fdw and postgres_fdw pushed (as well as citext, fuzzystrmatch, and hstore). Version 0.7.7a

  3. This comment has been removed by the author.

  4. There seems to be a ')' missing before ->> 'text' in the example view.

    1. Thanks! As usual, paste issues when reformatting the query for the blog.