Python Celery Asynchronous Task Decorator
A while ago I was working on a project in Django that had many tasks that needed to be executed asynchronously such as sending emails, generating reports, and checking some database related issues.
Executing asynchronous tasks in Python can be made in multiple ways, one of them is using celery, a distributed task queue written in Python which has proved itself to be a great solid task queue system.
And if you’re using Django then there is a Django app to make your life easier, django-celery.
Now as always I like to make my own life easier also by implementing generic solutions for my needs, so I have written this simple piece of code to create a function decorator that declares the function as an asynchronous task, which means that by just adding this decorator to the function it becomes asynchronous.
So let’s get started!
Description
The following code will allow you to decorate Python functions with a decorator that will allow them to be executed asynchronously in a seamless easy way. You don’t have to change any thing in your code, just add the asynchronous_task
decorator to the functions you want them to be executed asynchronously and you’re done.
This code can be used with Django also, just uncomment the Django block in it and it’ll work.
I wrote this code in a file called celery_tasks.py
which probably the place you’ll have your all celery tasks, it adds a task called execute
that is responsible for executing the passed function and parameters.
import os, sys from celery.task import task sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, os.path.dirname(__file__)) ########################## # Uncomment the following code block to enable it to work with Django. # Either put this file next to the settings.py file, or change the block code to import your settings module from the correct path. ########################## #from django.core.management import setup_environ #import settings #setup_environ(settings) @task(name='execute') def execute(module_name, method_name, *args, **kwargs): try: m = __import__(module_name) if m.__name__ != module_name: modules = module_name.split('.')[1:] for module in modules: m = getattr(m, module) f = getattr(m, method_name) if hasattr(f, 'func'): f = f.func return f(*args, **kwargs) except Exception, e: raise e
The code looks like a mess I know, but simply what it does is that you pass this task the following parameters:
- Python module name to get the function from
- Method name to be executed
- Positional and named parameters
The second part is defining the decorator, lets assume that this file is called celery_decorator.py
:
Note that I’m assuming that celery_decorator.py
is in the same folder of your celery_tasks.py
file.
from django.utils.decorators import available_attrs from django.utils.functional import wraps def asynchronous_task(check_if_active = False): def wrapper(f): try: @wraps(f, assigned=available_attrs(f)) def _wrapper(*args, **kwargs): try: if check_if_active: from celery.task.control import inspect insp = inspect() d = insp.active() if not d: raise Exception from celery_tasks import execute execute.delay(f.__module__, f.func_name, *args, **kwargs) except Exception as e: f(*args, **kwargs) _wrapper.func = f return _wrapper except Exception: return f return wrapper
This decorator simply wraps the function in a call to the execute
task on celery, so that it’ll be executed asynchronously, it accept an optional parameter that defines whether the decorator should check if celery is running and if it’s not then execute the function synchronously, it is set to False
by default because it takes some time to check celery, so it makes the performance poor a little bit.
Now we can us it on any function inside our code:
from celery_decorator import asynchronous_task #Default not to check if celery is running @asynchronous_task def a_very_long_process(p1, p2): pass #TODO: Do something #Force check @asynchronous_task(check_if_active = True) def another_very_long_process(p1, p2): pass #TODO: Do something
Now in your code you just simply don’t change anything, all the calls to these two functions will be asynchronous calls.
Download
The source code is licensed under the GNU Public License (GPL).
You can download a copy of celery_tasks.py
and celery_decorators.py
from this Gist: https://gist.github.com/3874979
Hope this helps somebody to perform faster 🙂