Build a real time data push engine using Python and Rethinkdb


Namaste everyone.Today we are going to talk about building real time data push engines.How to design models for the modern realtime web will be the lime light point in this article. We are going to build a cool push enigne that notifies “Super Heroes” real time in the Justice League(DC). We can also develop real time chat applications very easily with same principles.

What actually is a Data Push Engine?

Push engine is nothing but a software piece that pushes notifications from the server to all the clients who subscribed to recieve these events.When your app polls for data, it becomes slow, unscalable, and cumbersome to maintain.In order to overcome this burden two proposals were made.

  1. Web Sockets
  2. Server Sent Events(SSE)

But using any one of the above technologies is not sufficient for modern real time web. Think it in this way. The query-response database access model works well on the web because it maps directly to HTTP’s request-response. However, modern marketplaces, streaming analytics apps, multiplayer games, and collaborative web and mobile apps require sending data directly to the client in realtime. For example, when a user changes the position of a button in a collaborative design app, the server has to notify other users that are simultaneously working on the same project. Web browsers support these use cases via WebSockets and long-lived HTTP connections, but relying on database to notify updates is cool.

Seeing is believing

I am going to run my project first to make you confident with it. Project is nothing but a website which does following. Code for this project is available at

  • I am going to start a Justice League website (like the one superman runs).
  • Website collects nickname and email of a SuperHero.
  • Notify all existing heroes about new joinees in real time.

So I am going to tell you a small story. Just click the first image and navigate to last one by one. Don’t forget to read description below in each  image!. Press Esc to exit from slide show.

I think you got something with above story.If you don’t let me explain. Here we are asking information from clients and navigating them to their dashboard. From then all clients who are on dashboard will be notified about newly joined people instantly. No refresh,No ajax polling. Thanks to our push engine.

Are you kidding ,I can implement that using web sockets?

Yes are right. You can purely implement the above notification system using websockets. But why I used few more things to do that. Here is the answer.

“Using websockets code for designing push logic  is cumbersome. Websocket code must do a push from server and recieve that in client . Traditional databases do not know about the websockets or Server Sent Events. There we need to poll the database changes and then push them to intermediate queue and from there to clients. I say remove that headache from our server. Just exploit database capability of pushing changes in realtime whenever a change occurs to it’s data. That is why I chose RethinkDB plus Websockets“.

How I build that Push engine

I used two main ingredients to create data push engine shown above.

  1. Python Tornado web server ( for handling websocket requests and responses)
  2. RethinkDB ( for storing data and also to push real time changes to the server)

What is RethinkDB?

According to RethinkDB official website

RethinkDB is the first open-source, scalable JSON database built from the ground up for the realtime web. It inverts the traditional database architecture by exposing an exciting new access model – instead of polling for changes, the developer can tell RethinkDB to continuously push updated query results to applications in realtime. RethinkDB’s realtime push architecture dramatically reduces the time and effort necessary to build scalable realtime apps.

When is RethinkDB a good choice?

RethinkDB is a great choice when your applications could benefit from realtime feeds to your data.

The query-response database access model works well on the web because it maps directly to HTTP’s request-response. However, modern applications require sending data directly to the client in realtime. Use cases where companies benefited from RethinkDB’s realtime push architecture include:

  • Collaborative web and mobile apps
  • Streaming analytics apps
  • Multiplayer games
  • Realtime marketplaces
  • Connected devices

We know that modern web demands falls in one of the above catagories.So RethinkDB is extremely useful for the people want to exploit it’s real power for building real time apps.

RethinkDB has a dedicated python driver.In our project we are just inserting our dicument and reading the changes on users table.For getting familiar with RethinkDB python client visit these links.

Setup for our data push engine

Install RethinkDB on Ubuntu14.04 in this way.

$ source /etc/lsb-release && echo "deb $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
$ wget -qO- | sudo apt-key add -
$ sudo apt-get update && sudo apt-get install rethinkdb
$ sudo cp /etc/rethinkdb/default.conf.sample /etc/rethinkdb/instances.d/instance1.conf
$ sudo /etc/init.d/rethinkdb restart

Create virtualenv for the project and install required libraries

$ virtualenv rethink
$ source rethink/bin/activate
$ pip install tornado rethinkdb jinja2

Now everything is fine.My main applciation will be and there are templates and staticfiles in my project.The project structure looks like this.

|-- requirements.txt
|-- static
|  `-- js
|      `-- sockhand.js
`-- templates
|   `--detail.html

Now letus write our file.

#For tornado server stuff 

import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.websocket
import tornado.httpserver
from tornado.concurrent import Future

from jinja2 import Environment, FileSystemLoader #For templating stuff

import rethinkdb as r #For db stuff

from rethinkdb.errors import RqlRuntimeError, RqlDriverError

from conf import * #Fetching db and table details here

#Load the template environment

template_env = Environment(loader=FileSystemLoader("templates"))

db_connection = r.connect(RDB_HOST,RDB_PORT) #Connecting to RethinkDB server

#Our superheroes who connects to server
subscribers = set() 

#This is just for cross-checking database and table exists 
def dbSetup():
    print PROJECT_DB,db_connection
        print 'Database setup completed.'
    except RqlRuntimeError:
            print 'Table creation completed'
            print 'Table already exists.Nothing to do'
        print 'App database already exists.Nothing to do'

#There is a loop type in python rethinkDB client.set it to tornado

class MainHandler(tornado.web.RequestHandler): #Class that renders details page and Dashbaord
    def get(self):
        detail_template = template_env.get_template("detail.html") #Loads tenplate
    def post(self):
        home_template = template_env.get_template("home.html")
        email = self.get_argument("email")
        name = self.get_argument("nickname")
        connection = r.connect(RDB_HOST, RDB_PORT, PROJECT_DB)
        #Thread the connection
        threaded_conn = yield connection
        result = r.table(PROJECT_TABLE).insert({ "name": name , "email" : email}, conflict="error").run(threaded_conn)
        print 'log: %s inserted successfully'%result

#Sends the new user joined alerts to all subscribers who subscribed
def send_user_alert():
    while True:
            temp_conn = yield r.connect(RDB_HOST,RDB_PORT,PROJECT_DB)
            feed = yield r.table("users").changes().run(temp_conn)
            while (yield feed.fetch_next()):
                new_user_alert = yield
                for subscriber in subscribers:

class WSocketHandler(tornado.websocket.WebSocketHandler): #Tornado Websocket Handler
    def check_origin(self, origin):
        return True

    def open(self):
        subscribers.add(self) #Join client to our league

    def on_close(self):
        if self in subscribers:
            subscribers.remove(self) #Remove client

if __name__ == "__main__":
    dbSetup() #Check DB and Tables were pre created
    #Define tornado application
    current_dir = os.path.dirname(os.path.abspath(__file__))
    static_folder = os.path.join(current_dir, 'static')
    tornado_app = tornado.web.Application([('/', MainHandler), #For Landing Page (r'/ws', WSocketHandler), #For Sockets
(r'/static/(.*)', tornado.web.StaticFileHandler, { 'path': static_folder }) #Define static folder 

    #Start the server
    server = tornado.httpserver.HTTPServer(tornado_app)
    server.listen(8000) #Bind port 8888 to server

I am going to define database configuration parameters like db_name, table_name etc in seperate file.

import os

RDB_HOST = os.environ.get('RDB_HOST') or 'localhost'
RDB_PORT = os.environ.get('RDB_PORT') or 28015
PROJECT_DB = 'userfeed'

That’s it. We had our and ready. I will explain what I did above in point-wise below.

  • importing tornado tools and rethinkDB client drivers
  • writing a function called db_setup that checks whether required database and table were created or not
  • using MainHandler class to handle http requests. For GET request displaying enter details page and for POST showing the dashboard.
  • WSocketHandler is the tornado websocket handler that adds or removes subscribers.
  • We have one method called send_user_alert . It is the actual pusher of changes to the client.It does only two things. “subscribing to database table change” . “sending those changes to client “

In rethinkdb we have a concept called change feeds. It is similar to Redis PUBSUB.We can subscibe to a particular change-feed and rethindb returns us a cursor which is of infinite length.Whenever db recieves a change in particular table it triggers event to that subscribed cursor with new and old values of data.For example.

#cursor is returned when we subscribe to changes on authors table
cursor = r.table("users").changes().run(connection)

#just loop through it infinitely to grab changes that RethinkDB push to cursor
for document in cursor:

I think you got the thing by now. The other files in our project are templates and static files

  • detail.html
  • home.html
  • sockhand.js

The code for templates is quite obvious. You can find templates here

But we need to look into js file

//function that listens to Socket and do something when notification comes
function listen() {
    var source = new WebSocket('ws://' + + '/ws');
    var parent = document.getElementById("mycol")
    source.onmessage = function(msg) {
              var message = JSON.parse(;
              //Return random color for superhero
              var child = document.createElement("DIV");
              child.className = 'ui red message';
              var text = message['new_val']['name'].toUpperCase() + ' joined the league on '+ Date(); 
              var content = document.createTextNode(text);
              return false;

    console.log('I am ready'); 

Here we are defining a listen function when webpage is loaded. That listen function initializes a variable called source which is of type WebSocket and links it to the /ws url that we defined in the Tornado application. It also sets a callback when a message is recieved and that callback code updates the DOM structure and adds information about new user.

If you are still confused ,then run  application yourselves and see the things. The app we wrote above is a data push engine that routes directly from database to client.  Go to this project link . clone it. Install requirements.txt .Then run visit localhost:8000. If you still have any queries on how it works then feel free to comment below or approach

I thought to introduce rethinkDB for absolute beginners but article becomes very lengthy then.Sure I will come up with an article dedicated for RethinkDB in near future.

In this way we can build a real time data push engine using python and Rethinkdb.

Points to ponder

  • Use rethinkDB for building real time applications.It is scalable too.
  • Use Tornado because it can easily handle concurrent connections without any fuss.
  • Remove queuing from your architecturaal design
  • Use websockets for bidirectional communication
  • Try out new things frequently



7 thoughts on “Build a real time data push engine using Python and Rethinkdb

  1. I’m getting this error:
    Traceback (most recent call last):
    File “”, line 3, in
    import tornado.ioloop
    ImportError: No module named tornado.ioloop
    Please help me out.
    I’m using Linux mint operating system.

      1. Now i’m getting this error:
        Traceback (most recent call last):
        File “”, line 23, in
        db_connection = r.connect(RDB_HOST,RDB_PORT) #Connecting to RethinkDB server
        File “/usr/local/lib/python2.7/dist-packages/rethinkdb/”, line 542, in connect
        return conn.reconnect(timeout)
        File “/usr/local/lib/python2.7/dist-packages/rethinkdb/”, line 475, in reconnect
        return self._instance.connect(timeout)
        File “/usr/local/lib/python2.7/dist-packages/rethinkdb/”, line 360, in connect
        self._socket = SocketWrapper(self, timeout)
        File “/usr/local/lib/python2.7/dist-packages/rethinkdb/”, line 268, in __init__
        (, self.port, ex))
        rethinkdb.errors.RqlDriverError: Could not connect to localhost:28015. Error: Operation timed out.

        When I tried installing rethinkDB on Linux Mint by following your commands I’m getting error. So i followed instructions from this page I think mistake is in proper installing of rethinkDB. It would be great if you could help me out.

  2. I see you code and during post for a new superHero signup you connect to rethinkdb, but in the end you don’t close the connection. In setup method you close it in finally step.

    Do you leave it by purpose and the connection is closed by itself after run(), or you just forgotten?
    I want to know, because it means that in every post there is a new zombie connection created to rethinkdb.

      1. In fact, I just wanted to ask you if you did anything like that in production and how you handled it, because I’m getting “[Errorno 105] No buffer space available” after some while running this?

        After further investigation, I found out that that error means there is no network port available to handle the new connections. Windows is rising the error and crash the application, but Linux just block the entire server before you notice it and don’t let you in anymore, so a restart or a physical access can release it.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s