Modificare iterativamente le colonne di un dataframe, oppure aggiungere iterativamente delle colonne ad un dataframe, o altre operazioni iterative su un dataframe, non è semplicissimo da realizzare.
Vediamo come fare usando la funzione foldLeft
.
Innanzitutto ci serve un dataframe, carichiamone uno a caso.
import org.apache.spark.storage.StorageLevel val d_path = "D:/AnacondaProjects/Datasets/DatasetToCompleteTheSixSparkExercises" val d_path_sales = d_path + "/sales_parquet" val sales = spark.read.format("parquet").load(d_path_sales).persist(StorageLevel.MEMORY_AND_DISK) sales.show(1)
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ |order_id|product_id|seller_id| date|num_pieces_sold| bill_raw_text| +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ | 1| 0| 0|2020−07−10| 26|kyeibuumwlyhuwksx...| +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ only showing top 1 row import org.apache.spark.storage.StorageLevel d_path: String = D:/AnacondaProjects/Datasets/DatasetToCompleteTheSixSparkExercises d_path_sales: String = D:/AnacondaProjects/Datasets/DatasetToCompleteTheSixSparkExercises/sales_parquet sales: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [order_id: string, product_id: string ... 4 more fields]
Nel codice sotto il Range
agisce come una sorta di for loop.
Cosa fa il fold left?
Iniziamo con il semplificare il codice: Range(1,5).foldLeft( sales1 ){ (acc,x) => acc = func(acc,x) }
Vediamo gli step dell’iterazione:
- step 1:
- inizializa la variabile
acc
consales1
, ex
e il valore corrente del range: 1 nel primo step - applica alla coppia (acc,x) la funzione specificata, e l’output della funzione è assegnato come nuovo valore di
acc
che verrà usato/propagato allo step 2 - il nuovo valore di
acc
è il vecchioacc
con l’aggiunta di una colonna
- inizializa la variabile
- step 2:
- ho 2 nuovi valori acc e x. acc è stato aggiornato allo step precedente e x è il valore corrente del Range (=2)
- alla nuova coppia (acc,x) viene applicata la funzione specificata e l’aoutput della funzione viene assegnato ad
acc
, il cui nuovo valore verrà usato nello step 3
e così via.
Il risultato dell’intero comando è il valore di acc
generato all’ultima iterazione.
val acc = sc.collectionAccumulator("prova") val sales1 = sales.withColumnRenamed("bill_raw_text", "bill_raw_text_0") sales1.show(5) val sales2 = Range(1,5).foldLeft( sales1 ){ (acc, x) => acc.withColumn("bill_raw_text_"+x, md5( acc("bill_raw_text_"+(x-1)) ) )} sales2.show(5)
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ |order_id|product_id|seller_id| date|num_pieces_sold| bill_raw_text_0| +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ | 1| 0| 0|2020−07−10| 26|kyeibuumwlyhuwksx...| | 2| 0| 0|2020−07−08| 13|jfyuoyfkeyqkckwbu...| | 3| 0| 0|2020−07−05| 38|uyjihlzhzcswxcccx...| | 4| 0| 0|2020−07−05| 56|umnxvoqbdzpbwjqmz...| | 5| 0| 0|2020−07−05| 11|zmqexmaawmvdpqhih...| +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ only showing top 5 rows +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ |order_id|product_id|seller_id| date|num_pieces_sold| bill_raw_text_0| bill_raw_text_1| bill_raw_text_2| bill_raw_text_3| bill_raw_text_4| +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ | 1| 0| 0|2020−07−10| 26|kyeibuumwlyhuwksx...|0e72113eda28e817b...|dd7776a030df0941b...|51960891d3640178d...|aa26467f07244138a...| | 2| 0| 0|2020−07−08| 13|jfyuoyfkeyqkckwbu...|13af9a32eb8a31513...|4b4b6db3794449623...|1599682fcd0879b58...|afa71e43783564ce2...| | 3| 0| 0|2020−07−05| 38|uyjihlzhzcswxcccx...|c2d269a5c9376e431...|fee9bf121605c6666...|41719f6c251ec32a7...|7946fd68fde656983...| | 4| 0| 0|2020−07−05| 56|umnxvoqbdzpbwjqmz...|7a73a2250f416bb81...|fa00fab1238c5bf17...|82f89222fdc874bec...|260a666921f815980...| | 5| 0| 0|2020−07−05| 11|zmqexmaawmvdpqhih...|2e66bd8b5b16526e5...|4f2c00af4df594bb9...|dc2fe70005ee80ed9...|f165c5bfcbb6b4e85...| +−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ only showing top 5 rows acc: org.apache.spark.util.CollectionAccumulator[Nothing] = CollectionAccumulator(id: 189, name: Some(prova), value: []) sales1: org.apache.spark.sql.DataFrame = [order_id: string, product_id: string ... 4 more fields] sales2: org.apache.spark.sql.DataFrame = [order_id: string, product_id: string ... 8 more fields]
Si noti che l’ultimo numero di Range(1,5)
è 4
Altro esempio
Per usare il foldLeft ho bisogno di un iterabile, nel caso sopra ho usato range, in questo esempio uso una lista di stringhe, per la precisione uso la lista dei nomi delle colonne.
Inizio dalla creazione di un dataframe a caso:
val df = spark.sparkContext.parallelize(List((1,4,5,7,9,240),(2,5,8,34,5,350))).toDF df.show
+−−−+−−−+−−−+−−−+−−−+−−−+ | _1| _2| _3| _4| _5| _6| +−−−+−−−+−−−+−−−+−−−+−−−+ | 1| 4| 5| 7| 9|240| | 2| 5| 8| 34| 5|350| +−−−+−−−+−−−+−−−+−−−+−−−+ df: org.apache.spark.sql.DataFrame = [_1: int, _2: int ... 4 more fields]
Come si vede sopra la lista dei nomi delle colonne non è molto intuitiva, quindi decido di cambiarla.
Inizio con il fare un mapping dal vecchio nome al nuovo.
val col_mapping = Map("_1" -> "user_ID", "_2" -> "numero_ordini_2016", "_3" -> "numero_ordini_2017", "_4" -> "numero_ordini_2018", "_5" -> "numero_ordini_2019", "_6" -> "totale_ordini_Euro")
col_mapping: scala.collection.immutable.Map[String,String] = Map(_3 −> numero_ordini_2017, _2 −> numero_ordini_2016, _6 −> totale_ordini_Euro, _1 −> user_ID, _4 −> numero_ordini_2018, _5 −> numero_ordini_2019)
Come nel caso precedente, inizializzo l’accumulatore con df.
Ad ogni step rinomina la colonna corrente (x) di df.columns, cambiando il nome in col_mapping(x).
val df2 = df.columns.foldLeft(df){ (acc,x) => acc.withColumnRenamed(x, col_mapping(x))} df2.show
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ |user_ID|numero_ordini_2016|numero_ordini_2017|numero_ordini_2018|numero_ordini_2019|totale_ordini_Euro| +−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ | 1| 4| 5| 7| 9| 240| | 2| 5| 8| 34| 5| 350| +−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ df2: org.apache.spark.sql.DataFrame = [user_ID: int, numero_ordini_2016: int ... 4 more fields]
Bang!
Ecco il dataframe con tutte i nome delle colonne modificate.
Se voglio modificare solo alcuni nomi di colonne posso definire un Map e poi usare il foldLeft sulle keys del Map… vediamo come.
Il punto di partenza è sempre un iterabile.
//voglio modificare solo i nomi di queste due colonne val col_mapping = Map("_1" -> "user_ID", "_6" -> "totale_ordini_Euro") val acc = sc.collectionAccumulator("dataFrameAcc") val df2 = col_mapping.keys.foldLeft(df){ (acc,x) => acc.withColumnRenamed(x, col_mapping(x))} df2.show
+−−−−−−−+−−−+−−−+−−−+−−−+−−−−−−−−−−−−−−−−−−+ |user_ID| _2| _3| _4| _5|totale_ordini_Euro| +−−−−−−−+−−−+−−−+−−−+−−−+−−−−−−−−−−−−−−−−−−+ | 1| 4| 5| 7| 9| 240| | 2| 5| 8| 34| 5| 350| +−−−−−−−+−−−+−−−+−−−+−−−+−−−−−−−−−−−−−−−−−−+ col_mapping: scala.collection.immutable.Map[String,String] = Map(_1 −> user_ID, _6 −> totale_ordini_Euro) acc: org.apache.spark.util.CollectionAccumulator[Nothing] = CollectionAccumulator(id: 110, name: Some(dataFrameAcc), value: []) df2: org.apache.spark.sql.DataFrame = [user_ID: int, _2: int ... 4 more fields]
e ri-Bang!