自定义传输器
Nest 提供了多种开箱即用的传输器,以及一个允许开发者构建新的自定义传输策略的 API。传输器使你能够使用可插拔的通信层和非常简单的应用级消息协议通过网络连接组件(阅读完整文章)。
提示
使用 Nest 构建微服务并不一定意味着你必须使用 @nestjs/microservices 包。例如,如果你想与外部服务(比如用不同语言编写的其他微服务)通信,你可能不需要 @nestjs/microservice 库提供的所有功能。 事实上,如果你不需要让你声明式定义订阅者的装饰器(@EventPattern 或 @MessagePattern),运行一个独立应用并手动维护连接/订阅频道对大多数用例来说应该足够了,并且会给你更多的灵活性。
通过自定义传输器,你可以集成任何消息系统/协议(包括 Google Cloud Pub/Sub、Amazon Kinesis 等)或扩展现有的传输器,在其之上添加额外功能(例如 MQTT 的 QoS)。
提示
要更好地了解 Nest 微服务的工作方式以及如何扩展现有传输器的能力,我们推荐阅读 NestJS Microservices in Action 和 Advanced NestJS Microservices 系列文章。
创建策略
首先,让我们定义一个表示自定义传输器的类。
import { CustomTransportStrategy, Server } from '@nestjs/microservices';
class GoogleCloudPubSubServer
extends Server
implements CustomTransportStrategy
{
/**
* 当你运行 "app.listen()" 时触发。
*/
listen(callback: () => void) {
callback();
}
/**
* 在应用程序关闭时触发。
*/
close() {}
/**
* 如果你不希望传输器用户能够注册事件监听器,
* 可以忽略此方法。大多数自定义实现不需要此方法。
*/
on(event: string, callback: Function) {
throw new Error('Method not implemented.');
}
/**
* 如果你不希望传输器用户能够获取底层原生服务器,
* 可以忽略此方法。大多数自定义实现不需要此方法。
*/
unwrap<T = never>(): T {
throw new Error('Method not implemented.');
}
}警告
请注意,我们不会在本章中实现一个功能完善的 Google Cloud Pub/Sub 服务器,因为这需要深入研究传输器特定的技术细节。
在上面的示例中,我们声明了 GoogleCloudPubSubServer 类并提供了由 CustomTransportStrategy 接口强制要求的 listen() 和 close() 方法。此外,我们的类扩展了从 @nestjs/microservices 包导入的 Server 类,该类提供了一些有用的方法,例如 Nest 运行时用于注册消息处理器的方法。或者,如果你想扩展现有传输策略的能力,你可以扩展相应的服务器类,例如 ServerRedis。按照惯例,我们为类添加了 "Server" 后缀,因为它将负责订阅消息/事件(并在必要时响应它们)。
有了这个,我们现在可以使用自定义策略代替内置传输器,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new GoogleCloudPubSubServer(),
},
);基本上,我们传递了一个单一的 strategy 属性,其值是我们自定义传输器类的实例,而不是传递具有 transport 和 options 属性的普通传输器选项对象。
回到我们的 GoogleCloudPubSubServer 类,在实际应用中,我们会在 listen() 方法中建立与消息代理/外部服务的连接并注册订阅者/监听特定频道(然后在 close() 清理方法中移除订阅并关闭连接),但由于这需要充分理解 Nest 微服务如何相互通信,我们推荐阅读这个系列文章。在本章中,我们将重点关注 Server 类提供的功能以及如何利用它们来构建自定义策略。
例如,假设在我们的应用程序中某处定义了以下消息处理器:
@MessagePattern('echo')
echo(@Payload() data: object) {
return data;
}此消息处理器将由 Nest 运行时自动注册。通过 Server 类,你可以查看已注册的消息模式,还可以访问和执行分配给它们的实际方法。要测试这一点,让我们在 callback 函数调用之前在 listen() 方法内添加一个简单的 console.log:
listen(callback: () => void) {
console.log(this.messageHandlers);
callback();
}应用程序重启后,你会在终端中看到以下日志:
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }提示
如果我们使用了 @EventPattern 装饰器,你会看到相同的输出,但 isEventHandler 属性设置为 true。
如你所见,messageHandlers 属性是一个 Map 集合,包含所有消息(和事件)处理器,其中模式被用作键。现在,你可以使用键(例如 "echo")来获取消息处理器的引用:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
console.log(await echoHandler('Hello world!'));
callback();
}一旦我们执行 echoHandler 并传递一个任意字符串作为参数(这里是 "Hello world!"),我们应该在控制台中看到它:
Hello world!这意味着我们的方法处理器被正确执行了。
当使用带有拦截器的 CustomTransportStrategy 时,处理器被包装在 RxJS 流中。这意味着你需要订阅它们以执行流的底层逻辑(例如,在拦截器执行后继续进入控制器逻辑)。
示例如下:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
const streamOrResult = await echoHandler('Hello World');
if (isObservable(streamOrResult)) {
streamOrResult.subscribe();
}
callback();
}客户端代理
正如我们在第一节中提到的,你不一定需要使用 @nestjs/microservices 包来创建微服务,但如果你决定这样做并且需要集成自定义策略,你也需要提供一个"客户端"类。
提示
同样,实现一个与所有 @nestjs/microservices 功能(如流式传输)完全兼容的客户端类需要充分理解框架使用的通信技术。要了解更多,请查看这篇文章。
要与外部服务通信/发送和发布消息(或事件),你可以使用库特定的 SDK 包,或者实现一个扩展 ClientProxy 的自定义客户端类,如下所示:
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {}
async close() {}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {}
unwrap<T = never>(): T {
throw new Error('Method not implemented.');
}
}警告
请注意,我们不会在本章中实现一个功能完善的 Google Cloud Pub/Sub 客户端,因为这需要深入研究传输器特定的技术细节。
如你所见,ClientProxy 类要求我们提供多个方法来建立和关闭连接以及发布消息(publish)和事件(dispatchEvent)。注意,如果你不需要请求-响应通信风格的支持,可以将 publish() 方法留空。同样,如果你不需要支持基于事件的通信,可以跳过 dispatchEvent() 方法。
要观察这些方法何时以及如何执行,让我们添加多个 console.log 调用,如下所示:
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {
console.log('connect');
}
async close() {
console.log('close');
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
return console.log('event to dispatch: ', packet);
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {
console.log('message:', packet);
// 在实际应用中,应使用从响应者发回的负载来执行 "callback" 函数。
// 这里,我们将简单地模拟(5 秒延迟)
// 通过传递我们最初传入的相同 "data" 来表示响应已到达。
//
// WritePacket 上的 "isDisposed" 布尔值告诉响应不再期望更多数据。
// 如果未发送或为 false,这将简单地向 Observable 发出数据。
setTimeout(() => callback({
response: packet.data,
isDisposed: true,
}), 5000);
return () => console.log('teardown');
}
unwrap<T = never>(): T {
throw new Error('Method not implemented.');
}
}有了这个,让我们创建一个 GoogleCloudPubSubClient 类的实例并运行 send() 方法(你可能在之前的章节中见过),订阅返回的 observable 流。
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));现在,你应该在终端中看到以下输出:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- 5 秒后要测试我们的"teardown"方法(publish() 方法返回的)是否被正确执行,让我们对流应用一个 timeout 操作符,设置为 2 秒,使其比我们的 setTimeout 调用 callback 函数更早抛出。
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.pipe(timeout(2000))
.subscribe(
(response) => console.log(response),
(error) => console.error(error.message),
);提示
timeout 操作符从 rxjs/operators 包导入。
应用了 timeout 操作符后,你的终端输出应该如下所示:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred要分发事件(而不是发送消息),使用 emit() 方法:
googlePubSubClient.emit('event', 'Hello world!');你应该在控制台中看到以下内容:
connect
event to dispatch: { pattern: 'event', data: 'Hello world!' }消息序列化
如果你需要在客户端响应的序列化方面添加一些自定义逻辑,可以使用一个扩展 ClientProxy 类或其子类的自定义类。要修改成功的请求,你可以覆盖 serializeResponse 方法;要修改通过此客户端的任何错误,你可以覆盖 serializeError 方法。要使用此自定义类,你可以使用 customClass 属性将类本身传递给 ClientsModule.register() 方法。下面是一个将每个错误序列化为 RpcException 的自定义 ClientProxy 示例。
// error-handling.proxy
import { ClientTcp, RpcException } from '@nestjs/microservices';
class ErrorHandlingProxy extends ClientTCP {
serializeError(err: Error) {
return new RpcException(err);
}
}然后在 ClientsModule 中这样使用它:
// app.module
@Module({
imports: [
ClientsModule.register([{
name: 'CustomProxy',
customClass: ErrorHandlingProxy,
}]),
]
})
export class AppModule提示
传递给 customClass 的是类本身,而不是类的实例。Nest 会在底层为你创建实例,并将传递给 options 属性的任何选项传递给新的 ClientProxy。