Aller au contenu

Création et déploiement d'une fonction AWS Lambda avec Docker

Mise en place pas à pas d'une lambda AWS tournant sur une image Docker custom pour utiliser le client kafka afin de mettre à jour les topics d'un Cluster AWS MSK.

Comment créer et déployer une fonction AWS Lambda avec Docker ?

En voulant mettre en place une fonction Lambda pour supprimer des topics de mon Cluster MSK (Kafka managé), je me suis rendu compte qu’il n’existait pas beaucoup d’exemples sur internet de création de Lambda à partir d’une image Docker.

Je me suis donc mis en tête de faire un retour d’expérience suite à cette mise en place. La voici 🙂

Contexte

Nous avons un Cluster Kafka (MSK) qui tourne sur AWS ainsi que des connecteurs Debezium.

À la création de ces connecteurs Debezium sur AWS MSK Connect, des topics sont créés automatiquement : __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id

Lien de la documentation AWS

Ces topics internes ne sont jamais supprimés, même à la suppression des connecteurs.

Ils doivent être supprimés manuellement.

Dans ce contexte, j'ai créé une fonction lambda pour supprimer les topics après la destruction des connecteurs MSK par Terraform.

En effet, pour des raisons de finops (optimisation des coûts), nous détruisons les connecteurs MSK toutes les nuits et weekends sur nos environnements hors production, ce qui peut créer un reliquat de topics internes inutilisés conséquent sur le Cluster Kafka.

Choix technique

J’ai décidé de partir sur la création d’une image Docker afin d’utiliser la fonctionnalité d’exécution d’une lambda AWS à partir d’une image docker.

Grâce à ce choix, je suis agnostique de l’infra côté AWS car l’exécution se fait dans mon image Docker.

De plus, il me suffit de pousser une nouvelle image Docker dans mon ECR (la registry Docker d’AWS) pour mettre à jour le code de ma lambda.

Cette lambda va être invoquée par terraform lors d’une exécution par la ci (intégration continue).

Création de l’image Docker

Je me suis basé sur une image contenant une version de java (jre-17) puis j’ai installé les packages jq, kafka et aws-msk-iam-auth.

Ce qui donne ce Dockerfile :

FROM ****/jre-17:17.0.4-0-881

ARG JQ_VERSION=1.6-2.1
ARG KAFKA_VERSION=3.5.1
ARG MINOR_KAFKA_VERSION=2.13
ARG AWS_MSK_IAM_AUTH_VERSION=1.1.1

USER root

RUN apt-get update && \
    apt-get install -y --no-install-recommends jq=${JQ_VERSION} && \
    apt-get clean && \
    rm -rf /var/cache/apt /var/lib/apt/lists/*

USER debian

WORKDIR /home/debian

RUN wget --progress=dot:giga https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${MINOR_KAFKA_VERSION}-${KAFKA_VERSION}.tgz && \
    tar -xzf kafka_${MINOR_KAFKA_VERSION}-${KAFKA_VERSION}.tgz && rm -f kafka_${MINOR_KAFKA_VERSION}-${KAFKA_VERSION}.tgz

WORKDIR /home/debian/kafka_${MINOR_KAFKA_VERSION}-${KAFKA_VERSION}/libs

RUN wget --progress=dot:giga https://github.com/aws/aws-msk-iam-auth/releases/download/v${AWS_MSK_IAM_AUTH_VERSION}/aws-msk-iam-auth-${AWS_MSK_IAM_AUTH_VERSION}-all.jar

WORKDIR /home/debian/kafka_${MINOR_KAFKA_VERSION}-${KAFKA_VERSION}

COPY --chown=debian:debian resources/client.properties bin/client.properties

COPY --chown=debian:debian resources/entrypoint.sh /home/debian

RUN chmod +x /home/debian/entrypoint.sh

ENTRYPOINT [ "/home/debian/entrypoint.sh" ]

Ensuite, il faudra un fichier de properties afin de configurer la connexion IAM pour le client kafka :

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

Code logique de la lambda

J’ai commencé à écrire la logique de l’exécution de ma lambda dans l’entrypoint.sh.

Tout d’abord, il faut créer une boucle infinie pour traiter les invocations de la Lambda.

#!/bin/sh

# Arrête le script si une commande échoue
set -e

# Boucle infinie pour traiter les événements d'invocation
while true; do

// Code logique de la lambda

# Pause entre les invocations
sleep 10  # Pause de 10 secondes
done

Ensuite, il faut récupérer les informations de l’invocation de la lambda afin de pouvoir mettre à jour ses logs d’exécution.

## Récupérer l'événement d'invocation de l'API Runtime
# Récupérer la réponse complète incluant en-têtes et corps, avec gestion d'erreur
RESPONSE=$(curl -s -D - -f "${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/next" || echo "ERROR")
    
# Vérifier si une erreur s'est produite lors de l'appel à l'API
if [ "$RESPONSE" = "ERROR" ]; then
    echo "Erreur lors de la récupération de l'événement d'invocation."
    # Gestion de l'erreur (par exemple, réessayer, enregistrer, etc.)
        continue
fi

# Extraire l'ID de requête de l'en-tête
AWS_REQUEST_ID=$(echo "$RESPONSE" | grep -Fi Lambda-Runtime-Aws-Request-Id | awk '{print $2}' | tr -d '\r')

# Extraire les données d'événements du corps
EVENT_DATA=$(echo "$RESPONSE" | sed -e '1,/^\r$/d')

// Exécution des commandes via le client kafka

# Renvoyer la réponse ou l'erreur à l'API Runtime
RESPONSE_DATA="{\"result\": \"${COMMAND_RESULT}\"}"

curl -X POST "${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/${AWS_REQUEST_ID}/response" -H 'Content-Type: application/json' -d "$RESPONSE_DATA"

Une fois la partie purement AWS Lambda gérée, il ne reste qu’à faire la partie logique kafka.

Etant donné que ma lambda va être invoquée par terraform après un terraform destroy, je récupère les données de l’évènement et je vérifie qu'il s’agit bien d’un terraform destroy et non pas d'un create ou update.

# Vérifier si EVENT_DATA est non vide et est un JSON valide
if [ -z "$EVENT_DATA" ] || ! echo "$EVENT_DATA" | jq empty; then
    echo "Erreur : Données d'événements vides ou non valides"
    continue
fi

TF_ACTION=$(echo "$EVENT_DATA" | jq -r '.tf.action')
ACTION=$(echo "$EVENT_DATA" | jq -r '.action')

# Arrête la lambda si TF_ACTION est 'create' ou 'update'
if [ "$TF_ACTION" = "create" ] || [ "$TF_ACTION" = "update" ]; then
    echo "Action ignorée : En cas de tf apply la lambda ne fait rien."
    RESPONSE_DATA="{\"result\": \"Nothing to do on terraform apply.\"}"
    curl -X POST "${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/${AWS_REQUEST_ID}/response" -H 'Content-Type: application/json' -d "$RESPONSE_DATA"
    continue
fi

# Vérifier et remplacer ACTION si TF_ACTION est 'delete'
if [ "$TF_ACTION" = "delete" ]; then
    ACTION=$TF_ACTION
fi

Enfin, je termine par l'utilisation du client kafka pour supprimer les topics.

SERVER=$(echo "$EVENT_DATA" | jq -r '.brokers')
TOPIC_NAME=$(echo "$EVENT_DATA" | jq -r '.topicName')

# Construire les paramètres supplémentaires
EXTRA_PARAMS=""
if [ -n "$TOPIC_NAME" ] && [ "$TOPIC_NAME" != "null" ]; then
    EXTRA_PARAMS="--topic ${TOPIC_NAME}"
fi

COMMAND_TO_EXECUTE="bin/kafka-topics.sh --${ACTION} --bootstrap-server ${SERVER} --command-config ./bin/client.properties ${EXTRA_PARAMS} 2>&1"

# Exécuter la commande Kafka
COMMAND_RESULT=""
if [ -n "$ACTION" ] && [ -n "$SERVER" ]; then
    if ! COMMAND_RESULT=$($COMMAND_TO_EXECUTE); then
        echo "Erreur lors de l'exécution de la commande Kafka : ${COMMAND_RESULT}"
    fi
else
    COMMAND_RESULT="Erreur : Action ou serveur non spécifié"
fi

echo "Commande exécutée : ${COMMAND_RESULT}"

Terraform

Nous utilisons terraform comme outil de gestion de notre infrastructure as code (iac).

Dans mon dépôt git contenant le code terraform de ma lambda, j’ai créé deux ressources lambda_invocation :

locals {
  status_topics_regex  = "__amazon_msk_connect_status_${local.env_name}-.*"
  configs_topics_regex = "__amazon_msk_connect_configs_${local.env_name}-.*"
}

resource "aws_lambda_invocation" "destroy_status_topics" {
  function_name = local.lambda_function_name

  input = jsonencode({
    "brokers" : local.bootstrap_brokers,
    "topicName" : local.status_topics_regex
  })

  lifecycle_scope = "CRUD"

  depends_on = [
    aws_lambda_function.msk_lambda,
    aws_cloudwatch_log_group.msk_lambda,
  ]
}

resource "aws_lambda_invocation" "destroy_configs_topics" {
  depends_on    = [aws_lambda_invocation.destroy_status_topics]
  function_name = local.lambda_function_name

  input = jsonencode({
    "brokers" : local.bootstrap_brokers,
    "topicName" : local.configs_topics_regex
  })

  lifecycle_scope = "CRUD"
}

Points d’attention

Il faudra bien faire attention de construire votre image Docker en "linux/amd64" avant de la pousser dans votre registry, car il s’agit de l’environnement d’exécution par défaut des Lambda sur AWS.

Il sera nécessaire de configurer les règles de sortie de votre security group pour que la lambda puisse accéder au port 9098 de votre Cluster MSK, utilisé par la connexion AWS IAM.

Conclusion

En me lançant sur ce sujet je pensais que cela serait un jeu d’enfant mais finalement il n’existe pas tant que ça de ressources sur internet au sujet des lambdas tournant sur des images Docker et surtout ce n'est pas commun d'exécuter la logique en script bash.

Ce système de boucle while avec des curl pour discuter avec l’API runtime de la lambda est vraiment spécial et bien sur si vous utilisez des langages plus communs comme du python ou du Java (liste des langages pris en charge par AWS), vous avez déjà toutes les librairies qui font ça pour vous.

Mais ce fut très enrichissant et j’espère que cela pourra vous être utile.

Dernier