Appearance
使用 BullMQ 构建高可靠的 Node.js 异步任务队列
解决应用在处理耗时任务(如发送邮件、图像处理、数据同步)时导致的接口响应慢或进程阻塞问题,通过 BullMQ 实现可靠的异步队列管理。
为什么需要这个技能
在 Node.js 应用中,如果将耗时操作直接放在请求处理函数中,会导致用户等待时间过长,且一旦进程崩溃,正在运行的任务会直接丢失。
BullMQ 基于 Redis 构建,提供了强大的持久化和任务调度能力。通过将任务“生产者”与“消费者(Worker)”解耦,可以实现流量削峰、任务重试、延迟执行以及复杂的依赖工作流,确保即便在系统重启或网络波动时,任务依然能被可靠地执行。
适用场景
- 定时与延迟任务:如发送 24 小时后的预约提醒或每日定时数据汇总。
- 高并发削峰:将瞬间涌入的大量请求(如抢购、批量导入)放入队列,由 Worker 按照固定速率消费。
- 复杂工作流:需要多个任务按顺序或依赖关系执行(例如:验证库存 扣款 通知仓库)。
- 可靠性要求高的后台处理:需要支持指数退避重试机制,防止因下游 API 暂时不可用导致的任务失败。
核心工作流
- 环境配置:配置 Redis 连接,特别注意必须设置
maxRetriesPerRequest: null才能兼容 BullMQ。 - 定义队列(Queue):设置任务默认选项(如重试次数
attempts、退避策略backoff)及清理策略。 - 实现消费者(Worker):编写处理逻辑,并合理设置并发数
concurrency和速率限制limiter以保护下游服务。 - 任务调度:使用
add方法发送即时任务,或使用repeat参数定义 Cron 表达式实现定时任务。 - 流控与依赖(Flows):使用
FlowProducer构建父子任务关系,确保依赖任务完成后再触发主任务。 - 优雅退出:监听
SIGTERM信号,通过worker.close()确保当前处理中的任务在部署重启前能够正常完成。
示例代码:生产级队列配置
typescript
import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
});
const emailQueue = new Queue('emails', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { count: 1000 },
},
});
const worker = new Worker('emails', async (job) => {
await sendEmail(job.data);
}, {
connection,
concurrency: 5,
});下载和安装
下载 bullmq-specialist 中文版 Skill ZIP
解压后将目录放入你的 AI 工具 skills 文件夹,重启工具后即可使用。具体路径参考内附的 USAGE.zh.md。
你可能还需要
暂无推荐