Nestjs框架: nestjs-bull的使用与相关queue的规划
概述
- 在使用定时任务时,发现其只能处理简单任务,且由于定时任务是在 NestJS 运行过程中创建的
- 大量不同定时任务创建时可能会占用 NestJS 服务的内存
- 针对大量高并发任务场景和秒级任务应用场景,需要使用队列来创建此类任务
- NestJS 官网提供了一个队列的官方应用方案叫做 nestjs-bull
- 它使用 Redis 作为存储方案来存储消息队列的数据,封装了生产者、消费者和监听者等好用的接口
规划的目录结构
- conditional
- queue
- queue.module.ts
- services
- tomic.consumer.ts
- scheduled-tasks.consumer.ts
- index.ts
- queue
1 ) 安装
- $
npm i --save @nestjs/bull bull
2 ) 配置
# BULL任务队列
QUEUE_ON=false
QUEUE_REDIS_HOST=localhost
QUEUE_REDIS_PORT=6379
QUEUE_REDIS_PASSWORD=exmaple
- 注意,这里的 redis 只支持哨兵模式 和 单节点模式,
- 没有 cluster 的配置方式
- 但是 bull 本身对接的是 ioredis
- 可以使用 Connector 实现 redis 的 cluster 模式
- cluster 是分布式的应用场景,绝大多数用单机或哨兵这样的高可用即可
- 参考官方配置文档
- 看下面的
defaultJobOptions
配置任务的选项- 高级选项说明文档
- 没有用到 就不必动这些配置项目
interface JobOpts {priority: number; // 优先级 从 1 (最高优先级) to MAX_INT (最低优先级).delay: number; // 任务的延迟执行时间attempts: number; // 任务的尝试次数repeat: RepeatOpts; // 重复选项backoff: number | BackoffOpts; // 任务失败的 自动重试策略lifo: boolean; // 如果设置为 true, 则是后进先出的队列timeout: number; // 超时时间jobId: number | string; // 给任务设置一个具体的 id 默认是一个唯一的整数removeOnComplete: boolean | number; // 如果设置 true 成功之后删除// completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.removeOnFail: boolean | number; // 如果设置 true 则失败之后 删除stackTraceLimit: number; // 限制 记录 在堆栈中追踪的行数 }
3 )创建模块,这是一个可选模块,所以
-
$
nest g mo conditional/queue --no-spec
-
conditional/queue/queue.module.ts
import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; import { QueueConsumers } from './services'; import { ConfigService } from '@nestjs/config'; import { getEnvs } from '@/utils/get-envs'; import { MailModule } from '../mail/mail.module';@Module({}) export class QueueModule {static register(): DynamicModule {const parsedConfig = getEnvs(); // 获取配置的方法const mailOn = parsedConfig['MAIL_ON'];const conditionalModuleImports = [];if (mailOn) {conditionalModuleImports.push(MailModule);}return {module: QueueModule,imports: [...conditionalModuleImports,BullModule.forRootAsync({inject: [ConfigService],useFactory: (configService: ConfigService) => {const redisHost = configService.get('QUEUE_REDIS_HOST');const redisPort = configService.get('QUEUE_REDIS_PORT');const redisPassword = configService.get('QUEUE_REDIS_PASSWORD');return {redis: {host: redisHost,port: redisPort,password: redisPassword,},}}}),// 注册的时候,使用BullModule.registerQueue({ name: 'tomic' }, // { name: 'email' }, // 邮件相关// { name: 'data-processing' }, // 数据处理// { name: 'real-time-messages' }, // 实时消息// { name: 'image-processing' }, // 图片上传、压缩{ name: 'scheduled-tasks' }, // 计划任务// { name: 'order-processing' }, // 订单处理,比如信用卡账单经过一个月才会入账),],providers: [QueueConsumers],exports: [BullModule],}} }
4 ) 在一个测试模块中导入 QueueModule
import { Module } from '@nestjs/common';import { BullModule } from '@nestjs/bull';@Module({imports: [QueueModule.register(),],controllers: [AuthController],})export class AuthModule {}
- 之后在测试模块的控制器下使用,如下
import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { diffNow } from '../utils/time';@Controller('auth') export class AuthController {constructor(@InjectQueue('tomic') private queue: Queue,@InjectQueue('scheduled-tasks') private queue2: Queue,){}// 这里是 模拟 创建@Get('test')addQueue() {this.queue.add({foo: '123',});return 'ok';}@Get('test2')addQueue2() {// 选项版本this.queue2.add('sendMail', {}, {delay: diffNow('2025-07-01 00:00:05'), // 从当前时间来计算,需要延迟多久,单位 秒});return 'ok';} }
这里 utils/time.ts 需要安装dayjs, $ pnpm install dayjs
import dayjs, { UnitType } from 'dayjs';
import utc from 'dayjs/plugin/utc';
dayjs.extend(utc);export function diffNow (dateString: string,offset: number = 0,unit: UnitType = 'millisecond',
) {const now = dayjs().utc().add(offset, 'hour');const targetDate = dayjs(dateString).utc();const diff = targetDate.diff(now, unit);return diff;
}
5 ) 这里是模拟 消费 服务 tomic.consumer.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';@Processor('tomic')
export class TomicConsumer {@Process()async transcode(job: Job<unknown>) {let progress = 0;for (let i= 0; i < 10; i+) {console.log(`job data ${job.data} progress ${progress}`);progress += 1;await job.progress(progress);}return {};}
}
- 更多 process 方法 查看
- 除了 process 方法,官方文档中还有很多 job 实例暴露出来的方法
- 如 log、getdate、update、remove、retry 等
7 )queue/services/scheduled-tasks.consumer.ts
import { ISendMailOptions, MailerService } from '@nestjs-modules/mailer';
import { Processor, Process } from '@nestjs/bull';
import { Optional } from '@nestjs/common';
import { Job } from 'bull';@Processor('scheduled-tasks')
export class ScheduledTasksCosumer{// 这里注入 邮件服务 和 短信服务 TODOconstructor(@Optional() private mailerService: MailerService) {}// 发送邮件单个任务@Process('sendMail')async sendMail(job: Job<ISendMailOptions>) {// 这里可以临时设置为 Job<unknown> 测试const { data } = job;const res = await this.mailerService.sendMail(data);// TODO写到数据库console. log('~ScheduledTasksCosumer ~ sendMail ~ res:', res);}// //发送短信单个任务@Process('sendSms')async sendSms(job: Job<unknown>) {console.log'~ScheduledTasksCosumer~ sendSms~job:';}
}
6 )queue/services/index.ts
import { Provider } from '@nestjs/common';
import { ToimcConsumer } from './tomic.consumer';
import { ScheduledTasksConsumer } from './scheduled-tasks.consumer';export const QueueConsumers: Provider[] = [ToimcConsumer,ScheduledTasksConsumer,
];
其他 queue 的 消费模块都注入到这里
TODO