RabbitMQ
RabbitMQ 是一个开源的轻量级消息代理,支持多种消息传递协议。它可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。此外,它是全球最广泛部署的消息代理,被全球的小型创业公司和大型企业所使用。
安装
要开始构建基于 RabbitMQ 的微服务,首先安装所需的包:
$ npm i --save amqplib amqp-connection-manager概述
要使用 RabbitMQ 传输器,请将以下选项对象传递给 createMicroservice() 方法:
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});提示
Transport 枚举从 @nestjs/microservices 包导入。
选项
options 属性特定于所选的传输器。RabbitMQ 传输器暴露以下属性。
urls | 按顺序尝试的连接 URL 数组 |
queue | 服务器将监听的队列名称 |
prefetchCount | 设置通道的预取计数 |
isGlobalPrefetchCount | 启用每通道预取 |
noAck | 如果为 false,则启用手动确认模式 |
consumerTag | 服务器用于区分消费者的消息传递的名称;不能在通道上已被使用。通常更容易省略此项,这种情况下服务器将创建一个随机名称并在回复中提供。消费者标签标识符(阅读更多请点这里) |
queueOptions | 额外的队列选项(阅读更多请点这里) |
socketOptions | 额外的 socket 选项(阅读更多请点这里) |
headers | 随每条消息发送的头 |
replyQueue | 生产者的回复队列。默认为 amq.rabbitmq.reply-to |
persistent | 如果为真,消息将在代理重启后存活,前提是它位于同样能存活重启的队列中 |
noAssert | 当为 false 时,在消费之前不会断言队列 |
wildcards | 设置为 true 仅当你想使用 Topic Exchange 来将消息路由到队列。启用此选项将允许你使用通配符(*、#)作为消息和事件模式 |
exchange | 交换器名称。当 "wildcards" 设置为 true 时,默认为队列名称 |
exchangeType | 交换器类型。默认为 topic。有效值为 direct、fanout、topic 和 headers |
routingKey | Topic Exchange 的额外路由键 |
maxConnectionAttempts | 最大连接尝试次数。仅适用于消费者配置。-1 === 无限 |
客户端
与其他微服务传输器一样,你有多种选择来创建 RabbitMQ ClientProxy 实例。
一种创建实例的方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,导入它并使用 register() 方法传递一个选项对象,该对象包含上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。在这里阅读更多关于 ClientsModule 的信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})也可以使用其他选项来创建客户端(ClientProxyFactory 或 @Client())。你可以在这里了解它们。
上下文
在更复杂的场景中,你可能需要访问传入请求的额外信息。使用 RabbitMQ 传输器时,你可以访问 RmqContext 对象。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}提示
@Payload()、@Ctx() 和 RmqContext 从 @nestjs/microservices 包导入。
要访问原始 RabbitMQ 消息(包含 properties、fields 和 content),使用 RmqContext 对象的 getMessage() 方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}要获取 RabbitMQ channel 的引用,使用 RmqContext 对象的 getChannelRef 方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}消息确认
为确保消息永不丢失,RabbitMQ 支持消息确认。确认由消费者发回,告诉 RabbitMQ 某条特定消息已被接收、处理,RabbitMQ 可以自由删除它。如果消费者死亡(其通道被关闭、连接被关闭或 TCP 连接丢失)而没有发送确认,RabbitMQ 将理解消息未被完全处理并将其重新入队。
要启用手动确认模式,将 noAck 属性设置为 false:
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},当启用手动消费者确认时,我们必须从工作者发送适当的确认来表明我们已完成任务。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}记录构建器
要配置消息选项,你可以使用 RmqRecordBuilder 类(注意:这也适用于基于事件的流程)。例如,要设置 headers 和 priority 属性,使用 setOptions 方法,如下所示:
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);提示
RmqRecordBuilder 类从 @nestjs/microservices 包导出。
你也可以在服务器端通过访问 RmqContext 来读取这些值,如下所示:
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}实例状态更新
要获取连接和底层驱动实例状态的实时更新,你可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 RMQ 驱动,status 流发出 connected 和 disconnected 事件。
this.client.status.subscribe((status: RmqStatus) => {
console.log(status);
});提示
RmqStatus 类型从 @nestjs/microservices 包导入。
同样,你可以订阅服务器的 status 流以接收服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RmqStatus) => {
console.log(status);
});监听 RabbitMQ 事件
在某些情况下,你可能想监听微服务发出的内部事件。例如,你可以监听 error 事件,以在发生错误时触发额外操作。为此,使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err);
});同样,你可以监听服务器的内部事件:
server.on<RmqEvents>('error', (err) => {
console.error(err);
});提示
RmqEvents 类型从 @nestjs/microservices 包导入。
底层驱动访问
对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或使用驱动特定方法等场景中很有用。然而,请记住,在大多数情况下你不应该需要直接访问驱动。
为此,你可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定你期望的驱动实例类型。
const managerRef =
this.client.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();同样,你可以访问服务器的底层驱动实例:
const managerRef =
server.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();通配符
RabbitMQ 支持在路由键中使用通配符,以实现灵活的消息路由。# 通配符匹配零个或多个单词,而 * 通配符恰好匹配一个单词。
例如,路由键 cats.# 匹配 cats、cats.meow 和 cats.meow.purr。路由键 cats.* 匹配 cats.meow 但不匹配 cats.meow.purr。
要在 RabbitMQ 微服务中启用通配符支持,在选项对象中将 wildcards 配置选项设置为 true:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
wildcards: true,
},
},
);使用此配置,你可以在订阅事件/消息时使用通配符作为路由键。例如,要监听路由键为 cats.# 的消息,可以使用以下代码:
@MessagePattern('cats.#')
getCats(@Payload() data: { message: string }, @Ctx() context: RmqContext) {
console.log(`Received message with routing key: ${context.getPattern()}`);
return {
message: 'Hello from the cats service!',
}
}要发送带有特定路由键的消息,你可以使用 ClientProxy 实例的 send() 方法:
this.client.send('cats.meow', { message: 'Meow!' }).subscribe((response) => {
console.log(response);
});