Python Celery 快速实现分布式的任务队列管理 - 今日头条

本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com

我们在 tasks.py 所在目录下运行: 在 Windows 10 环境 运行 celery 可能会出现如下问题: 因为 windows 系统需要 even

Celery 是一个由 Python 实现的简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的任务队列,同时也支持任务调度。

Celery 常见应用场景

  • 异步执行:Web 应用中,当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给 Celery 去异步执行,执行完再返回给用户。比如发送短信 / 邮件、推送消息、清理 / 设置缓存等。
  • 定时 / 延时执行:周期性任务,比如 每天数据统计、对账。

Celery 基本特性

  • 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
  • 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
  • 方便把任务和配置管理相关联。
  • 可选多进程、Eventlet 和 Gevent 三种模式并发执行。
  • 提供错误处理机制。
  • 提供多种任务原语,方便实现任务分组、拆分和调用链。
  • 支持多种消息代理和存储后端。

https://p26.toutiaoimg.com/origin/pgc-image/1b52f2594e8742b89467bbd28a78155b?from=pc

Celery 扮演生产者和消费者的角色,Celery 本身不提供消息服务,但是可以方便的与第三方提供的消息中间件集成,例如,RabbitMQ、Redis、MongoDB 等。

Celery 的架构主要由以下几部分组成:

  • Producer:任务的生产者,调用 Celery 提供的 API、函数或者装饰器而产生任务,并交给任务队列处理的都是任务生产者。
  • Brokers:任务代理,接受生产者发送过来的任务消息,存进队列再按序分发给任务消费者 (celery worker),常见的 Brokers 有 Redis、RabbitMQ、Zookeeper 等。
  • Worker:任务的消费者,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。通常会在多台服务器运行多个消费者来提高执行效率。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。
  • Beat:任务调度器,以独立进程的形式存在,是 Celery 系统自带的任务生产者,Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。
  • Result Backend:结果存储,任务处理完后保存状态信息和结果,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。

产生任务的两种方式:

  • 发布者发布任务:如由 WEB 应用创建任务。
  • 任务调度定时发布任务:如周期性或定时任务。

我们使用 Redis 作为 Brokers、Result Backend,首先需要部署 Redis 环境。

redis 安装

官网下载 redis 安装包,如下:

https://p26.toutiaoimg.com/origin/pgc-image/ffcbc4a4afa94b598c7244b5643a1f29?from=pc

解压后,通过 CMD 切换到该 redis 解压包目录,使用 redis-server.exe 命令启动服务端。

https://p26.toutiaoimg.com/origin/pgc-image/a271d5d571734e5ea682e20831698a85?from=pc

重新打开一个 CMD,在该 redis 解压包目录下,使用 redis-cli.exe 命令启动客户端,连接服务端。

https://p26.toutiaoimg.com/origin/pgc-image/29aaf89b5b3d490b921481df80135dae?from=pc

我们在上面虽然启动了 redis 服务,但只要关闭 cmd 窗口,redis 服务就关闭了。所以,把我们需要将 redis 设置为一个 windows 服务。使用 redis-server.exe –service-install redis.windows.conf 命令安装,如图所示:

https://p26.toutiaoimg.com/origin/pgc-image/875435c1f7904159aabd4151ec116b68?from=pc

安装成功之后,我们在 windows 服务列表可以看到 Redis 服务。如下:

https://p26.toutiaoimg.com/origin/pgc-image/8b29f04c720f455881a3e1a23f730970?from=pc

常用的 redis 服务命令

1
2
3
4
5
6
7
卸载服务:redis-server --service-uninstall
 
 开启服务:redis-server --service-start
 
 停止服务:redis-server --service-stop
 
 重命名服务:redis-server --service-name name

重命名服务,需要写在前三个参数之后,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
The following would install and start three separate instances of Redis as a service:   
 以下将会安装并启动三个不同的Redis实例作服务:
 
 redis-server --service-install --service-name redisService1 --port 10001
 
 redis-server --service-start --service-name redisService1
 
 redis-server --service-install --service-name redisService2 --port 10002
 
 redis-server --service-start --service-name redisService2
 
 redis-server --service-install --service-name redisService3 --port 10003
 
 redis-server --service-start --service-name redisService3

测试

启动服务端

1
redis-server --service-start

启动客户客户端

1
2
3
4
精简模式:
 redis-cli.exe
 指定模式:(-h 服务器地址  -p 指定端口号 -a 连接数据库的密码[可以在redis.windows.conf中配置],默认无密码)
 redis-cli.exe -h 127.0.0.1 -p 6379 -a requirepass

测试读写数据

我们通过写入和读取数据,进行测试。

https://p26.toutiaoimg.com/origin/pgc-image/917048976aeb4ec9b5ed5d41f6b15174?from=pc

读写测试成功,如下图。

https://p26.toutiaoimg.com/origin/pgc-image/92dc7a4d98454362b44def30fdb0bbca?from=pc

接下来,安装 Celery 和 redis 以及 python 的 redis 支持。

安装 redis 库

1
pip install redis

安装过程,如果遇到下面错误,可以尝试关闭本机代理,如下图。

1
ValueError: check_hostname requires server_hostname

https://p26.toutiaoimg.com/origin/pgc-image/0deb856f07d34253ab0e7bdcb32ee9e3?from=pc

重新执行安装命令,则安装成功,如下。

1
2
3
4
5
6
PS E:\Gitee\knowledge-base> pip install redis
 Collecting redis
   Downloading redis-3.5.3-py2.py3-none-any.whl (72 kB)
      |████████████████████████████████| 72 kB 60 kB/s
 Installing collected packages: redis
 Successfully installed redis-3.5.3

安装 celery 库

1
pip install celery

代码目录树如下:

1
2
3
4
5
6
─demo
 │  │  app_test.py  --celery 应用
 │  │  celery_config.py -- celery 配置
 │  │  producer.py -- 生产者 添加任务
 │  │  tasks.py -- 任务定义
 │  │  __init__.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# celery_config.py
 
 # Broker 任务存储 Redis
 broker_url = 'redis://localhost:6379/2'
 # 任务结果存储 Redis
 result_backend = 'redis://localhost:6379/1'
 # 任务序列化和反序列化使用msgpack
 task_serializer = 'msgpack'
 # 任务结果序列化和反序列化使用json
 result_serializer = 'json'
 # 任务过期时间
 result_expires = 60 * 60 * 24
 # 指定接受的内容类型
 accept_content = ['json', 'msgpack']

创建 celery 对象,指定任务集合和 celery 配置,如下:

1
2
3
4
5
6
7
8
#  app_test.py
 from celery import Celery
 
 app = Celery('demo', include=['tasks'])
 app.config_from_object('celery_config')
 
 if __name__ == '__main__':
     app.start()

定义任务,如下,其中 add 任务我们增加了重试机制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# tasks.py
 import json
 import time
 import traceback
 
 from celery import Task
 
 from app_test import app
 from celery.utils.log import get_task_logger
 
 logger = get_task_logger(__name__)
 
 
 @app.task(bind=True)
 def add(self, body):
     time.sleep(10)
     try:
         logger.info(self.request.__dict__)
         n = body.get('x') / body.get('y')
         return n
     except ZeroDivisionError as e:
         # 重试, 错误处理主要是为了重试一些由于网络抖动等原因导致的任务失败
         self.retry(exc=e, countdown=5, max_retries=3)
 
 
 @app.task(bind=True)
 def add_beat(self, x, y):
     time.sleep(1)
     logger.info(self.request.__dict__)
     return x + y

我们在 tasks.py 所在目录下运行:

1
celery -A app_test worker -l info

在 Windows 10 环境 运行 celery 可能会出现如下问题:

1
not enough values to unpack (expected 3, got 0)

因为 windows 系统需要 eventlet 支持,解决办法如下:

1
pip install eventlet

重新在 tasks.py 所在目录下运行如下命令:

1
celery -A app_test worker -l info -P eventlet

运行成功,如下所示:

https://p26.toutiaoimg.com/origin/pgc-image/e74cdec8cf724ad49977562eaa534ded?from=pc

我们在运行 tasks.py 后, 此时 Broker 中还没有任务,Worker 处于等待状态,接下来,就是发布任务。如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# producer.py

 from tasks import add
 import time
 from datetime import datetime, timedelta
 from app_test import app
 from celery.schedules import crontab
 
 t1 = time.time()
 # 不要直接 add({"task_type": "add", "x": 10, "y": 1}),这里需要用 celery 提供的接口 delay 进行发布
 r1 = add.delay({"task_type": "add", "x": 10, "y": 1})
 r2 = add.delay({"task_type": "add", "x": 30, "y": 2})
 r3 = add.delay({"task_type": "add", "x": 100, "y": 0})
 
 r_list = [r1, r2, r3]
 for r in r_list:
     while not r.ready():
         pass
     print("任务ID:{0} , 执行结果:{1}".format(r, r.result))
 
 t2 = time.time()
 
 print('共耗时:%s' % str(t2 - t1))

我们在上述代码中,发布了 3 个任务,执行该代码,进行任务发布,并判断任务状态,获取任务结果,如下:

1
2
3
4
任务ID:9246f992-176c-4f62-8859-e4b45a96c5b2 , 执行结果:10.0
 任务ID:c87c32e7-e23b-4141-be0c-5c4c72bf20dd , 执行结果:15.0
 任务ID:136e51d1-9875-4ad6-8ccd-1d6a7c1bc6b9 , 执行结果:division by zero
 共耗时:33.814836502075195

这里,一个简单的 Celery 应用就完成了。

捕获成功 / 失败任务,进行额外操作

我们复写 task 的 on_failure、on_success 方法,在任务执行成功或失败时,进行一些额外日志操作。如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# tasks.py
  
 import json
 import time
 import traceback
 
 from celery import Task
 
 from app_test import app
 from celery.utils.log import get_task_logger
 
 logger = get_task_logger(__name__)
 
 
 class CallbackTask(Task):
     """
     对失败的任务进行跟踪或者告警
     """
     def __init__(self):
         super(CallbackTask, self).__init__()
 
     def on_success(self, retval, task_id, args, kwargs):
         try:
             logger.info(args)
             if isinstance(args[0], str):
                 item_param= json.loads(args[0])
             else:
                 item_param = args[0]
             logger.info('[task_id] {0}, [task_type] {1}, finished successfully.'.format(task_id, item_param.get('task_type')))
         except TypeError as error:
             logger.error(traceback.format_exc())
 
     def on_failure(self, exc, task_id, args, kwargs, einfo):
         try:
             if isinstance(args[0], str):
                 item_param = json.loads(args[0])
             else:
                 item_param = args[0]
 
             logger.error(('[task_id] {0}, [task_type] {1}, raised exception: {2!r}\n{3!r}'.format(
                     task_id, item_param.get('task_type'),exc, einfo.traceback)))
         except TypeError as error:
             logger.error(traceback.format_exc())
 
 
 @app.task(base=CallbackTask, bind=True)
 def add(self, body):
     time.sleep(10)
     try:
         logger.info(self.request.__dict__)
         n = body.get('x') / body.get('y')
         return n
     except ZeroDivisionError as e:
         # 重试, 错误处理主要是为了重试一些由于网络抖动等原因导致的任务失败
         self.retry(exc=e, countdown=5, max_retries=3)
 
 
 @app.task(bind=True)
 def add_beat(self, x, y):
     time.sleep(1)
     logger.info(self.request.__dict__)
     return x + y

通过如上重载 task 的 on_failure、on_success 方法,当任务成功或失败时,会额外输出以下日志,

1
2
[task_id] c87c32e7-e23b-4141-be0c-5c4c72bf20dd, [task_type] add, finished successfully.
 [task_id] 136e51d1-9875-4ad6-8ccd-1d6a7c1bc6b9, [task_type] add, raised exception: ZeroDivisionError('division by zero')

延时任务

通过 apply_async 发布延时执行任务,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from tasks import add
 import time
 from datetime import datetime, timedelta
 from app_test import app
 from celery.schedules import crontab
 
 t1 = time.time()
 r3 = add.delay({"task_type": "add", "x": 100, "y": 0})
 
 # 延迟执行
 eta = datetime.utcnow() + timedelta(seconds=10)
 r7 = add.apply_async(args=(100, 50), eta=eta)
 
 r_list = [r3, r7]
 for r in r_list:
     while not r.ready():
         pass
     print("任务ID:{0} , 执行结果:{1}".format(r, r.result))
 
 t2 = time.time()
 
 print('共耗时:%s' % str(t2 - t1))

定时任务

定时任务的添加需要重新运行 worker,接着再运行 beat(启动周期任务调度器)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 定时任务执行, 定时任务的添加需要重新运行 worker,接着再运行 beat(启动周期任务调度器)
 # 时区
 app.conf.timezone = 'Asia/Shanghai'
 # 是否使用UTC
 app.conf.enable_utc = False
 # 任务的定时配置
 app.conf.beat_schedule = {
     # 任务名称
     'periodicity-task': {
         # 真实执行的task路径
         'task': 'tasks.add_beat',
         # 间隔时间
         'schedule': timedelta(seconds=3),
         # 每周一早八点 'schedule': crontab(hour=8, day_of_week=1),
         'args': (300, 150),
     }
 }

AsyncResult

delay 与 apply_async 创建的都是 AsyncResult 对象,AsyncResult 主要用来储存任务执行信息与执行结果,此外可以通过任务 ID 获取任务执行状态,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from app_test import app
 
 from celery.result import AsyncResult
 # 任务对象的唯一标识:uuid
 id = 'dc0daf83-e47d-40b4-9b45-5489d7576e5e'
 
 if __name__ == '__main__':
     async1 = AsyncResult(id=id, app=app)
 
     if async1.successful():
         result = async1.get()
         print('任务成功')
     elif async1.failed():
         print('任务失败')
     elif async1.status == 'PENDING':
         print('任务等待中')
     elif async1.status == 'RETRY':
         print('任务异常后正在重试')
     elif async1.status == 'STARTED':
         print('任务已经开始')