CQRS
简单的 CRUD(创建、读取、更新、删除)应用,其流程通常可以描述为:
- 控制器层处理 HTTP 请求,并将任务委派给服务层。
- 服务层承载了大部分业务逻辑。
- 服务通过 repository / DAO 来修改或持久化实体。
- 实体作为值的容器,提供 setter 和 getter。
这种模式通常足以应对中小型应用,但对于更大、更复杂的应用来说,它未必是最佳选择。在这种情况下,CQRS(Command Query Responsibility Segregation,命令与查询职责分离)模型可能会更合适,也更具可扩展性(具体仍取决于应用需求)。这种模型的好处包括:
- 关注点分离。该模型将读操作和写操作拆分为不同模型。
- 可扩展性。读写操作可以独立扩容。
- 灵活性。读写操作可以使用不同的数据存储。
- 性能。可以为读操作和写操作分别选择更适合的存储方案。
为了支持这种模型,Nest 提供了一个轻量级的 CQRS 模块。本章将介绍如何使用它。
安装
首先安装所需包:
$ npm install --save @nestjs/cqrs安装完成后,打开应用根模块(通常是 AppModule),并导入 CqrsModule.forRoot():
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
@Module({
imports: [CqrsModule.forRoot()],
})
export class AppModule {}该模块接受一个可选配置对象。可用选项如下:
| 属性 | 描述 | 默认值 |
|---|---|---|
commandPublisher | 负责将命令分发到系统中的发布器。 | DefaultCommandPubSub |
eventPublisher | 用于发布事件的发布器,可让事件被广播或处理。 | DefaultPubSub |
queryPublisher | 用于发布查询的发布器,可触发数据读取操作。 | DefaultQueryPubSub |
unhandledExceptionPublisher | 负责处理未捕获异常的发布器,确保异常可被跟踪和上报。 | DefaultUnhandledExceptionPubSub |
eventIdProvider | 提供唯一事件 ID 的服务,可通过生成或从事件实例中提取来实现。 | DefaultEventIdProvider |
rethrowUnhandled | 决定未处理异常在被处理后是否重新抛出,便于调试和错误管理。 | false |
命令
命令用于改变应用状态。命令应当是面向任务的,而不是面向数据的。当一个命令被分发时,它会由对应的 Command Handler 来处理。处理器负责更新应用状态。
// heroes-game.service.ts
@Injectable()
export class HeroesGameService {
constructor(private commandBus: CommandBus) {}
async killDragon(heroId: string, killDragonDto: KillDragonDto) {
return this.commandBus.execute(
new KillDragonCommand(heroId, killDragonDto.dragonId)
);
}
}在上面的代码中,我们实例化了 KillDragonCommand 类,并将其传给 CommandBus 的 execute() 方法。下面是这个命令类:
// kill-dragon.command.ts
export class KillDragonCommand extends Command<{
actionId: string // This type represents the command execution result
}> {
constructor(
public readonly heroId: string,
public readonly dragonId: string,
) {
super();
}
}可以看到,KillDragonCommand 继承了 Command 类。Command 是 @nestjs/cqrs 包导出的一个简单工具类,用于定义命令的返回类型。这个例子里,返回类型是一个带有 actionId 属性的对象。这样,当 KillDragonCommand 被分发时,CommandBus#execute() 的返回类型就会被推导为 Promise<{ actionId: string }>。这在你希望从命令处理器返回某些数据时非常有用。
提示
继承 Command 类是可选的。只有当你想显式定义命令的返回类型时,才需要这样做。
CommandBus 表示一个命令流。它负责将命令分发给对应的处理器。execute() 方法会返回一个 promise,其值就是对应处理器返回的结果。
下面为 KillDragonCommand 创建一个处理器:
// kill-dragon.handler.ts
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(private repository: HeroesRepository) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command;
const hero = this.repository.findOneById(+heroId);
hero.killEnemy(dragonId);
await this.repository.persist(hero);
// "ICommandHandler<KillDragonCommand>" forces you to return a value that matches the command's return type
return {
actionId: crypto.randomUUID(), // This value will be returned to the caller
}
}
}这个处理器会从 repository 中取出 Hero 实体,调用 killEnemy() 方法,然后持久化变更。KillDragonHandler 类实现了 ICommandHandler 接口,因此必须实现 execute() 方法。execute() 会接收命令对象作为参数。
请注意,ICommandHandler<KillDragonCommand> 会强制你返回与命令返回类型一致的值。当前例子中,该返回类型是一个带 actionId 属性的对象。只有继承自 Command 类的命令才会受到这个约束;否则你可以返回任意内容。
最后,别忘了将 KillDragonHandler 注册为某个模块中的 provider:
providers: [KillDragonHandler];查询
查询用于从应用状态中检索数据。查询应当是面向数据的,而不是面向任务的。当一个查询被分发时,它会由对应的 Query Handler 处理。处理器负责返回数据。
QueryBus 与 CommandBus 遵循相同模式。查询处理器应实现 IQueryHandler 接口,并使用 @QueryHandler() 装饰器标记。示例如下:
export class GetHeroQuery extends Query<Hero> {
constructor(public readonly heroId: string) {}
}和 Command 类类似,Query 也是 @nestjs/cqrs 导出的一个简单工具类,用于定义查询的返回类型。这里的返回类型是一个 Hero 对象。因此,当 GetHeroQuery 被分发时,QueryBus#execute() 的返回类型会被推导为 Promise<Hero>。
为了获取 hero,我们需要创建一个查询处理器:
// get-hero.handler.ts
@QueryHandler(GetHeroQuery)
export class GetHeroHandler implements IQueryHandler<GetHeroQuery> {
constructor(private repository: HeroesRepository) {}
async execute(query: GetHeroQuery) {
return this.repository.findOneById(query.heroId);
}
}GetHeroHandler 实现了 IQueryHandler 接口,因此必须实现 execute() 方法。execute() 接收查询对象作为参数,并返回与查询返回类型匹配的数据(本例中为 Hero 对象)。
最后,别忘了将 GetHeroHandler 注册为某个模块中的 provider:
providers: [GetHeroHandler];现在,你可以通过 QueryBus 分发查询:
const hero = await this.queryBus.execute(new GetHeroQuery(heroId)); // "hero" will be auto-inferred as "Hero" type事件
事件用于通知应用的其他部分,应用状态发生了变化。事件可以由模型发出,也可以直接通过 EventBus 发出。当一个事件被分发时,会由对应的 Event Handlers 来处理。处理器随后可以更新读模型,或者执行其他动作。
为了演示,先创建一个事件类:
// hero-killed-dragon.event.ts
export class HeroKilledDragonEvent {
constructor(
public readonly heroId: string,
public readonly dragonId: string,
) {}
}虽然事件可以直接通过 EventBus.publish() 方法发布,但我们也可以从模型内部发出事件。下面更新 Hero 模型,在调用 killEnemy() 时发出 HeroKilledDragonEvent:
// hero.model.ts
export class Hero extends AggregateRoot {
constructor(private id: string) {
super();
}
killEnemy(enemyId: string) {
// Business logic
this.apply(new HeroKilledDragonEvent(this.id, enemyId));
}
}apply() 方法用于分发事件,它接收一个事件对象作为参数。不过,由于我们的模型本身并不知道 EventBus,所以还需要把它与模型关联起来。这时可以使用 EventPublisher 类。
// kill-dragon.handler.ts
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(
private repository: HeroesRepository,
private publisher: EventPublisher,
) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command;
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
);
hero.killEnemy(dragonId);
hero.commit();
}
}EventPublisher#mergeObjectContext 方法会把事件发布能力合并进传入对象中,也就是说,这个对象之后就可以向事件流中发布事件了。
注意,在这个例子里我们还调用了模型上的 commit() 方法。这个方法会把所有待发出的事件真正分发出去。若想自动分发事件,可以把 autoCommit 属性设为 true:
export class Hero extends AggregateRoot {
constructor(private id: string) {
super();
this.autoCommit = true;
}
}如果我们不是要把事件发布能力合并进某个已有对象,而是合并进一个类,则可以使用 EventPublisher#mergeClassContext:
const HeroModel = this.publisher.mergeClassContext(Hero);
const hero = new HeroModel('id'); // <-- HeroModel is a class这样,HeroModel 类的每个实例都可以发布事件,而无需再调用 mergeObjectContext()。
此外,我们也可以直接通过 EventBus 手动发布事件:
this.eventBus.publish(new HeroKilledDragonEvent());提示
EventBus 是一个可注入类。
每个事件都可以有多个 Event Handlers。
// hero-killed-dragon.handler.ts
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
constructor(private repository: HeroesRepository) {}
handle(event: HeroKilledDragonEvent) {
// Business logic
}
}提示
请注意,一旦你开始使用事件处理器,就已经脱离了传统的 HTTP Web 上下文。
CommandHandlers中的错误仍然可以被内建的 Exception filters 捕获。EventHandlers中的错误则不能被 Exception filters 捕获:你需要自行处理。可以使用简单的try/catch,使用 Sagas 触发补偿事件,或者采用其他你选择的方案。CommandHandlers中仍然可以把 HTTP 响应返回给客户端。EventHandlers中则不能。如果你希望把信息发送给客户端,可以使用 WebSocket、SSE 或其他方案。
与命令和查询一样,别忘了将 HeroKilledDragonHandler 注册为某个模块中的 provider:
providers: [HeroKilledDragonHandler];Sagas
Saga 是一种长生命周期流程,它会监听事件,并可能触发新的命令。它通常用于管理应用中的复杂工作流。例如,当用户注册后,某个 saga 可以监听 UserRegisteredEvent,然后向用户发送欢迎邮件。
Sagas 是一个极其强大的特性。单个 saga 可以监听 1..* 个事件。借助 RxJS,我们可以过滤、映射、分叉和合并事件流,从而构建复杂工作流。每个 saga 都会返回一个 Observable,该 Observable 产出一个命令实例;随后,这个命令会被 CommandBus 异步分发。
下面创建一个 saga,监听 HeroKilledDragonEvent,并分发 DropAncientItemCommand 命令。
// heroes-game.saga.ts
@Injectable()
export class HeroesGameSagas {
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(HeroKilledDragonEvent),
map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
);
}
}提示
ofType 操作符和 @Saga() 装饰器都由 @nestjs/cqrs 包导出。
@Saga() 装饰器用于把该方法标记为 saga。events$ 参数是一个包含全部事件的 Observable 流。ofType 操作符会按指定事件类型过滤流,而 map 则把该事件映射为一个新命令实例。
在这个例子中,我们把 HeroKilledDragonEvent 映射为 DropAncientItemCommand。随后这个命令会被 CommandBus 自动分发。
与查询、命令和事件处理器一样,别忘了将 HeroesGameSagas 注册为某个模块中的 provider:
providers: [HeroesGameSagas];未处理异常
事件处理器是异步执行的,因此必须始终正确处理异常,避免应用进入不一致状态。如果异常没有被处理,EventBus 会创建一个 UnhandledExceptionInfo 对象,并将其推入 UnhandledExceptionBus 流中。这个流本身是一个 Observable,可用于处理未捕获异常。
private destroy$ = new Subject<void>();
constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
this.unhandledExceptionsBus
.pipe(takeUntil(this.destroy$))
.subscribe((exceptionInfo) => {
// Handle exception here
// e.g. send it to external service, terminate process, or publish a new event
});
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}如果要筛选特定异常,可以配合 ofType 操作符:
this.unhandledExceptionsBus
.pipe(
takeUntil(this.destroy$),
UnhandledExceptionBus.ofType(TransactionNotAllowedException),
)
.subscribe((exceptionInfo) => {
// Handle exception here
});其中 TransactionNotAllowedException 就是你要筛选的异常类型。
UnhandledExceptionInfo 对象包含以下属性:
export interface UnhandledExceptionInfo<
Cause = IEvent | ICommand,
Exception = any,
> {
/**
* The exception that was thrown.
*/
exception: Exception;
/**
* The cause of the exception (event or command reference).
*/
cause: Cause;
}订阅所有事件
CommandBus、QueryBus 和 EventBus 都是 Observable。这意味着我们可以订阅整个流,例如处理全部事件。举例来说,我们可以把所有事件记录到控制台,或者保存到事件存储中。
private destroy$ = new Subject<void>();
constructor(private eventBus: EventBus) {
this.eventBus
.pipe(takeUntil(this.destroy$))
.subscribe((event) => {
// Save events to database
});
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}请求作用域
对于来自其他编程语言背景的开发者来说,Nest 中大多数内容都在不同请求之间共享,可能会让人感到意外。这包括数据库连接池、带全局状态的单例服务等等。要记住,Node.js 并不遵循那种“每个请求由独立线程处理”的无状态请求/响应多线程模型。因此,使用单例实例对于我们的应用来说是安全的。
不过,在某些边界场景下,你可能仍然希望 handler 拥有按请求隔离的生命周期。例如 GraphQL 应用中的按请求缓存、请求追踪、多租户等。关于如何控制作用域的更多信息,请参见这里。
在 CQRS 中配合请求作用域 provider 会比较复杂,因为 CommandBus、QueryBus 和 EventBus 本身都是单例。幸运的是,@nestjs/cqrs 包会为每一个处理的命令、查询或事件自动创建新的请求作用域 handler 实例。
要让一个 handler 成为请求作用域,你可以:
- 依赖某个请求作用域 provider。
- 显式地在
@CommandHandler、@QueryHandler或@EventsHandler装饰器中将其作用域设置为REQUEST,如下:
@CommandHandler(KillDragonCommand, {
scope: Scope.REQUEST,
})
export class KillDragonHandler {
// Implementation here
}如果想把请求载荷注入任意请求作用域 provider 中,你可以使用 @Inject(REQUEST) 装饰器。但在 CQRS 中,请求载荷的实际含义取决于上下文,它可能是 HTTP 请求、定时任务,或任何触发命令的操作。
该载荷必须是一个继承自 AsyncContext(由 @nestjs/cqrs 提供)的类实例。它会作为请求上下文,在整个请求生命周期中承载可访问数据。
import { AsyncContext } from '@nestjs/cqrs';
export class MyRequest extends AsyncContext {
constructor(public readonly user: User) {
super();
}
}执行命令时,把自定义请求上下文作为第二个参数传给 CommandBus#execute:
const myRequest = new MyRequest(user);
await this.commandBus.execute(
new KillDragonCommand(heroId, killDragonDto.dragonId),
myRequest,
);这样,MyRequest 实例就会作为 REQUEST provider 提供给对应的 handler:
@CommandHandler(KillDragonCommand, {
scope: Scope.REQUEST,
})
export class KillDragonHandler {
constructor(
@Inject(REQUEST) private request: MyRequest, // Inject the request context
) {}
// Handler implementation here
}对查询也可以采用相同方式:
const myRequest = new MyRequest(user);
const hero = await this.queryBus.execute(new GetHeroQuery(heroId), myRequest);在查询处理器中:
@QueryHandler(GetHeroQuery, {
scope: Scope.REQUEST,
})
export class GetHeroHandler {
constructor(
@Inject(REQUEST) private request: MyRequest, // Inject the request context
) {}
// Handler implementation here
}对于事件来说,虽然你也可以把请求 provider 传给 EventBus#publish,但这并不常见。更常见的做法是使用 EventPublisher,把请求 provider 合并进模型中:
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
this.request, // Inject the request context here
);订阅这些事件的请求作用域事件处理器,将能够访问这个请求 provider。
Saga 永远是单例实例,因为它们负责管理长生命周期流程。不过,你仍然可以从事件对象中取回请求 provider:
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(HeroKilledDragonEvent),
map((event) => {
const request = AsyncContext.of(event); // Retrieve the request context
const command = new DropAncientItemCommand(event.heroId, fakeItemID);
AsyncContext.merge(request, command); // Merge the request context into the command
return command;
}),
);
}或者,你也可以使用 request.attachTo(command) 方法把请求上下文绑定到命令上。
示例
一个可运行的示例见这里。