Skip to content

队列

队列是一种强大的设计模式,可以帮助你应对应用扩展性和性能方面的常见挑战。队列可以帮助你解决的一些问题包括:

  • 平滑处理高峰流量。例如,如果用户可以在任意时间发起资源密集型任务,你可以将这些任务加入队列,而不是同步执行。随后,你可以让工作进程以可控的方式从队列中拉取任务。随着应用规模增长,你可以很容易地增加新的队列消费者,以扩展后端任务处理能力。
  • 拆分那些可能会阻塞 Node.js 事件循环的单体任务。例如,如果某个用户请求需要进行音频转码这类 CPU 密集型工作,你可以将该任务委托给其他进程处理,从而让面向用户的进程保持响应。
  • 在不同服务之间提供可靠的通信通道。例如,你可以在一个进程或服务中将任务(job)放入队列,并在另一个进程或服务中消费它们。你可以从任何进程或服务中通过监听状态事件,在 job 生命周期中获知其完成、错误或其他状态变化。当队列生产者或消费者发生故障时,它们的状态会被保留,节点重启后任务处理也可以自动恢复。

Nest 提供了 @nestjs/bullmq 包用于集成 BullMQ,也提供了 @nestjs/bull 包用于集成 Bull。这两个包都是各自底层库之上的抽象/封装,而这两个底层库都由同一团队开发。Bull 当前处于维护模式,团队主要专注于修复 bug;BullMQ 则在积极开发中,采用现代化的 TypeScript 实现,并提供了一组不同的特性。如果 Bull 已经满足你的需求,它依然是一个可靠且经过实战考验的选择。Nest 提供的这些包使你能够以友好的方式,在 Nest 应用中集成 BullMQ 或 Bull 队列。

BullMQ 和 Bull 都使用 Redis 来持久化 job 数据,因此你需要在系统中安装 Redis。由于它们以 Redis 为后端,你的队列架构可以是完全分布式且与平台无关的。例如,你可以让一些队列生产者消费者以及监听器运行在一个(或多个)节点上的 Nest 中,同时让其他生产者、消费者和监听器运行在其他网络节点上的 Node.js 平台中。

本章会介绍 @nestjs/bullmq@nestjs/bull 包。我们也建议你阅读 BullMQBull 的文档,以获取更多背景知识和特定实现细节。

BullMQ 安装

要开始使用 BullMQ,我们先安装所需依赖。

bash
$ npm install --save @nestjs/bullmq bullmq

安装完成后,我们可以将 BullModule 导入根 AppModule

typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}

forRoot() 方法用于注册一个 bullmq 包的配置对象,应用中注册的所有队列都会使用该配置对象(除非另有指定)。供参考,配置对象中包含的部分属性如下:

  • connection: ConnectionOptions - 用于配置 Redis 连接的选项。更多信息见 Connections。可选。
  • prefix: string - 所有队列 key 的前缀。可选。
  • defaultJobOptions: JobOpts - 用于控制新 job 默认设置的选项。更多信息见 JobOpts。可选。
  • settings: AdvancedSettings - 高级队列配置项。通常不应该修改。更多信息见 AdvancedSettings。可选。
  • extraOptions - 模块初始化的额外选项。参见 手动注册

所有选项都是可选的,它们提供了对队列行为的细粒度控制。这些配置会被直接传递给 BullMQ 的 Queue 构造函数。关于这些选项以及其他选项的更多内容,请查看这里

要注册一个队列,导入 BullModule.registerQueue() 动态模块,如下所示:

typescript
BullModule.registerQueue({
  name: 'audio',
});

提示

通过向 registerQueue() 方法传入多个以逗号分隔的配置对象,可以创建多个队列。

registerQueue() 方法用于实例化和/或注册队列。连接到同一个底层 Redis 数据库且使用相同凭据的模块与进程之间会共享这些队列。每个队列通过其 name 属性保持唯一。队列名称既被用作注入令牌(用于在控制器/提供者中注入队列),也被用作装饰器参数,以便将消费者类和监听器与队列关联起来。

你也可以为特定队列覆盖部分预配置选项,如下所示:

typescript
BullModule.registerQueue({
  name: 'audio',
  connection: {
    port: 6380,
  },
});

BullMQ 还支持 job 之间的父子关系。这项能力允许你创建流程(flow),其中 job 可以构成任意深度树结构中的节点。更多内容请查看这里

要添加一个 flow,可以这样做:

typescript
BullModule.registerFlowProducer({
  name: 'flowProducerName',
});

由于 job 会持久化到 Redis 中,因此每当某个具名队列被实例化时(例如应用启动/重启时),它都会尝试处理上一轮未完成会话中可能遗留的旧 job。

每个队列可以拥有一个或多个生产者、消费者和监听器。消费者会按照特定顺序从队列中获取 job:FIFO(默认)、LIFO,或者按优先级处理。如何控制队列的处理顺序将在这里讨论。

命名配置

如果你的队列需要连接多个不同的 Redis 实例,可以使用一种称为命名配置的技术。该特性允许你在指定 key 下注册多个配置,之后你就可以在队列选项中引用这些配置。

例如,假设你有一个附加的 Redis 实例(除了默认实例外),被应用中注册的若干队列使用,你可以像下面这样注册它的配置:

typescript
BullModule.forRoot('alternative-config', {
  connection: {
    port: 6381,
  },
});

在上面的例子中,'alternative-config' 只是一个配置 key(它可以是任意字符串)。

完成后,你就可以在 registerQueue() 的选项对象中指向这份配置:

typescript
BullModule.registerQueue({
  configKey: 'alternative-config',
  name: 'video',
});

生产者

Job 生产者负责向队列中添加 job。生产者通常是应用服务(Nest providers)。要向队列中添加 job,首先像下面这样将队列注入到服务中:

typescript
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}

提示

@InjectQueue() 装饰器通过队列名称来标识队列,这个名称与 registerQueue() 方法调用中提供的名称一致(例如 'audio')。

现在,通过调用队列的 add() 方法来添加一个 job,并传入一个用户定义的 job 对象。Job 使用可序列化的 JavaScript 对象表示(因为它们就是这样存储在 Redis 数据库中的)。你传入的 job 形状是任意的;请用它表达你的 job 语义。你还需要为它指定一个名称。这让你可以创建专门的消费者,只处理某个给定名称的 job。

typescript
const job = await this.audioQueue.add('transcode', {
  foo: 'bar',
});

Job 选项

Job 可以附带额外选项。在 Queue.add() 方法中,于 job 参数之后传入一个选项对象。部分 job 选项属性如下:

  • priority: number - 可选的优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。注意,使用优先级会对性能产生轻微影响,因此请谨慎使用。
  • delay: number - 在该 job 可被处理之前需要等待的时间(毫秒)。注意,为了保证延迟精确,服务端和客户端的时钟应保持同步。
  • attempts: number - 在 job 完成前允许重试的总次数。
  • repeat: RepeatOpts - 按 cron 规则重复执行 job。参见 RepeatOpts
  • backoff: number | BackoffOpts - job 失败时自动重试所使用的退避设置。参见 BackoffOpts
  • lifo: boolean - 如果为 true,则将 job 添加到队列右侧而不是左侧(默认 false)。
  • jobId: number | string - 覆盖 job ID。默认情况下,job ID 是一个唯一整数,但你可以通过此设置覆盖它。如果使用这个选项,你需要自行确保 jobId 的唯一性。如果尝试添加一个已存在 ID 的 job,它将不会被加入。
  • removeOnComplete: boolean | number - 如果为 true,则在 job 成功完成后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 completed 集合中。
  • removeOnFail: boolean | number - 如果为 true,则在 job 在所有重试都失败后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 failed 集合中。
  • stackTraceLimit: number - 限制会被记录到 stacktrace 中的堆栈行数。

下面是一些使用 job 选项自定义 job 的示例。

要延迟 job 的启动时间,请使用 delay 配置属性。

typescript
const job = await this.audioQueue.add(
  'transcode',
  {
    foo: 'bar',
  },
  { delay: 3000 }, // 延迟 3 秒
);

要将一个 job 添加到队列右侧(以 LIFO(后进先出)方式处理),请将配置对象中的 lifo 属性设置为 true

typescript
const job = await this.audioQueue.add(
  'transcode',
  {
    foo: 'bar',
  },
  { lifo: true },
);

要为 job 设置优先级,请使用 priority 属性。

typescript
const job = await this.audioQueue.add(
  'transcode',
  {
    foo: 'bar',
  },
  { priority: 2 },
);

完整选项列表请查看 API 文档:这里这里

消费者

消费者是一个,其方法要么处理添加到队列中的 job,要么监听队列上的事件,或者两者兼有。使用 @Processor() 装饰器来声明消费者类,如下所示:

typescript
import { Processor } from '@nestjs/bullmq';

@Processor('audio')
export class AudioConsumer {}

提示

消费者必须注册为 providers,这样 @nestjs/bullmq 包才能识别它们。

其中,装饰器的字符串参数(例如 'audio')是与该类方法相关联的队列名称。

typescript
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('audio')
export class AudioConsumer extends WorkerHost {
  async process(job: Job<any, any, string>): Promise<any> {
    let progress = 0;
    for (let i = 0; i < 100; i++) {
      await doSomething(job.data);
      progress += 1;
      await job.updateProgress(progress);
    }
    return {};
  }
}

当 worker 空闲且队列中有 job 待处理时,就会调用 process 方法。这个处理方法仅接收一个参数,即 job 对象。处理方法返回的值会被存储到 job 对象中,之后可以在别处访问,例如在 completed 事件的监听器中。

Job 对象有多个方法可用于与其状态交互。例如,上面的代码使用 updateProgress() 方法来更新 job 的进度。完整的 Job 对象 API 参考请见这里

在旧版本 Bull 中,你可以通过将某个 name 传给 @Process() 装饰器,指定某个 job 处理方法处理某一类型的 job(即具有特定 name 的 job),如下所示。

警告

这在 BullMQ 中不起作用,继续往下看。

typescript
@Process('transcode')
async transcode(job: Job<unknown>) { ... }

由于这种行为容易引起混淆,BullMQ 不再支持它。相反,你需要使用 switch 分支,根据不同的 job 名称调用不同的服务或逻辑:

typescript
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('audio')
export class AudioConsumer extends WorkerHost {
  async process(job: Job<any, any, string>): Promise<any> {
    switch (job.name) {
      case 'transcode': {
        let progress = 0;
        for (i = 0; i < 100; i++) {
          await doSomething(job.data);
          progress += 1;
          await job.progress(progress);
        }
        return {};
      }
      case 'concatenate': {
        await doSomeLogic2();
        break;
      }
    }
  }
}

BullMQ 文档中的 named processor 一节对这一点做了说明。

请求作用域消费者

当一个消费者被标记为请求作用域时(关于注入作用域的更多内容请见这里),将会为每个 job 单独创建一个新的类实例。该实例会在 job 完成后被垃圾回收。

typescript
@Processor({
  name: 'audio',
  scope: Scope.REQUEST,
})

由于请求作用域消费者类是动态实例化的,并且作用于单个 job,因此你可以通过标准方式在构造函数中注入一个 JOB_REF

typescript
constructor(@Inject(JOB_REF) jobRef: Job) {
  console.log(jobRef);
}

提示

JOB_REF 令牌从 @nestjs/bullmq 包中导入。

事件监听器

当队列和/或 job 状态发生变化时,BullMQ 会生成一组有用的事件。这些事件既可以通过 @OnWorkerEvent(event) 装饰器在 Worker 级别订阅,也可以通过专门的监听器类与 @OnQueueEvent(event) 装饰器在 Queue 级别订阅。

Worker 事件必须声明在消费者类中(也就是使用 @Processor() 装饰的类中)。要监听某个事件,请使用 @OnWorkerEvent(event) 装饰器,并传入你想处理的事件。例如,要监听 audio 队列中某个 job 进入 active 状态时发出的事件,可以使用以下写法:

typescript
import { Processor, Process, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('audio')
export class AudioConsumer {
  @OnWorkerEvent('active')
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }

  // ...
}

你可以在 WorkerListener 中查看完整的事件列表及其参数。

QueueEvent 监听器必须使用 @QueueEventsListener(queue) 装饰器,并继承 @nestjs/bullmq 提供的 QueueEventsHost 类。要监听某个事件,请使用 @OnQueueEvent(event) 装饰器,并传入你想处理的事件。例如,要监听 audio 队列中某个 job 进入 active 状态时发出的事件,可以使用以下写法:

typescript
import {
  QueueEventsHost,
  QueueEventsListener,
  OnQueueEvent,
} from '@nestjs/bullmq';

@QueueEventsListener('audio')
export class AudioEventsListener extends QueueEventsHost {
  @OnQueueEvent('active')
  onActive(job: { jobId: string; prev?: string }) {
    console.log(`Processing job ${job.jobId}...`);
  }

  // ...
}

提示

QueueEvent 监听器必须注册为 providers,这样 @nestjs/bullmq 包才能识别它们。

你可以在 QueueEventsListener 中查看完整的事件列表及其参数。

队列管理

队列提供了一组 API,可用于执行管理操作,例如暂停和恢复、获取不同状态下的 job 数量等。完整的队列 API 请见这里。你可以像下面 pause/resume 示例那样,直接在 Queue 对象上调用这些方法。

通过调用 pause() 方法暂停一个队列。被暂停的队列在恢复之前不会处理新的 job,但当前正在处理的 job 会继续执行直到结束。

typescript
await audioQueue.pause();

要恢复一个已暂停的队列,请使用 resume() 方法,如下所示:

typescript
await audioQueue.resume();

独立进程

Job 处理器也可以运行在单独的(fork 出来的)进程中(来源)。这样做有几个优点:

  • 进程是隔离的,因此即使它崩溃,也不会影响 worker。
  • 你可以运行阻塞代码而不影响队列(job 不会卡住)。
  • 可以更充分地利用多核 CPU。
  • 与 Redis 的连接更少。
typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { join } from 'node:path';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
      processors: [join(__dirname, 'processor.js')],
    }),
  ],
})
export class AppModule {}

警告

请注意,由于你的函数运行在一个 fork 出来的进程中,因此依赖注入(以及 IoC 容器)将不可用。这意味着你的处理器函数必须自行包含(或创建)它所需的所有外部依赖实例。

异步配置

你可能希望以异步方式而不是静态方式传入 bullmq 选项。在这种情况下,使用 forRootAsync() 方法,它提供了多种处理异步配置的方式。同样,如果你希望以异步方式传入队列选项,请使用 registerQueueAsync() 方法。

一种方式是使用工厂函数:

typescript
BullModule.forRootAsync({
  useFactory: () => ({
    connection: {
      host: 'localhost',
      port: 6379,
    },
  }),
});

这个工厂函数的行为与任何其他异步提供者一样(例如,它可以是 async 的,并且可以通过 inject 注入依赖)。

typescript
BullModule.forRootAsync({
  imports: [ConfigModule],
  useFactory: async (configService: ConfigService) => ({
    connection: {
      host: configService.get('QUEUE_HOST'),
      port: configService.get('QUEUE_PORT'),
    },
  }),
  inject: [ConfigService],
});

或者,你也可以使用 useClass 语法:

typescript
BullModule.forRootAsync({
  useClass: BullConfigService,
});

上面的写法会在 BullModule 内部实例化 BullConfigService,并通过调用 createSharedConfiguration() 来提供一个选项对象。注意,这意味着 BullConfigService 必须实现 SharedBullConfigurationFactory 接口,如下所示:

typescript
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
  createSharedConfiguration(): BullModuleOptions {
    return {
      connection: {
        host: 'localhost',
        port: 6379,
      },
    };
  }
}

为了避免在 BullModule 内部创建 BullConfigService,并改为使用从其他模块导入的 provider,你可以使用 useExisting 语法。

typescript
BullModule.forRootAsync({
  imports: [ConfigModule],
  useExisting: ConfigService,
});

这种写法与 useClass 的工作方式相同,但有一个关键区别:BullModule 会查找导入的模块,以复用现有的 ConfigService,而不是重新实例化一个新的。

同样,如果你希望以异步方式传入队列选项,请使用 registerQueueAsync() 方法,只需记住要在工厂函数外部指定 name 属性。

typescript
BullModule.registerQueueAsync({
  name: 'audio',
  useFactory: () => ({
    redis: {
      host: 'localhost',
      port: 6379,
    },
  }),
});

手动注册

默认情况下,BullModule 会在 onModuleInit 生命周期函数中自动注册 BullMQ 组件(队列、处理器和事件监听服务)。但在某些场景下,这种行为可能并不理想。要阻止自动注册,可以像下面这样在 BullModule 中启用 manualRegistration

typescript
BullModule.forRoot({
  extraOptions: {
    manualRegistration: true,
  },
});

要手动注册这些组件,请注入 BullRegistrar 并调用 register 函数,理想情况下应在 OnModuleInitOnApplicationBootstrap 中完成。

typescript
import { Injectable, OnModuleInit } from '@nestjs/common';
import { BullRegistrar } from '@nestjs/bullmq';

@Injectable()
export class AudioService implements OnModuleInit {
  constructor(private bullRegistrar: BullRegistrar) {}

  onModuleInit() {
    if (yourConditionHere) {
      this.bullRegistrar.register();
    }
  }
}

除非你调用 BullRegistrar#register 函数,否则任何 BullMQ 组件都不会生效,也就意味着不会有 job 被处理。

Bull 安装

注意

如果你决定使用 BullMQ,请跳过本节以及后续章节。

要开始使用 Bull,我们先安装所需依赖。

bash
$ npm install --save @nestjs/bull bull

安装完成后,我们可以将 BullModule 导入根 AppModule

typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}

forRoot() 方法用于注册一个 bull 包的配置对象,应用中注册的所有队列都会使用该配置对象(除非另有指定)。配置对象由以下属性组成:

  • limiter: RateLimiter - 控制队列中 job 处理速率的选项。更多信息见 RateLimiter。可选。
  • redis: RedisOpts - 配置 Redis 连接的选项。更多信息见 RedisOpts。可选。
  • prefix: string - 所有队列 key 的前缀。可选。
  • defaultJobOptions: JobOpts - 用于控制新 job 默认设置的选项。更多信息见 JobOpts。可选。注意:如果你通过 FlowProducer 调度 job,这些选项不会生效。原因见 bullmq#1034
  • settings: AdvancedSettings - 高级队列配置项。通常不应该修改。更多信息见 AdvancedSettings。可选。

所有选项都是可选的,它们提供了对队列行为的细粒度控制。这些配置会被直接传递给 Bull 的 Queue 构造函数。关于这些选项的更多内容请查看这里

要注册一个队列,导入 BullModule.registerQueue() 动态模块,如下所示:

typescript
BullModule.registerQueue({
  name: 'audio',
});

提示

通过向 registerQueue() 方法传入多个以逗号分隔的配置对象,可以创建多个队列。

registerQueue() 方法用于实例化和/或注册队列。连接到同一个底层 Redis 数据库且使用相同凭据的模块与进程之间会共享这些队列。每个队列通过其 name 属性保持唯一。队列名称既被用作注入令牌(用于在控制器/提供者中注入队列),也被用作装饰器参数,以便将消费者类和监听器与队列关联起来。

你也可以为特定队列覆盖部分预配置选项,如下所示:

typescript
BullModule.registerQueue({
  name: 'audio',
  redis: {
    port: 6380,
  },
});

由于 job 会持久化到 Redis 中,因此每当某个具名队列被实例化时(例如应用启动/重启时),它都会尝试处理上一轮未完成会话中可能遗留的旧 job。

每个队列可以拥有一个或多个生产者、消费者和监听器。消费者会按照特定顺序从队列中获取 job:FIFO(默认)、LIFO,或者按优先级处理。如何控制队列的处理顺序将在这里讨论。

命名配置

如果你的队列连接多个 Redis 实例,可以使用一种称为命名配置的技术。该特性允许你在指定 key 下注册多个配置,之后你就可以在队列选项中引用这些配置。

例如,假设你有一个附加的 Redis 实例(除了默认实例外),被应用中注册的若干队列使用,你可以像下面这样注册它的配置:

typescript
BullModule.forRoot('alternative-config', {
  redis: {
    port: 6381,
  },
});

在上面的例子中,'alternative-config' 只是一个配置 key(它可以是任意字符串)。

完成后,你就可以在 registerQueue() 的选项对象中指向这份配置:

typescript
BullModule.registerQueue({
  configKey: 'alternative-config',
  name: 'video',
});

生产者

Job 生产者负责向队列中添加 job。生产者通常是应用服务(Nest providers)。要向队列中添加 job,首先像下面这样将队列注入到服务中:

typescript
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}

提示

@InjectQueue() 装饰器通过队列名称来标识队列,这个名称与 registerQueue() 方法调用中提供的名称一致(例如 'audio')。

现在,通过调用队列的 add() 方法来添加一个 job,并传入一个用户定义的 job 对象。Job 使用可序列化的 JavaScript 对象表示(因为它们就是这样存储在 Redis 数据库中的)。你传入的 job 形状是任意的;请用它表达你的 job 语义。

typescript
const job = await this.audioQueue.add({
  foo: 'bar',
});

命名 job

Job 可以具有唯一名称。这让你可以创建专门的消费者,只处理某个给定名称的 job。

typescript
const job = await this.audioQueue.add('transcode', {
  foo: 'bar',
});

警告

使用命名 job 时,你必须为加入队列的每一个唯一名称创建处理器,否则队列会提示缺少该 job 对应的处理器。关于如何消费命名 job 的更多信息,参见这里

Job 选项

Job 可以附带额外选项。在 Queue.add() 方法中,于 job 参数之后传入一个选项对象。Job 选项属性包括:

  • priority: number - 可选的优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。注意,使用优先级会对性能产生轻微影响,因此请谨慎使用。
  • delay: number - 在该 job 可被处理之前需要等待的时间(毫秒)。注意,为了保证延迟精确,服务端和客户端的时钟应保持同步。
  • attempts: number - 在 job 完成前允许重试的总次数。
  • repeat: RepeatOpts - 按 cron 规则重复执行 job。参见 RepeatOpts
  • backoff: number | BackoffOpts - job 失败时自动重试所使用的退避设置。参见 BackoffOpts
  • lifo: boolean - 如果为 true,则将 job 添加到队列右侧而不是左侧(默认 false)。
  • timeout: number - job 在超出该毫秒数后将因超时错误而失败。
  • jobId: number | string - 覆盖 job ID。默认情况下,job ID 是一个唯一整数,但你可以通过此设置覆盖它。如果使用这个选项,你需要自行确保 jobId 的唯一性。如果尝试添加一个已存在 ID 的 job,它将不会被加入。
  • removeOnComplete: boolean | number - 如果为 true,则在 job 成功完成后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 completed 集合中。
  • removeOnFail: boolean | number - 如果为 true,则在 job 在所有重试都失败后移除它。传入数字则表示要保留的 job 数量。默认行为是将 job 保留在 failed 集合中。
  • stackTraceLimit: number - 限制会被记录到 stacktrace 中的堆栈行数。

下面是一些使用 job 选项自定义 job 的示例。

要延迟 job 的启动时间,请使用 delay 配置属性。

typescript
const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { delay: 3000 }, // 延迟 3 秒
);

要将一个 job 添加到队列右侧(以 LIFO(后进先出)方式处理),请将配置对象中的 lifo 属性设置为 true

typescript
const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { lifo: true },
);

要为 job 设置优先级,请使用 priority 属性。

typescript
const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { priority: 2 },
);

消费者

消费者是一个,其方法要么处理添加到队列中的 job,要么监听队列上的事件,或者两者兼有。使用 @Processor() 装饰器来声明消费者类,如下所示:

typescript
import { Processor } from '@nestjs/bull';

@Processor('audio')
export class AudioConsumer {}

提示

消费者必须注册为 providers,这样 @nestjs/bull 包才能识别它们。

其中,装饰器的字符串参数(例如 'audio')是与该类方法相关联的队列名称。

在消费者类内部,使用 @Process() 装饰器修饰处理方法,以声明 job 处理器。

typescript
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {
  @Process()
  async transcode(job: Job<unknown>) {
    let progress = 0;
    for (let i = 0; i < 100; i++) {
      await doSomething(job.data);
      progress += 1;
      await job.progress(progress);
    }
    return {};
  }
}

当 worker 空闲且队列中有 job 待处理时,就会调用被装饰的方法(例如 transcode())。这个处理方法仅接收一个参数,即 job 对象。处理方法返回的值会被存储到 job 对象中,之后可以在别处访问,例如在 completed 事件的监听器中。

Job 对象有多个方法可用于与其状态交互。例如,上面的代码使用 progress() 方法来更新 job 的进度。完整的 Job 对象 API 参考请见这里

你可以通过向 @Process() 装饰器传入某个 name,指定某个 job 处理方法处理某一类型的 job(即具有特定 name 的 job),如下所示。你可以在同一个消费者类中拥有多个 @Process() 处理器,对应不同的 job 类型(name)。当你使用命名 job 时,请确保每个名称都存在对应的处理器。

typescript
@Process('transcode')
async transcode(job: Job<unknown>) { ... }

警告

当为同一个队列定义多个消费者时,@Process({ concurrency: 1 }) 中的 concurrency 选项不会生效。最小 concurrency 将与定义的消费者数量一致。即便 @Process() 处理器使用了不同的 name 来处理命名 job,这一点同样成立。

请求作用域消费者

当一个消费者被标记为请求作用域时(关于注入作用域的更多内容请见这里),将会为每个 job 单独创建一个新的类实例。该实例会在 job 完成后被垃圾回收。

typescript
@Processor({
  name: 'audio',
  scope: Scope.REQUEST,
})

由于请求作用域消费者类是动态实例化的,并且作用于单个 job,因此你可以通过标准方式在构造函数中注入一个 JOB_REF

typescript
constructor(@Inject(JOB_REF) jobRef: Job) {
  console.log(jobRef);
}

提示

JOB_REF 令牌从 @nestjs/bull 包中导入。

事件监听器

当队列和/或 job 状态发生变化时,Bull 会生成一组有用的事件。Nest 提供了一组装饰器,用于订阅一组核心标准事件。这些装饰器由 @nestjs/bull 包导出。

事件监听器必须声明在消费者类中(也就是使用 @Processor() 装饰的类中)。要监听某个事件,请使用下表中的某个装饰器来声明该事件的处理器。例如,要监听 audio 队列中某个 job 进入 active 状态时发出的事件,可以使用以下写法:

typescript
import { Processor, Process, OnQueueActive } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {

  @OnQueueActive()
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }
  ...

由于 Bull 运行在分布式(多节点)环境中,因此它定义了事件局部性(event locality)的概念。这个概念认为,事件既可能完全在单个进程中触发,也可能在由不同进程共享的队列上触发。**本地(local)**事件,是指在本地进程中的某个队列上,由某个动作或状态变化触发而产生的事件。换句话说,当你的事件生产者和消费者都位于同一个进程中时,队列上发生的所有事件都是本地事件。

当一个队列在多个进程之间共享时,就会出现**全局(global)**事件的可能性。如果一个进程中的监听器要接收到另一个进程触发的事件通知,它就必须注册为全局事件监听器。

每当对应事件被发出时,事件处理器就会被调用。处理器会以如下表所示的签名被调用,从而让你获取与该事件相关的信息。下面我们还会讨论本地与全局事件处理器签名之间的一个关键差异。

本地事件监听器全局事件监听器处理器方法签名 / 触发时机
@OnQueueError()@OnGlobalQueueError()handler(error: Error) - 发生错误。error 包含触发该事件的错误。
@OnQueueWaiting()@OnGlobalQueueWaiting()handler(jobId: number | string) - 某个 Job 正在等待处理,一旦 worker 空闲就会处理。jobId 包含进入该状态的 job 的 id。
@OnQueueActive()@OnGlobalQueueActive()handler(job: Job) - Job job 已开始执行。
@OnQueueStalled()@OnGlobalQueueStalled()handler(job: Job) - Job job 已被标记为 stalled。这对于调试崩溃或阻塞事件循环的 job worker 很有帮助。
@OnQueueProgress()@OnGlobalQueueProgress()handler(job: Job, progress: number) - Job job 的进度已更新为 progress
@OnQueueCompleted()@OnGlobalQueueCompleted()handler(job: Job, result: any) Job job 已成功完成,并返回结果 result
@OnQueueFailed()@OnGlobalQueueFailed()handler(job: Job, err: Error) Job job 失败,原因是 err
@OnQueuePaused()@OnGlobalQueuePaused()handler() 队列已暂停。
@OnQueueResumed()@OnGlobalQueueResumed()handler(job: Job) 队列已恢复。
@OnQueueCleaned()@OnGlobalQueueCleaned()handler(jobs: Job[], type: string) 队列中的旧 job 已被清理。jobs 是被清理的 job 数组,type 是被清理的 job 类型。
@OnQueueDrained()@OnGlobalQueueDrained()handler() 每当队列处理完所有等待中的 job 时触发(即使仍可能存在尚未处理的延迟 job)。
@OnQueueRemoved()@OnGlobalQueueRemoved()handler(job: Job) Job job 已成功移除。

监听全局事件时,方法签名可能与本地版本略有不同。具体来说,凡是在本地版本中接收 job 对象的方法签名,在全局版本中接收到的将是 jobIdnumber)。在这种情况下,如果要获取真实的 job 对象引用,请使用 Queue#getJob 方法。这个调用需要 await,因此处理器也应声明为 async。例如:

typescript
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
  const job = await this.immediateQueue.getJob(jobId);
  console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}

提示

要访问 Queue 对象(从而调用 getJob()),你当然需要先注入它。同时,这个 Queue 也必须在你进行注入的模块中完成注册。

除了这些特定的事件监听装饰器之外,你还可以将通用的 @OnQueueEvent() 装饰器与 BullQueueEventsBullQueueGlobalEvents 枚举配合使用。关于事件的更多内容见这里

队列管理

队列拥有一组 API,可用于执行管理操作,例如暂停和恢复、获取不同状态下的 job 数量等。完整的队列 API 请见这里。你可以像下面 pause/resume 示例那样,直接在 Queue 对象上调用这些方法。

通过调用 pause() 方法暂停一个队列。被暂停的队列在恢复之前不会处理新的 job,但当前正在处理的 job 会继续执行直到结束。

typescript
await audioQueue.pause();

要恢复一个已暂停的队列,请使用 resume() 方法,如下所示:

typescript
await audioQueue.resume();

独立进程

Job 处理器也可以运行在单独的(fork 出来的)进程中(来源)。这样做有几个优点:

  • 进程是隔离的,因此即使它崩溃,也不会影响 worker。
  • 你可以运行阻塞代码而不影响队列(job 不会卡住)。
  • 可以更充分地利用多核 CPU。
  • 与 Redis 的连接更少。
ts
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { join } from 'path';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
      processors: [join(__dirname, 'processor.js')],
    }),
  ],
})
export class AppModule {}

请注意,由于你的函数运行在一个 fork 出来的进程中,因此依赖注入(以及 IoC 容器)将不可用。这意味着你的处理器函数必须自行包含(或创建)它所需的所有外部依赖实例。

ts
// processor.ts
import { Job, DoneCallback } from 'bull';

export default function (job: Job, cb: DoneCallback) {
  console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
  cb(null, 'It works');
}

异步配置

你可能希望以异步方式而不是静态方式传入 bull 选项。在这种情况下,使用 forRootAsync() 方法,它提供了多种处理异步配置的方式。

一种方式是使用工厂函数:

typescript
BullModule.forRootAsync({
  useFactory: () => ({
    redis: {
      host: 'localhost',
      port: 6379,
    },
  }),
});

这个工厂函数的行为与任何其他异步提供者一样(例如,它可以是 async 的,并且可以通过 inject 注入依赖)。

typescript
BullModule.forRootAsync({
  imports: [ConfigModule],
  useFactory: async (configService: ConfigService) => ({
    redis: {
      host: configService.get('QUEUE_HOST'),
      port: configService.get('QUEUE_PORT'),
    },
  }),
  inject: [ConfigService],
});

或者,你也可以使用 useClass 语法:

typescript
BullModule.forRootAsync({
  useClass: BullConfigService,
});

上面的写法会在 BullModule 内部实例化 BullConfigService,并通过调用 createSharedConfiguration() 来提供一个选项对象。注意,这意味着 BullConfigService 必须实现 SharedBullConfigurationFactory 接口,如下所示:

typescript
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
  createSharedConfiguration(): BullModuleOptions {
    return {
      redis: {
        host: 'localhost',
        port: 6379,
      },
    };
  }
}

为了避免在 BullModule 内部创建 BullConfigService,并改为使用从其他模块导入的 provider,你可以使用 useExisting 语法。

typescript
BullModule.forRootAsync({
  imports: [ConfigModule],
  useExisting: ConfigService,
});

这种写法与 useClass 的工作方式相同,但有一个关键区别:BullModule 会查找导入的模块,以复用现有的 ConfigService,而不是重新实例化一个新的。

同样,如果你希望以异步方式传入队列选项,请使用 registerQueueAsync() 方法,只需记住要在工厂函数外部指定 name 属性。

typescript
BullModule.registerQueueAsync({
  name: 'audio',
  useFactory: () => ({
    redis: {
      host: 'localhost',
      port: 6379,
    },
  }),
});

示例

一个可运行的示例见这里

基于 NestJS 官方文档翻译