适配器
WebSocket 模块是平台无关的,因此你可以通过使用 WebSocketAdapter 接口来引入自己的库(甚至是原生实现)。该接口要求实现下表中描述的几个方法:
create | 根据传入的参数创建 socket 实例 |
bindClientConnect | 绑定客户端连接事件 |
bindClientDisconnect | 绑定客户端断开连接事件(可选*) |
bindMessageHandlers | 将传入的消息绑定到相应的消息处理器 |
close | 终止服务器实例 |
扩展 socket.io
socket.io 包被封装在 IoAdapter 类中。如果你想增强适配器的基本功能该怎么办?例如,你的技术需求要求能够在 Web 服务的多个负载均衡实例之间广播事件。为此,你可以扩展 IoAdapter 并覆盖负责实例化新 socket.io 服务器的单个方法。但首先,让我们安装所需的包。
警告
要在多个负载均衡实例中使用 socket.io,你必须在客户端的 socket.io 配置中设置 transports: ['websocket'] 来禁用轮询,或者在负载均衡器中启用基于 cookie 的路由。仅使用 Redis 是不够的。更多信息请参见这里。
$ npm i --save redis socket.io @socket.io/redis-adapter安装完包后,我们可以创建一个 RedisIoAdapter 类。
import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;
async connectToRedis(): Promise<void> {
const pubClient = createClient({ url: `redis://localhost:6379` });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
this.adapterConstructor = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
return server;
}
}之后,只需切换到新创建的 Redis 适配器。
const app = await NestFactory.create(AppModule);
const redisIoAdapter = new RedisIoAdapter(app);
await redisIoAdapter.connectToRedis();
app.useWebSocketAdapter(redisIoAdapter);Ws 库
另一个可用的适配器是 WsAdapter,它充当框架与集成了快速且经过全面测试的 ws 库之间的代理。该适配器与原生浏览器 WebSocket 完全兼容,并且比 socket.io 包快得多。遗憾的是,它开箱即用的功能要少得多。不过在某些情况下,你可能并不需要这些功能。
提示
ws 库不支持命名空间(socket.io 推广的通信通道概念)。然而,要在某种程度上模拟此功能,你可以在不同路径上挂载多个 ws 服务器(例如 @WebSocketGateway({ path: '/users' }))。
要使用 ws,我们首先需要安装所需的包:
$ npm i --save @nestjs/platform-ws安装完包后,我们可以切换适配器:
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));提示
WsAdapter 从 @nestjs/platform-ws 中导入。
WsAdapter 被设计为以 { event: string, data: any } 格式处理消息。如果你需要接收和处理不同格式的消息,你需要配置一个消息解析器将它们转换为此所需格式。
const wsAdapter = new WsAdapter(app, {
// 处理 [event, data] 格式的消息
messageParser: (data) => {
const [event, payload] = JSON.parse(data.toString());
return { event, data: payload };
},
});或者,你可以在创建适配器后使用 setMessageParser 方法配置消息解析器。
高级(自定义适配器)
出于演示目的,我们将手动集成 ws 库。如前所述,该库的适配器已经创建好了,并作为 WsAdapter 类从 @nestjs/platform-ws 包中导出。以下是简化版实现的大致样子:
// ws-adapter
import * as WebSocket from 'ws';
import { WebSocketAdapter, INestApplicationContext } from '@nestjs/common';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable, fromEvent, EMPTY } from 'rxjs';
import { mergeMap, filter } from 'rxjs/operators';
export class WsAdapter implements WebSocketAdapter {
constructor(private app: INestApplicationContext) {}
create(port: number, options: any = {}): any {
return new WebSocket.Server({ port, ...options });
}
bindClientConnect(server, callback: Function) {
server.on('connection', callback);
}
bindMessageHandlers(
client: WebSocket,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
) {
fromEvent(client, 'message')
.pipe(
mergeMap(data => this.bindMessageHandler(data, handlers, process)),
filter(result => result),
)
.subscribe(response => client.send(JSON.stringify(response)));
}
bindMessageHandler(
buffer,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
): Observable<any> {
const message = JSON.parse(buffer.data);
const messageHandler = handlers.find(
handler => handler.message === message.event,
);
if (!messageHandler) {
return EMPTY;
}
return process(messageHandler.callback(message.data));
}
close(server) {
server.close();
}
}提示
当你想利用 ws 库时,请使用内置的 WsAdapter,而不是创建自己的适配器。
然后,我们可以使用 useWebSocketAdapter() 方法设置自定义适配器:
// main
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));示例
一个使用 WsAdapter 的可运行示例可在这里获取。