Skip to content

Kafka

Kafka 是一个开源的分布式流平台,具有三个关键能力:

  • 发布和订阅记录流,类似于消息队列或企业消息系统。
  • 以容错持久的方式存储记录流。
  • 在记录发生时处理记录流。

Kafka 项目旨在提供一个统一的高吞吐量、低延迟平台,用于处理实时数据源。它与 Apache Storm 和 Spark 集成得非常好,可用于实时流数据分析。

安装

要开始构建基于 Kafka 的微服务,首先安装所需的包:

bash
$ npm i --save kafkajs

概述

与其他 Nest 微服务传输层实现一样,你可以使用传递给 createMicroservice() 方法的选项对象的 transport 属性来选择 Kafka 传输器机制,以及一个可选的 options 属性,如下所示:

typescript
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

提示

Transport 枚举从 @nestjs/microservices 包导入。

选项

options 对象特定于所选的传输器。Kafka 传输器暴露以下属性。

client客户端配置选项(阅读更多请点这里
consumer消费者配置选项(阅读更多请点这里
run运行配置选项(阅读更多请点这里
subscribe订阅配置选项(阅读更多请点这里
producer生产者配置选项(阅读更多请点这里
send发送配置选项(阅读更多请点这里
producerOnlyMode跳过消费者组注册并仅作为生产者的功能标志(boolean
postfixId更改 clientId 值的后缀(string

客户端

与其他微服务传输器相比,Kafka 有一个小区别。我们使用 ClientKafkaProxy 类而不是 ClientProxy 类。

与其他微服务传输器一样,你有多种选择来创建 ClientKafkaProxy 实例。

一种创建实例的方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,导入它并使用 register() 方法传递一个选项对象,该对象包含上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。在这里阅读更多关于 ClientsModule 的信息。

typescript
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

也可以使用其他选项来创建客户端(ClientProxyFactory@Client())。你可以在这里了解它们。

使用 @Client() 装饰器如下:

typescript
@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero',
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer'
    }
  }
})
client: ClientKafkaProxy;

消息模式

Kafka 微服务消息模式利用两个主题来处理请求和回复通道。ClientKafkaProxy.send() 方法通过将返回地址与请求消息关联来发送消息,关联内容包括关联 ID、回复主题和回复分区。这要求 ClientKafkaProxy 实例在发送消息之前已订阅回复主题并分配了至少一个分区。

因此,你需要为每个运行的 Nest 应用程序至少有一个回复主题分区。例如,如果你运行 4 个 Nest 应用程序但回复主题只有 3 个分区,那么其中 1 个 Nest 应用程序在尝试发送消息时将出错。

当新的 ClientKafkaProxy 实例启动时,它们加入消费者组并订阅各自的主题。这个过程会触发分配给消费者组中消费者的主题分区的重新平衡。

通常,主题分区使用轮询分区器分配,该分区器将主题分区分配给按消费者名称排序的消费者集合,消费者名称在应用程序启动时随机设置。然而,当新消费者加入消费者组时,新消费者可以被定位在消费者集合中的任何位置。这创建了一种条件,即当预先存在的消费者位于新消费者之后时,它们可能被分配到不同的分区。结果是,被分配到不同分区的消费者将丢失重新平衡之前发送的请求的响应消息。

为了防止 ClientKafkaProxy 消费者丢失响应消息,使用了 Nest 特有的内置自定义分区器。这个自定义分区器将分区分配给按高分辨率时间戳(process.hrtime())排序的消费者集合,时间戳在应用程序启动时设置。

消息响应订阅

注意

本节仅在你使用请求-响应消息风格(使用 @MessagePattern 装饰器和 ClientKafkaProxy.send 方法)时相关。订阅响应主题对于基于事件的通信(@EventPattern 装饰器和 ClientKafkaProxy.emit 方法)不是必需的。

ClientKafkaProxy 类提供了 subscribeToResponseOf() 方法。subscribeToResponseOf() 方法接受请求的主题名称作为参数,并将派生的回复主题名称添加到回复主题集合中。在实现消息模式时需要此方法。

typescript
// heroes.controller
onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
}

如果 ClientKafkaProxy 实例是异步创建的,subscribeToResponseOf() 方法必须在调用 connect() 方法之前调用。

typescript
// heroes.controller
async onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
  await this.client.connect();
}

入站

Nest 接收传入的 Kafka 消息作为具有 keyvalueheaders 属性的对象,这些属性的值类型为 Buffer。然后 Nest 通过将缓冲区转换为字符串来解析这些值。如果字符串"看起来像对象",Nest 会尝试将字符串解析为 JSON。然后将 value 传递给其关联的处理器。

出站

Nest 在发布事件或发送消息时,经过序列化过程后发送出站 Kafka 消息。这发生在传递给 ClientKafkaProxyemit()send() 方法的参数上,或从 @MessagePattern 方法返回的值上。此序列化通过使用 JSON.stringify()toString() 原型方法来"字符串化"非字符串或非缓冲区的对象。

typescript
// heroes.controller
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const dragonId = message.dragonId;
    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];
    return items;
  }
}

提示

@Payload()@nestjs/microservices 包导入。

出站消息也可以通过传递包含 keyvalue 属性的对象来添加键。为消息添加键对于满足共分区要求很重要。

typescript
// heroes.controller
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        realm
      },
      key: heroId,
      value: items
    }
  }
}

此外,以此格式传递的消息也可以包含在 headers 哈希属性中设置的自定义头。头哈希属性值必须是 stringBuffer 类型。

typescript
// heroes.controller
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        kafka_nestRealm: realm
      },
      key: heroId,
      value: items
    }
  }
}

基于事件

虽然请求-响应方法非常适合在服务之间交换消息,但当你的消息风格是基于事件的(这反过来对 Kafka 来说是理想的)——你只想发布事件而不等待响应时,它就不太适合了。在这种情况下,你不需要请求-响应所需的两个主题带来的开销。

查看这两节了解更多:概述:基于事件概述:发布事件

上下文

在更复杂的场景中,你可能需要访问传入请求的额外信息。使用 Kafka 传输器时,你可以访问 KafkaContext 对象。

typescript
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  console.log(`Topic: ${context.getTopic()}`);
}

提示

@Payload()@Ctx()KafkaContext@nestjs/microservices 包导入。

要访问原始 Kafka IncomingMessage 对象,使用 KafkaContext 对象的 getMessage() 方法,如下所示:

typescript
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

其中 IncomingMessage 实现以下接口:

typescript
interface IncomingMessage {
  topic: string;
  partition: number;
  timestamp: string;
  size: number;
  attributes: number;
  offset: string;
  key: any;
  value: any;
  headers: Record<string, any>;
}

如果你的处理器涉及每条接收消息的缓慢处理时间,应考虑使用 heartbeat 回调。要获取 heartbeat 函数,使用 KafkaContextgetHeartbeat() 方法,如下所示:

typescript
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const heartbeat = context.getHeartbeat();

  // 执行一些缓慢的处理
  await doWorkPart1();

  // 发送心跳以避免超过 sessionTimeout
  await heartbeat();

  // 再次执行一些缓慢的处理
  await doWorkPart2();
}

命名约定

Kafka 微服务组件会在 client.clientIdconsumer.groupId 选项后附加其各自角色的描述,以防止 Nest 微服务客户端和服务器组件之间的冲突。默认情况下,ClientKafkaProxy 组件附加 -clientServerKafka 组件附加 -server 到这两个选项。注意下面提供的值是如何以这种方式转换的(如注释所示)。

typescript
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-server
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-server
    },
  }
});

对于客户端:

typescript
// heroes.controller
@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-client
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-client
    }
  }
})
client: ClientKafkaProxy;

提示

Kafka 客户端和消费者命名约定可以通过在自定义提供者中扩展 ClientKafkaProxyKafkaServer 并覆盖构造函数来自定义。

由于 Kafka 微服务消息模式利用两个主题来处理请求和回复通道,因此应从请求主题派生回复模式。默认情况下,回复主题的名称是请求主题名称加上 .reply 后缀的组合。

typescript
// heroes.controller
onModuleInit() {
  this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}

提示

Kafka 回复主题命名约定可以通过在自定义提供者中扩展 ClientKafkaProxy 并覆盖 getResponsePatternName 方法来自定义。

可重试异常

与其他传输器类似,所有未处理的异常都会自动包装为 RpcException 并转换为"用户友好"的格式。然而,在某些边缘情况下,你可能希望绕过此机制,让异常被 kafkajs 驱动直接消费。在处理消息时抛出异常会指示 kafkajs 重试它(重新传递它),这意味着即使消息(或事件)处理器被触发了,偏移量也不会提交到 Kafka。

警告

对于事件处理器(基于事件的通信),所有未处理的异常默认被视为可重试异常

为此,你可以使用一个名为 KafkaRetriableException 的专用类,如下所示:

typescript
throw new KafkaRetriableException('...');

提示

KafkaRetriableException 类从 @nestjs/microservices 包导出。

自定义异常处理

除了默认的错误处理机制外,你可以为 Kafka 事件创建自定义异常过滤器来管理重试逻辑。例如,下面的示例演示了如何在可配置的重试次数后跳过有问题的事件:

typescript
import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
import { BaseExceptionFilter } from '@nestjs/core';
import { KafkaContext } from '../ctx-host';

@Catch()
export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
  private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);

  constructor(
    private readonly maxRetries: number,
    // 当超过最大重试次数时执行的可选自定义函数
    private readonly skipHandler?: (message: any) => Promise<void>,
  ) {
    super();
  }

  async catch(exception: unknown, host: ArgumentsHost) {
    const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
    const message = kafkaContext.getMessage();
    const currentRetryCount = this.getRetryCountFromContext(kafkaContext);

    if (currentRetryCount >= this.maxRetries) {
      this.logger.warn(
        `Max retries (${
          this.maxRetries
        }) exceeded for message: ${JSON.stringify(message)}`,
      );

      if (this.skipHandler) {
        try {
          await this.skipHandler(message);
        } catch (err) {
          this.logger.error('Error in skipHandler:', err);
        }
      }

      try {
        await this.commitOffset(kafkaContext);
      } catch (commitError) {
        this.logger.error('Failed to commit offset:', commitError);
      }
      return; // 停止传播异常
    }

    // 如果重试次数低于最大值,继续使用默认异常过滤器逻辑
    super.catch(exception, host);
  }

  private getRetryCountFromContext(context: KafkaContext): number {
    const headers = context.getMessage().headers || {};
    const retryHeader = headers['retryCount'] || headers['retry-count'];
    return retryHeader ? Number(retryHeader) : 0;
  }

  private async commitOffset(context: KafkaContext): Promise<void> {
    const consumer = context.getConsumer && context.getConsumer();
    if (!consumer) {
      throw new Error('Consumer instance is not available from KafkaContext.');
    }

    const topic = context.getTopic && context.getTopic();
    const partition = context.getPartition && context.getPartition();
    const message = context.getMessage();
    const offset = message.offset;

    if (!topic || partition === undefined || offset === undefined) {
      throw new Error(
        'Incomplete Kafka message context for committing offset.',
      );
    }

    await consumer.commitOffsets([
      {
        topic,
        partition,
        // 提交偏移量时,提交下一个数字(即当前偏移量 + 1)
        offset: (Number(offset) + 1).toString(),
      },
    ]);
  }
}

此过滤器提供了一种将 Kafka 事件处理重试到可配置次数的方法。一旦达到最大重试次数,它会触发自定义 skipHandler(如果提供)并提交偏移量,有效地跳过有问题的事件。这允许后续事件无中断地继续处理。

你可以通过将此过滤器添加到事件处理器来集成它:

typescript
@UseFilters(new KafkaMaxRetryExceptionFilter(5))
export class MyEventHandler {
  @EventPattern('your-topic')
  async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
    // 你的事件处理逻辑...
  }
}

提交偏移量

在使用 Kafka 时,提交偏移量至关重要。默认情况下,消息会在特定时间后自动提交。更多信息请访问 KafkaJS 文档KafkaContext 提供了一种访问活动消费者以手动提交偏移量的方式。消费者是 KafkaJS 消费者,工作方式与原生 KafkaJS 实现相同。

typescript
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
  // 业务逻辑

  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  const consumer = context.getConsumer();
  await consumer.commitOffsets([{ topic, partition, offset }])
}

要禁用消息的自动提交,在 run 配置中设置 autoCommit: false,如下所示:

typescript
// main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
});

实例状态更新

要获取连接和底层驱动实例状态的实时更新,你可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 Kafka 驱动,status 流发出 connecteddisconnectedrebalancingcrashedstopped 事件。

typescript
this.client.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

提示

KafkaStatus 类型从 @nestjs/microservices 包导入。

同样,你可以订阅服务器的 status 流以接收服务器状态的通知。

typescript
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

底层生产者和消费者

对于更高级的用例,你可能需要访问底层生产者和消费者实例。这在需要手动关闭连接或使用驱动特定方法等场景中很有用。然而,请记住,在大多数情况下你不应该需要直接访问驱动。

为此,你可以使用 ClientKafkaProxy 实例暴露的 producerconsumer getter。

typescript
const producer = this.client.producer;
const consumer = this.client.consumer;

基于 NestJS 官方文档翻译