Система отложенных задач celery.

Концепция AMQP

AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.

ampq

Установка

pip install django-celery

Добавим в requirements.txt

django-celery==3.2.2

Добавим в settings.py

INSTALLED_APPS += ["djcelery"]

Загрузим библиотеку в settings.py.

import djcelery
djcelery.setup_loader()

Запуск демона задач.

./manage.py celeryd

Установим redis server.

sudo apt-get install redis-server

Подключим селери к редису.

BROKER_URL = "redis://localhost/0"

Установка redis пакета

pip install redis==2.10.6

Создадим команду для тестов.

В каталоге myapp/management/commands/name_command.py

Создадим задачу в новом файле tasks.py.

from celery import task

@task()
def order_process():
    print ('Process order task')

Импортируем и выполним эту задачу в команде.

from celery import task

def handle(self, *args, **options):
    print ('Test celery')
    order_process()

order_process() - выполняет синхронно

order_process.delay() - выполняет Асинхронно через селери

Имитируем долгое выполнение.

@task()
def order_process():
    print ('Process order task')
    for i in range(5):
        time.sleep(1)
        print(i)

Подвяжем задачу к созданию объекта модели.

def save(self,*args, **kwargs):
    super().save(*args, **kwargs)
    order_process()

Передадим объект задаче.

order_process.delay(self)

@task()

def order_process(order): print ('Process order %s task' % order.id)

Переодичные задачи.

Расписание берем из базы.

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

Запуск демона периодичных задач.

./manage.py celerybeat

Заменим объект, передаваемый задаче числом в интерфейсе администратора.

@task()
def order_process(order):
    print ('Process order %s task' % order')

Именованные очереди

./manage.py celeryd --queue=order

Именуем задачу.

@task(name='shop.tasks.order_process')
def order_process(order):
    ...

Cоздание роутинга для задач селери в settings.py.

  class CeleryRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
    print "Task: %s" % task
        if task == 'shop.tasks.order_process':
            return {'queue': 'order'}
        else:
            return None

Присвоим этот класс глобально переменной.

CELERY_ROUTES = (CeleryRouter(), )

Теперь необходимо пересохранить запись задания в админ интерфейсе выбрав задачу с новым именем.