I have a Dataset
gathering informations about French cities,
/** * Obtenir un Dataset des communes. * @param session Session Spark. * @param anneeCOG Année du Code Officiel Géographique de référence. * @param verifications Vérifications demandées. * @return Dataset des communes. * @throws TechniqueException si un incident survient. */ public Dataset<Row> rowCommunes(SparkSession session, int anneeCOG, Verification... verifications) throws TechniqueException { String nomStore = "communes_par_codeCommune"; Dataset<Row> communes = loadFromStore(session, "{0}_{1,number,#0}", nomStore, anneeCOG, verifications); if (communes != null) { return communes; } LOGGER.info("Constitution du dataset des communes depuis pour le Code Officiel Géographique (COG) de l'année {}...", anneeCOG); Dataset<Row> c = loadAndRenameCommmunesCSV(session, anneeCOG, false, verifications); LOGGER.info("X01"); c.printSchema(); c.show(false); Dataset<Row> s = this.datasetSirenCommunaux.rowSirenCommunes(session, anneeCOG, TriSirenCommunaux.CODE_COMMUNE); LOGGER.info("X02"); s.printSchema(); s.show(false); Column condition1 = c.col("codeCommune").equalTo(s.col("codeCommune")); Column condition2 = c.col("codeCommuneParente").equalTo(s.col("codeCommune")); verifications("jonction communes et siren par codeCommune", c, null, s, condition1, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES); Dataset<Row> join1 = c.join(s, condition1) .drop(s.col("codeCommune")) .drop(s.col("nomCommune")) .drop(s.col("codeRegion")) .drop(s.col("codeDepartement")); verifications("jonction communes et siren par codeCommune, join1", c, null, null, null, verifications); verifications("jonction communes et siren par codeCommuneParente", c, null, s, condition2, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES); Dataset<Row> join2 = c.join(s, condition2) .drop(s.col("codeCommune")) .drop(s.col("nomCommune")) .drop(s.col("codeRegion")) .drop(s.col("codeDepartement")); verifications("jonction communes et siren par codeCommuneParente, join2", c, null, null, null, verifications); communes = join1.union(join2); LOGGER.info("X03"); communes.printSchema(); communes.show(false); // La strate communale doit concorder avec celle des comptes individuels des communes. communes = communes.withColumn("strateCommune", when(s.col("populationTotale").between(0, 249), lit(1)) // communes de moins de 250 hab .when(s.col("populationTotale").between(250, 499), lit(2)) // communes de 250 à 500 hab .when(s.col("populationTotale").between(500, 1999), lit(3)) // communes de 500 à 2 000 hab .when(s.col("populationTotale").between(2000, 3499), lit(4)) // communes de 2 000 à 3 500 hab .when(s.col("populationTotale").between(3500, 4999), lit(5)) // communes de 3 500 à 5 000 hab .when(s.col("populationTotale").between(5000, 9999), lit(6)) // communes de 5 000 à 10 000 hab .when(s.col("populationTotale").between(10000, 19999), lit(7)) // communes de 10 000 à 20 000 hab .when(s.col("populationTotale").between(20000, 49999), lit(8)) // communes de 20 000 à 50 000 hab .when(s.col("populationTotale").between(50000, 99999), lit(9)) // communes de 50 000 à 100 000 hab .otherwise(lit(10))); // communes de plus de 100 000 hab // Obtenir les contours des communes. // "(requête SQL) contours" est la forme de substitution pour Spark. cf https://stackoverflow.com/questions/38376307/create-spark-dataframe-from-sql-query String format = "(select insee as codecommuneosm, nom as nomcommuneosm, surf_ha as surface2, st_x(st_centroid(geom)) as longitude, st_y(st_centroid(geom)) as latitude from communes_{0,number,#0}) contours"; String sql = MessageFormat.format(format, anneeCOG); Dataset<Row> contours = sql(session, sql).load(); contours = contours.withColumn("surface", col("surface2").cast(DoubleType)).drop(col("surface2")) .orderBy("codecommuneosm"); Column conditionJoinContours = col("codeCommune").equalTo(col("codecommuneosm")); verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, contours, conditionJoinContours, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES); communes = communes.join(contours, conditionJoinContours, "left_outer") .drop(col("codecommuneosm")).drop(col("nomcommuneosm")); verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, null, null, verifications); LOGGER.info("X04"); communes.printSchema(); communes.show(false); // Associer à chaque commune son code intercommunalité, si elle en a un (les communes-communautés peuvent ne pas en avoir). Dataset<Row> perimetres = this.datasetPerimetres.rowPerimetres(session, anneeCOG, EPCIPerimetreDataset.TriPerimetresEPCI.CODE_COMMUNE_MEMBRE).selectExpr("sirenCommuneMembre", "sirenGroupement as codeEPCI", "nomGroupement as nomEPCI"); Column conditionJoinPerimetres = communes.col("sirenCommune").equalTo(perimetres.col("sirenCommuneMembre")); verifications("jonction communes et périmètres", communes, null, perimetres, conditionJoinPerimetres, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES); communes = communes.join(perimetres, conditionJoinPerimetres, "left"); LOGGER.info("X05"); communes.printSchema(); communes.show(false); // Y associer les départements. communes = this.datasetDepartements.withDepartement(session, "codeDepartementRetabli", communes, "codeDepartement", null, true, anneeCOG) .drop("codeRegionDepartement"); LOGGER.info("X06"); communes.printSchema(); communes.show(false); //communes = communes.repartition(col("codeDepartement")); communes = communes.sortWithinPartitions(col("codeCommune")); communes = communes.persist(); // Important : améliore les performances. LOGGER.info("X07"); communes.printSchema(); communes.show(false); saveToStore(communes, new String[] {/*"codeDepartement"*/}, "{0}_{1,number,#0}", nomStore, anneeCOG); LOGGER.info("Le dataset des communes du Code Officiel Géographique de l'année {} est prêt et stocké.", anneeCOG); return communes; }
and the field that is troubling me is the department one (CodeDepartement
).
When the Dataset isn’t partitioned by this String field codeDepartement: everything is working well
When that function runs, if I don’t attempt to partition the dataset (the required statements for partitioning are commented here), everything goes fine:
The content displayed on the console is this one:
2022-01-18 08:47:50.995 INFO 25913 --- [nio-9090-exec-1] f.e.spark.dataset.cog.CogDataset : X07 root |-- typeCommune: string (nullable = true) |-- codeCommune: string (nullable = true) |-- codeRegion: string (nullable = true) |-- codeDepartement: string (nullable = true) |-- CTCD: string (nullable = true) |-- arrondissement: string (nullable = true) |-- typeNomEtCharniere: string (nullable = true) |-- nomMajuscules: string (nullable = true) |-- nomCommune: string (nullable = true) |-- LIBELLE: string (nullable = true) |-- codeCanton: string (nullable = true) |-- codeCommuneParente: string (nullable = true) |-- sirenCommune: string (nullable = true) |-- populationTotale: integer (nullable = true) |-- populationMunicipale: integer (nullable = true) |-- populationCompteApart: integer (nullable = true) |-- strateCommune: integer (nullable = false) |-- longitude: double (nullable = true) |-- latitude: double (nullable = true) |-- surface: double (nullable = true) |-- sirenCommuneMembre: string (nullable = true) |-- codeEPCI: string (nullable = true) |-- nomEPCI: string (nullable = true) |-- codeDepartementRetabli: string (nullable = true) |-- codeCommuneChefLieuDepartement: string (nullable = true) |-- typeNomEtCharniereDepartement: string (nullable = true) |-- nomMajusculesDepartement: string (nullable = true) |-- nomDepartement: string (nullable = true) |-- libelleDepartement: string (nullable = true) +-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+ |typeCommune|codeCommune|codeRegion|codeDepartement|CTCD|arrondissement|typeNomEtCharniere|nomMajuscules |nomCommune |LIBELLE |codeCanton|codeCommuneParente|sirenCommune|populationTotale|populationMunicipale|populationCompteApart|strateCommune|longitude |latitude |surface|sirenCommuneMembre|codeEPCI |nomEPCI |codeDepartementRetabli|codeCommuneChefLieuDepartement|typeNomEtCharniereDepartement|nomMajusculesDepartement|nomDepartement |libelleDepartement | +-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+ |COM |02053 |32 |02 |02D |021 |0 |VALLEES EN CHAMPAGNE |Vallées en Champagne |Vallées en Champagne |0204 |null |200056307 |576 |570 |6 |3 |3.616431731194736 |49.007989478704616|4168.0 |200056307 |200072031|CA de la Région de Château-Thierry |02 |02408 |5 |AISNE |Aisne |Aisne | |COM |02057 |32 |02 |02D |023 |0 |BEAUREVOIR |Beaurevoir |Beaurevoir |0201 |null |210200564 |1433 |1415 |18 |3 |3.330083095945232 |49.996641228776475|2180.0 |210200564 |240200493|CC du Pays du Vermandois |02 |02408 |5 |AISNE |Aisne |Aisne | |COM |02070 |32 |02 |02D |025 |0 |BERNOT |Bernot |Bernot |0207 |null |210200697 |461 |448 |13 |2 |3.4866867514904327|49.87884521170864 |1672.0 |210200697 |200071983|CC Thiérache Sambre et Oise |02 |02408 |5 |AISNE |Aisne |Aisne | |COM |02090 |32 |02 |02D |024 |0 |BILLY SUR OURCQ |Billy-sur-Ourcq |Billy-sur-Ourcq |0221 |null |210200887 |220 |210 |10 |1 |3.2992043452730138|49.22146881666656 |1021.0 |210200887 |240200519|CC du Canton d'Oulchy le Château |02 |02408 |5 |AISNE |Aisne |Aisne | |COM |02291 |32 |02 |02D |023 |1 |ESTREES |Estrées |Estrées |0201 |null |210202743 |419 |411 |8 |2 |3.2807599426576335|49.97077968837569 |704.0 |210202743 |240200493|CC du Pays du Vermandois |02 |02408 |5 |AISNE |Aisne |Aisne | |COM |02527 |32 |02 |02D |024 |0 |MORSAIN |Morsain |Morsain |0220 |null |210205043 |464 |455 |9 |2 |3.1908402513250707|49.46152187457162 |1434.0 |210205043 |200071991|CC Retz en Valois |02 |02408 |5 |AISNE |Aisne |Aisne | |COM |03098 |84 |03 |03D |031 |0 |DESERTINES |Désertines |Désertines |0310 |null |210300984 |4519 |4432 |87 |5 |2.627485866618113 |46.357035806459585|827.0 |210300984 |200071082|CA Montluçon Communauté |03 |03190 |5 |ALLIER |Allier |Allier | |COM |04138 |93 |04 |04D |044 |0 |NIOZELLES |Niozelles |Niozelles |0406 |null |210401386 |284 |278 |6 |2 |5.847058475050684 |43.931981561717315|1061.0 |210401386 |240400440|CC Pays Forcalquier et Montagne de Lure|04 |04070 |4 |ALPES DE HAUTE PROVENCE |Alpes-de-Haute-Provence|Alpes-de-Haute-Provence| |COM |05107 |93 |05 |05D |051 |0 |PUY SAINT ANDRE |Puy-Saint-André |Puy-Saint-André |0502 |null |210501078 |480 |457 |23 |2 |6.566597158877383 |44.891314199898126|1530.0 |210501078 |240500439|CC du Briançonnais |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes | |COM |05163 |93 |05 |05D |052 |2 |SAUZE DU LAC |Sauze-du-Lac |Le Sauze-du-Lac |0504 |null |210501631 |149 |145 |4 |1 |6.323608116359499 |44.48601132516764 |1223.0 |210501631 |200067742|CC Serre-Ponçon |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes | |COM |05176 |93 |05 |05D |052 |0 |VALSERRES |Valserres |Valserres |0514 |null |210501763 |271 |267 |4 |2 |6.1382720630480465|44.48971119562803 |1123.0 |210501763 |200067320|CC Serre-Ponçon Val d'Avance |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes | |COM |07038 |84 |07 |07D |071 |0 |BORNE |Borne |Borne |0713 |null |210700381 |49 |49 |0 |1 |4.041460041609979 |44.61694459940681 |3199.0 |210700381 |200072007|CC de la Montagne d'Ardèche |07 |07186 |5 |ARDECHE |Ardèche |Ardèche | |COM |09120 |76 |09 |09D |093 |0 |FABAS |Fabas |Fabas |0911 |null |210901203 |356 |352 |4 |2 |1.1143034959106055|43.11978973617488 |2327.0 |210901203 |200067940|CC Couserans-Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège | |COM |09231 |76 |09 |09D |093 |2 |PORT |Port |Le Port |0903 |null |210902318 |160 |159 |1 |1 |1.3862330079391603|42.83569938864616 |5016.0 |210902318 |200067940|CC Couserans-Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège | |COM |09276 |76 |09 |09D |092 |0 |SAINT VICTOR ROUZAUD |Saint-Victor-Rouzaud |Saint-Victor-Rouzaud |0907 |null |210902763 |232 |222 |10 |1 |1.5478697013903504|43.087886607776255|1270.0 |210902763 |200066231|CC des Portes d'Ariège Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège | +-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
The sub functions saveToStore
are called, they are just doing this:
protected boolean saveToStore(Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) { return saveToStore(this.tempDir, isCacheUtilise(), ds, colonnesPartionnement, format, args); } public static boolean saveToStore(String tempDir, boolean useCache, Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) { if (useCache && ds.isEmpty() == false) { String store = tempDir + "/" + MessageFormat.format(format, args); ds.write().partitionBy(colonnesPartionnement).parquet(store); LOGGER.info("Un dataset a été sauvegardé dans le fichier parquet {}.", store); return true; } if (useCache == false) { LOGGER.debug("Le cache est désactivé, le dataset n'a pas été sauvegardé dans un fichier parquet."); } else { LOGGER.warn("Le dataset est vide et n'a pas été sauvegardé dans un fichier parquet."); } return false; }
A parquet file is created (a folder containing 200 files, like part-00000-522c936f-7b72-4ed8-bab9-9d4acee6bc7c-c000.snappy.parquet
, not partitioned), and if I load the resulting parquet file with Zeppelin, I receive that content :
val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021") communes.printSchema() communes.select("codeDepartement").distinct() .orderBy("codeDepartement") .show(100)
root |-- typeCommune: string (nullable = true) |-- codeCommune: string (nullable = true) |-- codeRegion: string (nullable = true) |-- codeDepartement: string (nullable = true) |-- CTCD: string (nullable = true) |-- arrondissement: string (nullable = true) |-- typeNomEtCharniere: string (nullable = true) |-- nomMajuscules: string (nullable = true) |-- nomCommune: string (nullable = true) |-- LIBELLE: string (nullable = true) |-- codeCanton: string (nullable = true) |-- codeCommuneParente: string (nullable = true) |-- sirenCommune: string (nullable = true) |-- populationTotale: integer (nullable = true) |-- populationMunicipale: integer (nullable = true) |-- populationCompteApart: integer (nullable = true) |-- strateCommune: integer (nullable = true) |-- longitude: double (nullable = true) |-- latitude: double (nullable = true) |-- surface: double (nullable = true) |-- sirenCommuneMembre: string (nullable = true) |-- codeEPCI: string (nullable = true) |-- nomEPCI: string (nullable = true) |-- codeDepartementRetabli: string (nullable = true) |-- codeCommuneChefLieuDepartement: string (nullable = true) |-- typeNomEtCharniereDepartement: string (nullable = true) |-- nomMajusculesDepartement: string (nullable = true) |-- nomDepartement: string (nullable = true) |-- libelleDepartement: string (nullable = true) +---------------+ |codeDepartement| +---------------+ | 01| | 02| | 03| | 04| | 05| | 06| | 07| | 08| | 09| | 10| | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| | 21| | 22| | 23| | 24| | 25| | 26| | 27| | 28| | 29| | 2A| | 2B| | 30| | 31| | 32| | 33| | 34| | 35| | 36| | 37| | 38| | 39| | 40| | 41| | 42| | 43| | 44| | 45| | 46| | 47| | 48| | 49| | 50| | 51| | 52| | 53| | 54| | 55| | 56| | 57| | 58| | 59| | 60| | 61| | 62| | 63| | 64| | 65| | 66| | 67| | 68| | 69| | 70| | 71| | 72| | 73| | 74| | 75| | 76| | 77| | 78| | 79| | 80| | 81| | 82| | 83| | 84| | 85| | 86| | 87| | 88| | 89| | 90| | 91| | 92| | 93| | 94| | 95| | 971| | 972| | 973| | 974| +---------------+ only showing top 100 rows
The leading zeroes of the codeDepartement
fields are here, and it’s nofmal, this field is a string, proved by some 2A
and to 2B
that are code for Corse (North and South) department.
I notice that in the schema shown, codeDepartment
is in fourth position.
When the dataset is partitioned with codeDepartement: leading zeroes are lost when the parquet file is loaded:
if I activate in my CogDataset
source file the partitionning by codeDepartement of the Dataset, by uncommenting the lines:
communes = communes.repartition(col("codeDepartement")); [...] saveToStore(communes, new String[] {"codeDepartement"}, "{0}_{1,number,#0}", nomStore, anneeCOG);
The content of the dump “X07” is the same, except that codeDepartment
are ordered, and retain their leading zeroes, the parquet file has now subfolders like codeDepartement=02
, (and 02
has its leading zero, so it’s promising), but when I load that parquet file with Zeppelin, things are going wrong:
root |-- typeCommune: string (nullable = true) |-- codeCommune: string (nullable = true) |-- codeRegion: string (nullable = true) |-- CTCD: string (nullable = true) |-- arrondissement: string (nullable = true) |-- typeNomEtCharniere: string (nullable = true) |-- nomMajuscules: string (nullable = true) |-- nomCommune: string (nullable = true) |-- LIBELLE: string (nullable = true) |-- codeCanton: string (nullable = true) |-- codeCommuneParente: string (nullable = true) |-- sirenCommune: string (nullable = true) |-- populationTotale: integer (nullable = true) |-- populationMunicipale: integer (nullable = true) |-- populationCompteApart: integer (nullable = true) |-- strateCommune: integer (nullable = true) |-- longitude: double (nullable = true) |-- latitude: double (nullable = true) |-- surface: double (nullable = true) |-- sirenCommuneMembre: string (nullable = true) |-- codeEPCI: string (nullable = true) |-- nomEPCI: string (nullable = true) |-- codeDepartementRetabli: string (nullable = true) |-- codeCommuneChefLieuDepartement: string (nullable = true) |-- typeNomEtCharniereDepartement: string (nullable = true) |-- nomMajusculesDepartement: string (nullable = true) |-- nomDepartement: string (nullable = true) |-- libelleDepartement: string (nullable = true) |-- codeDepartement: string (nullable = true) +---------------+ |codeDepartement| +---------------+ | 1| | 10| | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| | 2| | 21| | 22| | 23| | 24| | 25| | 26| | 27| | 28| | 29| | 2A| | 2B| | 3| | 30| | 31| | 32| | 33| | 34| | 35| | 36| | 37| | 38| | 39| | 4| | 40| | 41| | 42| | 43| | 44| | 45| | 46| | 47| | 48| | 49| | 5| | 50| | 51| | 52| | 53| | 54| | 55| | 56| | 57| | 58| | 59| | 6| | 60| | 61| | 62| | 63| | 64| | 65| | 66| | 67| | 68| | 69| | 7| | 70| | 71| | 72| | 73| | 74| | 75| | 76| | 77| | 78| | 79| | 8| | 80| | 81| | 82| | 83| | 84| | 85| | 86| | 87| | 88| | 89| | 9| | 90| | 91| | 92| | 93| | 94| | 95| | 971| | 972| | 973| | 974| +---------------+ only showing top 100 rows
Leading zeroes of codeDepartement
are lost,
while 2A
and 2B
department codes are still here, showing that this field is still a string
.
I notice that the codeDepartment
given by Parquet is at the last position of the schema, as if it recreated that field itself (?).
Do you have an idea of what is affecting me?
It looks like I’m missing some options that I should set before storing to Parquet or reloading for it my content?
I’m using Spark 3.2.0
.
Advertisement
Answer
I found out the answer. The problem isn’t the parquet file itself, but the fact that these statements:
val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021") communes.printSchema()
even if they are displaying the good dataset schema, don’t take this schema really into account, and try somewhat to infer it from data, while reading, as long as possible (?!),
therefore, considering my codeDepartement
field as numeric until stumbling upon the values 2A
and 2B
forcing it to switch its field type to string.
I wonder why is parquet file loading working that way. And if my explanation is the good one.
But the correct way to ensure the schema of the parquet file is taken into account is to explicitly ask to extract it first from that parquet file, and then ask the load function to use it:
val parquetFile = "/data/tmp/communes_par_codeCommune_2021"; val schema = spark.read.parquet(parquetFile).schema; val communes = spark.read.schema(schema).parquet(parquetFile);
and then the behavior of parquet file reading becomes normal:
+---------------+ |codeDepartement| +---------------+ | 01| | 02| | 03| | 04| | 05| | 06|
Maybe there’s a simpler way to achieve that, if you know it, I am happy to learn it because this is a bit burdensome.