Tutorial originally published on Medium (Part 1, Part 2)
We’re going to create a simple cryptocoin price ticker page using Django 2.0, Celery, Redis, and Channels. You don’t need experience with any of the aforementioned technologies to follow the tutorial, though some familiarity will be helpful.
Cryptocoin prices will be fetched from the CryptoCompare API every ten seconds and then published to tickers on our Django-based website via WebSockets whenever they change.
The finished code for this tutorial is in this Github repo.
- First, we’ll set up some Celery workers to periodically fetch and store the prices of some cryptocoins into a local Redis cache.
- Then, we’ll flesh out a Django application that has a dashboard for cryptocoin prices.
Setting up Redis
You need to ensure that you have a local Redis instance running on your machine, which is exposed on port 6379. Redis is an in-memory data structure cache — it’s great for holding data that changes often and doesn’t need to be saved to persistent storage. We’ll be using it as a general Django cache as well to store the canonical prices of different cryptocoins. Since we’re updating the prices so frequently, Redis is a good solution compared to a regular database.
If you are using macOS, you can do brew install redis
then brew services start redis
to get a Redis instance up and running. On Linux, your package/services manager most likely can do something similar. Otherwise, or on Windows, the easiest solution is to use Docker to run a local Redis container with the command docker run -d -p 6379:6379 redis
.
Starting our codebase
We’re going to keep everything in one codebase for ease of use and use Git for version control. Create an empty directory and initialize a Git repository inside it with git init
.
Now let's create a new pipenv environment for our codebase. We’ll be using Python 3.6, and you will need the pipenv
package, which you can get by running pip install pipenv
. Pipenv is a tool for defining Python environments programmatically and for managing your virtualenvs. The Pipfile
contains details of all the required pip packages for our project, kind of like a requirements.txt
file.
Run pipenv install django
to:
- Create a new
Pipfile
for recording the dependencies of our project. - Add Django as a dependency to our
Pipfile
. - Create a new virtual environment and install our
Pipfile
dependencies into it.
All in one command! Now run pipenv shell
to activate our newly created virtual environment — then we can do python -m django startproject ccticker
to start our new Django project.
Setting up Redis as a cache for Django
Do pipenv install django-redis
and add this to your settings.py
file:
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': '127.0.0.1:6379',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
}
},
}
Django will now use your local Redis instance as a backing cache when not in debug mode. For example, it will store compressed, rendered versions of your templates into Redis so that it can serve them up faster to visitors. We can now also set and get arbitrary data into the cache from within our Django code like so:
from django.core.cache import cache
def example():
cache.set('key', 'value', 30)
cache.get('key')
Integrating Celery with Django
Celery is a way to run code outside of Django’s HTTP request/response cycle, like processing uploads or periodic tasks that have nothing to do with web requests. In our case, we will be using Celery to fetch and store prices from CryptoCompare. Run pipenv install celery
and create a celery.py
file inside the ccticker/
directory. Your project structure should now look like this:
ccticker/
├── ccticker/
│ ├── __init__.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ ├── wsgi.py
├── manage.py
├── Pipfile
├── Pipfile.lock
The contents of your celery.py
file should look like this:
# set the default Django settings module for the 'celery' program.
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ccticker.settings')
app = Celery('ccticker')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
Add this to your ccticker/__init__.py
file also:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
And finally, add this to your settings.py
file — this tells Celery that we want it to communicate between processes using our local Redis instance. Redis does a lot of work in this project!
CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
This code sets up Celery so that it will autodiscover any Celery tasks registered inside tasks.py
files inside your Django apps. Tasks are simple Python functions decorated with a @shared_task
decorator.
Run ./manage.py startapp ccapp
and create a tasks.py
file inside it. First, we’re going to write a simple example task so that we can test everything is set up correctly. Add the below to tasks.py
:
from celery import shared_task
@shared_task
def example_task():
import logging
logging.info("This is a test message!")
Don’t forget to register the newly created ccapp
application in your settings.py
like so:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
] + [
'ccapp.apps.CcappConfig',
]
Now we will confirm that Celery workers can run tasks registered like so. Celery workers run in separate processes from your Django server — to start up a Celery worker, run celery -A ccticker worker -l info
from the same directory level as your manage.py
file. In another pipenv shell
, run ./manage.py shell
and execute the following code:
Python 3.6.5 (default, May 25 2018, 18:26:17)
[GCC 4.2.1 Compatible Apple LLVM 9.1.0 (clang-902.0.39.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> import ccapp.tasks
>>> ccapp.tasks.example_task.delay()
<AsyncResult: 97cf4530-60d0-49b3-afba-7764ca3ce1fb>
You call ccapp.tasks.example_task.delay()
instead of ccapp.tasks.example_task()
directly to indicate to send the task to be run by a Celery worker process, instead of running it directly in the shell. You should see “This is a test message” printed to the stdout of your Celery process.
Now we know that sending tasks to Celery from a Django shell process is working. We're going to be bold and assume it will work from the Django server process as well — and now we can write our first task for fetching ticker codes from CryptoCompare. We'll be using the cryptocompy
library to interact with the API, so install it using pipenv install cryptocompy requests
, then add this to your tasks.py
:
from celery import shared_task
from cryptocompy import price
from django.core.cache import cache
@shared_task
def update_cc_prices():
cryptocoins = ['ETH', 'BTC']
currencies = ['EUR', 'USD']
response = price.get_current_price(cryptocoins, currencies)
for cryptocoin in cryptocoins:
for currency in currencies:
ticker_code = cryptocoin + currency
cache.set(ticker_code, response[cryptocoin][currency])
If you want, you can test this task in the same way as with example_task
to make sure it's working. If it works, you should see the prices of cryptocoins appear in your Redis cache — you can look inside your Redis cache using a GUI tool or the redis-cli
command-line tool. Run keys *
to see all key-values stored in your Redis instance, and you should see one for each ticker.
Mon May 28 11:14:42 - ~/src/github.com/jameshiew/cryptocoin-ticker-tutorial/ccticker [0]
[james@Jamess-MacBook-Pro.local] redis-cli
127.0.0.1:6379> keys *
1) ":1:ETHEUR"
2) ":1:ETHUSD"
3) ":1:BTCEUR"
4) ":1:BTCUSD"
127.0.0.1:6379> GET ":1:ETHEUR"
"\x80\x04\x95\n\x00\x00\x00\x00\x00\x00\x00G@|\xc5p\xa3\xd7\n=."
127.0.0.1:6379> exit
If you look at the value of the ticker, you will see it is not a simple number — this is because it is encoded using Python's pickle codec.
We want to run this task every ten seconds. We'll use Celery Beat, a task scheduler especially for Celery, that has good Django integration. Do pipenv install django-celery-beat
and add this to your settings.py
:
# django-celery-beat
INSTALLED_APPS += [
'django_celery_beat'
]
Now run ./manage.py migrate
and ./manage.py createsuperuser
to create an admin user. Run ./manage.py runserver
and in the admin, create a new periodic task like below.
Once you’ve set this up in the admin, it’s time to run:
- A Celery worker process that will actually run the
update_cc_prices
task as and when necessary. - A Celery beat process to call the task as per the schedule.
You can do this with the following two commands in separate pipenv shell
environments:
celery -A ccticker worker -l info
celery -A ccticker beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
If everything has gone smoothly so far, your Redis cache should be getting the latest cryptocoin prices every ten seconds. In the next part of the tutorial, we’ll stream these prices (and changes to them) from our Redis cache to tickers on web pages served up by Django using WebSockets.
We’ve now got a Django project set up with cryptocoin prices periodically being pulled to our local Redis cache. Now we’ll create a very simple dashboard to let users subscribe to receive price updates for different cryptocoin/currency ticker codes, using WebSockets.
WebSockets are a way for a user’s browser to open a connection to our backend and then receive updates in the form of events, without having to poll our backend as it may have to with HTTP. WebSockets are a great match for our use case — once a user has subscribed to a particular ticker code (opened a WebSocket to a particular endpoint), every time the ticker price changes in our backend, we will push out an updated price to all connected WebSockets for that ticker code. We will be using the Channels plugin for handling WebSocket logic on our backend — install it using pipenv install channels
and add the following to your settings.py
.
# channels
INSTALLED_APPS += [
'channels',
]
ASGI_APPLICATION = "ccticker.routing.application"
We also need to create a ccticker/routing.py
file with the following:
from channels.routing import ProtocolTypeRouter
application = ProtocolTypeRouter({
# Empty for now (http->django views is added by default)
})
The way Channels works is it replaces the standard Django WSGI server which only handles HTTP requests with an ASGI server which can handle other protocols (including WebSockets) as well. For this reason, we need to provide a Router
object which routes different protocols to different handlers — in this case, a plain ProtocolTypeRouter
will suffice for now, which we then specify in the ASGI_APPLICATION
setting, and it already sends HTTP requests to Django views by default. We will update it to handle WebSockets later on during the tutorial.
Now we will create our ticker dashboard as a standard Django view. First, add the following to ccapp/views.py
from django.views.generic import TemplateView
class TickerView(TemplateView):
template_name = 'ccapp/tickers.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['ticker_codes'] = [
'BTCUSD',
'BTCEUR',
'ETHUSD',
'ETHEUR',
]
return context
We’re going to have a template at ccapp/templates/ccapp/tickers.html
and in its template context, pass it a list of ticker codes — the same ones we are using in our Celery task. These will be used to construct the dashboard. Connect this up to ccapp/urls.py
like so:
from django.urls import path
from .views import TickerView
urlpatterns = [
path('', TickerView.as_view()),
]
… and then connect up ccapp
URLs to our main ccticker/urls.py
like so:
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('tickers/', include('ccapp.urls')),
]
Now, create ccapp/templates/ccapp/tickers.html
and fill it out with the following code:
<html>
<head>
<title>Tickers</title>
<link
rel="stylesheet"
href="https://maxcdn.bootstrapcdn.com/bootstrap/4.0.0/css/bootstrap.min.css"
integrity="sha384-Gn5384xqQ1aoWXA+058RXPxPg6fy4IWvTNh0E263XmFcJlSAwiGgFAW/dAiS6JXm"
crossorigin="anonymous"
/>
<style>
#container {
margin: 0 auto;
padding: 1em;
width: 300px;
}
.ticker {
width: 200px;
margin: 1em auto;
}
</style>
</head>
<body>
<div id="container">
{% for ticker_code in ticker_codes %}
<div class="ticker input-group">
<div class="input-group-prepend">
<button
type="button"
value="{{ ticker_code }}"
class="btn btn-info"
data-subscribed="false"
>
{{ ticker_code }}
</button>
</div>
<input
id="{{ ticker_code }}"
class="form-control"
placeholder="..."
type="text"
readonly="readonly"
/>
</div>
{% endfor %}
</div>
<script
src="https://code.jquery.com/jquery-3.2.1.slim.min.js"
integrity="sha384-KJ3o2DKtIkvYIK3UENzmM7KCkRr/rE9/Qpg6aAZGJwFDMVNA/GpGFF93hXpG5KkN"
crossorigin="anonymous"
></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.12.9/umd/popper.min.js"
integrity="sha384-ApNbgh9B+Y1QKtv3Rn7W3mgPxhU9K/ScQsAP7hUibX39j7fakFPskvXusvfa0b4Q"
crossorigin="anonymous"
></script>
<script
src="https://maxcdn.bootstrapcdn.com/bootstrap/4.0.0/js/bootstrap.min.js"
integrity="sha384-JZR6Spejh4U02d8jOt6vLEHfe/JQGiRRSQQxSfFWpi1MquVdAyjUar5+76PVCmYl"
crossorigin="anonymous"
></script>
<script>
const sockets = {}
{% for ticker_code in ticker_codes %}{
const tc = '{{ ticker_code }}'
const button = $(`button[value="${tc}"`)
button.on('click', () => {
if (button.attr('data-subscribed') === 'false') {
button.attr('data-subscribed', 'true')
button.toggleClass('btn-info btn-success')
sockets[tc] = new WebSocket(`ws://${window.location.host}/ws/${tc.substring(0, 3)}/${tc.substring(3, 6)}`)
sockets[tc].onmessage = event => {
console.log(event)
const data = JSON.parse(event.data)
button.css({
'background-color': 'green',
'transition': 'background-color 0.2s linear',
})
if ('price' in data) {
$(`#${tc}`).val(data['price'])
}
setTimeout(() => {
button.css({
'background-color': '',
'transition': '',
})
}, 200)
}
} else {
button.attr('data-subscribed', 'false')
button.toggleClass('btn-success btn-info')
if (sockets[tc]) {
sockets[tc].close()
}
}
})
}{% endfor %}
</script>
</body>
</html>
In summary, the code of this dashboard is as follows:
- for each
ticker_code
from the template context, we’re creating a<button>
element to toggle a ticker subscription and an<input readonly>
element to contain the price - when the
<button>
is pressed, we open anew WebSocket
connection to e.g.ws://localhost:8000/ws/BTC/USD
- we await messages on the WebSocket connection with a
price
key — when we receive a message with this key, we update the<input readonly>
element accordingly and flash the<button>
‘s background color to a different shade of green - if the
<button>
is pressed again, we close the WebSocket connection
If you now ./manage.py runserver
and go to http://localhost:8000/tickers
, you should see a page with the four ticker buttons and empty input elements which will contain the prices. Pressing a button toggles a WebSocket connection for that ticker — currently, the dashboard will not work as we have not yet set up the WebSockets on our backend. If you toggle one of the buttons, you will see errors in your web console like so:
We’re now going to write the WebSocket handlers. In Channels, these are known as Consumer
s, analogous to standard Django View
s for HTTP requests. Create a new file ccapp/consumers.py
and add the following code:
# ccapp/consumers.py
from channels.generic.websocket import WebsocketConsumer
import json
from django.core.cache import cache
class TickerConsumer(WebsocketConsumer):
def connect(self):
cryptocoin = self.scope['url_route']['kwargs']['cryptocoin']
currency = self.scope['url_route']['kwargs']['currency']
self.ticker_code = cryptocoin + currency
super().connect()
self.send(text_data=json.dumps({
'message': f'connected'
}))
self.price_update({
'price': cache.get(self.ticker_code)
})
def price_update(self, event):
price = event['price']
# Send message to WebSocket
self.send(text_data=json.dumps({
'price': price,
}))
When a WebSocket connect()
s, we parse out the URL kwargs cryptocoin
and currency
, then send back a couple of events — one a message indicating that the connection was successful, then another with the current cryptocoin price as gotten from our Redis cache (the price_update()
method). We send events as JSON strings using the self.send(text_data=...)
method.
Now we need to connect up this Consumer
so that it is reachable at an endpoint. Add the following to your ccapp/urls.py
from .consumers import TickerConsumer
websocket_urlpatterns = [
path('ws/<str:cryptocoin>/<str:currency>', TickerConsumer),
]
… and update ccticker/routing.py
to the following
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import ccapp.urls
application = ProtocolTypeRouter({
# Empty for now (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
ccapp.urls.websocket_urlpatterns
)
),
})
If you go to the dashboard now, you’ll notice you can now receive a price for a ticker when you press the associated button.
Great! The final step is to update our update_cc_prices
Celery task so that it pushes price updates down the WebSockets whenever the price updates on CryptoCompare.
To do this, we’re going to introduce a new concept called groups. Our backend can have multiple Consumer
instances running — for example, we may have three users, each subscribed to different tickers on their dashboards. User #1 and User #2 may be subscribed to the BTCUSD ticker, and so on our backend we will have two TickerConsumer
instances open at ws://localhost/ws/btc/usd
, one for each user. When the BTCUSD price updates on our backend, we want to send price_update
events down these WebSocket connections with the new price, similar to how we did with the initial price. The way we do this is by grouping TickerConsumer
instances by ticker name, and then using a handle to the group in our update_cc_prices
task to send the event accordingly.
First, once again Redis is being used as a backing cache — in this case, for storing details of Consumer
groups. pipenv install channels_redis
and add the following to settings.py
:
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
Replace ccapp/consumers.py
with the following code:
# ccapp/consumers.py
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import json
from django.core.cache import cache
class TickerConsumer(WebsocketConsumer):
def connect(self):
cryptocoin = self.scope['url_route']['kwargs']['cryptocoin']
currency = self.scope['url_route']['kwargs']['currency']
self.ticker_code = cryptocoin + currency
async_to_sync(self.channel_layer.group_add)(
self.ticker_code,
self.channel_name
)
super().connect()
self.send(text_data=json.dumps({
'message': f'connected'
}))
self.price_update({
'price': cache.get(self.ticker_code)
})
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)(
self.ticker_code,
self.channel_name
)
super().disconnect(close_code)
def price_update(self, event):
price = event['price']
# Send message to WebSocket
self.send(text_data=json.dumps({
'price': price,
}))
The new code is as follows:
- when we first
connect()
, we add theTickerConsumer
instance to a group with name = the ticker code e.g. BTCUSD or ETHEUR - when we
disconnect()
, we remove theTickerConsumer
instance from the group
Now our TickerConsumer
instances are being grouped, let’s update the update_cc_prices
task in ccapp/tasks.py
to the following:
...
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync@shared_task
def update_cc_prices():
cryptocoins = ['ETH', 'BTC']
currencies = ['EUR', 'USD']
response = price.get_current_price(cryptocoins, currencies)
channel_layer = get_channel_layer()
for cryptocoin in cryptocoins:
for currency in currencies:
latest_price = response[cryptocoin][currency]
ticker_code = cryptocoin + currency
if cache.get(ticker_code) != latest_price:
cache.set(ticker_code, response[cryptocoin][currency])
async_to_sync(channel_layer.group_send)(
ticker_code,
{
'type': 'price_update',
'price': latest_price,
}
)
The new code checks the latest price against the current price in the Redis cache, and if it is different, sends out a price_update
event to the appropriate ticker_code
group.
Restart your Celery worker as we have updated the code, and your dashboard should now be updating with new prices! That’s it! Further improvements to the code could be to keep the cryptocoin and currency codes all in one place rather than repeating them all over the place as was done here — or even keeping them in Django models for easy editing in the admin.