Abdullah Diab’s Blog

Python Celery Asynchronous Task Decorator

A while ago I was working on a project in Django that had many tasks that needed to be executed asynchronously such as sending emails, generating reports, and checking some database related issues.

Executing asynchronous tasks in Python can be made in multiple ways, one of them is using celery, a distributed task queue written in Python which has proved itself to be a great solid task queue system.

And if you’re using Django then there is a Django app to make your life easier, django-celery.

Now as always I like to make my own life easier also by implementing generic solutions for my needs, so I have written this simple piece of code to create a function decorator that declares the function as an asynchronous task, which means that by just adding this decorator to the function it becomes asynchronous.

So let’s get started!

Description

The following code will allow you to decorate Python functions with a decorator that will allow them to be executed asynchronously in a seamless easy way. You don’t have to change any thing in your code, just add the asynchronous_task decorator to the functions you want them to be executed asynchronously and you’re done.

This code can be used with Django also, just uncomment the Django block in it and it’ll work.

I wrote this code in a file called celery_tasks.py which probably the place you’ll have your all celery tasks, it adds a task called execute that is responsible for executing the passed function and parameters.

import os, sys
from celery.task import task

sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
sys.path.insert(0, os.path.dirname(__file__))

##########################
# Uncomment the following code block to enable it to work with Django.
# Either put this file next to the settings.py file, or change the block code to import your settings module from the correct path.
##########################

#from django.core.management import setup_environ
#import settings
#setup_environ(settings)

@task(name='execute')
def execute(module_name, method_name, *args, **kwargs):
	try:
		m = __import__(module_name)
		if m.__name__ != module_name:
			modules = module_name.split('.')[1:]
			for module in modules:
				m = getattr(m, module)
		f = getattr(m, method_name)
		if hasattr(f, 'func'):
			f = f.func
		return f(*args, **kwargs)
	except Exception, e:
		raise e

The code looks like a mess I know, but simply what it does is that you pass this task the following parameters:

  1. Python module name to get the function from
  2. Method name to be executed
  3. Positional and named parameters

The second part is defining the decorator, lets assume that this file is called celery_decorator.py:

Note that I’m assuming that celery_decorator.py is in the same folder of your celery_tasks.py file.

from django.utils.decorators import available_attrs
from django.utils.functional import wraps

def asynchronous_task(check_if_active = False):
	def wrapper(f):
		try:
			@wraps(f, assigned=available_attrs(f))
			def _wrapper(*args, **kwargs):
				try:
					if check_if_active:
						from celery.task.control import inspect
						insp = inspect()
						d = insp.active()
						if not d:
							raise Exception
					from celery_tasks import execute
					execute.delay(f.__module__, f.func_name, *args, **kwargs)
				except Exception as e:
					f(*args, **kwargs)
			_wrapper.func = f
			return _wrapper
		except Exception:
			return f
	return wrapper

This decorator simply wraps the function in a call to the execute task on celery, so that it’ll be executed asynchronously, it accept an optional parameter that defines whether the decorator should check if celery is running and if it’s not then execute the function synchronously, it is set to False by default because it takes some time to check celery, so it makes the performance poor a little bit.

Now we can us it on any function inside our code:

from celery_decorator import asynchronous_task

#Default not to check if celery is running
@asynchronous_task
def a_very_long_process(p1, p2):
    pass #TODO: Do something

#Force check
@asynchronous_task(check_if_active = True)
def another_very_long_process(p1, p2):
    pass #TODO: Do something

Now in your code you just simply don’t change anything, all the calls to these two functions will be asynchronous calls.

Download

The source code is licensed under the GNU Public License (GPL).

You can download a copy of celery_tasks.py and celery_decorators.py from this Gist: https://gist.github.com/3874979

Hope this helps somebody to perform faster 🙂