异步任务队列是后端系统的基础设施之一。本文对比 Celery、RQ 和 Dramatiq 三种方案,分享在实际项目中选型和优化的经验。
📧 send_email #1024
→
⚙️ gen_report #1023
→
🖼 resize_img #1022
→
✅ sync_data #1021
队列深度: 3 · 处理中: 2 · 已完成 (1h): 847
一、方案对比
Celery
A
功能最全,生态成熟,但配置复杂
RQ
B+
极简上手,Redis 原生,功能有限
Dramatiq
A-
设计优雅,默认可靠,社区较小
二、Celery 深度配置
# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# 关键:限制任务执行时间
task_time_limit = 300 # 硬限制 5 分钟
task_soft_time_limit = 240 # 软限制 4 分钟(抛异常)
# 每个 worker 预取的任务数
worker_prefetch_multiplier = 1
# 任务结果保留时间
result_expires = 3600
# 队列路由
task_routes = {
'tasks.send_email': {'queue': 'email'},
'tasks.gen_report': {'queue': 'reports'},
'tasks.resize_image': {'queue': 'media'},
}
三、可靠投递保证
⚠️ 常见误区:Celery 默认不保证 at-least-once 投递。如果 worker 在执行任务时崩溃,任务可能丢失。需要开启
task_acks_late = True 和 task_reject_on_worker_lost = True。
# 可靠投递配置
task_acks_late = True
task_reject_on_worker_lost = True
# 任务幂等性装饰器
from celery import shared_task
from functools import wraps
def idempotent(ttl=3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = f"idempotent:{func.__name__}:{hash(str(args))}"
if redis.set(key, 1, nx=True, ex=ttl):
return func(*args, **kwargs)
return None # 重复执行,跳过
return wrapper
return decorator
@shared_task
@idempotent(ttl=7200)
def send_welcome_email(user_id):
# 即使重复投递也只执行一次
...
四、监控方案
推荐 Flower 作为 Celery 的实时监控工具,配合 Prometheus exporter 实现告警。
3
Workers 在线
email × 1 · reports × 1 · media × 1
12
队列积压
email: 3 · reports: 5 · media: 4
99.2%
成功率 (24h)
失败 23 / 总计 2,847
五、总结
- 小项目(< 10 任务类型)→ RQ 够用,零配置
- 中型项目 → Dramatiq,可靠且优雅
- 大型项目/复杂路由 → Celery,功能最全
不管选哪个,任务幂等性是必须实现的。异步系统天然可能重复投递,没有幂等性就是定时炸弹。