本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com
我们在 tasks.py 所在目录下运行: 在 Windows 10 环境 运行 celery 可能会出现如下问题: 因为 windows 系统需要 even
Celery 是一个由 Python 实现的简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的任务队列,同时也支持任务调度。
Celery 常见应用场景
- 异步执行:Web 应用中,当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给 Celery 去异步执行,执行完再返回给用户。比如发送短信 / 邮件、推送消息、清理 / 设置缓存等。
- 定时 / 延时执行:周期性任务,比如 每天数据统计、对账。
Celery 基本特性
- 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
- 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
- 方便把任务和配置管理相关联。
- 可选多进程、Eventlet 和 Gevent 三种模式并发执行。
- 提供错误处理机制。
- 提供多种任务原语,方便实现任务分组、拆分和调用链。
- 支持多种消息代理和存储后端。
Celery 扮演生产者和消费者的角色,Celery 本身不提供消息服务,但是可以方便的与第三方提供的消息中间件集成,例如,RabbitMQ、Redis、MongoDB 等。
Celery 的架构主要由以下几部分组成:
- Producer:任务的生产者,调用 Celery 提供的 API、函数或者装饰器而产生任务,并交给任务队列处理的都是任务生产者。
- Brokers:任务代理,接受生产者发送过来的任务消息,存进队列再按序分发给任务消费者 (celery worker),常见的 Brokers 有 Redis、RabbitMQ、Zookeeper 等。
- Worker:任务的消费者,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。通常会在多台服务器运行多个消费者来提高执行效率。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。
- Beat:任务调度器,以独立进程的形式存在,是 Celery 系统自带的任务生产者,Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。
- Result Backend:结果存储,任务处理完后保存状态信息和结果,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
产生任务的两种方式:
- 发布者发布任务:如由 WEB 应用创建任务。
- 任务调度定时发布任务:如周期性或定时任务。
我们使用 Redis 作为 Brokers、Result Backend,首先需要部署 Redis 环境。
redis 安装
官网下载 redis 安装包,如下:
解压后,通过 CMD 切换到该 redis 解压包目录,使用 redis-server.exe 命令启动服务端。
重新打开一个 CMD,在该 redis 解压包目录下,使用 redis-cli.exe 命令启动客户端,连接服务端。
我们在上面虽然启动了 redis 服务,但只要关闭 cmd 窗口,redis 服务就关闭了。所以,把我们需要将 redis 设置为一个 windows 服务。使用 redis-server.exe –service-install redis.windows.conf 命令安装,如图所示:
安装成功之后,我们在 windows 服务列表可以看到 Redis 服务。如下:
常用的 redis 服务命令
1
2
3
4
5
6
7
|
卸载服务:redis-server --service-uninstall
开启服务:redis-server --service-start
停止服务:redis-server --service-stop
重命名服务:redis-server --service-name name
|
重命名服务,需要写在前三个参数之后,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
The following would install and start three separate instances of Redis as a service:
以下将会安装并启动三个不同的Redis实例作服务:
redis-server --service-install --service-name redisService1 --port 10001
redis-server --service-start --service-name redisService1
redis-server --service-install --service-name redisService2 --port 10002
redis-server --service-start --service-name redisService2
redis-server --service-install --service-name redisService3 --port 10003
redis-server --service-start --service-name redisService3
|
测试
启动服务端
1
|
redis-server --service-start
|
启动客户客户端
1
2
3
4
|
精简模式:
redis-cli.exe
指定模式:(-h 服务器地址 -p 指定端口号 -a 连接数据库的密码[可以在redis.windows.conf中配置],默认无密码)
redis-cli.exe -h 127.0.0.1 -p 6379 -a requirepass
|
测试读写数据
我们通过写入和读取数据,进行测试。
读写测试成功,如下图。
接下来,安装 Celery 和 redis 以及 python 的 redis 支持。
安装 redis 库
安装过程,如果遇到下面错误,可以尝试关闭本机代理,如下图。
1
|
ValueError: check_hostname requires server_hostname
|
重新执行安装命令,则安装成功,如下。
1
2
3
4
5
6
|
PS E:\Gitee\knowledge-base> pip install redis
Collecting redis
Downloading redis-3.5.3-py2.py3-none-any.whl (72 kB)
|████████████████████████████████| 72 kB 60 kB/s
Installing collected packages: redis
Successfully installed redis-3.5.3
|
安装 celery 库
代码目录树如下:
1
2
3
4
5
6
|
─demo
│ │ app_test.py --celery 应用
│ │ celery_config.py -- celery 配置
│ │ producer.py -- 生产者 添加任务
│ │ tasks.py -- 任务定义
│ │ __init__.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# celery_config.py
# Broker 任务存储 Redis
broker_url = 'redis://localhost:6379/2'
# 任务结果存储 Redis
result_backend = 'redis://localhost:6379/1'
# 任务序列化和反序列化使用msgpack
task_serializer = 'msgpack'
# 任务结果序列化和反序列化使用json
result_serializer = 'json'
# 任务过期时间
result_expires = 60 * 60 * 24
# 指定接受的内容类型
accept_content = ['json', 'msgpack']
|
创建 celery 对象,指定任务集合和 celery 配置,如下:
1
2
3
4
5
6
7
8
|
# app_test.py
from celery import Celery
app = Celery('demo', include=['tasks'])
app.config_from_object('celery_config')
if __name__ == '__main__':
app.start()
|
定义任务,如下,其中 add 任务我们增加了重试机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# tasks.py
import json
import time
import traceback
from celery import Task
from app_test import app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, body):
time.sleep(10)
try:
logger.info(self.request.__dict__)
n = body.get('x') / body.get('y')
return n
except ZeroDivisionError as e:
# 重试, 错误处理主要是为了重试一些由于网络抖动等原因导致的任务失败
self.retry(exc=e, countdown=5, max_retries=3)
@app.task(bind=True)
def add_beat(self, x, y):
time.sleep(1)
logger.info(self.request.__dict__)
return x + y
|
我们在 tasks.py 所在目录下运行:
1
|
celery -A app_test worker -l info
|
在 Windows 10 环境 运行 celery 可能会出现如下问题:
1
|
not enough values to unpack (expected 3, got 0)
|
因为 windows 系统需要 eventlet 支持,解决办法如下:
重新在 tasks.py 所在目录下运行如下命令:
1
|
celery -A app_test worker -l info -P eventlet
|
运行成功,如下所示:
我们在运行 tasks.py 后, 此时 Broker 中还没有任务,Worker 处于等待状态,接下来,就是发布任务。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# producer.py
from tasks import add
import time
from datetime import datetime, timedelta
from app_test import app
from celery.schedules import crontab
t1 = time.time()
# 不要直接 add({"task_type": "add", "x": 10, "y": 1}),这里需要用 celery 提供的接口 delay 进行发布
r1 = add.delay({"task_type": "add", "x": 10, "y": 1})
r2 = add.delay({"task_type": "add", "x": 30, "y": 2})
r3 = add.delay({"task_type": "add", "x": 100, "y": 0})
r_list = [r1, r2, r3]
for r in r_list:
while not r.ready():
pass
print("任务ID:{0} , 执行结果:{1}".format(r, r.result))
t2 = time.time()
print('共耗时:%s' % str(t2 - t1))
|
我们在上述代码中,发布了 3 个任务,执行该代码,进行任务发布,并判断任务状态,获取任务结果,如下:
1
2
3
4
|
任务ID:9246f992-176c-4f62-8859-e4b45a96c5b2 , 执行结果:10.0
任务ID:c87c32e7-e23b-4141-be0c-5c4c72bf20dd , 执行结果:15.0
任务ID:136e51d1-9875-4ad6-8ccd-1d6a7c1bc6b9 , 执行结果:division by zero
共耗时:33.814836502075195
|
这里,一个简单的 Celery 应用就完成了。
捕获成功 / 失败任务,进行额外操作
我们复写 task 的 on_failure、on_success 方法,在任务执行成功或失败时,进行一些额外日志操作。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# tasks.py
import json
import time
import traceback
from celery import Task
from app_test import app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class CallbackTask(Task):
"""
对失败的任务进行跟踪或者告警
"""
def __init__(self):
super(CallbackTask, self).__init__()
def on_success(self, retval, task_id, args, kwargs):
try:
logger.info(args)
if isinstance(args[0], str):
item_param= json.loads(args[0])
else:
item_param = args[0]
logger.info('[task_id] {0}, [task_type] {1}, finished successfully.'.format(task_id, item_param.get('task_type')))
except TypeError as error:
logger.error(traceback.format_exc())
def on_failure(self, exc, task_id, args, kwargs, einfo):
try:
if isinstance(args[0], str):
item_param = json.loads(args[0])
else:
item_param = args[0]
logger.error(('[task_id] {0}, [task_type] {1}, raised exception: {2!r}\n{3!r}'.format(
task_id, item_param.get('task_type'),exc, einfo.traceback)))
except TypeError as error:
logger.error(traceback.format_exc())
@app.task(base=CallbackTask, bind=True)
def add(self, body):
time.sleep(10)
try:
logger.info(self.request.__dict__)
n = body.get('x') / body.get('y')
return n
except ZeroDivisionError as e:
# 重试, 错误处理主要是为了重试一些由于网络抖动等原因导致的任务失败
self.retry(exc=e, countdown=5, max_retries=3)
@app.task(bind=True)
def add_beat(self, x, y):
time.sleep(1)
logger.info(self.request.__dict__)
return x + y
|
通过如上重载 task 的 on_failure、on_success 方法,当任务成功或失败时,会额外输出以下日志,
1
2
|
[task_id] c87c32e7-e23b-4141-be0c-5c4c72bf20dd, [task_type] add, finished successfully.
[task_id] 136e51d1-9875-4ad6-8ccd-1d6a7c1bc6b9, [task_type] add, raised exception: ZeroDivisionError('division by zero')
|
延时任务
通过 apply_async 发布延时执行任务,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from tasks import add
import time
from datetime import datetime, timedelta
from app_test import app
from celery.schedules import crontab
t1 = time.time()
r3 = add.delay({"task_type": "add", "x": 100, "y": 0})
# 延迟执行
eta = datetime.utcnow() + timedelta(seconds=10)
r7 = add.apply_async(args=(100, 50), eta=eta)
r_list = [r3, r7]
for r in r_list:
while not r.ready():
pass
print("任务ID:{0} , 执行结果:{1}".format(r, r.result))
t2 = time.time()
print('共耗时:%s' % str(t2 - t1))
|
定时任务
定时任务的添加需要重新运行 worker,接着再运行 beat(启动周期任务调度器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# 定时任务执行, 定时任务的添加需要重新运行 worker,接着再运行 beat(启动周期任务调度器)
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
app.conf.beat_schedule = {
# 任务名称
'periodicity-task': {
# 真实执行的task路径
'task': 'tasks.add_beat',
# 间隔时间
'schedule': timedelta(seconds=3),
# 每周一早八点 'schedule': crontab(hour=8, day_of_week=1),
'args': (300, 150),
}
}
|
AsyncResult
delay 与 apply_async 创建的都是 AsyncResult 对象,AsyncResult 主要用来储存任务执行信息与执行结果,此外可以通过任务 ID 获取任务执行状态,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from app_test import app
from celery.result import AsyncResult
# 任务对象的唯一标识:uuid
id = 'dc0daf83-e47d-40b4-9b45-5489d7576e5e'
if __name__ == '__main__':
async1 = AsyncResult(id=id, app=app)
if async1.successful():
result = async1.get()
print('任务成功')
elif async1.failed():
print('任务失败')
elif async1.status == 'PENDING':
print('任务等待中')
elif async1.status == 'RETRY':
print('任务异常后正在重试')
elif async1.status == 'STARTED':
print('任务已经开始')
|