gRPC
gRPC 是一个现代化的开源高性能 RPC 框架,可以在任何环境中运行。它可以通过可插拔的负载均衡、追踪、健康检查和身份验证支持,高效地连接数据中心内部和跨数据中心的服务。
与许多 RPC 系统一样,gRPC 基于定义服务的概念,即定义可以远程调用的函数(方法)。对于每个方法,你定义参数和返回类型。服务、参数和返回类型在 .proto 文件中使用 Google 的开源语言无关的 protocol buffers 机制定义。
通过 gRPC 传输器,Nest 使用 .proto 文件动态绑定客户端和服务器,使得实现远程过程调用变得简单,并自动序列化和反序列化结构化数据。
安装
要开始构建基于 gRPC 的微服务,首先安装所需的包:
$ npm i --save @grpc/grpc-js @grpc/proto-loader概述
与其他 Nest 微服务传输层实现一样,你可以使用传递给 createMicroservice() 方法的选项对象的 transport 属性来选择 gRPC 传输器机制。在以下示例中,我们将设置一个 hero 服务。options 属性提供了关于该服务的元数据;其属性描述在下方。
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
});提示
join() 函数从 path 包导入;Transport 枚举从 @nestjs/microservices 包导入。
在 nest-cli.json 文件中,我们添加 assets 属性来分发非 TypeScript 文件,以及 watchAssets 来启用对所有非 TypeScript 资源的监视。在我们的案例中,我们希望 .proto 文件被自动复制到 dist 文件夹。
{
"compilerOptions": {
"assets": ["**/*.proto"],
"watchAssets": true
}
}选项
gRPC 传输器选项对象暴露以下属性。
package | Protobuf 包名称(匹配 .proto 文件中的 package 设置)。必填 |
protoPath | .proto 文件的绝对路径(或相对于根目录的路径)。必填 |
url | 连接 URL。格式为 ip 地址/dns 名称:端口 的字符串(例如 '0.0.0.0:50051' 用于 Docker 服务器),定义传输器建立连接的地址/端口。可选。默认为 'localhost:5000' |
protoLoader | 用于加载 .proto 文件的工具 NPM 包名称。可选。默认为 '@grpc/proto-loader' |
loader | @grpc/proto-loader 选项。这些选项提供对 .proto 文件行为的详细控制。可选。详情请参阅 这里 |
credentials | 服务器凭证。可选。阅读更多 |
示例 gRPC 服务
让我们定义名为 HeroesService 的示例 gRPC 服务。在上面的 options 对象中,protoPath 属性设置了 .proto 定义文件 hero.proto 的路径。hero.proto 文件使用 protocol buffers 结构化。内容如下:
// hero/hero.proto
syntax = "proto3";
package hero;
service HeroesService {
rpc FindOne (HeroById) returns (Hero) {}
}
message HeroById {
int32 id = 1;
}
message Hero {
int32 id = 1;
string name = 2;
}我们的 HeroesService 暴露了一个 FindOne() 方法。该方法期望一个 HeroById 类型的输入参数并返回一个 Hero 消息(protocol buffers 使用 message 元素来定义参数类型和返回类型)。
接下来,我们需要实现该服务。要定义一个满足此定义的处理器,我们在控制器中使用 @GrpcMethod() 装饰器,如下所示。该装饰器提供了将方法声明为 gRPC 服务方法所需的元数据。
提示
在之前的微服务章节中介绍的 @MessagePattern() 装饰器(阅读更多)不适用于基于 gRPC 的微服务。@GrpcMethod() 装饰器实际上替代了它在基于 gRPC 微服务中的角色。
// heroes.controller
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService', 'FindOne')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}提示
@GrpcMethod() 装饰器从 @nestjs/microservices 包导入,而 Metadata 和 ServerUnaryCall 从 grpc 包导入。
上面显示的装饰器接受两个参数。第一个是服务名称(例如 'HeroesService'),对应于 hero.proto 中的 HeroesService 服务定义。第二个(字符串 'FindOne')对应于 hero.proto 文件中 HeroesService 内定义的 FindOne() rpc 方法。
findOne() 处理器方法接受三个参数:从调用方传递的 data、存储 gRPC 请求元数据的 metadata,以及用于获取 GrpcCall 对象属性(如 sendMetadata 用于向客户端发送元数据)的 call。
@GrpcMethod() 装饰器的两个参数都是可选的。如果不提供第二个参数(例如 'FindOne'),Nest 将根据将处理器名称转换为大驼峰命名法来自动关联 .proto 文件的 rpc 方法(例如 findOne 处理器关联到 FindOne rpc 调用定义)。如下所示。
// heroes.controller
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}你也可以省略第一个 @GrpcMethod() 参数。在这种情况下,Nest 会根据定义处理器的类名自动将处理器与 proto 定义文件中的服务定义关联。例如,在以下代码中,类 HeroesService 通过名称 'HeroesService' 的匹配,将其处理器方法与 hero.proto 文件中的 HeroesService 服务定义关联。
// heroes.controller
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}客户端
Nest 应用程序可以充当 gRPC 客户端,消费 .proto 文件中定义的服务。你通过 ClientGrpc 对象访问远程服务。你可以通过多种方式获取 ClientGrpc 对象。
首选技术是导入 ClientsModule。使用 register() 方法将 .proto 文件中定义的一组服务绑定到注入令牌,并配置服务。name 属性是注入令牌。对于 gRPC 服务,使用 transport: Transport.GRPC。options 属性是一个具有上述相同属性的对象。
imports: [
ClientsModule.register([
{
name: 'HERO_PACKAGE',
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
},
]),
];提示
register() 方法接受一个对象数组。通过提供以逗号分隔的注册对象列表来注册多个包。
注册后,我们可以使用 @Inject() 注入配置的 ClientGrpc 对象。然后使用 ClientGrpc 对象的 getService() 方法来获取服务实例,如下所示。
@Injectable()
export class AppService implements OnModuleInit {
private heroesService: HeroesService;
constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService');
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 });
}
}警告
gRPC 客户端不会发送名称中包含下划线 _ 的字段,除非在 proto loader 配置中将 keepCase 选项设置为 true(微服务传输器配置中的 options.loader.keepcase)。
注意,与其他微服务传输方法使用的技术相比,有一个小区别。我们使用 ClientGrpc 类而不是 ClientProxy 类,ClientGrpc 提供了 getService() 方法。getService() 泛型方法接受服务名称作为参数并返回其实例(如果可用)。
或者,你可以使用 @Client() 装饰器实例化一个 ClientGrpc 对象,如下所示:
@Injectable()
export class AppService implements OnModuleInit {
@Client({
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
})
client: ClientGrpc;
private heroesService: HeroesService;
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService');
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 });
}
}最后,对于更复杂的场景,我们可以使用如此处描述的 ClientProxyFactory 类注入一个动态配置的客户端。
无论哪种情况,我们最终都会获得 HeroesService 代理对象的引用,该对象暴露了与 .proto 文件中定义的相同方法集。现在,当我们访问此代理对象(即 heroesService)时,gRPC 系统会自动序列化请求、转发到远程系统、返回响应并反序列化响应。因为 gRPC 为我们屏蔽了这些网络通信细节,heroesService 看起来和表现得就像一个本地提供者。
注意,所有服务方法都是小驼峰命名的(以遵循语言的自然约定)。所以,例如,虽然我们的 .proto 文件 HeroesService 定义包含 FindOne() 函数,但 heroesService 实例将提供 findOne() 方法。
interface HeroesService {
findOne(data: { id: number }): Observable<any>;
}消息处理器也能够返回 Observable,在这种情况下,结果值将被发出直到流完成。
// heroes.controller
@Get()
call(): Observable<any> {
return this.heroesService.findOne({ id: 1 });
}要发送 gRPC 元数据(随请求一起),你可以传递第二个参数,如下所示:
call(): Observable<any> {
const metadata = new Metadata();
metadata.add('Set-Cookie', 'yummy_cookie=choco');
return this.heroesService.findOne({ id: 1 }, metadata);
}提示
Metadata 类从 grpc 包导入。
请注意,这需要更新我们在前面几步中定义的 HeroesService 接口。
示例
一个可运行的示例可在这里获取。
gRPC 反射
gRPC 服务器反射规范是一个标准,允许 gRPC 客户端请求服务器暴露的 API 的详细信息,类似于为 REST API 暴露 OpenAPI 文档。这可以使使用开发者调试工具(如 grpc-ui 或 postman)变得更加容易。
要为服务器添加 gRPC 反射支持,首先安装所需的实现包:
$ npm i --save @grpc/reflection然后可以使用 gRPC 服务器选项中的 onLoadPackageDefinition 钩子将其挂接到 gRPC 服务器,如下所示:
// main
import { ReflectionService } from '@grpc/reflection';
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
options: {
onLoadPackageDefinition: (pkg, server) => {
new ReflectionService(pkg).addToServer(server);
},
},
});现在你的服务器将使用反射规范响应请求 API 详情的消息。
gRPC 流
gRPC 本身支持长期活动的连接,通常称为 streams(流)。流适用于聊天、观察或分块数据传输等用例。在官方文档中查找更多详情这里。
Nest 以两种可能的方式支持 GRPC 流处理器:
- RxJS
Subject+Observable处理器:可用于直接在控制器方法内编写响应,或传递给Subject/Observable消费者 - 纯 GRPC 调用流处理器:可用于传递给某个执行器,由其处理 Node 标准
Duplex流处理器的其余分发
流示例
让我们定义一个名为 HelloService 的新示例 gRPC 服务。hello.proto 文件使用 protocol buffers 结构化。内容如下:
// hello/hello.proto
syntax = "proto3";
package hello;
service HelloService {
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}提示
LotsOfGreetings 方法可以简单地使用 @GrpcMethod 装饰器实现(如上面的示例),因为返回的流可以发出多个值。
基于这个 .proto 文件,让我们定义 HelloService 接口:
interface HelloService {
bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>;
lotsOfGreetings(
upstream: Observable<HelloRequest>,
): Observable<HelloResponse>;
}
interface HelloRequest {
greeting: string;
}
interface HelloResponse {
reply: string;
}Subject 策略
@GrpcStreamMethod() 装饰器将函数参数作为 RxJS Observable 提供。因此,我们可以接收和处理多条消息。
@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
const subject = new Subject();
const onNext = message => {
console.log(message);
subject.next({
reply: 'Hello, world!'
});
};
const onComplete = () => subject.complete();
messages.subscribe({
next: onNext,
complete: onComplete,
});
return subject.asObservable();
}警告
要使用 @GrpcStreamMethod() 装饰器支持全双工交互,控制器方法必须返回 RxJS Observable。
提示
Metadata 和 ServerUnaryCall 类/接口从 grpc 包导入。
根据服务定义(在 .proto 文件中),BidiHello 方法应该向服务流式传输请求。要从客户端向流发送多条异步消息,我们利用 RxJS ReplaySubject 类。
const helloService = this.client.getService<HelloService>('HelloService');
const helloRequest$ = new ReplaySubject<HelloRequest>();
helloRequest$.next({ greeting: 'Hello (1)!' });
helloRequest$.next({ greeting: 'Hello (2)!' });
helloRequest$.complete();
return helloService.bidiHello(helloRequest$);在上面的示例中,我们向流写入了两条消息(next() 调用),并通知服务我们已完成数据发送(complete() 调用)。
调用流处理器
当方法返回值定义为 stream 时,@GrpcStreamCall() 装饰器将函数参数作为 grpc.ServerDuplexStream 提供,它支持标准方法如 .on('data', callback)、.write(message) 或 .cancel()。可用方法的完整文档可在这里找到。
另外,当方法返回值不是 stream 时,@GrpcStreamCall() 装饰器提供两个函数参数,分别是 grpc.ServerReadableStream(阅读更多请点这里)和 callback。
让我们从实现应该支持全双工交互的 BidiHello 开始。
@GrpcStreamCall()
bidiHello(requestStream: any) {
requestStream.on('data', message => {
console.log(message);
requestStream.write({
reply: 'Hello, world!'
});
});
}提示
此装饰器不需要提供任何特定的返回参数。流的处理方式与任何其他标准流类型类似。
在上面的示例中,我们使用 write() 方法向响应流写入对象。传入 .on() 方法作为第二个参数的回调将在服务每次接收到新的数据块时被调用。
让我们实现 LotsOfGreetings 方法。
@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
requestStream.on('data', message => {
console.log(message);
});
requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }));
}这里我们使用 callback 函数在 requestStream 处理完成后发送响应。
健康检查
当在 Kubernetes 等编排器中运行 gRPC 应用程序时,你可能需要知道它是否正在运行且处于健康状态。gRPC 健康检查规范是一个标准,允许 gRPC 客户端暴露其健康状态,以便编排器可以相应地采取行动。
要添加 gRPC 健康检查支持,首先安装 grpc-node 包:
$ npm i --save grpc-health-check然后可以使用 gRPC 服务器选项中的 onLoadPackageDefinition 钩子将其挂接到 gRPC 服务。注意 protoPath 需要同时包含健康检查和 hero 包。
// main
import { HealthImplementation, protoPath as healthCheckProtoPath } from 'grpc-health-check';
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
options: {
protoPath: [
healthCheckProtoPath,
protoPath: join(__dirname, 'hero/hero.proto'),
],
onLoadPackageDefinition: (pkg, server) => {
const healthImpl = new HealthImplementation({
'': 'UNKNOWN',
});
healthImpl.addToServer(server);
healthImpl.setStatus('', 'SERVING');
},
},
});提示
gRPC health probe 是一个有用的 CLI 工具,用于在容器化环境中测试 gRPC 健康检查。
gRPC 元数据
元数据是关于特定 RPC 调用的信息,以键值对列表的形式存在,其中键是字符串,值通常是字符串但也可以是二进制数据。元数据对 gRPC 本身是不透明的——它允许客户端提供与调用关联的信息给服务器,反之亦然。元数据可能包括身份验证令牌、用于监控目的的请求标识符和标签,以及数据信息如数据集中的记录数。
要在 @GrpcMethod() 处理器中读取元数据,使用第二个参数(metadata),类型为 Metadata(从 grpc 包导入)。
要从处理器发送回元数据,使用 ServerUnaryCall#sendMetadata() 方法(第三个处理器参数)。
// heroes.controller
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const serverMetadata = new Metadata();
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
serverMetadata.add('Set-Cookie', 'yummy_cookie=choco');
call.sendMetadata(serverMetadata);
return items.find(({ id }) => id === data.id);
}
}同样,要在使用 @GrpcStreamMethod() 处理器(subject 策略)注解的处理器中读取元数据,使用第二个参数(metadata),类型为 Metadata(从 grpc 包导入)。
要从处理器发送回元数据,使用 ServerDuplexStream#sendMetadata() 方法(第三个处理器参数)。
要从调用流处理器(使用 @GrpcStreamCall() 装饰器注解的处理器)中读取元数据,监听 requestStream 引用上的 metadata 事件,如下所示:
requestStream.on('metadata', (metadata: Metadata) => {
const meta = metadata.get('X-Meta');
});