Skip to content

使用 BullMQ 构建高可靠的 Node.js 异步任务队列

解决应用在处理耗时任务(如发送邮件、图像处理、数据同步)时导致的接口响应慢或进程阻塞问题,通过 BullMQ 实现可靠的异步队列管理。

为什么需要这个技能

在 Node.js 应用中,如果将耗时操作直接放在请求处理函数中,会导致用户等待时间过长,且一旦进程崩溃,正在运行的任务会直接丢失。

BullMQ 基于 Redis 构建,提供了强大的持久化和任务调度能力。通过将任务“生产者”与“消费者(Worker)”解耦,可以实现流量削峰、任务重试、延迟执行以及复杂的依赖工作流,确保即便在系统重启或网络波动时,任务依然能被可靠地执行。

适用场景

  • 定时与延迟任务:如发送 24 小时后的预约提醒或每日定时数据汇总。
  • 高并发削峰:将瞬间涌入的大量请求(如抢购、批量导入)放入队列,由 Worker 按照固定速率消费。
  • 复杂工作流:需要多个任务按顺序或依赖关系执行(例如:验证库存 扣款 通知仓库)。
  • 可靠性要求高的后台处理:需要支持指数退避重试机制,防止因下游 API 暂时不可用导致的任务失败。

核心工作流

  1. 环境配置:配置 Redis 连接,特别注意必须设置 maxRetriesPerRequest: null 才能兼容 BullMQ。
  2. 定义队列(Queue):设置任务默认选项(如重试次数 attempts、退避策略 backoff)及清理策略。
  3. 实现消费者(Worker):编写处理逻辑,并合理设置并发数 concurrency 和速率限制 limiter 以保护下游服务。
  4. 任务调度:使用 add 方法发送即时任务,或使用 repeat 参数定义 Cron 表达式实现定时任务。
  5. 流控与依赖(Flows):使用 FlowProducer 构建父子任务关系,确保依赖任务完成后再触发主任务。
  6. 优雅退出:监听 SIGTERM 信号,通过 worker.close() 确保当前处理中的任务在部署重启前能够正常完成。

示例代码:生产级队列配置

typescript
import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis(process.env.REDIS_URL, {
  maxRetriesPerRequest: null,
});

const emailQueue = new Queue('emails', {
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: { count: 1000 },
  },
});

const worker = new Worker('emails', async (job) => {
  await sendEmail(job.data);
}, {
  connection,
  concurrency: 5,
});

下载和安装

下载 bullmq-specialist 中文版 Skill ZIP

解压后将目录放入你的 AI 工具 skills 文件夹,重启工具后即可使用。具体路径参考内附的 USAGE.zh.md

你可能还需要

暂无推荐