异步任务队列是后端系统的基础设施之一。本文对比 Celery、RQ 和 Dramatiq 三种方案,分享在实际项目中选型和优化的经验。

任务队列可视化 — 实时状态
📧 send_email #1024
⚙️ gen_report #1023
🖼 resize_img #1022
✅ sync_data #1021
队列深度: 3 · 处理中: 2 · 已完成 (1h): 847

一、方案对比

Celery

A

功能最全,生态成熟,但配置复杂

RQ

B+

极简上手,Redis 原生,功能有限

Dramatiq

A-

设计优雅,默认可靠,社区较小

二、Celery 深度配置

# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

# 关键:限制任务执行时间
task_time_limit = 300        # 硬限制 5 分钟
task_soft_time_limit = 240   # 软限制 4 分钟(抛异常)

# 每个 worker 预取的任务数
worker_prefetch_multiplier = 1

# 任务结果保留时间
result_expires = 3600

# 队列路由
task_routes = {
    'tasks.send_email': {'queue': 'email'},
    'tasks.gen_report': {'queue': 'reports'},
    'tasks.resize_image': {'queue': 'media'},
}

三、可靠投递保证

⚠️ 常见误区:Celery 默认不保证 at-least-once 投递。如果 worker 在执行任务时崩溃,任务可能丢失。需要开启 task_acks_late = Truetask_reject_on_worker_lost = True
# 可靠投递配置
task_acks_late = True
task_reject_on_worker_lost = True

# 任务幂等性装饰器
from celery import shared_task
from functools import wraps

def idempotent(ttl=3600):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = f"idempotent:{func.__name__}:{hash(str(args))}"
            if redis.set(key, 1, nx=True, ex=ttl):
                return func(*args, **kwargs)
            return None  # 重复执行,跳过
        return wrapper
    return decorator

@shared_task
@idempotent(ttl=7200)
def send_welcome_email(user_id):
    # 即使重复投递也只执行一次
    ...

四、监控方案

推荐 Flower 作为 Celery 的实时监控工具,配合 Prometheus exporter 实现告警。

Flower 监控面板
3

Workers 在线

email × 1 · reports × 1 · media × 1

12

队列积压

email: 3 · reports: 5 · media: 4

99.2%

成功率 (24h)

失败 23 / 总计 2,847

五、总结

不管选哪个,任务幂等性是必须实现的。异步系统天然可能重复投递,没有幂等性就是定时炸弹。