Aller au contenu

Kafka et NestJS : construire une librairie sur mesure pour la consommation de messages

Dans cet article, je partage mon retour d'expérience sur l'intégration de Kafka avec NestJS pour un leader du retail. En créant une bibliothèque sur mesure, nous avons surmonté les limitations du module natif pour optimiser la consommation de messages et répondre aux exigences de performance élevées

Intégration de Kafka avec NestJS pour une architecture microservices performante et flexible.

Dans le cadre de mon activité de consultant, j'ai eu l'opportunité de travailler avec un leader mondial dans le secteur du retail. Au sein d'une équipe développant une architecture microservices en Node.js et TypeScript, j'ai eu le lead sur la transition vers NestJS. Cela a impliqué la mise en place de microservices et le développement de librairies, dont celle-ci, permettant de simplifier à la fois la consommation de messages Kafka via un décorateur et la production de messages Kafka via l'injection d'un client Kafka.

Bien que la production de messages soit également une partie importante, elle n'a pas été incluse dans cet article pour des raisons de concision.

Kafka et NestJS : une alliance risquée ?

Utiliser Kafka avec NestJS peut sembler audacieux, car Kafka est souvent associé à des systèmes en Java, où il est particulièrement bien optimisé. Toutefois, dans notre équipe, nous avions besoin de microservices légers et rapides à déployer. Node.js s'est imposé pour sa légèreté et sa rapidité, offrant une alternative plus flexible et facile à scaler que les architectures Java.

Les librairies Kafka en JavaScript : forces et faiblesses

Voici un tour d'horizon des principales librairies Kafka pour JavaScript :

  • kafkajs : Librairie entièrement en JavaScript, simple à installer et bien documentée. Elle est idéale pour de nombreux cas d'usage, mais ses performances sont légèrement inférieures aux solutions avec bindings natifs.
  • node-rdkafka : Basée sur le client C/C++ librdkafka, elle offre de meilleures performances grâce à ses bindings natifs, mais sa configuration est complexe et peut poser des problèmes de compatibilité.
  • confluent-kafka-javascript : Lancée en 2024 par Confluent, cette librairie combine la performance de librdkafka avec une API compatible avec kafkajs. Actuellement en phase de tests, elle n'est pas encore prête pour la production.

Dans notre équipe, nous avons choisi kafkajs pour sa présence historique dans nos projets. Bien que nous ayons besoin de hautes performances, nous avons écarté node-rdkafka en raison de ses contraintes liées aux dépendances natives.

Kafka et NestJS : une intégration parfaite ?

NestJS est un framework progressif et modulaire pour construire des applications côté serveur avec Node.js et TypeScript. Il propose une architecture inspirée d'Angular, avec des principes comme l'injection de dépendances, les modules et les décorateurs, permettant de structurer l'application.

NestJS offre une intégration Kafka via sa librairie @nestjs/microservices. Plutôt que de proposer un module Kafka autonome, il s'agit d'une stratégie de transport configurable directement dans le framework, permettant d'utiliser Kafka comme moyen de communication entre microservices.

Fonctionnalités du transport Kafka dans NestJS

  • Consommation de messages : Les messages Kafka peuvent être consommés via des décorateurs spécifiques, simplifiant l'abstraction de la logique de consommation dans un microservice.
  • Production de messages : Un client dédié permet de produire des messages Kafka simplement, facilitant l'envoi de données vers d'autres microservices.
  • Support du RPC : NestJS permet également d'utiliser Kafka pour des appels de procédures distantes (RPC) en utilisant un modèle client-serveur pour une communication synchrone, avec RxJS.

Limites du transport Kafka dans NestJS

Bien que le module Kafka de NestJS soit efficace pour des cas simples, il montre certaines limites dans des scénarios plus complexes :

  • Traitement en batch : Par défaut, NestJS utilise la méthode eachMessage, qui traite les messages un par un. Pour des besoins de haute performance, comme ceux de notre client, il est préférable de traiter les messages en batch avec eachBatch, qui est beaucoup plus efficace.
  • Complexité du RPC : NestJS impose une certaine lourdeur dans la gestion des appels RPC, ce qui peut ralentir la consommation et la production de messages, en particulier dans des environnements où la communication asynchrone est prédominante.

Suite à ces limites, nous allons créer notre propre bibliothèque via un module dynamique de NestJS. Cela nous permettra de conserver l'usage des décorateurs pour consommer les messages Kafka, tout en simplifiant la gestion et en optimisant le traitement en batch.

Créer une lib Kafka

Étape 1 : démarrage de l'application et connexion à Kafka

Pour commencer, il est crucial de bien comprendre comment NestJS fonctionne avec les microservices. Lors du démarrage de l'application, nous devons indiquer à NestJS que nous souhaitons connecter un serveur Kafka. Voici un exemple de code qui illustre cette configuration :

import { NestFactory } from '@nestjs/core';
import { CustomTransportStrategy, MicroserviceOptions } from '@nestjs/microservices';
import { FastifyAdapter, NestFastifyApplication } from '@nestjs/platform-fastify';
import { KAFKA_SERVER } from '@my-libs/kafka';
import { AppModule } from './app.module';

async function bootstrap(): Promise<void> {
  const app = await NestFactory.create<NestFastifyApplication>(
    AppModule,
    new FastifyAdapter(),
    { bufferLogs: true },
  );

  const strategy = app.get<CustomTransportStrategy>(KAFKA_SERVER);

  app.enableShutdownHooks();
  app.connectMicroservice<MicroserviceOptions>(
    { strategy },
  );

  await app.startAllMicroservices();
  await app.listen(8081, '0.0.0.0');
}

bootstrap();

Dans cet extrait, nous créons une application NestJS avec Fastify et la connectons à un serveur Kafka via une stratégie personnalisée issue de notre librairie. Cette approche permet à l'application de consommer les messages Kafka dès son démarrage.

Étape 2 : configuration de Kafka avec des modules dynamiques

NestJS permet de créer des librairies pour des services comme Kafka en utilisant des modules dynamiques, centralisant ainsi la configuration tout en offrant la possibilité de personnaliser chaque module individuellement.

La méthode forRoot permet de configurer Kafka globalement dans l'application, comme le montre l'exemple suivant :

import { Module } from '@nestjs/common';
import { KafkaModule } from '@my-libs/kafka';

@Module({
  imports: [
    KafkaModule.forRoot({
      config: {
        brokers: ['localhost:9092'],
        sasl: {
          mechanism: 'plain',
          username: 'USERNAME',
          password: 'PASSWORD',
        },
        clientId: 'CLIENT-ID',
      },
      schemaRegistry: {
        config: {
          host: 'http://localhost:8081',
          auth: {
            username: 'USERNAME',
            password: 'PASSWORD',
          },
        },
      },
    }),
  ],
})
export class AppModule {}

Cette configuration centralisée définit les brokers Kafka et les détails d'authentification.

La méthode forFeature permet de définir des configurations spécifiques à certains modules de l'application :

import { Module } from '@nestjs/common';
import { KafkaModule, DeserializerEnum, SerializerEnum, TopicType } from '@my-libs/kafka';

@Module({
  imports: [
    KafkaModule.forFeature([
      {
        name: 'MY-INPUT-TOPIC',
        type: TopicType.INPUT,
        topic: {
          consumer: { groupId: 'MY-GROUP-ID' },
          deserializers: {
            keyDeserializer: DeserializerEnum.STRING,
            valueDeserializer: DeserializerEnum.AVRO,
          },
        },
      },
    ]),
  ],
})
export class MyFeatureModule {}

Dans cet exemple, nous configurons un topic Kafka avec des paramètres comme le groupe de consommateurs, ainsi que les méthodes de désérialisation des clés et des valeurs des messages.

Étape 3 : consommer des messages Kafka dans un contrôleur

Grâce aux décorateurs fournis par NestJS, consommer des messages Kafka devient extrêmement simple :

import { ConsumeMessage } from '@my-libs/kafka';
import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, KafkaContext, Payload } from '@nestjs/microservices';
import { Key, Value } from './__generated__/kafka/message-key';

@Controller()
export class MyAppController {
  @EventPattern('MY-INPUT-TOPIC')
  async execute(
    @Payload() message: ConsumeMessage<Key, Value>,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    console.log(message, context);
  }
}

Dans cet exemple, le décorateur @EventPattern écoute les messages Kafka provenant du topic "MY-INPUT-TOPIC" et exécute la logique associée. Le message et le contexte Kafka sont facilement accessibles via les décorateurs @Payload() et @Ctx().

Étape 4 : implémentation du module dynamique

La classe KafkaModule est le point d'entrée de notre librairie Kafka, fournissant les méthodes : forRoot et forFeature.

import { DynamicModule, Module } from '@nestjs/common';
import { KafkaCoreModule } from './kafka-core.module';
import { KafkaModuleOptions, TopicDefinition } from './kafka-options.types';

@Module({})
export class KafkaModule {
  static forRoot(forRootOptions: KafkaModuleOptions): DynamicModule {
    return {
      module: KafkaModule,
      imports: [KafkaCoreModule.forRoot(forRootOptions)],
    };
  }

  static forFeature(topicDefinitions: TopicDefinition[]): DynamicModule {
    const providers = createKafkaProviders(topicDefinitions);

    return {
      module: KafkaModule,
      providers: providers,
      exports: providers,
    };
  }
}

La méthode forRoot configure globalement la librairie en utilisant un sous-module dynamique KafkaCoreModule, une pratique courante dans NestJS pour gérer les configurations globales. À l'inverse, le forFeature définit des configurations spécifiques via une simple fonction, permettant une personnalisation locale plus légère.


import { DynamicModule, Module } from '@nestjs/common';
import { KafkaModuleOptions } from './kafka-options.types';
import { KafkaServer } from './kafka-server';
import { CORE_MODULE_OPTIONS, KAFKA_SERVER } from './kafka.constants';

@Module({})
export class KafkaCoreModule {
  constructor() {}

  static forRoot(forRootOptions: KafkaModuleOptions = {}): DynamicModule {
    const kafkaModuleOptionsProvider = {
      provide: CORE_MODULE_OPTIONS,
      useValue: forRootOptions,
    };

    const serverTemp = {
      provide: KAFKA_SERVER_TEMP,
      useFactory: async (): Promise<KafkaServer | null> => {
        if (!forRootOptions.server) {
          return null;
        }

        return new KafkaServer({
          kafka: forRootOptions.config,
          consumer: forRootOptions.server.consumer,
          run: forRootOptions.server.run,
          subscribe: forRootOptions.server.subscribe,
        });
      },
      inject: [],
    };

    return {
      module: KafkaCoreModule,
      providers: [kafkaModuleOptionsProvider, serverTemp],
      exports: [CORE_MODULE_OPTIONS, serverTemp],
    };
  }
  
    static forFeature(topicDefinitions: TopicDefinition[]): DynamicModule {
    const providers = createKafkaProviders(topicDefinitions);
    return {
      module: KafkaModule,
      providers: providers,
      exports: providers,
    };
  }
}

Ce KafkaCoreModule gère deux éléments principaux :

  1. CORE_MODULE_OPTIONS : Ce token stocke la configuration définie via forRoot dans l'AppModule. Il est exporté pour être accessible dans le forFeature.
  2. KAFKA_SERVER_TEMP : Ce provider représente une version temporaire du serveur Kafka, initialisé via la classe KafkaServer. À ce stade, le serveur n'est pas encore complètement configuré. C'est la méthode forFeature qui finalise la configuration en fournissant les paramètres spécifiques nécessaires pour permettre au serveur d'interagir avec le cluster Kafka de manière complète.

La fonction createKafkaProviders est utilisée dans la méthode forFeature pour créer les providers Kafka spécifiques aux topics définis dans chaque module. Elle injecte la configuration globale définie par forRoot (via CORE_MODULE_OPTIONS) et le serveur Kafka temporaire.

export function createKafkaProviders(
  forFeatureOptions: TopicDefinition[] = [],
): Provider {
  const inputTopics = forFeatureOptions.filter(
    factory => factory.type === TopicType.INPUT,
  );
  
  if (inputTopics.length > 1) {
      throw new Error('for this demo we accept only one input topic');
  }

  return {
    provide: KAFKA_SERVER,
    useFactory: async (
      moduleOptions: KafkaModuleOptions,
      kafkaServer: KafkaServer,
    ): Promise<KafkaServer> => {
      for (const topic of inputTopics) {
        const config = moduleOptions.schemaRegistry?.config;
        const { postfixId, consumer, run, subscribe, deserializers } =
          topic as InputTopic;
        const consumerOptions = { postfixId, consumer, run, subscribe };

        kafkaServer.setConsumerConfig(consumerOptions);
        kafkaServer.setDeserializers({
          keyDeserializer:
            deserializers?.keyDeserializer === DeserializerEnum.AVRO
              ? new KafkaAvroDeserializer(config)
              : new KafkaStringDeserializer(),
          valueDeserializer:
            deserializers?.valueDeserializer === DeserializerEnum.AVRO
              ? new KafkaAvroDeserializer(config)
              : new KafkaStringDeserializer(),
        });
      }

      return kafkaServer;
    },
    inject: [CORE_MODULE_OPTIONS, KAFKA_SERVER_TEMP],
  };
}

Étape 5 : implémentation du serveur Kafka

La classe KafkaServer hérite de la classe Server de NestJS, permettant de gérer les handlers décorés avec @EventPattern. Lors de l'appel à app.connectMicroservice(), NestJS associe chaque topic Kafka à son handler, permettant de lier automatiquement les messages Kafka aux méthodes qui doivent les traiter.

export class KafkaServer extends Server implements CustomTransportStrategy {
  public readonly transportId = Transport.KAFKA;

  private readonly kafkaConfig: KafkaConfig;
  private consumerConfig?: ConsumerConfig;
  private consumerRunConfig?: ConsumerRunConfig;
  private consumerSubscribeConfig?: ConsumerSubscribeConfig;

  private client: Kafka | null = null;
  private consumer: Consumer | null = null;

  private keyDeserializer?: Deserializer<Buffer, unknown>;
  private valueDeserializer?: Deserializer<Buffer, unknown>;

  constructor(config: KafkaServerConfig) {
    super();
    this.kafkaConfig = {
      ...config.kafka,
      brokers: config.kafka?.brokers ?? ["localhost:9092"],
      clientId: `${config.kafka?.clientId ?? "nestjs-consumer"}-${
        config.postfixId ?? "server"
      }`,
    };
  }

  setConsumerConfig(options: ConsumerOptions): void {
    const { consumer } = options;
    this.consumerConfig = {
      ...consumer,
      groupId: `${consumer?.groupId ?? "nestjs-group"}-server`,
    };
    this.consumerRunConfig = {
      ...options.run,
      eachBatch: this.handleBatch.bind(this),
    };
    this.consumerSubscribeConfig = options.subscribe;
  }

  setDeserializers(deserializers: Deserializers): void {
    this.keyDeserializer = deserializers.keyDeserializer;
    this.valueDeserializer = deserializers.valueDeserializer;
  }

  async listen(
    callback: (error?: unknown, ...optionalParams: unknown[]) => void
  ): Promise<void> {
    try {
      if (!this.consumerConfig) {
        throw new Error("Consumer configuration is not provided");
      }

      this.client = new Kafka(this.kafkaConfig);
      this.consumer = this.client.consumer(this.consumerConfig);

      await this.consumer.connect();
      await this.consumer.subscribe({
        ...this.consumerSubscribeConfig,
        topics: [...this.messageHandlers.keys()],
      });

      await this.consumer.run(this.consumerRunConfig);

      callback();
    } catch (error) {
      this.logger.error(error);
      callback(error);
    }
  }

  async close(): Promise<void> {
    if (this.consumer) {
      await this.consumer.disconnect();
      this.consumer = null;
    }
    this.client = null;
  }

  private async handleBatch(payload: EachBatchPayload): Promise<void> {
    if (!this.consumer || !this.keyDeserializer || !this.valueDeserializer) {
      throw new Error("Required components not set");
    }

    const { batch, isRunning, isStale, heartbeat } = payload;
    const handler = this.messageHandlers.get(batch.topic);

    if (!handler)
      throw new Error(`Handler not found for topic "${batch.topic}"`);

    for (const message of batch.messages) {
      if (!isRunning() || isStale()) {
        throw new Error("Consumer stopped or stale");
      }

      const context = new KafkaContext([
        message,
        batch.partition,
        batch.topic,
        this.consumer,
        heartbeat,
      ]);
      const deserializedMessage = {
        key: await this.keyDeserializer.deserialize(message.key),
        value: await this.valueDeserializer.deserialize(message.value),
        timestamp: message.timestamp,
        attributes: message.attributes,
        offset: message.offset,
        headers: message.headers,
        size: message.size,
      };

      const resultOrStream = await handler(deserializedMessage, context);

      if (isObservable(resultOrStream)) {
        await lastValueFrom(resultOrStream);
      }
    }
  }
}

Dans cet exemple simplifié, chaque message est désérialisé individuellement avant d'être envoyé au handler. Cependant, pour plus d'efficacité, il serait possible de désérialiser tout le batch en une seule fois et d'envoyer un tableau de messages désérialisés au handler, ce qui optimiserait les performances du traitement de gros volumes de données.

Notez que le handler pourrait être englobé par un interceptor de NestJS ou par autre chose qui modifierait le flux de retour, et donc il est probable que parfois le type de retour soit un observable RxJS. Il est essentiel de s'assurer de transformer un observable en promesse et de s'assurer que la valeur finale de l'observable est attendue avant que le traitement ne soit terminé.

La méthode close() est appelée automatiquement par NestJS lors de l'arrêt de l'application pour déconnecter proprement le consommateur Kafka et garantir que toutes les ressources sont libérées correctement.

Conclusion

En développant cette librairie Kafka sur mesure avec NestJS, nous avons réussi à dépasser les limitations du module Kafka natif en exploitant les modules dynamiques. Cela nous a permis de centraliser la configuration tout en offrant la flexibilité nécessaire pour répondre à des besoins de traitement plus complexes

Dernier