Skip to content

概述

除了传统的(有时称为单体的)应用架构外,Nest 原生支持微服务架构开发风格。本文档其他地方讨论的大多数概念,如依赖注入、装饰器、异常过滤器、管道、守卫和拦截器,同样适用于微服务。在可能的情况下,Nest 会抽象实现细节,以便相同的组件可以跨 HTTP 平台、WebSocket 和微服务运行。本节介绍 Nest 中特定于微服务的方面。

在 Nest 中,微服务本质上是使用与 HTTP 不同的传输层的应用程序。

Nest 支持多种内置的传输层实现,称为传输器,负责在不同微服务实例之间传输消息。大多数传输器原生支持请求-响应基于事件两种消息风格。Nest 在规范接口背后抽象了每种传输器的实现细节,同时支持请求-响应和基于事件的消息传递。这使得从一种传输层切换到另一种变得容易——例如,利用特定传输层的可靠性或性能特性——而不会影响你的应用程序代码。

安装

要开始构建微服务,首先安装所需的包:

bash
$ npm i --save @nestjs/microservices

入门

要实例化一个微服务,使用 NestFactory 类的 createMicroservice() 方法:

typescript
// main
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
    },
  );
  await app.listen();
}
bootstrap();

提示

微服务默认使用 TCP 传输层。

createMicroservice() 方法的第二个参数是一个 options 对象。该对象可能包含两个成员:

transport指定传输器(例如 Transport.NATS
options传输器特定的选项对象,决定传输器行为

options 对象特定于所选的传输器。TCP 传输器暴露以下属性。对于其他传输器(如 Redis、MQTT 等),请参阅相关章节了解可用选项。

host连接主机名
port连接端口
retryAttempts消息重试次数(默认:0
retryDelay消息重试间隔时间(毫秒)(默认:0
serializer自定义序列化器,用于出站消息
deserializer自定义反序列化器,用于入站消息
socketClass扩展 TcpSocket 的自定义 Socket(默认:JsonSocket
tlsOptions用于配置 TLS 协议的选项

提示

以上属性特定于 TCP 传输器。有关其他传输器的可用选项信息,请参阅相关章节。

消息和事件模式

微服务通过模式识别消息和事件。模式是一个纯值,例如字面量对象或字符串。模式会被自动序列化并随消息的数据部分一起通过网络发送。通过这种方式,消息发送方和消费方可以协调哪些请求由哪些处理器消费。

请求-响应

当你需要在各种外部服务之间交换消息时,请求-响应消息风格非常有用。这种范式确保服务实际接收到了消息(无需你手动实现确认协议)。然而,请求-响应方式并不总是最佳选择。例如,使用基于日志持久化的流式传输器,如 KafkaNATS streaming,它们针对不同的挑战进行了优化,更适合事件消息范式(更多详情请参阅基于事件的消息传递)。

要启用请求-响应消息类型,Nest 创建两个逻辑通道:一个用于传输数据,另一个用于等待传入响应。对于一些底层传输,如 NATS,这种双通道支持是开箱即用的。对于其他传输,Nest 通过手动创建单独的通道来补偿。虽然这种方式有效,但可能会引入一些开销。因此,如果你不需要请求-响应消息风格,可以考虑使用基于事件的方法。

要创建基于请求-响应范式的消息处理器,使用 @MessagePattern() 装饰器,它从 @nestjs/microservices 包中导入。此装饰器只应在控制器类中使用,因为它们是应用程序的入口点。在提供者中使用它不会有效果,因为它们会被 Nest 运行时忽略。

typescript
// math.controller
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class MathController {
  @MessagePattern({ cmd: 'sum' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }
}

在上面的代码中,accumulate() 消息处理器监听匹配 { cmd: 'sum' } 消息模式的消息。消息处理器接受一个参数,即客户端传递的 data。在本例中,数据是一个需要累加的数字数组。

异步响应

消息处理器可以同步或异步响应,因此支持 async 方法。

typescript
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
  return (data || []).reduce((a, b) => a + b);
}

消息处理器也可以返回一个 Observable,在这种情况下,结果值将被发出,直到流完成。

typescript
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): Observable<number> {
  return from([1, 2, 3]);
}

在上面的示例中,消息处理器将响应 3 次(数组中的每个元素各一次)。

基于事件

虽然请求-响应方法非常适合在服务之间交换消息,但当你只想发布事件而不等待响应时,它就不太适合了——基于事件的消息传递更适用于这种情况。在这种情况下,维护请求-响应所需的两个通道的开销是不必要的。

例如,如果你想通知另一个服务在系统的这部分发生了特定条件,基于事件的消息风格是理想的。

要创建事件处理器,使用从 @nestjs/microservices 包导入的 @EventPattern() 装饰器。

typescript
@EventPattern('user_created')
async handleUserCreated(data: Record<string, unknown>) {
  // 业务逻辑
}

提示

你可以为单个事件模式注册多个事件处理器,所有处理器将自动并行触发。

handleUserCreated() 事件处理器监听 'user_created' 事件。事件处理器接受一个参数,即客户端传递的 data(在本例中,是通过网络发送的事件负载)。

额外的请求详情

在更高级的场景中,你可能需要访问传入请求的额外详情。例如,当使用 NATS 通配符订阅时,你可能想获取生产者发送消息的原始主题。同样,使用 Kafka 时,你可能需要访问消息头。为此,你可以利用内置装饰器,如下所示:

typescript
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`); // 例如 "time.us.east"
  return new Date().toLocaleTimeString(...);
}

提示

@Payload()@Ctx()NatsContext@nestjs/microservices 导入。

提示

你也可以向 @Payload() 装饰器传递属性键,以从传入的负载对象中提取特定属性,例如 @Payload('id')

客户端(生产者类)

客户端 Nest 应用程序可以使用 ClientProxy 类与 Nest 微服务交换消息或发布事件。该类提供了多个方法,如 send()(用于请求-响应消息传递)和 emit()(用于事件驱动消息传递),使得与远程微服务的通信成为可能。你可以通过以下方式获取该类的实例:

一种方法是导入 ClientsModule,它暴露了静态 register() 方法。该方法接受一个表示微服务传输器的对象数组。每个对象必须包含一个 name 属性,以及可选的 transport 属性(默认为 Transport.TCP)和 options 属性。

name 属性充当注入令牌,你可以在需要的地方使用它来注入 ClientProxy 实例。name 属性的值可以是任意字符串或 JavaScript symbol,如此处所述。

options 属性是一个对象,包含我们之前在 createMicroservice() 方法中看到的相同属性。

typescript
@Module({
  imports: [
    ClientsModule.register([
      { name: 'MATH_SERVICE', transport: Transport.TCP },
    ]),
  ],
})

或者,如果你需要在设置过程中提供配置或执行任何其他异步过程,可以使用 registerAsync() 方法。

typescript
@Module({
  imports: [
    ClientsModule.registerAsync([
      {
        imports: [ConfigModule],
        name: 'MATH_SERVICE',
        useFactory: async (configService: ConfigService) => ({
          transport: Transport.TCP,
          options: {
            url: configService.get('URL'),
          },
        }),
        inject: [ConfigService],
      },
    ]),
  ],
})

模块导入后,你可以使用 @Inject() 装饰器注入为 'MATH_SERVICE' 传输器配置的 ClientProxy 实例。

typescript
constructor(
  @Inject('MATH_SERVICE') private client: ClientProxy,
) {}

提示

ClientsModuleClientProxy 类从 @nestjs/microservices 包导入。

有时,你可能需要从另一个服务(如 ConfigService)获取传输器配置,而不是在客户端应用程序中硬编码。为此,你可以使用 ClientProxyFactory 类注册一个自定义提供者。该类提供了一个静态 create() 方法,接受传输器选项对象并返回一个自定义的 ClientProxy 实例。

typescript
@Module({
  providers: [
    {
      provide: 'MATH_SERVICE',
      useFactory: (configService: ConfigService) => {
        const mathSvcOptions = configService.getMathSvcOptions();
        return ClientProxyFactory.create(mathSvcOptions);
      },
      inject: [ConfigService],
    }
  ]
  ...
})

提示

ClientProxyFactory@nestjs/microservices 包导入。

另一个选项是使用 @Client() 属性装饰器。

typescript
@Client({ transport: Transport.TCP })
client: ClientProxy;

提示

@Client() 装饰器从 @nestjs/microservices 包导入。

使用 @Client() 装饰器不是首选技术,因为它更难测试且更难共享客户端实例。

ClientProxy惰性的。它不会立即建立连接。相反,连接将在第一次微服务调用之前建立,然后在后续每次调用中重用。然而,如果你想延迟应用程序引导过程直到连接建立,可以在 OnApplicationBootstrap 生命周期钩子中使用 ClientProxy 对象的 connect() 方法手动建立连接。

typescript
async onApplicationBootstrap() {
  await this.client.connect();
}

如果无法创建连接,connect() 方法将使用相应的错误对象拒绝。

发送消息

ClientProxy 暴露了一个 send() 方法。该方法用于调用微服务并返回一个带有其响应的 Observable。因此,我们可以轻松地订阅发出的值。

typescript
accumulate(): Observable<number> {
  const pattern = { cmd: 'sum' };
  const payload = [1, 2, 3];
  return this.client.send<number>(pattern, payload);
}

send() 方法接受两个参数:patternpayloadpattern 应匹配 @MessagePattern() 装饰器中定义的模式。payload 是我们想要传输到远程微服务的消息。该方法返回一个Observable,这意味着你必须显式订阅它才会发送消息。

发布事件

要发送事件,使用 ClientProxy 对象的 emit() 方法。该方法向消息代理发布事件。

typescript
async publish() {
  this.client.emit<number>('user_created', new UserCreatedEvent());
}

emit() 方法接受两个参数:patternpayloadpattern 应匹配 @EventPattern() 装饰器中定义的模式,而 payload 表示你想传输到远程微服务的事件数据。该方法返回一个Observable(与 send() 返回的冷 Observable 不同),这意味着无论你是否显式订阅该 observable,代理都会立即尝试传递事件。

请求作用域

对于来自不同编程语言背景的开发者来说,可能会惊讶地发现在 Nest 中,大多数东西在传入请求之间是共享的。这包括数据库连接池、具有全局状态的单例服务等。请记住,Node.js 不遵循请求/响应多线程无状态模型,其中每个请求由单独的线程处理。因此,使用单例实例对我们的应用程序是安全的

然而,在一些边缘情况下,基于请求的生命周期可能是处理器所需要的。这可能包括 GraphQL 应用中的每请求缓存、请求追踪或多租户等场景。你可以在这里了解更多关于如何控制作用域的信息。

请求作用域的处理器和提供者可以使用 @Inject() 装饰器结合 CONTEXT 令牌注入 RequestContext

typescript
import { Injectable, Scope, Inject } from '@nestjs/common';
import { CONTEXT, RequestContext } from '@nestjs/microservices';

@Injectable({ scope: Scope.REQUEST })
export class CatsService {
  constructor(@Inject(CONTEXT) private ctx: RequestContext) {}
}

这提供了对 RequestContext 对象的访问,该对象有两个属性:

typescript
export interface RequestContext<T = any> {
  pattern: string | Record<string, any>;
  data: T;
}

data 属性是消息生产者发送的消息负载。pattern 属性是用于标识合适处理器来处理传入消息的模式。

实例状态更新

要获取连接和底层驱动实例状态的实时更新,你可以订阅 status 流。此流提供特定于所选驱动的状态更新。例如,如果你使用 TCP 传输器(默认),status 流会发出 connecteddisconnected 事件。

typescript
this.client.status.subscribe((status: TcpStatus) => {
  console.log(status);
});

提示

TcpStatus 类型从 @nestjs/microservices 包导入。

同样,你可以订阅服务器的 status 流以接收服务器状态的通知。

typescript
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: TcpStatus) => {
  console.log(status);
});

监听内部事件

在某些情况下,你可能想监听微服务发出的内部事件。例如,你可以监听 error 事件,以在发生错误时触发额外操作。为此,使用 on() 方法,如下所示:

typescript
this.client.on('error', (err) => {
  console.error(err);
});

同样,你可以监听服务器的内部事件:

typescript
server.on<TcpEvents>('error', (err) => {
  console.error(err);
});

提示

TcpEvents 类型从 @nestjs/microservices 包导入。

底层驱动访问

对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或使用驱动特定方法等场景中很有用。然而,请记住,在大多数情况下你不应该需要直接访问驱动。

为此,你可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定你期望的驱动实例类型。

typescript
const netServer = this.client.unwrap<Server>();

这里 Server 是从 net 模块导入的类型。

同样,你可以访问服务器的底层驱动实例:

typescript
const netServer = server.unwrap<Server>();

处理超时

在分布式系统中,微服务有时可能会宕机或不可用。为了防止无限期等待,你可以使用超时。超时是与其他服务通信时非常有用的模式。要为微服务调用应用超时,你可以使用 RxJStimeout 操作符。如果微服务在指定时间内没有响应,将抛出一个异常,你可以捕获并适当处理。

要实现这一点,你需要使用 rxjs 包。只需在管道中使用 timeout 操作符:

typescript
this.client
  .send<TResult, TInput>(pattern, data)
  .pipe(timeout(5000));

提示

timeout 操作符从 rxjs/operators 包导入。

5 秒后,如果微服务没有响应,它将抛出一个错误。

TLS 支持

在私有网络之外通信时,加密流量以确保安全很重要。在 NestJS 中,可以使用 Node 内置的 TLS 模块通过 TCP 上的 TLS 来实现。Nest 在其 TCP 传输中提供了对 TLS 的内置支持,允许我们加密微服务或客户端之间的通信。

要为 TCP 服务器启用 TLS,你需要 PEM 格式的私钥和证书。通过设置 tlsOptions 并指定密钥和证书文件将它们添加到服务器选项中,如下所示:

typescript
import * as fs from 'fs';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const key = fs.readFileSync('<pathToKeyFile>', 'utf8').toString();
  const cert = fs.readFileSync('<pathToCertFile>', 'utf8').toString();

  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      options: {
        tlsOptions: {
          key,
          cert,
        },
      },
    },
  );

  await app.listen();
}
bootstrap();

要使客户端通过 TLS 安全通信,我们也定义 tlsOptions 对象,但这次使用 CA 证书。这是签署服务器证书的机构的证书。这确保客户端信任服务器的证书并可以建立安全连接。

typescript
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.TCP,
        options: {
          tlsOptions: {
            ca: [fs.readFileSync('<pathToCaFile>', 'utf-8').toString()],
          },
        },
      },
    ]),
  ],
})
export class AppModule {}

如果你的配置涉及多个受信任的机构,也可以传递一个 CA 数组。

一切设置完成后,你可以像往常一样使用 @Inject() 装饰器注入 ClientProxy,在服务中使用客户端。这确保了 NestJS 微服务之间的加密通信,Node 的 TLS 模块处理加密细节。

更多信息请参考 Node 的 TLS 文档

动态配置

当微服务需要使用 ConfigService(来自 @nestjs/config 包)进行配置,但注入上下文仅在微服务实例创建后才可用时,AsyncMicroserviceOptions 提供了解决方案。这种方法允许动态配置,确保与 ConfigService 的顺畅集成。

typescript
import { ConfigService } from '@nestjs/config';
import { AsyncMicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<AsyncMicroserviceOptions>(
    AppModule,
    {
      useFactory: (configService: ConfigService) => ({
        transport: Transport.TCP,
        options: {
          host: configService.get<string>('HOST'),
          port: configService.get<number>('PORT'),
        },
      }),
      inject: [ConfigService],
    },
  );

  await app.listen();
}
bootstrap();

基于 NestJS 官方文档翻译