Les tests unitaires sont parfois négligés dans le développement de projets, alors qu'ils sont primordiaux pour :
- Vérifier le bon fonctionnement d'un morceau de code ✅
- Assurer la non-régression ✅
- Détecter les erreurs au plus tôt ✅
- Améliorer la qualité globale du code ✅
- Optimiser la maintenance de l'application ✅
Aujourd'hui, voyons ensemble comment vous pouvez créer de bons tests unitaires durant le développement de Pipelines Data en Spark (en Scala).
Un premier test simple 🔍
Les exemples seront présentés à l'aide de l'utilisation du Framework de tests Funspec
mais les principes restent les mêmes que si vous utilisiez Funsuite
, FlatSpec
, JUnit
, etc.
import org.scalatest.{FunSpec, Matchers}
class MyTest extends FunSpec with Matchers with SparkSessionUtils {
it("Should return DataFrame with one row Given raw data") {
// GIVEN
val filePath = "src/test/resources/simple.csv"
// WHEN
val df = MySingleton.loadCsv(filePath)
// THEN
dataset.collect should have length 1
}
}
Le test est donc rédigé en trois parties afin de respecter les standards de clarté du code :
- GIVEN : représente le contexte dans lequel le test unitaire va s'exécuter. On va typiquement pouvoir préciser toutes les données nourrissant la fonction que l'on cherche à tester.
- WHEN : il s'agit de la fonction que l'on souhaite tester. Idéalement, cette section ne doit contenir qu'une seule ligne afin d'afficher clairement l'intention du test.
- THEN : il s'agit de l'assertion que l'on cherche à vérifier. Ici, on vérifie le nombre de lignes contenues dans le DataFrame. Dans cette section, n'ayez pas peur d'utiliser des méthodes qui réalisent des retours au Driver (
.collect
,count
,head
, etc.). Pour rappel, les données utilisées dans les tests ne doivent pas être très volumineuses et n'auront pas d'impact négatif sur le temps d'exécution des tests.
Vous vous demandez peut-être ce que signifie SparkSessionUtils
dans l'extrait de code précédent, c'est ce que nous allons découvrir tout de suite.
Mutualiser la session Spark ⏳
Je me suis rendu compte qu’au fur et à mesure du développement de mes tests unitaires (et de la croissance sans fin de leur nombre…) que certains prenaient un temps fou à s’exécuter.
Une session Spark par fichier 📖
Mon principal problème résidait finalement dans le fait que je déclarais une nouvelle session Spark dans chacun de mes fichiers de tests :
implicit val spark: SparkSession = SparkSession
.builder()
.config("spark.master", "local")
.config("spark.driver.allowMultipleContexts", value = true)
.appName("SparkScalaApplication")
.getOrCreate()
La réalité, c’est que ce petit bout de code va démarrer, pour chaque fichier de tests, une nouvelle session Spark. Même avec la fonction getOrCreate
, certaines actions de démarrage vont s’exécuter. Au lancement des tests, j’ai pu mesurer que le lancement de la SparkSession
me prend entre 15 et 30 secondes. En multipliant cela par le nombre de fichiers de tests que possède l’application, on arrive à des durées de plus en plus importantes (et frustrantes 😤) jusqu’à obtenir l’inacceptable alors qu’aucun test en lui-même ne s’est réellement exécuté.
Alors, comment on accélère tout cela ? 🚀
Ce que je propose, c’est d’encapsuler la session Spark dans un trait
Scala et d’étendre cet objet dans chacune de vos classes de tests :
import org.apache.spark.sql.SparkSession
trait SparkSessionUtils {
implicit val spark: SparkSession = SparkSession
.builder()
.config("spark.master", "local")
.config("spark.driver.allowMultipleContexts", value = true)
.appName("SparkScalaApplication")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
}
Maintenant que votre session Spark est étendue sur tous vos fichiers de tests (class MyClass extends SparkSessionUtils
), eelle ne démarrera qu’une seule fois puis sera partagée dans toutes vos classes ! C’est vraiment du temps gagné👌 !
Autre astuce permise par cette méthode : vous pouvez donner à votre session Spark partagée les configurations que vous souhaiter avoir, directement dans votre Trait. Au revoir la duplication de code 👼 ! Par exemple, dans le code ci-dessus, vous pouvez voir que j’ai spécifié le niveau de log de Spark à ERROR
, cela permet de n’afficher que très peu de messages lors de l’exécution des tests unitaires. C’est personnel, mais je préfère avoir le moins de messages de logs possibles, le moins de “bruit” et n’avoir à la fin que les informations essentielles à mon programme.
Préparer des Datasets 💾
Ne négligez pas l'importance de la préparation de jeux de données proches de vos données de production. En effet, dans le cadre des tests unitaires, on ne cherche pas à stresser le système en envoyant de grandes quantités de données (cela correspond plutôt à des tests de performances). Ainsi, il vaut mieux se concentrer sur des données pouvant provoquer des erreurs lors de la lecture/écriture des DataFrames (des types problématiques, des chaînes de caractères avec de mauvais encodages, etc.).
Vous pouvez même utiliser ChatGPT pour générer des données précises et anonymisées avec le bon prompt.
Sinon, vous pouvez créer des Case Class
spécifiques à vos tests et les appeler dans la partie GIVEN en tant que Seq
pour les convertir en DataFrames :
case class MyIntermediateClass(
id: String,
created_time_utc: Option[Timestamp] = None
)
it("...") {
// GIVEN
val df = Seq(
Order(id = "1", created_time_utc = ts(1)),
Order(id = "2", created_time_utc = ts(2)),
Order(id = "3", created_time_utc = ts(3))
).toDF()
...
}
Conclusion 🎯
Les tests unitaires sont essentiels pour assurer la qualité et la robustesse de votre code, notamment lors du développement de pipelines de données en Spark. En adoptant une méthodologie claire, comme l'utilisation des sections GIVEN, WHEN et THEN, vous pouvez structurer vos tests pour qu'ils soient à la fois efficaces et faciles à comprendre. En optimisant des aspects comme la gestion de la SparkSession
et en préparant soigneusement vos Datasets, vous réduisez non seulement les temps d'exécution, mais vous améliorez également la maintenabilité de votre code. Ces bonnes pratiques vous permettront de détecter rapidement les erreurs, d'éviter les régressions, et de maintenir un code de haute qualité sur le long terme.