一、关于celery

celery是异步任务框架

image

使用前提,需要下载第三方模块

pip install celery

如果是Windows系统还需要额外模块支撑

pip install evetlet

更多详细内容请看官方文档资源
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

二、celery架构的构成

1 任务中间件 Broker,

其他服务提交的异步任务,放在里面排队
需要借助于第三方 redis rabbitmq

2 任务执行单元 worker

真正执行异步任务的进程
celery提供的

3 结果存储 backend

结果存储,函数的返回结果,存到 backend中
需要借助于第三方:redis,mysql

三、celery的应用场景

1. 异步执行:解决耗时任务

2. 延迟执行:解决延迟任务

3. 定时执行:解决周期任务

四、使用方法

第一步:搞清楚包的目录结构

image

第二步:在celery.py文件中写一下代码

from celery import Celery  # 导入celery模块
'''broker和backend对应的数据库可以是同一个,指定不同的两张表'''
broker = 'redis://127.0.0.1:6379/1'  # broker对应一个数据库 执行任务列表在这里
backend = 'redis://127.0.0.1:6379/2'  # backend对应一个数据库 执行结果列表在这里
'''写的各种任务在include=[]列表中注册'''
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])

第三步:在包内部,写task,任务异步任务

order_task.py文件

from .celery import app
import time
@app.task
def add(a, b):
    print('-----', a + b)
    time.sleep(2)
    return a + b

user_task.py文件

from .celery import app
import time
@app.task
def send_sms(phone, code):
    print("给%s发送短信成功,验证码为:%s" % (phone, code))
    time.sleep(2)
    return True

第四步:启动worker ,包所在目录下

celery  -A celery_task  worker -l info -P eventlet

image

第五步:其他程序 提交任务,被提交到中间件中,等待worker执行

因为worker启动了,就会被worker执行
在Django框架的其他任务,这就是两个框架互不相干,而关键时刻还拉一把,帮帮忙

from celery_task.user_task import send_sms
# 同步操作
# res = send_sms('999999',88888)
# print(res)


# 异步步操作
res = send_sms.delay('999999',88888)
print(res)  # a45ced2a-c285-41e1-9e40-643863d8467f

第五步:worker执行完,结果存到backend中

第六步:咱们要看执行结果,拿到执行的结果

from celery_task import app
from celery.result import AsyncResult
id = '7d39033c-4cc7-4af2-8d78-e62c277db183'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')
#1 异步任务   
	任务.delay(参数) 
# 延迟任务
	任务.apply_async(args=[参数],eta=时间对象) # 如果没有修改时区,需要使用utc事件
# 定时任务
	-需要启动beat和启动worker
    	-beat    定时提交任务的进程---》配置在app.conf.beat_schedule的任务
        -worker  执行任务的
        
    ### 使用步骤
    	# 第一步:在celery的py文件中写入
        app.conf.timezone = 'Asia/Shanghai'
        # 是否使用UTC
        app.conf.enable_utc = False
        # celery的配置文件#####
        # 任务的定时配置
        app.conf.beat_schedule = {
            'send_sms': {
                'task': 'celery_task.user_task.send_sms',
                # 'schedule': timedelta(seconds=3),  # 时间对象
                # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
                'schedule': crontab(hour=9, minute=43),  # 每天9点43
                'args': ('18888888', '6666'),
            },
        }
        
        ## 第二步:启动beat
        celery -A celery_task beat -l info
        
        ## 第三步:启动worker
        celery -A celery_task worker -l info -P eventlet
        
        
        
        
# 注意点:
	1 启动命令的执行位置,如果是包结构,一定在包这一层
    2 include=['celery_task.order_task'],路径从包名下开始导入,因为我们在包这层执行的命令