... modificare iterativamente le colonne del dataframe in Apache Spark

Modificare iterativamente le colonne di un dataframe, oppure aggiungere iterativamente delle colonne ad un dataframe, o altre operazioni iterative su un dataframe, non è semplicissimo da realizzare.

Vediamo come fare usando la funzione foldLeft.

Innanzitutto ci serve un dataframe, carichiamone uno a caso.

import org.apache.spark.storage.StorageLevel

val d_path = "D:/AnacondaProjects/Datasets/DatasetToCompleteTheSixSparkExercises"

val d_path_sales = d_path + "/sales_parquet"

val sales = spark.read.format("parquet").load(d_path_sales).persist(StorageLevel.MEMORY_AND_DISK)

sales.show(1)

+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|       1|         0|        0|2020−07−10|             26|kyeibuumwlyhuwksx...|
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
only showing top 1 row

import org.apache.spark.storage.StorageLevel
d_path: String = D:/AnacondaProjects/Datasets/DatasetToCompleteTheSixSparkExercises
d_path_sales: String = D:/AnacondaProjects/Datasets/DatasetToCompleteTheSixSparkExercises/sales_parquet
sales: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [order_id: string, product_id: string ... 4 more fields]

Nel codice sotto il Range agisce come una sorta di for loop.

Cosa fa il fold left?
Iniziamo con il semplificare il codice: Range(1,5).foldLeft( sales1 ){ (acc,x) => acc = func(acc,x) }

Vediamo gli step dell’iterazione:

  • step 1:
    • inizializa la variabile acc con sales1, e x e il valore corrente del range: 1 nel primo step
    • applica alla coppia (acc,x) la funzione specificata, e l’output della funzione è assegnato come nuovo valore di acc che verrà usato/propagato allo step 2
    • il nuovo valore di acc è il vecchio acc con l’aggiunta di una colonna
  • step 2:
    • ho 2 nuovi valori acc e x. acc è stato aggiornato allo step precedente e x è il valore corrente del Range (=2)
    • alla nuova coppia (acc,x) viene applicata la funzione specificata e l’aoutput della funzione viene assegnato ad acc, il cui nuovo valore verrà usato nello step 3

e così via.

Il risultato dell’intero comando è il valore di acc generato all’ultima iterazione.

val acc = sc.collectionAccumulator("prova")

val sales1 = sales.withColumnRenamed("bill_raw_text", "bill_raw_text_0")
sales1.show(5)

val sales2 = Range(1,5).foldLeft( sales1 ){ (acc, x) => acc.withColumn("bill_raw_text_"+x, md5( acc("bill_raw_text_"+(x-1)) ) )}
sales2.show(5)

+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|order_id|product_id|seller_id|      date|num_pieces_sold|     bill_raw_text_0|
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|       1|         0|        0|2020−07−10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020−07−08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020−07−05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020−07−05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020−07−05|             11|zmqexmaawmvdpqhih...|
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
only showing top 5 rows

+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|order_id|product_id|seller_id|      date|num_pieces_sold|     bill_raw_text_0|     bill_raw_text_1|     bill_raw_text_2|     bill_raw_text_3|     bill_raw_text_4|
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|       1|         0|        0|2020−07−10|             26|kyeibuumwlyhuwksx...|0e72113eda28e817b...|dd7776a030df0941b...|51960891d3640178d...|aa26467f07244138a...|
|       2|         0|        0|2020−07−08|             13|jfyuoyfkeyqkckwbu...|13af9a32eb8a31513...|4b4b6db3794449623...|1599682fcd0879b58...|afa71e43783564ce2...|
|       3|         0|        0|2020−07−05|             38|uyjihlzhzcswxcccx...|c2d269a5c9376e431...|fee9bf121605c6666...|41719f6c251ec32a7...|7946fd68fde656983...|
|       4|         0|        0|2020−07−05|             56|umnxvoqbdzpbwjqmz...|7a73a2250f416bb81...|fa00fab1238c5bf17...|82f89222fdc874bec...|260a666921f815980...|
|       5|         0|        0|2020−07−05|             11|zmqexmaawmvdpqhih...|2e66bd8b5b16526e5...|4f2c00af4df594bb9...|dc2fe70005ee80ed9...|f165c5bfcbb6b4e85...|
+−−−−−−−−+−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
only showing top 5 rows

acc: org.apache.spark.util.CollectionAccumulator[Nothing] = CollectionAccumulator(id: 189, name: Some(prova), value: [])
sales1: org.apache.spark.sql.DataFrame = [order_id: string, product_id: string ... 4 more fields]
sales2: org.apache.spark.sql.DataFrame = [order_id: string, product_id: string ... 8 more fields]

Si noti che l’ultimo numero di Range(1,5) è 4

Altro esempio

Per usare il foldLeft ho bisogno di un iterabile, nel caso sopra ho usato range, in questo esempio uso una lista di stringhe, per la precisione uso la lista dei nomi delle colonne.

Inizio dalla creazione di un dataframe a caso:

val df = spark.sparkContext.parallelize(List((1,4,5,7,9,240),(2,5,8,34,5,350))).toDF
df.show

+−−−+−−−+−−−+−−−+−−−+−−−+
| _1| _2| _3| _4| _5| _6|
+−−−+−−−+−−−+−−−+−−−+−−−+
|  1|  4|  5|  7|  9|240|
|  2|  5|  8| 34|  5|350|
+−−−+−−−+−−−+−−−+−−−+−−−+

df: org.apache.spark.sql.DataFrame = [_1: int, _2: int ... 4 more fields]

Come si vede sopra la lista dei nomi delle colonne non è molto intuitiva, quindi decido di cambiarla.

Inizio con il fare un mapping dal vecchio nome al nuovo.

val col_mapping = Map("_1" -> "user_ID",
      "_2" -> "numero_ordini_2016",
      "_3" -> "numero_ordini_2017",
      "_4" -> "numero_ordini_2018",
      "_5" -> "numero_ordini_2019",
      "_6" -> "totale_ordini_Euro")

col_mapping: scala.collection.immutable.Map[String,String] = Map(_3 −> numero_ordini_2017, _2 −> numero_ordini_2016, _6 −> totale_ordini_Euro, _1 −> user_ID, _4 −> numero_ordini_2018, _5 −> numero_ordini_2019)

Come nel caso precedente, inizializzo l’accumulatore con df.

Ad ogni step rinomina la colonna corrente (x) di df.columns, cambiando il nome in col_mapping(x).

val df2 = df.columns.foldLeft(df){ (acc,x) => acc.withColumnRenamed(x, col_mapping(x))}
df2.show

+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
|user_ID|numero_ordini_2016|numero_ordini_2017|numero_ordini_2018|numero_ordini_2019|totale_ordini_Euro|
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
|      1|                 4|                 5|                 7|                 9|               240|
|      2|                 5|                 8|                34|                 5|               350|
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+

df2: org.apache.spark.sql.DataFrame = [user_ID: int, numero_ordini_2016: int ... 4 more fields]

Bang!

Ecco il dataframe con tutte i nome delle colonne modificate.

Se voglio modificare solo alcuni nomi di colonne posso definire un Map e poi usare il foldLeft sulle keys del Map… vediamo come.

Il punto di partenza è sempre un iterabile.

//voglio modificare solo i nomi di queste due colonne
val col_mapping = Map("_1" -> "user_ID",
      "_6" -> "totale_ordini_Euro")
 
val acc = sc.collectionAccumulator("dataFrameAcc")
     
val df2 = col_mapping.keys.foldLeft(df){ (acc,x) => acc.withColumnRenamed(x, col_mapping(x))}

df2.show

+−−−−−−−+−−−+−−−+−−−+−−−+−−−−−−−−−−−−−−−−−−+
|user_ID| _2| _3| _4| _5|totale_ordini_Euro|
+−−−−−−−+−−−+−−−+−−−+−−−+−−−−−−−−−−−−−−−−−−+
|      1|  4|  5|  7|  9|               240|
|      2|  5|  8| 34|  5|               350|
+−−−−−−−+−−−+−−−+−−−+−−−+−−−−−−−−−−−−−−−−−−+

col_mapping: scala.collection.immutable.Map[String,String] = Map(_1 −> user_ID, _6 −> totale_ordini_Euro)
acc: org.apache.spark.util.CollectionAccumulator[Nothing] = CollectionAccumulator(id: 110, name: Some(dataFrameAcc), value: [])
df2: org.apache.spark.sql.DataFrame = [user_ID: int, _2: int ... 4 more fields]

e ri-Bang!


					

Leave a Reply

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>