Django Celery 任务异步化

django celery 2017年01月11日 星期三

一个项目其中多多少少会遇到需异步完成的任务,通俗点将就是,在一个请求正常结束之后,需额外执行其他的任务作业。这样的例子,如 缓存的更新,log的记录,短信发送,定时任务 等待一系列只需要后台任务完成即可,不需要前端一直等待。

对于django框架,celery是一个很好的解决方案,其中 django-celery 这个库做了很好的封装,接下来介绍下如何优雅的在django项目中使用celery以及如何让celery工作起来。

1. 使用pip安装 django-celery 库

2. settings设置相关变量 

目前大致有两种方式设置  BROKER_URL   和   CELERY_RESULT_BACKEND  ,redis + redis 和 rabbitmq  + redis ,前者相对简单,两个大同小异,差别不是很大。

# redis 设置
REDIS_HOST = 'redis.host'
REDIS_PORT = 6379
REDIS_CELERY_RESULT_DB = 6
REDIS_CELERY_BROKER_DB = 7

# celery 设置
import djcelery
djcelery.setup_loader()

# BROKER_URL = 'amqp://username:password@host:port/vhost'
BROKER_URL = 'redis://%s:%s/%s' % (REDIS_HOST, REDIS_PORT, REDIS_CELERY_BROKER_DB)
CELERY_RESULT_BACKEND = 'redis://%s:%s/%s' % (REDIS_HOST, REDIS_PORT, REDIS_CELERY_RESULT_DB)

我比较习惯自定义celery tasks路径,如工程名称为 DEMO,统一所有的celery任务都在DEMO/tasks 目录下, 有个log.py 处理日志上传,cron.py 定时任务。

# Celery Tasks Path
CELERY_IMPORTS = (
    'DEMO.tasks.log',
    'DEMO.tasks.cron',
)

若需要定时任务的时候可自行设置间隔时间, 如在cron文件中有个cron_task任务。

# Celery 定时任务
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    'cron_task': {
        'task': 'DEMO.tasks.cron.cron_task',
        'schedule': timedelta(seconds=60),
        'args': (60, )
    },
}

这样设置就可以在命令行中运行,若没有报错则恭喜现在已经成功了。

python manage.py celeryd -B -l info

3. 使用对应queue进行路由各个任务,实现任务分离高效率处理任务,解决有些任务花费时间太久阻塞其他任务,在settiing设置相关路由。

class TasksRouter(object):
    @classmethod
    def route_for_task(cls, task, args=None, kwargs=None):
        if task.startswith('DEMO.tasks.cron'):
            return {'queue': 'cron_queue'}
        elif task.startswith('DEMO.tasks.log'):
            return {'queue': 'log_queue'}
        else:
            return {'queue': 'extra_queue'}

CELERY_ROUTES = (TasksRouter(), )

这样就会根据各个任务的名称进行分配独立的queue去处理,也好单独记录log。

在运行上面也有一点差异,就是需要使用Q函数。

python manage.py celeryd -Q cron_queue -B -l info
python manage.py celeryd -Q log_queue,extra_queue -l info

其中选项-Q知道队列,-B定时任务开启beat,-l 设置log等级,具体其它参数可运行python manage.py celeryd -h获取其含义。

若是使用了rabbitmq为BROKER_URL,这个有一定的差别,可在在settings里设置exchange & queue ,如:

from kombu import Queue, Exchange

# 设置交换机
cron_exchange = Exchange('cron', type='topic')
log_exchange = Exchange('log', type='topic')
extra_exchange = Exchange('extra', type='topic')

# 设置队列
CELERY_QUEUES = (
    Queue('cron_queue', cron_exchange, routing_key='cron.#'),
    Queue('extra_queue', extra_exchange, routing_key='extra.#'),
    Queue('log_queue', log_exchange, routing_key='log.#'),
)

以上内容可参看官方文档,http://docs.celeryproject.org/en/latest/index.html