队列
队列是一种强大的设计模式,可以帮助你应对应用扩展性和性能方面的常见挑战。队列可以帮助你解决的一些问题包括:
- 平滑处理高峰流量。例如,如果用户可以在任意时间发起资源密集型任务,你可以将这些任务加入队列,而不是同步执行。随后,你可以让工作进程以可控的方式从队列中拉取任务。随着应用规模增长,你可以很容易地增加新的队列消费者,以扩展后端任务处理能力。
- 拆分那些可能会阻塞 Node.js 事件循环的单体任务。例如,如果某个用户请求需要进行音频转码这类 CPU 密集型工作,你可以将该任务委托给其他进程处理,从而让面向用户的进程保持响应。
- 在不同服务之间提供可靠的通信通道。例如,你可以在一个进程或服务中将任务(job)放入队列,并在另一个进程或服务中消费它们。你可以从任何进程或服务中通过监听状态事件,在 job 生命周期中获知其完成、错误或其他状态变化。当队列生产者或消费者发生故障时,它们的状态会被保留,节点重启后任务处理也可以自动恢复。
Nest 提供了 @nestjs/bullmq 包用于集成 BullMQ,也提供了 @nestjs/bull 包用于集成 Bull。这两个包都是各自底层库之上的抽象/封装,而这两个底层库都由同一团队开发。Bull 当前处于维护模式,团队主要专注于修复 bug;BullMQ 则在积极开发中,采用现代化的 TypeScript 实现,并提供了一组不同的特性。如果 Bull 已经满足你的需求,它依然是一个可靠且经过实战考验的选择。Nest 提供的这些包使你能够以友好的方式,在 Nest 应用中集成 BullMQ 或 Bull 队列。
BullMQ 和 Bull 都使用 Redis 来持久化 job 数据,因此你需要在系统中安装 Redis。由于它们以 Redis 为后端,你的队列架构可以是完全分布式且与平台无关的。例如,你可以让一些队列生产者、消费者以及监听器运行在一个(或多个)节点上的 Nest 中,同时让其他生产者、消费者和监听器运行在其他网络节点上的 Node.js 平台中。
本章会介绍 @nestjs/bullmq 和 @nestjs/bull 包。我们也建议你阅读 BullMQ 和 Bull 的文档,以获取更多背景知识和特定实现细节。
BullMQ 安装
要开始使用 BullMQ,我们先安装所需依赖。
$ npm install --save @nestjs/bullmq bullmq安装完成后,我们可以将 BullModule 导入根 AppModule。
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}forRoot() 方法用于注册一个 bullmq 包的配置对象,应用中注册的所有队列都会使用该配置对象(除非另有指定)。供参考,配置对象中包含的部分属性如下:
connection: ConnectionOptions- 用于配置 Redis 连接的选项。更多信息见 Connections。可选。prefix: string- 所有队列 key 的前缀。可选。defaultJobOptions: JobOpts- 用于控制新 job 默认设置的选项。更多信息见 JobOpts。可选。settings: AdvancedSettings- 高级队列配置项。通常不应该修改。更多信息见 AdvancedSettings。可选。extraOptions- 模块初始化的额外选项。参见 手动注册
所有选项都是可选的,它们提供了对队列行为的细粒度控制。这些配置会被直接传递给 BullMQ 的 Queue 构造函数。关于这些选项以及其他选项的更多内容,请查看这里。
要注册一个队列,导入 BullModule.registerQueue() 动态模块,如下所示:
BullModule.registerQueue({
name: 'audio',
});提示
通过向 registerQueue() 方法传入多个以逗号分隔的配置对象,可以创建多个队列。
registerQueue() 方法用于实例化和/或注册队列。连接到同一个底层 Redis 数据库且使用相同凭据的模块与进程之间会共享这些队列。每个队列通过其 name 属性保持唯一。队列名称既被用作注入令牌(用于在控制器/提供者中注入队列),也被用作装饰器参数,以便将消费者类和监听器与队列关联起来。
你也可以为特定队列覆盖部分预配置选项,如下所示:
BullModule.registerQueue({
name: 'audio',
connection: {
port: 6380,
},
});BullMQ 还支持 job 之间的父子关系。这项能力允许你创建流程(flow),其中 job 可以构成任意深度树结构中的节点。更多内容请查看这里。
要添加一个 flow,可以这样做:
BullModule.registerFlowProducer({
name: 'flowProducerName',
});由于 job 会持久化到 Redis 中,因此每当某个具名队列被实例化时(例如应用启动/重启时),它都会尝试处理上一轮未完成会话中可能遗留的旧 job。
每个队列可以拥有一个或多个生产者、消费者和监听器。消费者会按照特定顺序从队列中获取 job:FIFO(默认)、LIFO,或者按优先级处理。如何控制队列的处理顺序将在这里讨论。
命名配置
如果你的队列需要连接多个不同的 Redis 实例,可以使用一种称为命名配置的技术。该特性允许你在指定 key 下注册多个配置,之后你就可以在队列选项中引用这些配置。
例如,假设你有一个附加的 Redis 实例(除了默认实例外),被应用中注册的若干队列使用,你可以像下面这样注册它的配置:
BullModule.forRoot('alternative-config', {
connection: {
port: 6381,
},
});在上面的例子中,'alternative-config' 只是一个配置 key(它可以是任意字符串)。
完成后,你就可以在 registerQueue() 的选项对象中指向这份配置:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
});生产者
Job 生产者负责向队列中添加 job。生产者通常是应用服务(Nest providers)。要向队列中添加 job,首先像下面这样将队列注入到服务中:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}提示
@InjectQueue() 装饰器通过队列名称来标识队列,这个名称与 registerQueue() 方法调用中提供的名称一致(例如 'audio')。
现在,通过调用队列的 add() 方法来添加一个 job,并传入一个用户定义的 job 对象。Job 使用可序列化的 JavaScript 对象表示(因为它们就是这样存储在 Redis 数据库中的)。你传入的 job 形状是任意的;请用它表达你的 job 语义。你还需要为它指定一个名称。这让你可以创建专门的消费者,只处理某个给定名称的 job。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});Job 选项
Job 可以附带额外选项。在 Queue.add() 方法中,于 job 参数之后传入一个选项对象。部分 job 选项属性如下:
priority:number- 可选的优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。注意,使用优先级会对性能产生轻微影响,因此请谨慎使用。delay:number- 在该 job 可被处理之前需要等待的时间(毫秒)。注意,为了保证延迟精确,服务端和客户端的时钟应保持同步。attempts:number- 在 job 完成前允许重试的总次数。repeat:RepeatOpts- 按 cron 规则重复执行 job。参见 RepeatOpts。backoff:number | BackoffOpts- job 失败时自动重试所使用的退避设置。参见 BackoffOpts。lifo:boolean- 如果为 true,则将 job 添加到队列右侧而不是左侧(默认 false)。jobId:number|string- 覆盖 job ID。默认情况下,job ID 是一个唯一整数,但你可以通过此设置覆盖它。如果使用这个选项,你需要自行确保jobId的唯一性。如果尝试添加一个已存在 ID 的 job,它将不会被加入。removeOnComplete:boolean | number- 如果为 true,则在 job 成功完成后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 completed 集合中。removeOnFail:boolean | number- 如果为 true,则在 job 在所有重试都失败后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 failed 集合中。stackTraceLimit:number- 限制会被记录到 stacktrace 中的堆栈行数。
下面是一些使用 job 选项自定义 job 的示例。
要延迟 job 的启动时间,请使用 delay 配置属性。
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ delay: 3000 }, // 延迟 3 秒
);要将一个 job 添加到队列右侧(以 LIFO(后进先出)方式处理),请将配置对象中的 lifo 属性设置为 true。
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ lifo: true },
);要为 job 设置优先级,请使用 priority 属性。
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ priority: 2 },
);消费者
消费者是一个类,其方法要么处理添加到队列中的 job,要么监听队列上的事件,或者两者兼有。使用 @Processor() 装饰器来声明消费者类,如下所示:
import { Processor } from '@nestjs/bullmq';
@Processor('audio')
export class AudioConsumer {}提示
消费者必须注册为 providers,这样 @nestjs/bullmq 包才能识别它们。
其中,装饰器的字符串参数(例如 'audio')是与该类方法相关联的队列名称。
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
let progress = 0;
for (let i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 1;
await job.updateProgress(progress);
}
return {};
}
}当 worker 空闲且队列中有 job 待处理时,就会调用 process 方法。这个处理方法仅接收一个参数,即 job 对象。处理方法返回的值会被存储到 job 对象中,之后可以在别处访问,例如在 completed 事件的监听器中。
Job 对象有多个方法可用于与其状态交互。例如,上面的代码使用 updateProgress() 方法来更新 job 的进度。完整的 Job 对象 API 参考请见这里。
在旧版本 Bull 中,你可以通过将某个 name 传给 @Process() 装饰器,指定某个 job 处理方法仅处理某一类型的 job(即具有特定 name 的 job),如下所示。
警告
这在 BullMQ 中不起作用,继续往下看。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }由于这种行为容易引起混淆,BullMQ 不再支持它。相反,你需要使用 switch 分支,根据不同的 job 名称调用不同的服务或逻辑:
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
switch (job.name) {
case 'transcode': {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 1;
await job.progress(progress);
}
return {};
}
case 'concatenate': {
await doSomeLogic2();
break;
}
}
}
}BullMQ 文档中的 named processor 一节对这一点做了说明。
请求作用域消费者
当一个消费者被标记为请求作用域时(关于注入作用域的更多内容请见这里),将会为每个 job 单独创建一个新的类实例。该实例会在 job 完成后被垃圾回收。
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})由于请求作用域消费者类是动态实例化的,并且作用于单个 job,因此你可以通过标准方式在构造函数中注入一个 JOB_REF。
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef);
}提示
JOB_REF 令牌从 @nestjs/bullmq 包中导入。
事件监听器
当队列和/或 job 状态发生变化时,BullMQ 会生成一组有用的事件。这些事件既可以通过 @OnWorkerEvent(event) 装饰器在 Worker 级别订阅,也可以通过专门的监听器类与 @OnQueueEvent(event) 装饰器在 Queue 级别订阅。
Worker 事件必须声明在消费者类中(也就是使用 @Processor() 装饰的类中)。要监听某个事件,请使用 @OnWorkerEvent(event) 装饰器,并传入你想处理的事件。例如,要监听 audio 队列中某个 job 进入 active 状态时发出的事件,可以使用以下写法:
import { Processor, Process, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer {
@OnWorkerEvent('active')
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
// ...
}你可以在 WorkerListener 中查看完整的事件列表及其参数。
QueueEvent 监听器必须使用 @QueueEventsListener(queue) 装饰器,并继承 @nestjs/bullmq 提供的 QueueEventsHost 类。要监听某个事件,请使用 @OnQueueEvent(event) 装饰器,并传入你想处理的事件。例如,要监听 audio 队列中某个 job 进入 active 状态时发出的事件,可以使用以下写法:
import {
QueueEventsHost,
QueueEventsListener,
OnQueueEvent,
} from '@nestjs/bullmq';
@QueueEventsListener('audio')
export class AudioEventsListener extends QueueEventsHost {
@OnQueueEvent('active')
onActive(job: { jobId: string; prev?: string }) {
console.log(`Processing job ${job.jobId}...`);
}
// ...
}提示
QueueEvent 监听器必须注册为 providers,这样 @nestjs/bullmq 包才能识别它们。
你可以在 QueueEventsListener 中查看完整的事件列表及其参数。
队列管理
队列提供了一组 API,可用于执行管理操作,例如暂停和恢复、获取不同状态下的 job 数量等。完整的队列 API 请见这里。你可以像下面 pause/resume 示例那样,直接在 Queue 对象上调用这些方法。
通过调用 pause() 方法暂停一个队列。被暂停的队列在恢复之前不会处理新的 job,但当前正在处理的 job 会继续执行直到结束。
await audioQueue.pause();要恢复一个已暂停的队列,请使用 resume() 方法,如下所示:
await audioQueue.resume();独立进程
Job 处理器也可以运行在单独的(fork 出来的)进程中(来源)。这样做有几个优点:
- 进程是隔离的,因此即使它崩溃,也不会影响 worker。
- 你可以运行阻塞代码而不影响队列(job 不会卡住)。
- 可以更充分地利用多核 CPU。
- 与 Redis 的连接更少。
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { join } from 'node:path';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}警告
请注意,由于你的函数运行在一个 fork 出来的进程中,因此依赖注入(以及 IoC 容器)将不可用。这意味着你的处理器函数必须自行包含(或创建)它所需的所有外部依赖实例。
异步配置
你可能希望以异步方式而不是静态方式传入 bullmq 选项。在这种情况下,使用 forRootAsync() 方法,它提供了多种处理异步配置的方式。同样,如果你希望以异步方式传入队列选项,请使用 registerQueueAsync() 方法。
一种方式是使用工厂函数:
BullModule.forRootAsync({
useFactory: () => ({
connection: {
host: 'localhost',
port: 6379,
},
}),
});这个工厂函数的行为与任何其他异步提供者一样(例如,它可以是 async 的,并且可以通过 inject 注入依赖)。
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});或者,你也可以使用 useClass 语法:
BullModule.forRootAsync({
useClass: BullConfigService,
});上面的写法会在 BullModule 内部实例化 BullConfigService,并通过调用 createSharedConfiguration() 来提供一个选项对象。注意,这意味着 BullConfigService 必须实现 SharedBullConfigurationFactory 接口,如下所示:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
connection: {
host: 'localhost',
port: 6379,
},
};
}
}为了避免在 BullModule 内部创建 BullConfigService,并改为使用从其他模块导入的 provider,你可以使用 useExisting 语法。
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
});这种写法与 useClass 的工作方式相同,但有一个关键区别:BullModule 会查找导入的模块,以复用现有的 ConfigService,而不是重新实例化一个新的。
同样,如果你希望以异步方式传入队列选项,请使用 registerQueueAsync() 方法,只需记住要在工厂函数外部指定 name 属性。
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});手动注册
默认情况下,BullModule 会在 onModuleInit 生命周期函数中自动注册 BullMQ 组件(队列、处理器和事件监听服务)。但在某些场景下,这种行为可能并不理想。要阻止自动注册,可以像下面这样在 BullModule 中启用 manualRegistration:
BullModule.forRoot({
extraOptions: {
manualRegistration: true,
},
});要手动注册这些组件,请注入 BullRegistrar 并调用 register 函数,理想情况下应在 OnModuleInit 或 OnApplicationBootstrap 中完成。
import { Injectable, OnModuleInit } from '@nestjs/common';
import { BullRegistrar } from '@nestjs/bullmq';
@Injectable()
export class AudioService implements OnModuleInit {
constructor(private bullRegistrar: BullRegistrar) {}
onModuleInit() {
if (yourConditionHere) {
this.bullRegistrar.register();
}
}
}除非你调用 BullRegistrar#register 函数,否则任何 BullMQ 组件都不会生效,也就意味着不会有 job 被处理。
Bull 安装
注意
如果你决定使用 BullMQ,请跳过本节以及后续章节。
要开始使用 Bull,我们先安装所需依赖。
$ npm install --save @nestjs/bull bull安装完成后,我们可以将 BullModule 导入根 AppModule。
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}forRoot() 方法用于注册一个 bull 包的配置对象,应用中注册的所有队列都会使用该配置对象(除非另有指定)。配置对象由以下属性组成:
limiter: RateLimiter- 控制队列中 job 处理速率的选项。更多信息见 RateLimiter。可选。redis: RedisOpts- 配置 Redis 连接的选项。更多信息见 RedisOpts。可选。prefix: string- 所有队列 key 的前缀。可选。defaultJobOptions: JobOpts- 用于控制新 job 默认设置的选项。更多信息见 JobOpts。可选。注意:如果你通过FlowProducer调度 job,这些选项不会生效。原因见 bullmq#1034。settings: AdvancedSettings- 高级队列配置项。通常不应该修改。更多信息见 AdvancedSettings。可选。
所有选项都是可选的,它们提供了对队列行为的细粒度控制。这些配置会被直接传递给 Bull 的 Queue 构造函数。关于这些选项的更多内容请查看这里。
要注册一个队列,导入 BullModule.registerQueue() 动态模块,如下所示:
BullModule.registerQueue({
name: 'audio',
});提示
通过向 registerQueue() 方法传入多个以逗号分隔的配置对象,可以创建多个队列。
registerQueue() 方法用于实例化和/或注册队列。连接到同一个底层 Redis 数据库且使用相同凭据的模块与进程之间会共享这些队列。每个队列通过其 name 属性保持唯一。队列名称既被用作注入令牌(用于在控制器/提供者中注入队列),也被用作装饰器参数,以便将消费者类和监听器与队列关联起来。
你也可以为特定队列覆盖部分预配置选项,如下所示:
BullModule.registerQueue({
name: 'audio',
redis: {
port: 6380,
},
});由于 job 会持久化到 Redis 中,因此每当某个具名队列被实例化时(例如应用启动/重启时),它都会尝试处理上一轮未完成会话中可能遗留的旧 job。
每个队列可以拥有一个或多个生产者、消费者和监听器。消费者会按照特定顺序从队列中获取 job:FIFO(默认)、LIFO,或者按优先级处理。如何控制队列的处理顺序将在这里讨论。
命名配置
如果你的队列连接多个 Redis 实例,可以使用一种称为命名配置的技术。该特性允许你在指定 key 下注册多个配置,之后你就可以在队列选项中引用这些配置。
例如,假设你有一个附加的 Redis 实例(除了默认实例外),被应用中注册的若干队列使用,你可以像下面这样注册它的配置:
BullModule.forRoot('alternative-config', {
redis: {
port: 6381,
},
});在上面的例子中,'alternative-config' 只是一个配置 key(它可以是任意字符串)。
完成后,你就可以在 registerQueue() 的选项对象中指向这份配置:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
});生产者
Job 生产者负责向队列中添加 job。生产者通常是应用服务(Nest providers)。要向队列中添加 job,首先像下面这样将队列注入到服务中:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}提示
@InjectQueue() 装饰器通过队列名称来标识队列,这个名称与 registerQueue() 方法调用中提供的名称一致(例如 'audio')。
现在,通过调用队列的 add() 方法来添加一个 job,并传入一个用户定义的 job 对象。Job 使用可序列化的 JavaScript 对象表示(因为它们就是这样存储在 Redis 数据库中的)。你传入的 job 形状是任意的;请用它表达你的 job 语义。
const job = await this.audioQueue.add({
foo: 'bar',
});命名 job
Job 可以具有唯一名称。这让你可以创建专门的消费者,只处理某个给定名称的 job。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});警告
使用命名 job 时,你必须为加入队列的每一个唯一名称创建处理器,否则队列会提示缺少该 job 对应的处理器。关于如何消费命名 job 的更多信息,参见这里。
Job 选项
Job 可以附带额外选项。在 Queue.add() 方法中,于 job 参数之后传入一个选项对象。Job 选项属性包括:
priority:number- 可选的优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。注意,使用优先级会对性能产生轻微影响,因此请谨慎使用。delay:number- 在该 job 可被处理之前需要等待的时间(毫秒)。注意,为了保证延迟精确,服务端和客户端的时钟应保持同步。attempts:number- 在 job 完成前允许重试的总次数。repeat:RepeatOpts- 按 cron 规则重复执行 job。参见 RepeatOpts。backoff:number | BackoffOpts- job 失败时自动重试所使用的退避设置。参见 BackoffOpts。lifo:boolean- 如果为 true,则将 job 添加到队列右侧而不是左侧(默认 false)。timeout:number- job 在超出该毫秒数后将因超时错误而失败。jobId:number|string- 覆盖 job ID。默认情况下,job ID 是一个唯一整数,但你可以通过此设置覆盖它。如果使用这个选项,你需要自行确保jobId的唯一性。如果尝试添加一个已存在 ID 的 job,它将不会被加入。removeOnComplete:boolean | number- 如果为 true,则在 job 成功完成后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 completed 集合中。removeOnFail:boolean | number- 如果为 true,则在 job 在所有重试都失败后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 failed 集合中。stackTraceLimit:number- 限制会被记录到 stacktrace 中的堆栈行数。
下面是一些使用 job 选项自定义 job 的示例。
要延迟 job 的启动时间,请使用 delay 配置属性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 }, // 延迟 3 秒
);要将一个 job 添加到队列右侧(以 LIFO(后进先出)方式处理),请将配置对象中的 lifo 属性设置为 true。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true },
);要为 job 设置优先级,请使用 priority 属性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 },
);消费者
消费者是一个类,其方法要么处理添加到队列中的 job,要么监听队列上的事件,或者两者兼有。使用 @Processor() 装饰器来声明消费者类,如下所示:
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}提示
消费者必须注册为 providers,这样 @nestjs/bull 包才能识别它们。
其中,装饰器的字符串参数(例如 'audio')是与该类方法相关联的队列名称。
在消费者类内部,使用 @Process() 装饰器修饰处理方法,以声明 job 处理器。
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (let i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 1;
await job.progress(progress);
}
return {};
}
}当 worker 空闲且队列中有 job 待处理时,就会调用被装饰的方法(例如 transcode())。这个处理方法仅接收一个参数,即 job 对象。处理方法返回的值会被存储到 job 对象中,之后可以在别处访问,例如在 completed 事件的监听器中。
Job 对象有多个方法可用于与其状态交互。例如,上面的代码使用 progress() 方法来更新 job 的进度。完整的 Job 对象 API 参考请见这里。
你可以通过向 @Process() 装饰器传入某个 name,指定某个 job 处理方法仅处理某一类型的 job(即具有特定 name 的 job),如下所示。你可以在同一个消费者类中拥有多个 @Process() 处理器,对应不同的 job 类型(name)。当你使用命名 job 时,请确保每个名称都存在对应的处理器。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }警告
当为同一个队列定义多个消费者时,@Process({ concurrency: 1 }) 中的 concurrency 选项不会生效。最小 concurrency 将与定义的消费者数量一致。即便 @Process() 处理器使用了不同的 name 来处理命名 job,这一点同样成立。
请求作用域消费者
当一个消费者被标记为请求作用域时(关于注入作用域的更多内容请见这里),将会为每个 job 单独创建一个新的类实例。该实例会在 job 完成后被垃圾回收。
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})由于请求作用域消费者类是动态实例化的,并且作用于单个 job,因此你可以通过标准方式在构造函数中注入一个 JOB_REF。
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef);
}提示
JOB_REF 令牌从 @nestjs/bull 包中导入。
事件监听器
当队列和/或 job 状态发生变化时,Bull 会生成一组有用的事件。Nest 提供了一组装饰器,用于订阅一组核心标准事件。这些装饰器由 @nestjs/bull 包导出。
事件监听器必须声明在消费者类中(也就是使用 @Processor() 装饰的类中)。要监听某个事件,请使用下表中的某个装饰器来声明该事件的处理器。例如,要监听 audio 队列中某个 job 进入 active 状态时发出的事件,可以使用以下写法:
import { Processor, Process, OnQueueActive } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
...由于 Bull 运行在分布式(多节点)环境中,因此它定义了事件局部性(event locality)的概念。这个概念认为,事件既可能完全在单个进程中触发,也可能在由不同进程共享的队列上触发。**本地(local)**事件,是指在本地进程中的某个队列上,由某个动作或状态变化触发而产生的事件。换句话说,当你的事件生产者和消费者都位于同一个进程中时,队列上发生的所有事件都是本地事件。
当一个队列在多个进程之间共享时,就会出现**全局(global)**事件的可能性。如果一个进程中的监听器要接收到另一个进程触发的事件通知,它就必须注册为全局事件监听器。
每当对应事件被发出时,事件处理器就会被调用。处理器会以如下表所示的签名被调用,从而让你获取与该事件相关的信息。下面我们还会讨论本地与全局事件处理器签名之间的一个关键差异。
| 本地事件监听器 | 全局事件监听器 | 处理器方法签名 / 触发时机 |
|---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 发生错误。error 包含触发该事件的错误。 |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - 某个 Job 正在等待处理,一旦 worker 空闲就会处理。jobId 包含进入该状态的 job 的 id。 |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job) - Job job 已开始执行。 |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job) - Job job 已被标记为 stalled。这对于调试崩溃或阻塞事件循环的 job worker 很有帮助。 |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number) - Job job 的进度已更新为 progress。 |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) Job job 已成功完成,并返回结果 result。 |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error) Job job 失败,原因是 err。 |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler() 队列已暂停。 |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job) 队列已恢复。 |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 队列中的旧 job 已被清理。jobs 是被清理的 job 数组,type 是被清理的 job 类型。 |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler() 每当队列处理完所有等待中的 job 时触发(即使仍可能存在尚未处理的延迟 job)。 |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job) Job job 已成功移除。 |
监听全局事件时,方法签名可能与本地版本略有不同。具体来说,凡是在本地版本中接收 job 对象的方法签名,在全局版本中接收到的将是 jobId(number)。在这种情况下,如果要获取真实的 job 对象引用,请使用 Queue#getJob 方法。这个调用需要 await,因此处理器也应声明为 async。例如:
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}提示
要访问 Queue 对象(从而调用 getJob()),你当然需要先注入它。同时,这个 Queue 也必须在你进行注入的模块中完成注册。
除了这些特定的事件监听装饰器之外,你还可以将通用的 @OnQueueEvent() 装饰器与 BullQueueEvents 或 BullQueueGlobalEvents 枚举配合使用。关于事件的更多内容见这里。
队列管理
队列拥有一组 API,可用于执行管理操作,例如暂停和恢复、获取不同状态下的 job 数量等。完整的队列 API 请见这里。你可以像下面 pause/resume 示例那样,直接在 Queue 对象上调用这些方法。
通过调用 pause() 方法暂停一个队列。被暂停的队列在恢复之前不会处理新的 job,但当前正在处理的 job 会继续执行直到结束。
await audioQueue.pause();要恢复一个已暂停的队列,请使用 resume() 方法,如下所示:
await audioQueue.resume();独立进程
Job 处理器也可以运行在单独的(fork 出来的)进程中(来源)。这样做有几个优点:
- 进程是隔离的,因此即使它崩溃,也不会影响 worker。
- 你可以运行阻塞代码而不影响队列(job 不会卡住)。
- 可以更充分地利用多核 CPU。
- 与 Redis 的连接更少。
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { join } from 'path';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}请注意,由于你的函数运行在一个 fork 出来的进程中,因此依赖注入(以及 IoC 容器)将不可用。这意味着你的处理器函数必须自行包含(或创建)它所需的所有外部依赖实例。
// processor.ts
import { Job, DoneCallback } from 'bull';
export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
cb(null, 'It works');
}异步配置
你可能希望以异步方式而不是静态方式传入 bull 选项。在这种情况下,使用 forRootAsync() 方法,它提供了多种处理异步配置的方式。
一种方式是使用工厂函数:
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});这个工厂函数的行为与任何其他异步提供者一样(例如,它可以是 async 的,并且可以通过 inject 注入依赖)。
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});或者,你也可以使用 useClass 语法:
BullModule.forRootAsync({
useClass: BullConfigService,
});上面的写法会在 BullModule 内部实例化 BullConfigService,并通过调用 createSharedConfiguration() 来提供一个选项对象。注意,这意味着 BullConfigService 必须实现 SharedBullConfigurationFactory 接口,如下所示:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}为了避免在 BullModule 内部创建 BullConfigService,并改为使用从其他模块导入的 provider,你可以使用 useExisting 语法。
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
});这种写法与 useClass 的工作方式相同,但有一个关键区别:BullModule 会查找导入的模块,以复用现有的 ConfigService,而不是重新实例化一个新的。
同样,如果你希望以异步方式传入队列选项,请使用 registerQueueAsync() 方法,只需记住要在工厂函数外部指定 name 属性。
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});示例
一个可运行的示例见这里。