Skip to content

NATS

NATS 是一个简单、安全、高性能的开源消息系统,适用于云原生应用、物联网消息传递和微服务架构。NATS 服务器使用 Go 编程语言编写,但与服务器交互的客户端库可用于数十种主要编程语言。NATS 支持最多一次至少一次两种交付方式。它可以在任何地方运行,从大型服务器和云实例,到边缘网关甚至物联网设备。

安装

要开始构建基于 NATS 的微服务,首先安装所需的包:

bash
$ npm i --save nats

概述

要使用 NATS 传输器,请将以下选项对象传递给 createMicroservice() 方法:

typescript
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
  },
});

提示

Transport 枚举从 @nestjs/microservices 包导入。

选项

options 对象特定于所选的传输器。NATS 传输器暴露这里描述的属性以及以下属性:

queue服务器应订阅的队列(留空 undefined 以忽略此设置)。在下方阅读更多关于 NATS 队列组的信息。
gracefulShutdown启用优雅关闭。启用后,服务器在关闭连接之前会先取消订阅所有频道。默认为 false
gracePeriod取消订阅所有频道后等待服务器的时间(毫秒)。默认为 10000 毫秒。

客户端

与其他微服务传输器一样,你有多种选择来创建 NATS ClientProxy 实例。

一种创建实例的方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,导入它并使用 register() 方法传递一个选项对象,该对象包含上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。在这里阅读更多关于 ClientsModule 的信息。

typescript
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.NATS,
        options: {
          servers: ['nats://localhost:4222'],
        }
      },
    ]),
  ]
  ...
})

也可以使用其他选项来创建客户端(ClientProxyFactory@Client())。你可以在这里了解它们。

请求-响应

对于请求-响应消息风格(阅读更多),NATS 传输器不使用 NATS 内置的 Request-Reply 机制。相反,"请求"使用 publish() 方法发布到指定主题,并带有唯一的回复主题名称,响应者监听该主题并将响应发送到回复主题。回复主题会动态定向回请求者,无论双方位于何处。

基于事件

对于基于事件的消息风格(阅读更多),NATS 传输器使用 NATS 内置的 Publish-Subscribe 机制。发布者在某个主题上发送消息,任何在该主题上监听的活动订阅者都会接收到该消息。订阅者还可以注册通配符主题的兴趣,其工作方式类似于正则表达式。这种一对多模式有时称为扇出。

队列组

NATS 提供了一个内置的负载均衡功能,称为分布式队列。要创建队列订阅,使用 queue 属性,如下所示:

typescript
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
    queue: 'cats_queue',
  },
});

上下文

在更复杂的场景中,你可能需要访问传入请求的额外信息。使用 NATS 传输器时,你可以访问 NatsContext 对象。

typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`);
}

提示

@Payload()@Ctx()NatsContext@nestjs/microservices 包导入。

通配符

订阅可以是一个明确的主题,也可以包含通配符。

typescript
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`); // 例如 "time.us.east"
  return new Date().toLocaleTimeString(...);
}

记录构建器

要配置消息选项,你可以使用 NatsRecordBuilder 类(注意:这也适用于基于事件的流程)。例如,要添加 x-version 头,使用 setHeaders 方法,如下所示:

typescript
import * as nats from 'nats';

// 代码中的某处
const headers = nats.headers();
headers.set('x-version', '1.0.0');

const record = new NatsRecordBuilder(':cat:').setHeaders(headers).build();
this.client.send('replace-emoji', record).subscribe(...);

提示

NatsRecordBuilder 类从 @nestjs/microservices 包导出。

你也可以在服务器端通过访问 NatsContext 来读取这些头,如下所示:

typescript
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: NatsContext): string {
  const headers = context.getHeaders();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

在某些情况下,你可能想为多个请求配置头,可以将这些作为选项传递给 ClientProxyFactory

typescript
import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.NATS,
          options: {
            servers: ['nats://localhost:4222'],
            headers: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}

实例状态更新

要获取连接和底层驱动实例状态的实时更新,你可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 NATS 驱动,status 流发出 connecteddisconnectedreconnecting 事件。

typescript
this.client.status.subscribe((status: NatsStatus) => {
  console.log(status);
});

提示

NatsStatus 类型从 @nestjs/microservices 包导入。

同样,你可以订阅服务器的 status 流以接收服务器状态的通知。

typescript
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: NatsStatus) => {
  console.log(status);
});

监听 NATS 事件

在某些情况下,你可能想监听微服务发出的内部事件。例如,你可以监听 error 事件,以在发生错误时触发额外操作。为此,使用 on() 方法,如下所示:

typescript
this.client.on('error', (err) => {
  console.error(err);
});

同样,你可以监听服务器的内部事件:

typescript
server.on<NatsEvents>('error', (err) => {
  console.error(err);
});

提示

NatsEvents 类型从 @nestjs/microservices 包导入。

底层驱动访问

对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或使用驱动特定方法等场景中很有用。然而,请记住,在大多数情况下你不应该需要直接访问驱动。

为此,你可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定你期望的驱动实例类型。

typescript
const natsConnection = this.client.unwrap<import('nats').NatsConnection>();

同样,你可以访问服务器的底层驱动实例:

typescript
const natsConnection = server.unwrap<import('nats').NatsConnection>();

基于 NestJS 官方文档翻译