Apache Spark : Use cases pour développeurs Java

Depuis que nous utilisons Apache Spark chez LesFurets.com, nous sommes étonnés par le manque d’articles et de conférences dédiés à l’utilisation de Spark avec Java. Pourtant, Java est l’un des languages supportés par Spark, tout comme Scala, Python et R, et la continuité de son support est officielle. La documentation Spark est complète et contient des exemples d’utilisation en Java, mais peu d’articles concernant des cas d’usages spécifiques existent.

Pourquoi utiliser Apache Spark en Java au lieu d’un autre langage ? Simplement parce que notre équipe de développement est à l’aise en Java, et ce langage correspond mieux à notre culture et à notre histoire. Il est à noter que depuis la version 2.0 d’Apache Spark, l’API de DataFrame uniformise l’utilisation de Spark à travers les langages (plus besoin de récupérer des JavaRDD difficiles à manipuler par exemple) et aussi au niveau de la performance (Scala était jusqu’à maintenant plus rapide de presque un ordre de grandeur).

De plus, depuis l’arrivée du DataFrame API, les plans d’exécution des Datasets et de SparkSQL sont optimisés par Catalyst, le moteur d’optimisation interne de Spark, et ce peu importe le langage. Grâce au projet Tungsten, le code généré est plus performant avec entre autre l’ajout du whole-stage codegen.

Le code de cet article est tiré d’exemples d’usage de Spark en Java sur notre github et est associé à une conférence que nous donnons sur le sujet, Apache Spark : Deep dive dans l’API Java pour développeur.

Import des dépendances

Pour utiliser Spark, il suffit d’ajouter les dépendances à votre projet Maven. Pour utiliser la version 2.1.0 de Spark, le code suivant suffit. Le 2.11 en fin de nom d’artefact Maven spécifie la version de Scala qui a été utilisée pour compiler Spark, soit 2.11 dans ce cas. Il est important d’avoir la même version sur les workers, sinon vous ne pourrez envoyer vos traitements sur ceux-ci.

<!-- Import de Spark core -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

<!-- Import de Spark SQL, nécessaire pour utiliser l'API DataFrame -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

Utiliser le point d’entrée : SparkSession

Le point d’entrée pour l’API Dataframe est la classe SparkSession, qui vous permet de récupérer des DataFrames (avec SparkSession#read), d’en créer des nouveaux (avec SparkSession#createDataframe), de gérer vos fonctions utilisateurs (avec SparkSession#udf), etc.

Le code suivant crée une nouvelle session, avec comme configuration un master local[*], et le main s’exécute en imprimant un DataFrame vide fraichement créé.

private static final SparkSession spark = SparkSession.builder()
        .appName("LesFurets.com - Spark")
        .master("local[*]")
        .getOrCreate();

public static void main(String[] args) {
    spark.emptyDataFrame().show();
}

La machine qui instancie le SparkSession est ce qu’on appelle le driver, il contient le contexte et communique avec le cluster manager afin de lancer les exécutions sur les workers (ou exécuteurs).

 

Apache Spark est un moteur en cluster, et celui-ci s’utilise en 3 modes : local (celui de l’exemple, soit 1 driver et 1 worker sur la même jvm), standalone (1 driver et 1 worker sur la même machine), cluster (mode production, soit 1 driver et n workers sur des machines différentes). Cela veut dire que le jar contenant votre programme est envoyé par le cluster manager (Standalone, Apache Mesos, Hadoop YARN) aux workers, et les datas sont sérialisées entre les JVM. Les workers n’ont donc pas directement accès aux variables du driver.

Exécution d’un programme Spark

Vous pouvez lancer le driver associé au SparkSession de l’exemple précédent à partir de votre IDE, il suffit de lancer le code comme une application Java. En mode « local », il n’y a rien d’autre à faire, Spark s’occupe du reste, soit de démarrer un worker sur la même jvm pour l’exécution. Dans Intellij, il suffit de faire « Run > Run ‘Exemple' », avec le nom de votre fichier java.

Lire un csv de tarifs assureurs

Notre exemple d’aujourd’hui sera : Comment trouver la moyenne des prix par formule pour un assureur donné ?. Pour rappel, tout le code associé est disponible sur notre github

Soit un csv contenant un extrait de nos tarifs. On cherche à d’abord récupérer le DataFrame du csv avec la méthode SparkSession#read, en passant comme option l’activation de l’inférence de schéma et de l’entête.

Dataset<Row> tarifs = spark.read()
        .option("header", true)
        .option("inferSchema", true)
        .csv(PATH_TARIFS_CSV);
tarifs.printSchema();
tarifs.show(3);

Le schéma inféré par Spark nous donne 6 colonnes avec les bons types

root
 |-- uid: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- time: string (nullable = true)
 |-- module: string (nullable = true)
 |-- formule: string (nullable = true)
 |-- prime: double (nullable = true)
 |-- assureur: string (nullable = true)

Voici un court extrait du contenu

+--------------------+--------------------+------------+------+--------------------+-------+------------------+
|                 uid|                date|        time|module|             formule|  prime|          assureur|
+--------------------+--------------------+------------+------+--------------------+-------+------------------+
|0001474666949457b...|2016-09-24 00:00:...|00:03:23.189|  auto|        TOUS RISQUES| 369.11|Mon SUPER assureur|
|0001474666949457b...|2016-09-24 00:00:...|00:03:23.189|  auto|TOUS RISQUES TRAN...| 426.57|Mon SUPER assureur|
|0001474667515396b...|2016-09-24 00:00:...|00:03:04.136|  auto|               TIERS|  475.0|Mon SUPER assureur|
+--------------------+--------------------+------------+------+--------------------+-------+------------------+

Filtrer les tarifs par assureur

On cherche ensuite à filtrer par assureur avec la méthode Dataset#filter qui prend un type SAM (donc vous pouvez passer une lamdba). Attention si vous passez une lambda il faut la caster vers le bon type! Le problème est connu et vient de la compatibilité bytecode entre Scala et Java, qui est réglé par Scala 2.12. Le support Spark de cette version de Scala n’est pas triviale, voir les discussions sur le JIRA de Spark : SPARK-14220 et SPARK-14643.

Dataset<Row> assureurFiltre = tarifs
        .filter((FilterFunction<Row>) row ->
                row.<String>getAs("assureur").equals("Mon SUPER assureur"));

La fonction filter prend en paramètre le type FilterFunction<Row>, et notre argument row dans la lambda sera de type Row. Sur ce type, la méthode Row#getAs vous retourne la valeur de la colonne au type voulu (il faut passer en paramètre de type String par exemple). La méthode FilterFunction#call nous demande de retourner un boolean, donc on compare avec String#equals sur le nom de l’assureur voulu, soit « Mon SUPER assureur ».

Faire la moyenne des primes par formule

Ensuite, on peut faire l’agrégation par nom de formule, qui se fait avec 2 opérations : Dataset#groupBy puis agg (pour « aggregate »). La plupart du temps, lorsque vous faites un groupBy, vous le suivez par une fonction d’agrégation. D’ailleurs le type retourné par groupBy n’est pas Dataset, mais bien RelationalGroupedDataset, qui est un objet intermédiaire sur lequel seul des fonctions d’aggréation sont disponibles.

Dataset<Row> averagePrime = assureurFiltre
        .groupBy("formule")
        .agg(avg("prime").as("average"));
averagePrime.printSchema();
averagePrime.show();

La fonction RelationalGroupedDataset#agg prend en paramètre un argument de type Column, et nous utilisons la fonction org.apache.spark.sql.functions#avg pour nous retourner cette colonne. En règle générale, vous trouverez la majorité des fonctions dont vous aurez besoin dans org.apache.spark.sql.functions. En Spark, il faut penser colonne même si on parle d’une fonction, par exemple notre fonction avg retourne une colonne, dont le résultat sera la moyenne du contenu.

On remarque l’usage de Column#as pour renommer la colonne. On a donc un DataFrame à 2 colonnes :

root
 |-- formule: string (nullable = true)
 |-- average: double (nullable = true)

Et nos moyennes sont correctement agrégées

+--------------------+-----------------+
|             formule|          average|
+--------------------+-----------------+
|               TIERS|485.6201062874247|
|TIERS INTERMEDIAI...|553.3796044184278|
|   TIERS TRANQUILITE|543.2877653123106|
|TOUS RISQUES TRAN...|691.4981265264315|
|        TOUS RISQUES|639.7877950498043|
| TIERS INTERMEDIAIRE|498.2656585547094|
+--------------------+-----------------+

Ordonner les résultats

Il nous reste à ordonner les résultats, à noter l’usage de org.apache.spark.sql.functions#desc qui est aussi une fonction retournant une Column (comme avg).

Dataset<Row> averagePrimeOrder = averagePrime
        .orderBy(desc("average"));
averagePrimeOrder.show(1);

La formule tous risque tranquillité est donc, pour l’assureur « Mon SUPER assureur », la formule la plus chère en moyenne.

+--------------------+-----------------+
|             formule|          average|
+--------------------+-----------------+
|TOUS RISQUES TRAN...|691.4981265264315|
+--------------------+-----------------+

En conclusion

L’API des DataFrames rend l’usage en Java relativement facile. Outre la nécessité de caster les lambdas vers les bons types, en connaissant bien l’API il est possible d’écrire un code expressif, fortement typé, d’une longueur raisonnable comparé à du code identique en Scala.

N’oubliez pas que ce code, ainsi que d’autres exemples, est disponible sur notre github.

import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.types.DataTypes.StringType;

public class TarifsRun {

    private static final SparkSession spark = SparkSession.builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {
        Dataset<Row> tarifs = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .csv(PATH_TARIFS_CSV)
                .filter((FilterFunction<Row>) value ->
                        value.<String>getAs("assureur").equals("Mon SUPER assureur"))
                .groupBy("formule")
                .agg(avg("prime").as("average"))
                .orderBy(desc("average"));
        tarifs.show();
    }

}