... Certificazione Apache Spark Developer con Scala – Parte 1

In questo articolo vediamo un ripasso guidato dell’esame per la certificazione Databricks Spark Associate Developer con Scala.

Gli argomenti da conoscere per l’esame di certificazione CRT020 – Spark certified associated developerwith Scala sono quelli riportati alla pagina linkata.

In questo articolo vedremo soltanto le parti di programmazione dell’esame di certificazione.

Nei riquadri colorati ho riportato il testo come dal sito Databricks

SparkContext

Candidates are expected to know how to use the SparkContext to control basic configuration settings such as spark.sql.shuffle.partitions.

Innanzitutto verifichiamo che versione di Spark stiamo usando:

spark.version

res1: String = 2.4.4

Per settare i parametri di configurazione di Apache Spark posso usare il metodo spark.conf.set direttamente nella Spark-shell (o in Zeppelin)

spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "6g")

per verificare che i parametri siano stati settati correttamente uso il metodo spark.conf.get

print(spark.conf.get("spark.sql.shuffle.partitions") + ", " + spark.conf.get("spark.executor.memory"))

6, 6g

Per visualizzare tutti i settaggi posso usare spark.conf.getAll

spark.conf.getAll.foreach(println)

(zeppelin.pyspark.python,C:\Users\home\Anaconda3\envs\conda2020\python)
(spark.driver.host,10.0.75.1)
(zeppelin.dep.localrepo,local−repo)
(zeppelin.spark.sql.stacktrace,false)
(spark.driver.port,51622)
(master,local[15])
(spark.repl.class.uri,spark://10.0.75.1:51622/classes)
(zeppelin.spark.useHiveContext,true)
(spark.repl.class.outputDir,C:\Users\home\AppData\Local\Temp\spark6251788337677085436)
(zeppelin.spark.sql.interpolation,false)
(zeppelin.spark.importImplicit,true)
(zeppelin.interpreter.output.limit,102400)
(spark.app.name,Zeppelin)
(zeppelin.R.cmd,R)
(zeppelin.spark.maxResult,1000)
(zeppelin.pyspark.useIPython,true)
(zeppelin.spark.concurrentSQL,false)
(zeppelin.spark.enableSupportedVersionCheck,true)
(zeppelin.spark.printREPLOutput,true)
(zeppelin.dep.additionalRemoteRepository,spark−packages,http://dl.bintray.com/spark−packages/maven,false;)
(org.apache.spark.storage.BlockManager,DEBUG)
(spark.executor.id,driver)
(log4j.logger.org.apache.spark.storage.BlockManager,TRACE)
(zeppelin.spark.useNew,true)
(spark.useHiveContext,true)
(spark.master,local)
(zeppelin.R.image.width,100%)
(zeppelin.spark.ui.hidden,false)
(zeppelin.interpreter.localRepo,C:\zeppelin−0.8.2−bin−all/local−repo/spark)
(spark.executor.memory,6g)
(spark.driver.allowMultipleContexts,false)
(zeppelin.R.render.options,out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, warning = F, fig.retina = 2)
(zeppelin.interpreter.max.poolsize,10)
(spark.app.id,local−1578911285841)
(zeppelin.R.knitr,true)
(spark.sql.shuffle.partitions,6)

Prima di definire un nuovo SparkContext devo cancellare quello vecchio

sc.stop()

Per definire i parametri di configurazione posso usare spark.conf (come fatto sopra) oppure l’oggetto SparkConf di org.apache.spark.SparkConf, i due sono equivalenti

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("provaApp")
val sc = new SparkContext(conf)

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@13115401
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5a7f8768

Un altro modo per visualizzare i settaggi è usare toDebugString

conf.toDebugString

res4: String =
spark.app.name=provaApp
spark.driver.extraClassPath=C:\Users\home\Downloads\jar_files\mongo−spark−connector_2.12−2.4.1.jar
spark.driver.extraJavaOptions= −Dfile.encoding=UTF−8 −Dzeppelin.log.file='C:\zeppelin−0.8.2−bin−all\logs\zeppelin−interpreter−spark−home−DESKTOP−46GRV54.log'
spark.driver.memory=8g
spark.executor.extraClassPath=C:\Users\home\Downloads\jar_files\mongo−spark−connector_2.12−2.4.1.jar
spark.executor.memory=8g
spark.jars=file:///C:/zeppelin−0.8.2−bin−all/interpreter/spark/spark−interpreter−0.8.2.jar,file:/C:/zeppelin−0.8.2−bin−all/interpreter/spark/spark−interpreter−0.8.2.jar
spark.master=local[*]
spark.repl.local.jars=file:///C:/zeppelin−0.8.2−bin−all/interpreter/spark/spark−interpreter−0.8.2.jar
spark.submit.deployMode=client

SparkSession

Sopra abbiamo visto come configurare lo SparkContext, se invece vogliamo definire le proprietà della SparkSession, punto di accesso al cluster di default a partire da Spark 2.0, posso usare il modo che segue.

spark.stop()

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf().setMaster("local[*]").set("spark.driver.cores", "1").set("spark.executor.memory","4G")

val spark = SparkSession.builder.config(conf=conf).getOrCreate()

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@650f29a4
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7db780c

Nel definire la SparkSession (o lo SparkContext) ci sono due settaggi che sono obbligatori: spark.master e spark.app.name.

Alcuni parametri dello SparkConf (vedi link per la lista completa):

property name default meaning
spark.app.name none
spark.driver.cores 1 Number of cores to use for the driver process, #only in cluster mode.
spark.driver.memory 1g
spark.executor.memory 1g
spark.local.dir /tmp Directory to use for “scratch” space in Spark, including map output files and RDDs that get stored on disk.
spark.master none The deploy mode of Spark driver program, either “client” or “cluster”, Which means to launch driver program locally (“client”) or remotely (“cluster”) on one of the nodes inside the cluster.

DataFrames API

Candidates are expected to know how to:

  • Create a DataFrame/Dataset from a collection (e.g. list or set)

Per la creazione di un DataFrame si può vedere questo post.

I punti principali sono riportati sotto.

Creazione di un dataframe – Metodo 1

Per creare un dataframe da una lista di valori si può usare il metodo toDF.
In alcuni casi occorre importare spark.implicits._ per poter fare la conversione da lista a datafame.

import spark.implicits._

val lista_numeri = List(1,2,3,4,5,6,7)
val list_df = lista_numeri.toDF()

import spark.implicits._
lista_numeri: List[Int] = List(1, 2, 3, 4, 5, 6, 7)
list_df: org.apache.spark.sql.DataFrame = [value: int]

Creazione di un dataframe – Metodo 2

Uso il metodo di Sparksession createDataFrame() definito come:
def createDataFrame(rows: rdd[Row], schema: StructType): DataFrame
Il metodo necessita due parametri: un RDD di Row e uno schema definito con StructType.

Lo schema è definito così:

StructType( 
    List ( StructField("name1", TypeName1), StructField("name2", TypeName2) ...) 
)

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType, StringType}
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.master("local[15]").appName("test").getOrCreate()

//creiamo un RDD da una sequenza
val someData = spark.sparkContext.parallelize( Seq(
         Row(10230, "Paolo"),
         Row(12025, "Pippo"),
         Row(25876, "Pedro")
         )
        )

//definiamo lo schema
val someSchema = StructType ( List(
         StructField("Identificativo", IntegerType, true),
         StructField("Nome", StringType, true)
         )
       )

//dobbiamo dare come parametri un RDD (creato con sc.parallelize) e lo schema
val someDF = spark.createDataFrame( someData, someSchema )

someDF.show()
+−−−−−−−−−−−−−−+−−−−−+
|Identificativo| Nome|
+−−−−−−−−−−−−−−+−−−−−+
|         10230|Paolo|
|         12025|Pippo|
|         25876|Pedro|
+−−−−−−−−−−−−−−+−−−−−+

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType, StringType}
someData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:37
someSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Identificativo,IntegerType,true), StructField(Nome,StringType,true))
someDF: org.apache.spark.sql.DataFrame = [Identificativo: int, Nome: string]

Un altro modo di usare StructType per definire lo schema del dataframe è il seguente:

import org.apache.spark.sql.types.StructType

val schema = new StructType()
        .add($"Identificativo".long.copy(nullable = false))
        .add($"Nome".string)
        .add($"Cognome".string)

schema.printTreeString

import org.apache.spark.sql.DataFrameReader

val r: DataFrameReader = spark.read.schema(schema)

root
 |−− Identificativo: long (nullable = false)
 |−− Nome: string (nullable = true)
 |−− Cognome: string (nullable = true)

import org.apache.spark.sql.types.StructType
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Identificativo,LongType,false), StructField(Nome,StringType,true), StructField(Cognome,StringType,true))
import org.apache.spark.sql.DataFrameReader
r: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@3e061032

Adesso ho un DataFrameReader che posso usare per leggere i miei file

val data_from_csv = r.csv("exampl1.csv")

//oppure

val data_from_load = r.load("exampl1.csv")

data_from_csv: org.apache.spark.sql.DataFrame = [Identificativo: bigint, Nome: string ... 1 more field]
data_from_load: org.apache.spark.sql.DataFrame = [Identificativo: bigint, Nome: string ... 1 more field]

  • creare un dataframe da un Map

In questo caso lo schema viene settato automaticamente da Spark.

Con un dato di partenza in formato Map lo schema è:
Map -> Seq -> RDD -> DataFrame

val df = spark.createDataFrame( sc.parallelize( Map(("x", 24), ("y", 25), ("z", 26)).toSeq ) )

df.withColumnRenamed("_1", "nome").withColumnRenamed("_2", "età").show()

+−−−−+−−−+
|nome|età|
+−−−−+−−−+
|   x| 24|
|   y| 25|
|   z| 26|
+−−−−+−−−+

df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

in alternativa si possono usare i metodi toSeq e toDF in cascata (si ricordi di importare spark.implicits._ prima)

import spark.implicits._

Map(("x", 24), ("y", 25), ("z", 26)).toSeq.toDF

import spark.implicits._
res8: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

  • Create a DataFrame for a range of numbers

Anche in questo caso ci sono diversi modi per creare un dataframe con un range di numeri.

Nel primo modo si può creare un range, convertirlo in RDD e successivamente usare .toDF()

sc.parallelize(1 to 10).toDF()

res16: org.apache.spark.sql.DataFrame = [value: int]

oppure senza passare da un rdd

import org.apache.spark.sql.DataFrame

val ar = 1 to 10
val df: DataFrame = ar.toDF()

import org.apache.spark.sql.DataFrame
ar: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
df: org.apache.spark.sql.DataFrame = [value: int]

infine si può usare la funzione spark.range()

spark.range(1, 1000).show(5)

+−−−+
| id|
+−−−+
|  1|
|  2|
|  3|
|  4|
|  5|
+−−−+
only showing top 5 rows


a rigore questo crea un dataset, ma posso usare semplicemente .toDF per trasformarlo in dataframe

spark.range(1,100).toDF("ID").show(5)

+−−−+
| ID|
+−−−+
|  1|
|  2|
|  3|
|  4|
|  5|
+−−−+
only showing top 5 rows


  • Access the DataFrameReaders

Il DataFrameReaders da accesso ad una serie di metodi che consentono di leggere i files contenenti i dati.

Posso creare il DataFrameReader esplicitamente con:

import org.apache.spark.sql.DataFrameReader

val r: DataFrameReader = spark.read

r.csv("exampl1.csv")

import org.apache.spark.sql.DataFrameReader
r: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@3f709ba
res24: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]

oppure implicitamente senza passare da un oggetto DataFrameReader, ma usando spark.read.csv()

val df = spark.read.csv("exampl1.csv")

df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]

Vediamo un esempio preso da The Internal of Spark

val csvLine = "0,Warsaw,Poland"

import org.apache.spark.sql.Dataset
val cities: Dataset[String] = Seq(csvLine).toDS

cities.show

// Define schema explicitly (as below)
// or
// option("header", true) + option("inferSchema", true)
import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)

schema.printTreeString

import org.apache.spark.sql.DataFrame

val citiesDF: DataFrame = spark
  .read
  .schema(schema)
  .csv(cities)
 
citiesDF.show

+−−−−−−−−−−−−−−−+
|          value|
+−−−−−−−−−−−−−−−+
|0,Warsaw,Poland|
+−−−−−−−−−−−−−−−+

root
 |−− id: long (nullable = false)
 |−− city: string (nullable = true)
 |−− country: string (nullable = true)

+−−−+−−−−−−+−−−−−−−+
| id|  city|country|
+−−−+−−−−−−+−−−−−−−+
|  0|Warsaw| Poland|
+−−−+−−−−−−+−−−−−−−+

csvLine: String = 0,Warsaw,Poland
import org.apache.spark.sql.Dataset
cities: org.apache.spark.sql.Dataset[String] = [value: string]
import org.apache.spark.sql.types.StructType
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(city,StringType,true), StructField(country,StringType,true))
import org.apache.spark.sql.DataFrame
citiesDF: org.apache.spark.sql.DataFrame = [id: bigint, city: string ... 1 more field]

  • Register User Defined Functions (UDFs)

Dobbiamo registrare una funzione da noi definita (UDF), che potremo poi usare per operare sui dataframe o in una espressione SQL.

Usiamo il metodo udf(), con parametro la funzione da registrare. Nel nostro caso la funzione da registrare è:
x(Double) => {x*x*x}

In questo modo registriamo la UDF per l’utilizzo nella dataFrame API. Si badi che non possiamo ancora usare la funzione nelle espressioni SQL.

//importiamo udf
import org.apache.spark.sql.functions.udf

val power3udf = udf( (x:Double) => ( x*x*x ) )

val df = spark.sparkContext.parallelize(0 to 10 by 2).toDF()

df.show()

df.select(power3udf( $"value" )).show()

+−−−−−+
|value|
+−−−−−+
|    0|
|    2|
|    4|
|    6|
|    8|
|   10|
+−−−−−+

+−−−−−−−−−−+
|UDF(value)|
+−−−−−−−−−−+
|       0.0|
|       8.0|
|      64.0|
|     216.0|
|     512.0|
|    1000.0|
+−−−−−−−−−−+

import org.apache.spark.sql.functions.udf
power3udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))
df: org.apache.spark.sql.DataFrame = [value: int]

Di seguito un altro esempio di registrazione di una UDF, l’esempio è preso dall’articolo di Medium Spark User Defined Functions (UDFs).
Nell’esempio ho una funzione che converte le stringhe di input in minuscolo e rimuove tutti gli spazi.

import org.apache.spark.sql.types.StringType

def lowerRemoveAllWhitespace(s: String): String = {
 s.toLowerCase().replaceAll("\\s", "")
}

val lowerRemoveAllWhitespaceUDF = udf[String, String](lowerRemoveAllWhitespace)

val sourceDF = List(
 ("  HI THERE     "),
 (" GivE mE PresenTS     ")).toDF()

sourceDF.show()

sourceDF.select(
 lowerRemoveAllWhitespaceUDF(col("value")).as("clean_value")
).show()

+−−−−−−−−−−−−−−−−−−−−+
|               value|
+−−−−−−−−−−−−−−−−−−−−+
|       HI THERE     |
| GivE mE PresenTS...|
+−−−−−−−−−−−−−−−−−−−−+

+−−−−−−−−−−−−−−+
|   clean_value|
+−−−−−−−−−−−−−−+
|       hithere|
|givemepresents|
+−−−−−−−−−−−−−−+

import org.apache.spark.sql.types.StringType
lowerRemoveAllWhitespace: (s: String)String
lowerRemoveAllWhitespaceUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
sourceDF: org.apache.spark.sql.DataFrame = [value: string]

La funzione è registrata in dataframe. Non ci resta che registrarla anche in SQL.

//creiamo un dataframe
val df = (1 to 100 by 3).toDF("num")
df.show(5)

//registrimo la funzione in SQL
spark.udf.register("power2", (x: Double) => x*x)

//adesso che la funzione è registrata posso usarla in una query sql, nel formato Scala
df.selectExpr("power2(num)").show(5)

//posso usare la funzione anche nel formato SQL
//prima creiamo una view temporanea
df.createOrReplaceTempView("udfExampleSQL3")
//e poi la query
spark.sql("SELECT power2(num) AS p2 FROM udfExampleSQL3").show(5)

+−−−+
|num|
+−−−+
|  1|
|  4|
|  7|
| 10|
| 13|
+−−−+
only showing top 5 rows

+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|UDF:power2(cast(num as double))|
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|                            1.0|
|                           16.0|
|                           49.0|
|                          100.0|
|                          169.0|
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
only showing top 5 rows

+−−−−−+
|   p2|
+−−−−−+
|  1.0|
| 16.0|
| 49.0|
|100.0|
|169.0|
+−−−−−+
only showing top 5 rows

df: org.apache.spark.sql.DataFrame = [num: int]

DataFrameReader

Candidates are expected to know how to:

  • Read data for the “core” data formats (CSV, JSON, JDBC, ORC, Parquet, text and tables)

In modo generico questo è il comando per leggere dati da una sorgente esterna in un dataframe:

spark.read.format("csv")
    .option("mode", "FAILFAST")
    .option("inferSchema", "true")
    .option("path", "path/to/file(s)")
    .schema(someSchema)
    .load()

Una delle opzioni è il read mode, che specifica cosa fare in caso di dati malformati. Queste sono le possibilità:

  • permissive (DEFAULT), Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record
  • dropMalformed, non importa i dati malformati
  • failfast, da un messaggio di errore ed interrompe la procedura di lettura

Questi sono i comandi per leggere i formati più comuni:

val dataFrame = spark.read.json("example.json")

val dataFrame = spark.read.csv("example.csv") 

val dataFrame = spark.read.parquet("example.parquet")

val dataFrame = spark.read.text("file.txt")

val dataFrame = spark.read.orc("file.orc")

val dataFrame = spark.read.jdbc(url,"person",properties)

Per i dettagli riguardo il connettere Spark ad un database SQL usando jdbc si può fare riferimento a questo link

Una sintassi equivalente per importare i file è la seguente:

spark.read.format("json").load("example.json")

spark.read.format("csv").load("example.csv")

spark.read.format("parquet").load("example.parquet")

spark.read.format("text").load("file.txt")

  • How to configure options for specific formats

Devo usare il metodo option(“parametro”, “valore”), mettendo anche più option in cascata.

val df = spark.read.option("header", "true").option("inferSchema", "true").csv("exampl1.csv")
//or 
val df2 = spark.read.options(Map(("header", "true"), ("inferSchema","true"))).csv("exampl1.csv")
//or
val df3 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "FAILFAST").load("exampl1.csv")

df: org.apache.spark.sql.DataFrame = [eta: int, amici: int]
df2: org.apache.spark.sql.DataFrame = [eta: int, amici: int]
df3: org.apache.spark.sql.DataFrame = [eta: int, amici: int]

Per vedere le opzioni disponibili per i metodi di DataFrameReader, ad esempio csv, si può fare riferimento alla documentazione della API.
Ad esempio per conoscere tutte le opzioni disponibili per csv si può clikkare sulla freccia per espandere il campo csv (vedere la figura sotto).
Apache Spark DataFrameReader options

Alcune delle opzioni sono per l’importazione dei file csv sono:
– header
– inferSchema
– mode
– …


  • How to read data from non-core formats using format() and load()

Supponiamo di avere un file in cui i dati sono salvati seguendo un formato particolare, per esempio:
– il separatore è il segno meno invece della virgola ,
– nel file ci sono linee di commenti che iniziano con <!–
– il file è indentato per cui ci sono spazi vuoti all’inizio delle righe
– etc.

per ognuno di questi casi particolari trovo una opzione di load.

val df = spark.read.format("txt").option("sep","-").option("inferSchema","true").option("header","true").option("ignoreLeadingWhiteSpace","true").option("comment", "<!--").load(data_file)

  • How to construct and specify a schema using the StructType classes

Come già visto in precedenza, dobbiamo importare StructType and StructField. E costruire lo schema come nel codice sottostante.

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType}

val my_schema = StructType( List(
        StructField("nome", StringType),
        StructField("cognome", StringType, nullable=true),
        StructField("altezza", IntegerType, nullable=true), 
        StructField("peso", FloatType, nullable=true),
        StructField("età", IntegerType)
        )
       )

val df = spark.read.format("csv").schema(my_schema).option("mode","PERMISSIVE").load("exampl2.csv")
df.show()

+−−−−−−−−+−−−−−−−−−+−−−−−−−+−−−−+−−−+
|    nome|  cognome|altezza|peso|età|
+−−−−−−−−+−−−−−−−−−+−−−−−−−+−−−−+−−−+
|   Pippo|  Paolini|    167|72.3| 44|
|   Luigi|    Russo|    178|89.2|  5|
|Giovanna|    Rosso|    175|80.0| 44|
|Giuseppe|  Bianchi|    165|75.8|  9|
|  Amedeo|    Verdi|    167|56.2| 18|
|   Luisa|Valentino|    182|64.9| 15|
+−−−−−−−−+−−−−−−−−−+−−−−−−−+−−−−+−−−+

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType}
my_schema: org.apache.spark.sql.types.StructType = StructType(StructField(nome,StringType,true), StructField(cognome,StringType,true), StructField(altezza,IntegerType,true), StructField(peso,FloatType,true), StructField(età,IntegerType,true))
df: org.apache.spark.sql.DataFrame = [nome: string, cognome: string ... 3 more fields]

  • How to specify a DDL-formatted schema

il metodo schema del DataFrameReader consente di specificare lo schema dei dati usando un formato DDL. Per esempio:

spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")

DataFrameWriter

  • Write data to the “core” data formats (csv, json, jdbc, orc, parquet, text and tables)

Il modo generico di usare il DataFrameWriter è:

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

il formato di default è parquet.

Queste sono le varie possibilità:

df.write.parquet(folder-path)

df.write.orc(folder-path)

df.write.json(folder-path)

df.write.csv(folder-path)

df.write.text(folder-path)

Scrivere un file csv

Esempio scrittura file tsv (tab separated values):

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/my-tsv-file.tsv")

Salvare una tabella

Con saveAsTable() il dataframe viene salvato nelle tabella specificata. La tabella sarà pertanto disponibile in catalog e verrà salvata nell’hive metastore (di default la cartella spark-warehouse).

df.write.saveAsTable("mytable")

Scrivere un json file

Si noti che il numero di file creati è pari al numero di partizioni del dataframe.

// df.write.json(folder-path)
// oppure 

val df = (1 to 10).toDF()

df.write.format("json").save("d:\\AnacondaProjects\\testSave")

df: org.apache.spark.sql.DataFrame = [value: int]

  • Overwriting existing files

Possiamo usare il metodo mode del DataFrameWriter, per indicare cosa fare in caso di file esistenti, per esempio

df.write.mode("append").csv("data.csv")

.mode() definisce cosa fare nel caso in cui dati con lo stesso nome son già presenti nella locazione specificata. Le opzioni disponibile sono:
overwrite sovrascrive i dati
append appende i dati
ignore ignora
error default, da un errore nel runtime

  • How to configure options for specific formats

Opzioni per la srittura di un csv file

Le opzioni per esportare un file in csv sono al link

Qui sotto riportate:

  • sep (default ,): sets a single character as a separator for each field and value.
  • quote (default “): sets a single character used for escaping quoted values where the separator can be part of the value. If an empty string is set, it uses u0000 (null character).
  • escape (default ): sets a single character used for escaping quotes inside an already quoted value.
  • charToEscapeQuoteEscaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise.
  • escapeQuotes (default true): a flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote character.
  • quoteAll (default false): a flag indicating whether all values should always be enclosed in quotes. Default is to only escape values containing a quote character.-
  • header (default false): writes the names of columns as the first line.
  • nullValue (default empty string): sets the string representation of a null value.
  • emptyValue (default “”): sets the string representation of an empty value.
  • encoding (by default it is not set): specifies encoding (charset) of saved csv files. If it is not set, the UTF-8 charset will be used.
  • compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).
  • dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.
  • timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
  • ignoreLeadingWhiteSpace (default true): a flag indicating whether or not leading whitespaces from values being written should be skipped.
  • ignoreTrailingWhiteSpace (default true): a flag indicating defines whether or not trailing whitespaces from values being written should be skipped.
csvFile.write.format("csv").mode("overwrite").option("sep", "\t").save("my-tsv-file.tsv")

df.write.option("sep", ",")
     .option("quote", "U+005C")
     .option("escape", "U+005C")
     .option("charToEscapeQuoteEscaping", "\0")
     .option("escapeQuotes", "true")
     .option("quoteAll", false)
     ...


Opzioni per la scrittura di un file json

Posso settare i seguenti parametri:
compression (default null): definisce l’algoritmo di compressione usato per salvare il file. Si può scegliere tra: none, bzip2, gzip, lz4, snappy and deflate.
dateFormat (default yyyy-MM-dd): setta la stringa che definisce il tipo data. I formati per la data sono indicati in java.text.SimpleDateFormat.
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX): setta la stringa che indica il timestampFormat. I formati per il timestamp sono indicati in java.text.SimpleDateFormat.

  • How to write a data source to 1 single file or N separate files

Di default Spark salva un numero di files pari al numero di partizioni in cui i dati sono divisi. Se voglio cambiare il numero di files generati devo usare i metodi .coalesce() e .repartition() per modificare il numero di partizioni del dataframe originario.

df.coalesce(1).write.csv("D:\\Anacondaprojects\\prova")
df.repartition(10).write.csv("D:\\Anacondaprojects\\prova2")
  • How to write partitioned data

In alternativa, per dividere il file in files più piccoli posso usare il metodo partitionBy(“colonna”).
In questo modo per ogni valore (key) della colonna passata come parametro viene generata una cartella dentro la quale viene salvato il file contenente tutti i dati relativi solo alla key corrente, nel formato desiderato, per esempio csv.

df.write.partitionBy("country").csv("./folder1")

  • How to bucket data by a given set of columns

Il bucketing è una tecnica che usa i buckets (contenitori) per definire il partizionamento dei dati e ridurre lo shuffle dei dati stessi.

Si veda la pagina Bucketing su The internal of Spark SQL.

Nella pagina linkata è mostrato come facendo il join di due dataframe salvati con il metodo .bucketBy() ci si risparmia lo shuffle nell’operazione di join. Si noti che avendo salvato con bucketBy i dati sono pre-shuffled.

Per eseguire il bucketing devo indicare quanti file salvare e su quale colonna di dati fare il bucketing/partizionamento.

val df = spark.read.option("header", "true").option("inferSchema", "true").csv("Datasets/globalpowerplantdatabasev120/*.csv")

df.write
    .mode("overwrite")
    .bucketBy(14, "primary_fuel")
    .saveAsTable("orders_partitioned_bucketed")

df: org.apache.spark.sql.DataFrame = [country: string, country_long: string ... 22 more fields]

spark.catalog.listTables.show()

+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−−+
|                name|database|description|tableType|isTemporary|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−−+
|orders_partitione...| default|       null|  MANAGED|      false|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−−+


Differenza tra PartitionBy() e BucketBy()

I metodi partitioning e bucketing distribuiscono i dati tra diversi file.

Con .partitionBy() i dati vengono distribuiti secondo i valori della colonna indicata. Una query fatta su una colonna usata per il partitioning può trarre grosso beneficio dallo stesso. Di contro, se ci sono troppi file, alcune query possono diventare molto lente.

Con .bucketBy() invece i dati vengono suddivisi in un numero prefissato di file.

DataFrame (Dataset)

  • Have a working understanding of every action such as take(), collect(), and foreach()

con .take(n) ottengo le prime n righe del dataframe

df.take(1)

res13: Array[org.apache.spark.sql.Row] = Array([AFG,Afghanistan,Kajaki Hydroelectric Power Plant Afghanistan,GEODB0040538,33.0,32.322,65.119,Hydro,null,null,null,null,null,GEODB,http://globalenergyobservatory.org,GEODB,1009793,2017,null,null,null,null,null,null])

.collect() invece non prende alcun parametro. come risultato ho tutto il dataframe. Può essere un’operazione “costosa” in quanto tutti i dati dagli executor vengono mandati al driver.

df.collect()

res71: Array[org.apache.spark.sql.Row] = Array([1.0,a], [2.0,b], [3.0,c], [1.0,d], [2.0,e], [3.0,f])

  • Have a working understanding of the various transformations and how they work such as producing a distinct set, filtering data, repartitioning and coalescing, performing joins and unions as well as producing aggregates
  • … filtering data,…
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("globalpowerplantdatabasev120/*.csv")

df: org.apache.spark.sql.DataFrame = [country: string, country_long: string ... 22 more fields]

Allo scopo di avere una visualizzazione più semplice del dataframe, riduco lo stesso ad un numero di colonne più maneggiabile. Seleziono le colonne contenenti i dati di stato, capacità(MGW), alimentazione, anno di costruzione, produzione di energia stimata (GWh).
Per farlo uso il metodo select(), che agisce sulle colonne.

val df_reduced = df.select("country_long", "capacity_mw", "primary_fuel", "commissioning_year", "estimated_generation_gwh")

df_reduced: org.apache.spark.sql.DataFrame = [country_long: string, capacity_mw: double ... 3 more fields]

con filter() seleziono solo le righe in cui la produzione stimata è superiore a 50000GWh

df_reduced.filter('estimated_generation_gwh > 50000).show()

+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−+
|        country_long|capacity_mw|primary_fuel|commissioning_year|estimated_generation_gwh|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−+
|               China|    13050.0|       Hydro|              null|       53622.49078855527|
|               China|    22500.0|       Hydro|            2003.0|        92452.5703250953|
|               China|    12600.0|       Hydro|            2013.0|      51773.439382053366|
|United States of ...|      454.3|        Coal|              null|       450562.6923495511|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−+


  • …producing a distinct set,…

uso distinct() per vedere quali sono tutti gli elementi distinti della colonna country_long

df_reduced.select('country_long).distinct().show

+−−−−−−−−−−−−−−+
|  country_long|
+−−−−−−−−−−−−−−+
|      Paraguay|
|        Russia|
|         Yemen|
|       Senegal|
|        Sweden|
|        Guyana|
|       Eritrea|
|   Philippines|
|      Djibouti|
|      Malaysia|
|     Singapore|
|          Fiji|
|        Turkey|
|        Malawi|
|Western Sahara|
|          Iraq|
|       Germany|
|   Afghanistan|
|      Cambodia|
|        Jordan|
+−−−−−−−−−−−−−−+
only showing top 20 rows


  • …performing joins…

Ciascun dataframe ha il metodo .join(), la cui sintassi più semplice è la seguente:
dataframe1.join(dataframe2, colonna-comune)

La colonna-comune è una colonna che ha lo stesso nome in tutti e due i dataframe.

Il default è un inner join, cioè vengono usate le chiavi comuni ai due dataframe nella colonna (comune) specificata.

Vediamo un esempio:

val df_1 = Seq(
 (0, "zero"),
 (1, "uno"),
 (4, "quattro")).toDF("number", "numberString")

val df_2 = Seq(
 (1, "UNO"),
 (2, "due"),
 (3, "tre")).toDF("number", "numberString")

//se ho una chiave comune posso semplicemente indicare la chiave sulla quale fare il join
df_2.join(df_1, "number").show()

+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+
|number|numberString|numberString|
+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+
|     1|         UNO|         uno|
+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+

df_1: org.apache.spark.sql.DataFrame = [number: int, numberString: string]
df_2: org.apache.spark.sql.DataFrame = [number: int, numberString: string]

se non abbiamo una chiave comune devo usare una espressione del tipo
$"key1"===$"key2"

val df_1 = Seq(
 (0, "zero"),
 (1, "uno"),
 (4, "quattro")).toDF("numberKey1", "numberString")

val df_2 = Seq(
 (1, "UNO"),
 (2, "due"),
 (3, "tre")).toDF("numberKey2", "numberString")

//se ho una chiave comune posso semplicemente indicare la chiave sulla quale fare il join
df_2.join(df_1, $"numberKey1"===$"numberKey2").show()

+−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−+
|numberKey2|numberString|numberKey1|numberString|
+−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−+
|         1|         UNO|         1|         uno|
+−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−+−−−−−−−−−−−−+

df_1: org.apache.spark.sql.DataFrame = [numberKey1: int, numberString: string]
df_2: org.apache.spark.sql.DataFrame = [numberKey2: int, numberString: string]

Non esiste solo l’inner join visto sopra, ma possiamo specificare altri tipi di unione. La definizione del metodo join() in questo caso è:
dataframe1.join(dataframe2, Seq(colonna/e), modo-di-join)

Si noti che devo:

  • raggruppare le colonne su cui voglio fare il join in una sequenza, anche se si tratta di una sola colonna,
  • definire la modalità dell’operazione di join, tra:
    • inner (o innerjoin),
    • cross,
    • outer,
    • full,
    • full_outer (o fullouter),
    • left,
    • left_outer,
    • right,
    • right_outer,
    • left_semi,
    • left_anti (o leftanti)

Con un join di tipo full outer vengono usate tutte le key,comuni e non, dei due dataframe, per le chiavi non comuni si usa null dove manca la chiave:

val df_1 = Seq(
 (0, "zero"),
 (1, "uno"),
 (4, "quattro")).toDF("number", "String1")

val df_2 = Seq(
 (1, "UNO"),
 (2, "due"),
 (3, "tre")).toDF("number", "String2")

//se ho una chiave comune posso semplicemente indicare la chiave sulla quale fare il join
df_2.join(df_1, Seq("number"), "fullouter").show()

+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+
|number|numberString|numberString|
+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+
|     1|         UNO|         uno|
|     3|         tre|        null|
|     4|        null|     quattro|
|     2|         due|        null|
|     0|        null|        zero|
+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+

df_1: org.apache.spark.sql.DataFrame = [number: int, numberString: string]
df_2: org.apache.spark.sql.DataFrame = [number: int, numberString: string]

In un leftanti join vengono usate le chiavi non comuni (da qui il nome anti) e vengono prese solo le colonne del dataframe di sinistra

df_2.join(df_1, Seq("number"), "leftanti").show()

+−−−−−−+−−−−−−−−−−−−+
|number|numberString|
+−−−−−−+−−−−−−−−−−−−+
|     2|         due|
|     3|         tre|
+−−−−−−+−−−−−−−−−−−−+


Con leftouter vengono usate tutte le key, comuni e non, del dataframe di sinistra:

df_2.join(df_1, Seq("number"), "leftouter").show()

+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+
|number|numberString|numberString|
+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+
|     0|        null|        zero|
|     1|         UNO|         uno|
|     4|        null|     quattro|
+−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−−+


con leftsemi si prendono le chiavi comuni ai due dataframe, il dataframe risultate avrà solo la colonna del dataframe left:

df_2.join(df_1, Seq("number"), "leftsemi").show()

+−−−−−−+−−−−−−−−−−−−+
|number|numberString|
+−−−−−−+−−−−−−−−−−−−+
|     1|         UNO|
+−−−−−−+−−−−−−−−−−−−+


  • …and unions…

L’unione è una sorta di join in verticale di due dataframe.

Si noti che, tutto quello che il comando fa è di unire in verticale i due dataframe senza occuparsi della “coerenza” tra i due dataframe, nell’esempio successivo i le colonne dei due dataframe hanno nomi (e probabilmente significati) diversi, ciononostante l’unione viene fatta senza alcun errore.

df_2.union(df_1).show()

+------+-------+
|number|String2|
+------+-------+
|     1|    UNO|
|     2|    due|
|     3|    tre|
|     0|   zero|
|     1|    uno|
|     4|quattro|
+------+-------+
  • …as well as producing aggregates

Con le funzioni di aggregazioni, come indica la denominazione stessa, posso fare una aggregazione della colonna (intera) in un valore, per esempio un valore massimo o una media etc. Le funzioni disponibile per l’aggregazione sono al link RelationalGroupedDataset.

df_2.agg(max($"number")).show

+-----------+
|max(number)|
+-----------+
|          3|
+-----------+
  • …repartitioning and coalescing,…

Con .coalesce() si può ridurre il numero di partizioni minimizzando lo shuffling dei dati.

Con .repartition() posso aumentare o ridurre il numero delle partizione, e ho un reshuffling completo dei dati.

Vediamo come usare questi due comandi.

Innanzitutto allo scopo di dimostrare l’uso di coalesce voglio creare un numero alto di partizioni pertanto setto la dimensione massima della partizione a 1MB

spark.conf.set("spark.sql.files.maxPartitionBytes", 1*1024*1024)

Costruisco un dataframe e controllo su quante partizioni è distribuito:

val df = spark.read.option("header", "true").option("inferSchema", "true").csv("D:\\AnacondaProjects\\Datasets\\globalpowerplantdatabasev120\\*.csv")

println("Numero di partizioni di df: " + df.rdd.getNumPartitions)

Numero di partizioni di df: 8
df: org.apache.spark.sql.DataFrame = [country: string, country_long: string ... 22 more fields]

riduco il numero di partizioni a 4, usando coalesce mi garantisco che sia fatto il minur numero di sposatmento dei dati

val df_coalesce = df.coalesce(4)

println("Numero di partizioni di df_coalesce: " + df_coalesce.rdd.getNumPartitions)

Numero di partizioni di df_coalesce: 4
df_coalesce: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, country_long: string ... 22 more fields]

aumento il numero di partizioni a 12

val df_repartition = df.repartition(12)

println("Numero di partizioni di df_repartition: " + df_repartition.rdd.getNumPartitions)

Numero di partizioni di df_repartition: 12
df_repartition: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, country_long: string ... 22 more fields]

Altro

Salvare e leggere una tabella Hive con la SparkSession

Esempio preso dal sito Databricks a questo link

//drop the table if exists to get around existing table error
spark.sql("DROP TABLE IF EXISTS zips_hive_table")
//save as a hive table
spark.table("zips_table").write.saveAsTable("zips_hive_table")
//make a similar query against the hive table 
val resultsHiveDF = spark.sql("SELECT city, pop, state, zip FROM zips_hive_table WHERE pop > 40000")
resultsHiveDF.show(10)

Seconda parte

Nella seconda parte ci occuperemo delle classi Row e Column e di Spark SQL.

Leave a Reply

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>