Apache Spark dispose d’une API très puissante intégrée pour extraire des données d’une base de données relationnelle. L’efficacité et la performance, selon l’approche habituelle de Spark, sont gérées de manière transparente.
Les deux concepts de base à connaître lorsque l’on travaille dans de tels scénarios sont :
- Dataset : une collection distribuée de données
- DataFrame : un Dataset organisé en colonnes nommées
Conceptuellement similaire à une table dans une base de données relationnelle, le Dataset est la structure qui va contenir nos données :
val dataset = sparkSession.read.jdbc(...);
La documentation de la méthode jdbc(…) est un peu cryptique, du moins pour moi, surtout en ce qui concerne la gestion des partitions. Voici les paramètres :
- url : l’URL JDBC de la base de données sous la forme
jdbc:subprotocol:subname. - table : le nom de la table dans la base de données externe.
- columnName : le nom d’une colonne (de la table ci-dessus) de type intégral qui sera utilisée pour la partition.
- lowerBound : la valeur minimale de columnName utilisée pour décider du pas de partition.
- upperBound : la valeur maximale de columnName utilisée pour décider du pas de partition.
- numPartitions : le nombre de partitions. Celui-ci, avec les bornes inférieure (inclusive) et supérieure (exclusive), forme les pas de partition pour les expressions WHERE générées utilisées pour diviser la colonne de manière égale. Quand l’entrée est inférieure à 1, le nombre est défini sur 1.
- connectionProperties : les arguments de connexion JDBC, une liste de paires clé/valeur arbitraires. Normalement, il devrait inclure au moins une propriété “user” et “password”. “fetchSize” peut être utilisé pour contrôler le nombre de lignes par extraction.
Parmi cette liste, les paramètres pertinents qui influencent le comportement des partitions sont : columnName, lowerBound, upperBound, et numPartitions. Alors que les premiers (columnName) et le dernier (numPartitions) devraient être intuitifs, la description de upperBound et lowerBound n’est pas si claire, même après avoir lu la documentation.
Qu’est-ce que ce “stride” ? Comment est-il calculé ?
Eh bien, bien que l’on pourrait dire “Pourquoi ne pas essayer vous-même ?”, en suivant une approche “pragmatique”, je préfère généralement aller plus en profondeur pour avoir une vue déterministe de ce que je fais.
Bien sûr, je n’ai rien contre l’approche pragmatique : je suis un grand fan de StackOverflow et des principales méthodes 🙂 mais souvent, il existe une autre façon, plus précise, d’obtenir l’information requise (même si elle est assez proche de cette approche pragmatique, comme vous le verrez bientôt).
En supposant que vous soyez familier avec Java et / ou Scala, il est assez facile de trouver dans le code Spark l’endroit où la logique de partitionnement réside et comment les conditions WHERE sont générées : cet endroit est le singleton JDBCRelation, plus précisément la fonction columnPartition(…) :
package org.apache.spark.sql.execution.datasources.jdbc
private object JDBCRelation extends Logging {
...
def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = {
...
val stride =
upperBound / numPartitions - lowerBound / numPartitions
...
}
Là, vous pouvez voir que le stride est une sorte de “pas” utilisé pour déterminer l’intervalle de chaque partition. Si nous passons les valeurs suivantes :
- column = “table_id”
- lowerBound = 0
- upperBound = 100000
- numPartitions = 10
Le stride aura une valeur de 10000.
Comment ce stride fonctionne-t-il en réalité ? Si je déplace le code de columnPartition dans une classe principale (et voici l’approche pragmatique), après avoir supprimé des éléments comme les logs et les types de retour (en gras), nous avons une méthode simple comme celle-ci :
def columnPartition(...): Unit = {
require(lowerBound <= upperBound)
val numPartitions =
if ((upperBound - lowerBound) >= requestedPartitions) {
requestedPartitions
} else {
upperBound - lowerBound
}
val stride: Long =
upperBound / numPartitions - lowerBound / numPartitions
var i: Int = 0
var currentValue: Long = lowerBound
var ans = new ArrayBuffer[String]()
while (i < numPartitions) {
val lBound = if (i != 0) s"$column >= $currentValue" else null
currentValue += stride
val uBound =
if (i != numPartitions - 1) s"$column < $currentValue" else null
val whereClause =
if (uBound == null) {
lBound
} else if (lBound == null) {
s"$uBound or $column is null"
} else {
s"$lBound AND $uBound"
}
ans += whereClause
i = i + 1
}
ans.foreach(println)
}
Ici, vous pouvez clairement voir que le stride est un “pas” utilisé pour déterminer la clause WHERE de chaque commande SELECT qui sera exécutée (une par partition). Spécifiquement, nous pouvons exécuter cette méthode avec les valeurs d’exemple ci-dessus :
def main(args: Array[String]): Unit = {
MyObject.columnPartition(0, 100000, 10, "table_id")
}
Et nous obtiendrons cette réponse :
table_id < 10000 or table_id is null
table_id >= 10000 AND table_id < 20000
table_id >= 20000 AND table_id < 30000
table_id >= 30000 AND table_id < 40000
table_id >= 40000 AND table_id < 50000
table_id >= 50000 AND table_id < 60000
table_id >= 60000 AND table_id < 70000
table_id >= 70000 AND table_id < 80000
table_id >= 80000 AND table_id < 90000
table_id >= 90000
La logique de partitionnement s’assure qu’aucune donnée ne soit laissée de côté, quels que soient les paramètres d’entrée ; ce que vous devez expérimenter et ajuster, ce qui dépend fortement de votre contexte concret, c’est le nombre de partitions et la taille correspondante. Une erreur courante est d’assigner une mauvaise taille à la dernière partition : si, dans l’exemple précédent, la cardinalité de la table est de 10 000 000, la dernière partition :
table_id >= 90000
devra récupérer 9 910 000 lignes, ce qui constitue une charge déséquilibrée par rapport aux autres partitions.