GCP Composer est un service d'orchestration de flux d'ingestion construit à partir d'Apache Airflow. Lorsque nous créons un environnement Composer, nous spécifions la version de Composer que nous souhaitons utiliser. Cependant, un an plus tard, nous pouvons nous rendre compte que la version que nous utilisons n'est plus prise en charge par Google, car les nouvelles versions de Composer ne sont prises en charge que pendant un an:
Lorsqu'un incident survient, et qu'un Case Google est ouvert auprès de l'équipe support GCP, il se peut qu'ils nous demandent de mettre à jour notre environnement, car la version actuelle n'est plus prise en charge. À ce stade, se pose la question de la manière de mettre à jour notre environnement sans perdre l'historique des tâches Airflow ni les DAGs.
Google propose une fonctionnalité de mise à jour des environnements Composer, mais il convient de noter que cette fonctionnalité est toujours en version Preview depuis mai 2022. Par conséquent, son utilisation comporte des risques potentiels de perturbation.
Dans cet article, nous explorerons les meilleures pratiques pour planifier, exécuter et réussir la mise à jour de votre environnement Composer tout en préservant précieusement votre historique Airflow et vos DAGs. Suivez attentivement les étapes pour garantir une transition en douceur et minimiser les interruptions dans votre flux de travail.
Quelles sont les procédures fiables pour mettre à jour vos environnement?
La documentation de Google ne propose actuellement aucune procédure fiable pour la mise à jour d'un environnement Composer. Si vous essayez de mettre à jour votre environnement à l'aide de Terraform par exemple, vous constaterez que, dans le plan Terraform, votre environnement sera détruit et remplacé par un autre. Cela signifie qu'actuellement, aucune méthode fiable n'est disponible via Terraform.
Avant de vous lancer dans la mise à jour de l'environnement Composer, il est fortement recommandé de tester la nouvelle version de Composer afin d'identifier les éventuelles conséquences de la mise à jour sur vos DAGs et vos traitements.
Pour commencer, nous vous conseillons de consulter les documents Google suivants :
Vérification de la nouvelle version Composer
Les étapes à suivre:
1- Créez un nouvel environnement de test sur un projet GCP Sandbox, identique à votre environnement de production.
2- Utilisez la commande "gsutil cp" pour importer tous les DAGs, les plugins et les fichiers Data présents dans le bucket GCS de votre environnement de production, et placez-les dans le bucket GCS du nouvel environnement.
Il est important de noter que l'utilisation de la fonctionnalité Composer Snapshot pour importer les DAGs de votre environnement de production peut activer ces DAGs sur l'environnement de test, ce qui pourrait déclencher de nouveaux jobs, c'est la raison pour laquelle nous évitons de l'utiliser ici.
3- Vérifiez le parsing de vos DAGs. Normalement, tous les DAGs importés devraient s'afficher dans l'interface Airflow.
4- Nous vous recommandons d'activer et de lancer quelques DAGs de test pour tester les opérateurs communs dans l'environnement de test.
5- Ajoutez le prochain Dag pour mettre en pause les Dags actifs, puis enregistrez la liste dans un fichier sur un bucket GCS:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagModel
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.exceptions import AirflowException
GCS_BUCKET = 'your-gcs-bucket' # Replace with your GCS bucket name
GCS_FILENAME = 'paused_dags.txt' # Name of the file to store the list of paused DAGs
def pause_all_dags_and_store_list():
try:
# Fetch all active DAGs
active_dags = DagModel.get_dags()
# Pause each active DAG and collect their names
paused_dags = []
for dag in active_dags:
dag.is_paused = True
dag.store()
paused_dags.append(dag.dag_id)
# Store the list of paused DAGs in a GCS file
gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id='gcs_default')
gcs_hook.upload(GCS_BUCKET, GCS_FILENAME, "\n".join(paused_dags), 'text/plain')
print("All active DAGs have been paused, and the list has been stored in GCS.")
except AirflowException as e:
print(f"An error occurred: {str(e)}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 15),
'retries': 1,
}
dag = DAG('pause_all_dags_and_store_list',
default_args=default_args,
schedule_interval=None, # This DAG won't be scheduled, run manually
catchup=False, # Prevent backfilling
is_paused_upon_creation=False,
description='Pause all active DAGs and store their list in GCS')
pause_all_dags_task = PythonOperator(
task_id='pause_all_dags_task',
python_callable=pause_all_dags_and_store_list,
dag=dag
)
pause_all_dags_task
6- Lancez le DAG et vérifiez que vos DAGs de test sont en pause.
7- Ajouter le prochain Dag pour réactiver les Dags mis en pause:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagModel
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.exceptions import AirflowException
GCS_BUCKET = 'your-gcs-bucket' # Replace with your GCS bucket name
GCS_FILENAME = 'paused_dags.txt' # Name of the file storing the list of paused DAGs
def activate_paused_dags():
try:
# Fetch the list of paused DAGs from GCS
gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id='gcs_default')
paused_dags_str = gcs_hook.download(GCS_BUCKET, GCS_FILENAME)
paused_dags = paused_dags_str.split('\n')
# Activate each paused DAG
for dag_id in paused_dags:
dag = DagModel.get_dagmodel(dag_id)
if dag:
dag.is_paused = False
dag.store()
print(f"Activated DAG: {dag_id}")
print("All paused DAGs from the GCS file have been activated.")
except AirflowException as e:
print(f"An error occurred: {str(e)}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 15),
'retries': 1,
}
dag = DAG('activate_paused_dags',
default_args=default_args,
schedule_interval=None, # This DAG won't be scheduled, run manually
catchup=False, # Prevent backfilling
is_paused_upon_creation=False,
description='Activate paused DAGs from the GCS file')
activate_paused_dags_task = PythonOperator(
task_id='activate_paused_dags_task',
python_callable=activate_paused_dags,
dag=dag
)
activate_paused_dags_task
8- Lancez le DAG et assurez-vous que vos DAGs de test sont à nouveau actifs.
9- Mettez à nouveau en pause les Dags de test en lancant le Dag "pause_all_dags_and_store_list".
10- Créer un Composer Snapshot de votre environnement de test et stockez-le dans un bucket GCS différent de celui de Composer.
11- Mettez à jour l'environnement Composer de test en le remplaçant par un nouvel environnement utilisant la version cible de Composer cible.
12- Dès que le nouvel environnement est créé, importez le Snapshot pris précédemment.
13- Lancez le Dag "activate_paused_dags" et vérifiez:
- Le parsing de tous les Dags est réussi.
- Vos Dags de tests sont bien actifs.
- Les nouveaux Jobs lancés dans les Dags de tests fonctionnent comme avant.
Si l'un ou plusieurs de vos DAGs ne sont pas parsés après la mise à jour, il est préférable d'identifier l'erreur de parsing et d'essayer d'adapter le DAG aux nouvelles version des Packages Python utilisées par le nouvel environnement Composer. Il est également essentiel d'informer les équipes gérant ces DAGs des changements intervenus.
Planification de la mise à jour
Après avoir vérifié la nouvelle version de Composer, vous pouvez désormais planifier la mise à jour de vos environnements Composer en production. Il est essentiel de minimiser les perturbations pour vos utilisateurs, et pour cela, nous vous recommandons d'utiliser les tableaux de bord Composer de votre environnement pour déterminer les heures de moindre d'utilisation:
Lorsque la date de la mise à jour est choisie, nous vous conseillons vivement d'effectuer une communication pour informer tous les utilisateurs de vos environnements. Il est important de noter que les environnements ne seront pas accessibles pendant la mise à jour, ce qui peut durer environ 2 heures.
Mise à jour de l'environnement Composer en Production
Pour mettre à jour votre environnement Composer de Production, deux options s'offrent à vous:
1- Remplacer l'environnement Composer
Cette méthode consiste à remplacer l'environnement Composer existant par un nouvel environnement qui utilise la nouvelle version de Composer.
Avantages:
- Moins de risques d'incidents.
- Plus rapide que la deuxième méthode.
Inconvénients:
- L'URL Airflow change, car l'environnement est remplacé.
- Le bucket GCS de l'environnement Composer change également.
Les étapes à suivre:
1- Ajoutez les dags "pause_all_dags_and_store_list" et "activate_paused_dags" à votre environnement de production.
2- Le jour J, lancez le DAG "pause_all_dags_and_store_list" et attendez 15 minutes pour permettre aux tâches en cours de s'achever, puis, Si certaines tâches sont toujours en cours, il est recommandé de les arrêter manuellement via l'interface Airflow. Demandez ensuite aux projets responsables de ces DAGs de les relancer après la fin de la mise à jour.
3- Créez un Composer Snapshot de l'environnement de production et stockez le Snapshot dans un bucket GCS différent de celui de Composer.
4- Mettez à jour l'environnement Composer en production en le remplaçant par un nouvel environnement utilisant la version de Composer testée.
5- Dès que le nouvel environnement est créé, importez le Snapshot pris précédemment.
6- Lancer le dag "activate_paused_dags" et vérifiez:
- Le parsing de tous les Dags est réussi.
- Les Dags de production sont actifs.
- Les nouveaux Jobs lancés par les Dags fonctionnent comme avant.
7- Informez les utilisateurs que la mise à jour est terminée.
2- Mise à jour de l'environnement Composer en utilisant la fonctionnalité Upgrade Composer
Dans certains cas, il n'est pas envisageable de changer l'URL Airflow ou le bucket GCS, car de tels changements pourraient perturber les pipelines CI/CD existantes. Dans cette deuxième procédure, nous vous montrons comment utiliser la fonctionnalité Upgrade Composer pour mettre à jour votre environnement.
Comme nous l'avons mentionné précédemment, la fonctionnalité Upgrade Composer est encore en review. Cependant, d'après nos tests, elle peut fonctionner à 95 %. En cas d'échec, vous devrez alors opter pour le remplacement complet de l'environnement (méthode 1)
Avantages:
- L'URL Airflow et le bucket GCS ne changeront pas.
Inconvénients:
- La fonctionnalité Upgrade Composer est en review, et en cas d'incident, le support Google ne pourra pas vous aider.
- Plus long en cas d'incident.
Les étapes à suivre:
1- Ajouter les Dags "pause_all_dags_and_store_list" et "activate_paused_dags" à votre environnement de production.
2- Le jour J, lancez le DAG "pause_all_dags_and_store_list" et attendez 15 minutes pour permettre aux tâches en cours de s'achever, puis, Si certaines tâches sont toujours en cours, il est recommandé de les arrêter manuellement via l'interface Airflow. Demandez ensuite aux projets responsables de ces DAGs de les relancer après la fin de la mise à jour.
3- Créez un Composer Snapshot de l'environnement de production et stockez le Snapshot dans un bucket GCS différent de celui de Composer.
4- Mettez à jour l'environnement Composer en utilisant la fonctionnalité Upgrade Composer comme expliqué dans la Documentation Google.
5- Lorsque la mise à jour est terminée, si votre environnement fonctionne correctement et que la mise à jour s'est déroulée sans problème, lancez le Dag "activate_paused_dags" et vérifiez :
- Le parsing de tous les Dags est réussi.
- Les Dags de production sont activés
- Les nouveaux Jobs lancés par les Dags fonctionnent comme avant.
En cas d'incident, nous vous conseillons de suivre la première méthode à partir de l'étape 4.
6- Informez les utilisateurs que la mise à jour est terminée.
Conclusion
La mise à jour d'un environnement GCP Composer V2 est une étape cruciale pour maintenir la performance, la sécurité et la compatibilité. Les méthodes que nous avons explorées, que ce soit le remplacement de l'environnement existant ou l'utilisation de la fonctionnalité Upgrade Composer, offrent des avantages et des inconvénients distincts.
La clé du succès réside dans une préparation minutieuse, des tests approfondis et une communication transparente avec les parties prenantes. Quelle que soit la méthode choisie, la préservation des DAGs, de leur historique et la réduction des interruptions pour les utilisateurs sont nos principaux objectifs.