My django app needs to periodically fetch and process data from a 3rd party API. I need a task queue.
Celery is the big dog. It's well worth a look. It's great but I'm not going to use it. django-background-tasks is a simpler option. It uses a relational database to store the jobs and cron as the scheduler. It can get the job done but I'm not going to use it either.
I'm going to use a combination of redis, python-rq, and rq-scheduler. For me python-rq hits the sweet spot between complexity and flexibility.
Redis is a rock solid in-memory data structure store. It has data structures like lists and hashes, and commands like rpush, lpop, blpop that make it an excellent choice for implementing queues. Python-rq implements a simple API to queue jobs and run them using workers. Jobs are python function that are pickled, given an id, and stored in a redis hash. The id is then added to a redis list. Workers are python processes that run jobs by popping an id from the list, unpickling the function, fork()ing, and then executing the function.
The API is small and clean. I create a queue instance with the Queue()
API passing it the redis connection. I then enqueue a job onto the queue with the Queue.enqueue()
API. I then run the command line script rq worker
that pops jobs from queues, fork()'s and executes them. I can even create a custom worker classes if I need different behavior from rq worker
.
It is instructive to get this all going outside of django.
Queue Example (OS X)
- Install redis with
% brew install redis
- Start redis with the default settings in a new terminal window
% redis-server
- Create a new pyenv environment for playing with python-rq and redis
- Create a module called
work.py
containing a function that will be performed by the job: - Create a module called
jobs.py
containing code to enqueue a job:import time from rq import Queue from redis import Redis from work import fib def queue_job(): # Tell rq to connect to redis on the default port and use the default queue redis_conn = Redis() q = Queue(connection=redis_conn) # Enqueue a job to calculate the 100th Fibonacci number job = q.enqueue(fib, 100) # Print the job.result immediately after enqueuing. 'None' means the job hasn't finished. print(job.result) # => None # Now, wait a few seconds, until the worker is finished time.sleep(3) print(job.result) # => 354224848179261915075 if __name__ == '__main__': queue_job()
- Start rq's worker process in a new terminal window
% rq worker
- Create a new terminal window and arrange it so you can see it and the terminal window created in step 6. Run
jobs.py
to enqueue the jobs with% python jobs.py
.
% mkdir redis
% cd redis
% pyenv virtualenv redis
% pyenv local redis
% pip install rq
% pip install rq-scheduler
# Get the n'th number in the Fibonacci sequence
def fib(n):
a, b = 1, 1
for i in range(n - 1):
a, b = b, a + b
return a
In #7 terminal you should see:
% python jobs.py
None
354224848179261915075
In #6 terminal you should see something like:
21:18:14 RQ worker 'rq:worker:Suprise.90196' started, version 0.5.6
21:18:14
21:18:14 *** Listening on default...
23:08:21 default: work.fib(100) (25dddb0b-586f-43e3-8ba1-c107a6eaa9a6)
23:08:21 Job OK
23:08:21 Result is kept for 500 seconds
23:08:21
23:08:21 *** Listening on default...
The jobs.py
module added a job to the default redis queue and the rq worker
process popped the job and executed it. Very cool, but what about scheduling jobs in the future? That's where rq-scheduler comes in. rq-scheduler implements a simple API to schedule jobs and a process that polls redis every minute and moves scheduled jobs to queues when they need to be executed.
Scheduling Example
- In a new terminal window start the rq-scheduler process with default redis settings but change the poll interval to 5 seconds
% rqscheduler -i 5
. - Create a module called
schedule.py
containing code that schedules a couple of jobs: - Schedule the jobs with
python schedule.py
.
from datetime import datetime, timedelta, timezone
from redis import Redis
from rq_scheduler import Scheduler
from work import fib
def schedule_job():
# Get a scheduler for the "default" queue on the default redis server
scheduler = Scheduler(connection=Redis())
now = time_to_str(datetime.now())
# Schedule a job, to find the 10th Fibonacci number, to be run in 10 seconds time.
# The scheduler expects utc dates.
job1_at = datetime.utcnow() + timedelta(seconds=10)
scheduler.enqueue_at(job1_at, fib, 10)
print("scheduling job1 at {} to run at {}".format(now, time_to_str(utc_to_local(job1_at))))
# schedule a second job, to find the 20th Fibonacci number, to be run in 20 seconds time
job2_at = datetime.utcnow() + timedelta(seconds=20)
scheduler.enqueue_at(job2_at, fib, 20)
print("scheduling job2 at {} to run at {}".format(now, time_to_str(utc_to_local(job2_at))))
def time_to_str(date):
return date.strftime('%H:%M:%S')
def utc_to_local(utc):
return utc.replace(tzinfo=timezone.utc).astimezone(tz=None)
if __name__ == '__main__':
schedule_job()
python schedule.py
.The schedule.py
terminal should just print out the time the jobs were scheduled:
% python schedule.py
scheduling job1 at 00:14:50 to run at 00:15:00
scheduling job2 at 00:14:50 to run at 00:15:10
The rqscheduler
terminal should look something like this:
% rqscheduler -i 5
00:14:46 Running RQ scheduler...
00:14:46 Checking for scheduled jobs...
00:14:51 Checking for scheduled jobs...
00:14:56 Checking for scheduled jobs...
00:15:01 Checking for scheduled jobs...
00:15:07 Checking for scheduled jobs...
00:15:12 Checking for scheduled jobs...
The rq worker
terminal (created in #6 in the queue example) should look something like this:
% rq worker
00:14:44 RQ worker 'rq:worker:Suprise.95509' started, version 0.5.6
00:14:44
00:14:44 *** Listening on default...
00:15:02 default: work.fib(10) (9ecdad40-77a6-40ad-a689-36a147a5d465)
00:15:02 Job OK
00:15:02 Result is kept for 500 seconds
00:15:02
00:15:02 *** Listening on default...
00:15:12 default: work.fib(20) (a144d888-b630-4d47-93fb-2873ff47841d)
00:15:12 Job OK
00:15:12 Result is kept for 500 seconds
As you can see rqscheduler
polls redis every 5 seconds 'checking for scheduled jobs...'. When it finds a job that was scheduled to run since the last time it polled redis it moves it to the 'default' queue. The worker process then pops the job from the queue and executes the job.
I've only covered the very basics here but the docs and source are great.
Integration with django
The whole point of this was to add task queues to my django app. That is where django-rq comes in. It is a django app that makes it easy to setup queues in settings.py. It wraps some of the python-rq and rq-scheduler APIs to use the queues defined in settings.py. It includes a dashboard to monitor queues and django management commands that run workers and the scheduler with the django context so you can access your models from your task code.
The documentation is the best place to get started but it boils down to :
- Install django-rq with
pip install django-rq
- Add django-rq to INSTALLED_APPS:
- Add the RQ_QUEUES setting to settings.py and define the queues you want to use:
- Add the django-rq views to your URLConf
url(r'^rq/', include('django_rq.urls')),
- Start the workers for your queues
python manage.py rqworker default low
. - Start the scheduler
python manage.py rqscheduler
INSTALLED_APPS = (
# other apps
"django_rq",
)
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
},
'low': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
}
}
You can now add jobs to queues with django_rq.get_queue('low').enqueue()
or use the @job
decorator. You can add jobs to the scheduler with django_rq.get_scheduler('low').enqueue_at()
. You can monitor the queues at /rq/
.
Tips
- Structure job code well. Decompose functionality into helper modules and keep the job code simple and clean.
- Be careful what you put on a queue. Jobs and their arguments are pickled and then executed in a separate context.
- If you find that you need multiple instances of redis remember that workers spun up with
rqworker
can only monitor queues in the same instance. The solution is to spin up additional workers for the other redis instances. For example, if I had started two instances of redis with one queue on each, default on port 6379 and low on 6380, I would need to start two workers:python manage.py rqworker default
andpython manage.py rqworker low
. - Put
rqworker
andrqscheduler
under supervisord or some other process monitoring system. - When python-rq jobs fail they get appended to the
failed
queue but you should configure custom exception handling for your jobs. Django-rq supports configuring exception handlers for queues you define in settings.py. - Log like you mean it. You will need to piece things together after the inevitable failure so prepare for it.
- Channel your inner Sting and monitor everything about your queues and logs.
Cover photo of a Female Corynura Bee
Comments !