当前位置: 首页 > news >正文

Nestjs框架: nestjs-bull的使用与相关queue的规划

概述

  • 在使用定时任务时,发现其只能处理简单任务,且由于定时任务是在 NestJS 运行过程中创建的
  • 大量不同定时任务创建时可能会占用 NestJS 服务的内存
  • 针对大量高并发任务场景和秒级任务应用场景,需要使用队列来创建此类任务
  • NestJS 官网提供了一个队列的官方应用方案叫做 nestjs-bull
  • 它使用 Redis 作为存储方案来存储消息队列的数据,封装了生产者、消费者和监听者等好用的接口

规划的目录结构

  • conditional
    • queue
      • queue.module.ts
      • services
        • tomic.consumer.ts
        • scheduled-tasks.consumer.ts
        • index.ts

1 ) 安装

  • $ npm i --save @nestjs/bull bull

2 ) 配置

# BULL任务队列
QUEUE_ON=false
QUEUE_REDIS_HOST=localhost
QUEUE_REDIS_PORT=6379
QUEUE_REDIS_PASSWORD=exmaple
  • 注意,这里的 redis 只支持哨兵模式 和 单节点模式,
  • 没有 cluster 的配置方式
  • 但是 bull 本身对接的是 ioredis
  • 可以使用 Connector 实现 redis 的 cluster 模式
  • cluster 是分布式的应用场景,绝大多数用单机或哨兵这样的高可用即可
  • 参考官方配置文档
  • 看下面的 defaultJobOptions 配置任务的选项
    • 高级选项说明文档
  • 没有用到 就不必动这些配置项目
    interface JobOpts {priority: number; // 优先级  从 1 (最高优先级) to MAX_INT  (最低优先级).delay: number; // 任务的延迟执行时间attempts: number; // 任务的尝试次数repeat: RepeatOpts; // 重复选项backoff: number | BackoffOpts; // 任务失败的 自动重试策略lifo: boolean; // 如果设置为 true, 则是后进先出的队列timeout: number; // 超时时间jobId: number | string; // 给任务设置一个具体的 id 默认是一个唯一的整数removeOnComplete: boolean | number; // 如果设置 true 成功之后删除// completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.removeOnFail: boolean | number; // 如果设置 true 则失败之后 删除stackTraceLimit: number; // 限制 记录 在堆栈中追踪的行数
    }
    

3 )创建模块,这是一个可选模块,所以

  • $ nest g mo conditional/queue --no-spec

  • conditional/queue/queue.module.ts

    import { Module } from '@nestjs/common';
    import { BullModule } from '@nestjs/bull';
    import { QueueConsumers } from './services';
    import { ConfigService } from '@nestjs/config';
    import { getEnvs } from '@/utils/get-envs';
    import { MailModule } from '../mail/mail.module';@Module({})
    export class QueueModule {static register(): DynamicModule {const parsedConfig = getEnvs(); // 获取配置的方法const mailOn = parsedConfig['MAIL_ON'];const conditionalModuleImports = [];if (mailOn) {conditionalModuleImports.push(MailModule);}return {module: QueueModule,imports: [...conditionalModuleImports,BullModule.forRootAsync({inject: [ConfigService],useFactory: (configService: ConfigService) => {const redisHost = configService.get('QUEUE_REDIS_HOST');const redisPort = configService.get('QUEUE_REDIS_PORT');const redisPassword = configService.get('QUEUE_REDIS_PASSWORD');return {redis: {host: redisHost,port: redisPort,password: redisPassword,},}}}),// 注册的时候,使用BullModule.registerQueue({ name: 'tomic' }, // {  name: 'email' }, // 邮件相关// { name: 'data-processing' }, // 数据处理// { name: 'real-time-messages' }, // 实时消息// { name: 'image-processing' }, // 图片上传、压缩{ name: 'scheduled-tasks' }, // 计划任务// { name: 'order-processing' }, //  订单处理,比如信用卡账单经过一个月才会入账),],providers: [QueueConsumers],exports: [BullModule],}}
    }
    

4 ) 在一个测试模块中导入 QueueModule

	import { Module } from '@nestjs/common';import { BullModule } from '@nestjs/bull';@Module({imports: [QueueModule.register(),],controllers: [AuthController],})export class AuthModule {}
  • 之后在测试模块的控制器下使用,如下
    import { InjectQueue } from '@nestjs/bull';
    import { Queue } from 'bull';
    import { diffNow } from '../utils/time';@Controller('auth')
    export class AuthController {constructor(@InjectQueue('tomic') private queue: Queue,@InjectQueue('scheduled-tasks') private queue2: Queue,){}// 这里是 模拟 创建@Get('test')addQueue() {this.queue.add({foo: '123',});return 'ok';}@Get('test2')addQueue2() {// 选项版本this.queue2.add('sendMail', {}, {delay: diffNow('2025-07-01 00:00:05'), // 从当前时间来计算,需要延迟多久,单位 秒});return 'ok';}
    }
    

这里 utils/time.ts 需要安装dayjs, $ pnpm install dayjs

import dayjs, { UnitType } from 'dayjs';
import utc from 'dayjs/plugin/utc';
dayjs.extend(utc);export function diffNow (dateString: string,offset: number = 0,unit: UnitType = 'millisecond',
) {const now = dayjs().utc().add(offset, 'hour');const targetDate = dayjs(dateString).utc();const diff = targetDate.diff(now, unit);return diff;
}

5 ) 这里是模拟 消费 服务 tomic.consumer.ts

import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';@Processor('tomic')
export class TomicConsumer {@Process()async transcode(job: Job<unknown>) {let progress = 0;for (let i= 0; i < 10; i+) {console.log(`job data ${job.data} progress ${progress}`);progress += 1;await job.progress(progress);}return {};}
}
  • 更多 process 方法 查看
  • 除了 process 方法,官方文档中还有很多 job 实例暴露出来的方法
  • 如 log、getdate、update、remove、retry 等

7 )queue/services/scheduled-tasks.consumer.ts

import { ISendMailOptions, MailerService } from '@nestjs-modules/mailer';
import { Processor, Process } from '@nestjs/bull';
import { Optional } from '@nestjs/common';
import { Job } from 'bull';@Processor('scheduled-tasks')
export class ScheduledTasksCosumer{// 这里注入 邮件服务 和 短信服务 TODOconstructor(@Optional() private mailerService: MailerService) {}// 发送邮件单个任务@Process('sendMail')async sendMail(job: Job<ISendMailOptions>) {// 这里可以临时设置为 Job<unknown> 测试const { data } = job;const res = await this.mailerService.sendMail(data);// TODO写到数据库console. log('~ScheduledTasksCosumer ~ sendMail ~ res:', res);}// //发送短信单个任务@Process('sendSms')async sendSms(job: Job<unknown>) {console.log'~ScheduledTasksCosumer~ sendSms~job:';}
}

6 )queue/services/index.ts

import { Provider } from '@nestjs/common';
import { ToimcConsumer } from './tomic.consumer';
import { ScheduledTasksConsumer } from './scheduled-tasks.consumer';export const QueueConsumers: Provider[] = [ToimcConsumer,ScheduledTasksConsumer,
];

其他 queue 的 消费模块都注入到这里

TODO

相关文章:

  • 【RAG面试题】LLMs已经具备了较强能力,存在哪些不足点?
  • day49-硬件学习之I2C(续)
  • TTvideo免费开源PC录屏软件
  • UE--Slate 焦点、捕获,输入处理与玩家控制器的关系
  • 【 MyBatis-Plus | 精讲 】
  • 1 Studying《Is Parallel Programming Hard》1-5
  • 【网络安全】密码学知识普及
  • leetcode.2014 重复k次的最长子序列
  • Unity 脚本自动添加头部注释
  • 不同信创系统如何集中远程运维?贝锐向日葵提供稳定方案
  • 科技如何影响我们的生活?
  • word中如何保存高清图片,并保存为高质量的pdf文件(图像不失真)
  • uniappx 安卓app项目本地打包运行,腾讯地图报错:‘鉴权失败,请检查你的key‘
  • CSS3实现同心圆效果
  • 系统架构设计师论文分享-论单元测试方法及其应用
  • SolidWorks 镜像实体操作指南:解决镜像失败的常见问题
  • Oracle/PostgreSQL/MSSQL/MySQL函数实现对照表
  • SQL Server for Linux 如何实现高可用架构
  • 多模态融合相机L3CAM
  • 腾讯云TSE注册中心实战:Nacos高可用集群搭建与流量治理避坑指南