Celery

author::牛哄哄的celery


一、Celery

1 定义

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

Celery的架构由三部分组成:

  • 消息中间件(message broker):Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括RabbitMQ,Redis等等任务执行单元。
  • 人物执行单元(worker):Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
  • 人物执行结果存储(task result store):Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等

消息处理顺序:

  1. User:生产者,产生消息的一方。如:Django。
  2. message broker:消息中间件,管理消息的分发。如:RabbitMQ、Redis。
  3. worker:执行单元,异步任务的执行者。如:Celery。
  4. task result store:存储结果单元,存储处理后的结果,如:Redis。

Celery特点:

  • Simple(简单):Celery 使用和维护都非常简单,并且不需要配置文件。

  • Highly Available(高可用):woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。

  • Fast(快速):单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

  • Flexible(灵活):Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

2 安装

安装:pip install -U Celery

使用Redis作为Broker时需要额外安装:celery-with-redis

3 简单使用

异步任务不是直接运行,而是用命令执行

启动:celery -A celery_task worker -l info -P eventlet

  • -A:对应的py文件
  • -l:指定日志模式
  • -P:指定模块,此处是告知celery用eventlet开启协程。
  • -c:并发数量(-c 10

下述案例执行后可以观察到:两个任务几乎同时执行,同时过了5秒后两个任务几乎同时结束,说明任务是异步执行的。

# 生产者:celery_task.py

# 从消费者中导入异步函数
from celery_task import send_email, send_msg

# 发送数据
result = send_email.delay("yuan")
print(result.id)

# 发送数据
result = send_msg.delay("msg")
print(result.id)
# 消费者:produce_task.py
import celery
import time

# 定义broker和task result store指向的库(Redis)
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'

# 使用RabbitMQ则是如下代码
# broker = 'amqp://guest:guest@localhost:5672//'
# backend= 'rpc://',

# 创建任务(celery名称,存储库,消息库)
app=celery.Celery('test', broker=broker, backend=backend)

# 定义异步任务
@app.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成"%name)
return "ok"

@app.task
def send_msg(name):
print("向%s发送消息..."%name)
time.sleep(5)
print("向%s发送消息完成"%name)
return "ok"
# 获取任务结果:result.py

from celery.result import AsyncResult
# 从消费者中导入cel
from celery_task import cel

# 新建异步结果(id是指向对应的异步请求的id值,需要哪个传入哪个)
async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)

# 结果判断
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')

4 模块化

为了更好的使用celery,我们可以将celery的生产者解耦开来。先创建一个celery_tasks文件夹,然后在文件夹下创建如下文件:

  • celery.py:celery配置文件
  • taskxx.py:任务文件
  • check_result.py:结果检测文件

创建好后用命令启动,同时执行生产者代码即可。

# celery文件
from celery import Celery

app = Celery('celery_demo',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# 包含以下多个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['celery_tasks.task01',
'celery_tasks.task02'
])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# task01.py
import time
from celery_tasks.celery import app

# 定义异步任务
@cel.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成"%name)
return "ok"
# task02.py
import time
from celery_tasks.celery import app

@cel.task
def send_msg(name):
print("向%s发送消息..."%name)
time.sleep(5)
print("向%s发送消息完成"%name)
return "ok"
# check_result.py
from celery.result import AsyncResult
from celery_task import app

'''验证任务的执行状态的'''


def check_task_status(task_id):
'''
任务的执行状态:
PENDING :等待执行
STARTED :开始执行
RETRY :重新尝试执行
SUCCESS :执行成功
FAILURE :执行失败
:param task_id:
:return:
'''
result = AsyncResult(id=task_id, app=app)
dic = {
'code': 400,
'type': result.status,
'message': '',
'data': '',

}
if result.status == 'PENDING':
dic['message'] = '任务等待中'
elif result.status == 'STARTED':
dic['message'] = '任务开始执行'
elif result.status == 'RETRY':
dic['message'] = '任务重新尝试执行'
elif result.status == 'FAILURE':
dic['message'] = '任务执行失败了'
elif result.status == 'SUCCESS':
result = result.get()
dic['message'] = '任务执行成功'
dic['data'] = result
dic['code'] = 200
# result.forget() # 将结果删除
# async.revoke(terminate=True) # 无论现在是什么时候,都要终止
# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
return dic

5 简单定时任务

定义生产者任务为定时任务,此时调用生产者任务的函数不再是delay,而是apply_async。此时该任务也不是个异步任务,而是一个定时任务。

# 生产者
from celery_task import send_email
from datetime import datetime, timedelta

# 获取当前时间并增加15s
current_time = datetime.now()
delayed_time = current_time + timedelta(seconds=15)
time_utc = datetime.utcfromtimestamp(delayed_time.timestamp())
print(time_utc)

# 使用延迟15s后的时间作为任务的预期执行时间
# eta是任务开始执行的事件,没有eta该任务等同于delay()
result = send_email.apply_async(args=["alex",], eta=time_utc)
# 打印任务ID
print(result.id)

6 模块化定时任务

celery_tasks文件夹:

  • celery.py:需要新增配置
  • taskxx.py:不变
  • check_result.py:不变

配置好了以后,需要通过celery beat命令插入任务,从而定时性执行add-every-10-seconds

启动定时任务:celery -A celery_task beat。启动后每隔设定的时间会向数据库中加入任务。

之后再使用celery -A celery_task worker -l info -P eventlet查看结果。使用前,可能数据库中会有之前的残余任务,如果确定是不需要的任务就先删除之前的任务。

注意:一定要带-P eventlet。不开协程会一直卡主。

# celery.py
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

# 删除历史遗留任务防止里面的任务直接被运行
import redis
r = redis.Redis(host="127.0.0.1", port=6379, db=10)

# 清空当前数据库
r.flushdb()

# 查看任务(可能查看不到)
# for i in r.lrange("celery",0,-1):
# print(i)

app = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
'celery_tasks.task01',
'celery_tasks.task02',
])

app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

app.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_tasks.task01.send_email',
# 每隔2秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
# 延时6秒后只执行一次
'schedule': timedelta(seconds=6),
# 传递参数
'args': ('张三',)
},
# 'add-every-12-seconds': {
# 'task': 'celery_tasks.task01.send_email',
# 每年4月11号,8点42分执行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': ('张三',)
# },
}

二、Django使用Celery

1 基础配置

Django使用celery

在Django中,可以新建一个文件夹专门处理异步任务,和app同级,命名如下:

my_celery(所有celery任务):

  • config.py(配置文件)
  • main.py(基本文件)
  • email(处理邮箱):
    • tasks.py(必须命名为tasks.py)
  • sms(处理信息):
    • tasks.py(必须命名为tasks.py)
# config.py(里面的变量名不要修改)
broker_url = "redis://127.0.0.1:6379/14"
result_backend = "redis://127.0.0.1:6379/15"
# main.py
# 主程序
import os
from celery import Celery

# 把celery和django进行组合,识别和加载django的配置文件(和manage.py的配置一致)
# 注意:配置一定要放在创建实例前,不然就会报错
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xxxx.settings')

# 创建celery实例对象
app = Celery("sms")

# 通过app对象加载配置
app.config_from_object("my_celery.config")

# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称(因此名称必须是tasks.py)
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["my_celery.sms",])

# 初始化执行某个任务
app.send_task('my_celery.orders.tasks.start_check_charging_status')

# 启动Celery的命令
# 强烈建议切换目录到my_celery的根目录下启动(不是my_celery里面)
# celery -A my_celery.main worker --loglevel=info
# tasks.py
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from my_celery.main import app
import time

import logging
log = logging.getLogger("django")

# name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
@app.task(name="send_sms")
def send_sms(mobile):
"""发送短信"""
print("向手机号%s发送短信成功!"%mobile)
time.sleep(5)

return "send_sms OK"

@app.task
def send_sms2(mobile):
print("向手机号%s发送短信成功2!" % mobile)
time.sleep(5)

return "send_sms2 OK"

2 使用

# django的视图
from django.shortcuts import render,HttpResponse
from mycelery.sms.tasks import send_sms,send_sms2
from datetime import timedelta

from datetime import datetime
def test(request):

################################# 异步任务

# 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决

# send_sms.delay("110")
# send_sms2.delay("119")
# send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容


################################# 定时任务

# ctime = datetime.now()
# # 默认用utc时间
# utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
# time_delay = timedelta(seconds=10)
# task_time = utc_ctime + time_delay
# result = send_sms.apply_async(["911", ], eta=task_time)
# print(result.id)

return HttpResponse('ok')