Skip to content

CQRS

简单的 CRUD(创建、读取、更新、删除)应用,其流程通常可以描述为:

  1. 控制器层处理 HTTP 请求,并将任务委派给服务层。
  2. 服务层承载了大部分业务逻辑。
  3. 服务通过 repository / DAO 来修改或持久化实体。
  4. 实体作为值的容器,提供 setter 和 getter。

这种模式通常足以应对中小型应用,但对于更大、更复杂的应用来说,它未必是最佳选择。在这种情况下,CQRS(Command Query Responsibility Segregation,命令与查询职责分离)模型可能会更合适,也更具可扩展性(具体仍取决于应用需求)。这种模型的好处包括:

  • 关注点分离。该模型将读操作和写操作拆分为不同模型。
  • 可扩展性。读写操作可以独立扩容。
  • 灵活性。读写操作可以使用不同的数据存储。
  • 性能。可以为读操作和写操作分别选择更适合的存储方案。

为了支持这种模型,Nest 提供了一个轻量级的 CQRS 模块。本章将介绍如何使用它。

安装

首先安装所需包:

bash
$ npm install --save @nestjs/cqrs

安装完成后,打开应用根模块(通常是 AppModule),并导入 CqrsModule.forRoot()

typescript
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';

@Module({
  imports: [CqrsModule.forRoot()],
})
export class AppModule {}

该模块接受一个可选配置对象。可用选项如下:

属性描述默认值
commandPublisher负责将命令分发到系统中的发布器。DefaultCommandPubSub
eventPublisher用于发布事件的发布器,可让事件被广播或处理。DefaultPubSub
queryPublisher用于发布查询的发布器,可触发数据读取操作。DefaultQueryPubSub
unhandledExceptionPublisher负责处理未捕获异常的发布器,确保异常可被跟踪和上报。DefaultUnhandledExceptionPubSub
eventIdProvider提供唯一事件 ID 的服务,可通过生成或从事件实例中提取来实现。DefaultEventIdProvider
rethrowUnhandled决定未处理异常在被处理后是否重新抛出,便于调试和错误管理。false

命令

命令用于改变应用状态。命令应当是面向任务的,而不是面向数据的。当一个命令被分发时,它会由对应的 Command Handler 来处理。处理器负责更新应用状态。

typescript
// heroes-game.service.ts
@Injectable()
export class HeroesGameService {
  constructor(private commandBus: CommandBus) {}

  async killDragon(heroId: string, killDragonDto: KillDragonDto) {
    return this.commandBus.execute(
      new KillDragonCommand(heroId, killDragonDto.dragonId)
    );
  }
}

在上面的代码中,我们实例化了 KillDragonCommand 类,并将其传给 CommandBusexecute() 方法。下面是这个命令类:

typescript
// kill-dragon.command.ts
export class KillDragonCommand extends Command<{
  actionId: string // This type represents the command execution result
}> {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {
    super();
  }
}

可以看到,KillDragonCommand 继承了 Command 类。Command@nestjs/cqrs 包导出的一个简单工具类,用于定义命令的返回类型。这个例子里,返回类型是一个带有 actionId 属性的对象。这样,当 KillDragonCommand 被分发时,CommandBus#execute() 的返回类型就会被推导为 Promise<{ actionId: string }>。这在你希望从命令处理器返回某些数据时非常有用。

提示

继承 Command 类是可选的。只有当你想显式定义命令的返回类型时,才需要这样做。

CommandBus 表示一个命令。它负责将命令分发给对应的处理器。execute() 方法会返回一个 promise,其值就是对应处理器返回的结果。

下面为 KillDragonCommand 创建一个处理器:

typescript
// kill-dragon.handler.ts
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(private repository: HeroesRepository) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = this.repository.findOneById(+heroId);

    hero.killEnemy(dragonId);
    await this.repository.persist(hero);

    // "ICommandHandler<KillDragonCommand>" forces you to return a value that matches the command's return type
    return {
      actionId: crypto.randomUUID(), // This value will be returned to the caller
    }
  }
}

这个处理器会从 repository 中取出 Hero 实体,调用 killEnemy() 方法,然后持久化变更。KillDragonHandler 类实现了 ICommandHandler 接口,因此必须实现 execute() 方法。execute() 会接收命令对象作为参数。

请注意,ICommandHandler<KillDragonCommand> 会强制你返回与命令返回类型一致的值。当前例子中,该返回类型是一个带 actionId 属性的对象。只有继承自 Command 类的命令才会受到这个约束;否则你可以返回任意内容。

最后,别忘了将 KillDragonHandler 注册为某个模块中的 provider:

typescript
providers: [KillDragonHandler];

查询

查询用于从应用状态中检索数据。查询应当是面向数据的,而不是面向任务的。当一个查询被分发时,它会由对应的 Query Handler 处理。处理器负责返回数据。

QueryBusCommandBus 遵循相同模式。查询处理器应实现 IQueryHandler 接口,并使用 @QueryHandler() 装饰器标记。示例如下:

typescript
export class GetHeroQuery extends Query<Hero> {
  constructor(public readonly heroId: string) {}
}

Command 类类似,Query 也是 @nestjs/cqrs 导出的一个简单工具类,用于定义查询的返回类型。这里的返回类型是一个 Hero 对象。因此,当 GetHeroQuery 被分发时,QueryBus#execute() 的返回类型会被推导为 Promise<Hero>

为了获取 hero,我们需要创建一个查询处理器:

typescript
// get-hero.handler.ts
@QueryHandler(GetHeroQuery)
export class GetHeroHandler implements IQueryHandler<GetHeroQuery> {
  constructor(private repository: HeroesRepository) {}

  async execute(query: GetHeroQuery) {
    return this.repository.findOneById(query.heroId);
  }
}

GetHeroHandler 实现了 IQueryHandler 接口,因此必须实现 execute() 方法。execute() 接收查询对象作为参数,并返回与查询返回类型匹配的数据(本例中为 Hero 对象)。

最后,别忘了将 GetHeroHandler 注册为某个模块中的 provider:

typescript
providers: [GetHeroHandler];

现在,你可以通过 QueryBus 分发查询:

typescript
const hero = await this.queryBus.execute(new GetHeroQuery(heroId)); // "hero" will be auto-inferred as "Hero" type

事件

事件用于通知应用的其他部分,应用状态发生了变化。事件可以由模型发出,也可以直接通过 EventBus 发出。当一个事件被分发时,会由对应的 Event Handlers 来处理。处理器随后可以更新读模型,或者执行其他动作。

为了演示,先创建一个事件类:

typescript
// hero-killed-dragon.event.ts
export class HeroKilledDragonEvent {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {}
}

虽然事件可以直接通过 EventBus.publish() 方法发布,但我们也可以从模型内部发出事件。下面更新 Hero 模型,在调用 killEnemy() 时发出 HeroKilledDragonEvent

typescript
// hero.model.ts
export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super();
  }

  killEnemy(enemyId: string) {
    // Business logic
    this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  }
}

apply() 方法用于分发事件,它接收一个事件对象作为参数。不过,由于我们的模型本身并不知道 EventBus,所以还需要把它与模型关联起来。这时可以使用 EventPublisher 类。

typescript
// kill-dragon.handler.ts
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(
    private repository: HeroesRepository,
    private publisher: EventPublisher,
  ) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = this.publisher.mergeObjectContext(
      await this.repository.findOneById(+heroId),
    );
    hero.killEnemy(dragonId);
    hero.commit();
  }
}

EventPublisher#mergeObjectContext 方法会把事件发布能力合并进传入对象中,也就是说,这个对象之后就可以向事件流中发布事件了。

注意,在这个例子里我们还调用了模型上的 commit() 方法。这个方法会把所有待发出的事件真正分发出去。若想自动分发事件,可以把 autoCommit 属性设为 true

typescript
export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super();
    this.autoCommit = true;
  }
}

如果我们不是要把事件发布能力合并进某个已有对象,而是合并进一个类,则可以使用 EventPublisher#mergeClassContext

typescript
const HeroModel = this.publisher.mergeClassContext(Hero);
const hero = new HeroModel('id'); // <-- HeroModel is a class

这样,HeroModel 类的每个实例都可以发布事件,而无需再调用 mergeObjectContext()

此外,我们也可以直接通过 EventBus 手动发布事件:

typescript
this.eventBus.publish(new HeroKilledDragonEvent());

提示

EventBus 是一个可注入类。

每个事件都可以有多个 Event Handlers

typescript
// hero-killed-dragon.handler.ts
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
  constructor(private repository: HeroesRepository) {}

  handle(event: HeroKilledDragonEvent) {
    // Business logic
  }
}

提示

请注意,一旦你开始使用事件处理器,就已经脱离了传统的 HTTP Web 上下文。

  • CommandHandlers 中的错误仍然可以被内建的 Exception filters 捕获。
  • EventHandlers 中的错误则不能被 Exception filters 捕获:你需要自行处理。可以使用简单的 try/catch,使用 Sagas 触发补偿事件,或者采用其他你选择的方案。
  • CommandHandlers 中仍然可以把 HTTP 响应返回给客户端。
  • EventHandlers 中则不能。如果你希望把信息发送给客户端,可以使用 WebSocketSSE 或其他方案。

与命令和查询一样,别忘了将 HeroKilledDragonHandler 注册为某个模块中的 provider:

typescript
providers: [HeroKilledDragonHandler];

Sagas

Saga 是一种长生命周期流程,它会监听事件,并可能触发新的命令。它通常用于管理应用中的复杂工作流。例如,当用户注册后,某个 saga 可以监听 UserRegisteredEvent,然后向用户发送欢迎邮件。

Sagas 是一个极其强大的特性。单个 saga 可以监听 1..* 个事件。借助 RxJS,我们可以过滤、映射、分叉和合并事件流,从而构建复杂工作流。每个 saga 都会返回一个 Observable,该 Observable 产出一个命令实例;随后,这个命令会被 CommandBus 异步分发。

下面创建一个 saga,监听 HeroKilledDragonEvent,并分发 DropAncientItemCommand 命令。

typescript
// heroes-game.saga.ts
@Injectable()
export class HeroesGameSagas {
  @Saga()
  dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(HeroKilledDragonEvent),
      map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
    );
  }
}

提示

ofType 操作符和 @Saga() 装饰器都由 @nestjs/cqrs 包导出。

@Saga() 装饰器用于把该方法标记为 saga。events$ 参数是一个包含全部事件的 Observable 流。ofType 操作符会按指定事件类型过滤流,而 map 则把该事件映射为一个新命令实例。

在这个例子中,我们把 HeroKilledDragonEvent 映射为 DropAncientItemCommand。随后这个命令会被 CommandBus 自动分发。

与查询、命令和事件处理器一样,别忘了将 HeroesGameSagas 注册为某个模块中的 provider:

typescript
providers: [HeroesGameSagas];

未处理异常

事件处理器是异步执行的,因此必须始终正确处理异常,避免应用进入不一致状态。如果异常没有被处理,EventBus 会创建一个 UnhandledExceptionInfo 对象,并将其推入 UnhandledExceptionBus 流中。这个流本身是一个 Observable,可用于处理未捕获异常。

typescript
private destroy$ = new Subject<void>();

constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
  this.unhandledExceptionsBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((exceptionInfo) => {
      // Handle exception here
      // e.g. send it to external service, terminate process, or publish a new event
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

如果要筛选特定异常,可以配合 ofType 操作符:

typescript
this.unhandledExceptionsBus
  .pipe(
    takeUntil(this.destroy$),
    UnhandledExceptionBus.ofType(TransactionNotAllowedException),
  )
  .subscribe((exceptionInfo) => {
    // Handle exception here
  });

其中 TransactionNotAllowedException 就是你要筛选的异常类型。

UnhandledExceptionInfo 对象包含以下属性:

typescript
export interface UnhandledExceptionInfo<
  Cause = IEvent | ICommand,
  Exception = any,
> {
  /**
   * The exception that was thrown.
   */
  exception: Exception;
  /**
   * The cause of the exception (event or command reference).
   */
  cause: Cause;
}

订阅所有事件

CommandBusQueryBusEventBus 都是 Observable。这意味着我们可以订阅整个流,例如处理全部事件。举例来说,我们可以把所有事件记录到控制台,或者保存到事件存储中。

typescript
private destroy$ = new Subject<void>();

constructor(private eventBus: EventBus) {
  this.eventBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((event) => {
      // Save events to database
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

请求作用域

对于来自其他编程语言背景的开发者来说,Nest 中大多数内容都在不同请求之间共享,可能会让人感到意外。这包括数据库连接池、带全局状态的单例服务等等。要记住,Node.js 并不遵循那种“每个请求由独立线程处理”的无状态请求/响应多线程模型。因此,使用单例实例对于我们的应用来说是安全的。

不过,在某些边界场景下,你可能仍然希望 handler 拥有按请求隔离的生命周期。例如 GraphQL 应用中的按请求缓存、请求追踪、多租户等。关于如何控制作用域的更多信息,请参见这里

在 CQRS 中配合请求作用域 provider 会比较复杂,因为 CommandBusQueryBusEventBus 本身都是单例。幸运的是,@nestjs/cqrs 包会为每一个处理的命令、查询或事件自动创建新的请求作用域 handler 实例。

要让一个 handler 成为请求作用域,你可以:

  1. 依赖某个请求作用域 provider。
  2. 显式地在 @CommandHandler@QueryHandler@EventsHandler 装饰器中将其作用域设置为 REQUEST,如下:
typescript
@CommandHandler(KillDragonCommand, {
  scope: Scope.REQUEST,
})
export class KillDragonHandler {
  // Implementation here
}

如果想把请求载荷注入任意请求作用域 provider 中,你可以使用 @Inject(REQUEST) 装饰器。但在 CQRS 中,请求载荷的实际含义取决于上下文,它可能是 HTTP 请求、定时任务,或任何触发命令的操作。

该载荷必须是一个继承自 AsyncContext(由 @nestjs/cqrs 提供)的类实例。它会作为请求上下文,在整个请求生命周期中承载可访问数据。

typescript
import { AsyncContext } from '@nestjs/cqrs';

export class MyRequest extends AsyncContext {
  constructor(public readonly user: User) {
    super();
  }
}

执行命令时,把自定义请求上下文作为第二个参数传给 CommandBus#execute

typescript
const myRequest = new MyRequest(user);
await this.commandBus.execute(
  new KillDragonCommand(heroId, killDragonDto.dragonId),
  myRequest,
);

这样,MyRequest 实例就会作为 REQUEST provider 提供给对应的 handler:

typescript
@CommandHandler(KillDragonCommand, {
  scope: Scope.REQUEST,
})
export class KillDragonHandler {
  constructor(
    @Inject(REQUEST) private request: MyRequest, // Inject the request context
  ) {}

  // Handler implementation here
}

对查询也可以采用相同方式:

typescript
const myRequest = new MyRequest(user);
const hero = await this.queryBus.execute(new GetHeroQuery(heroId), myRequest);

在查询处理器中:

typescript
@QueryHandler(GetHeroQuery, {
  scope: Scope.REQUEST,
})
export class GetHeroHandler {
  constructor(
    @Inject(REQUEST) private request: MyRequest, // Inject the request context
  ) {}

  // Handler implementation here
}

对于事件来说,虽然你也可以把请求 provider 传给 EventBus#publish,但这并不常见。更常见的做法是使用 EventPublisher,把请求 provider 合并进模型中:

typescript
const hero = this.publisher.mergeObjectContext(
  await this.repository.findOneById(+heroId),
  this.request, // Inject the request context here
);

订阅这些事件的请求作用域事件处理器,将能够访问这个请求 provider。

Saga 永远是单例实例,因为它们负责管理长生命周期流程。不过,你仍然可以从事件对象中取回请求 provider:

typescript
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
  return events$.pipe(
    ofType(HeroKilledDragonEvent),
    map((event) => {
      const request = AsyncContext.of(event); // Retrieve the request context
      const command = new DropAncientItemCommand(event.heroId, fakeItemID);

      AsyncContext.merge(request, command); // Merge the request context into the command
      return command;
    }),
  );
}

或者,你也可以使用 request.attachTo(command) 方法把请求上下文绑定到命令上。

示例

一个可运行的示例见这里

基于 NestJS 官方文档翻译