... ingestione di dati da un file testo in Apache Spark

Con Apache Spark ho la possibilità di caricare in memoria grosse quantità di dati di diverso formato, csv, txt, json etc.

I dati vengono memorizzati in un dataset o in un dataframe e successivamente processati.

Ci sono diverse strategie per l’ingestione di uno o più file di dati in Apache Spark:

  1. leggere il file come un file di testo, i dati su ciascuna riga vengono memorizzati come una unica stringa, dalla quale posso estrarre le informazioni che mi interessano per inserirle nelle diverse colonne di un dataframe.
  2. leggere il file come un file csv, fornendo uno schema o lasciando che Spark intuisca da solo lo schema da applicare
  3. definire una classe corrispondente ai dati da memorizzare, leggere il file e forzare il casting dei dati alla classe definita

Vediamo come procedere in ciascuno dei tre casi.

Leggere come file di testo

val df = spark.read.textFile("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT").cache
df.show(truncate=false)

+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|value                                                                                                                          |
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|in24.inetnebr.com − − [01/Aug/1995:00:00:01 −0400] "GET /shuttle/missions/sts−68/news/sts−68−mcc−05.txt HTTP/1.0" 200 1839     |
|uplherc.upl.com − − [01/Aug/1995:00:00:07 −0400] "GET / HTTP/1.0" 304 0                                                        |
|uplherc.upl.com − − [01/Aug/1995:00:00:08 −0400] "GET /images/ksclogo−medium.gif HTTP/1.0" 304 0                               |
|uplherc.upl.com − − [01/Aug/1995:00:00:08 −0400] "GET /images/MOSAIC−logosmall.gif HTTP/1.0" 304 0                             |
|uplherc.upl.com − − [01/Aug/1995:00:00:08 −0400] "GET /images/USA−logosmall.gif HTTP/1.0" 304 0                                |
|ix−esc−ca2−07.ix.netcom.com − − [01/Aug/1995:00:00:09 −0400] "GET /images/launch−logo.gif HTTP/1.0" 200 1713                   |
|uplherc.upl.com − − [01/Aug/1995:00:00:10 −0400] "GET /images/WORLD−logosmall.gif HTTP/1.0" 304 0                              |
|slppp6.intermind.net − − [01/Aug/1995:00:00:10 −0400] "GET /history/skylab/skylab.html HTTP/1.0" 200 1687                      |
|piweba4y.prodigy.com − − [01/Aug/1995:00:00:10 −0400] "GET /images/launchmedium.gif HTTP/1.0" 200 11853                        |
|slppp6.intermind.net − − [01/Aug/1995:00:00:11 −0400] "GET /history/skylab/skylab−small.gif HTTP/1.0" 200 9202                 |
|slppp6.intermind.net − − [01/Aug/1995:00:00:12 −0400] "GET /images/ksclogosmall.gif HTTP/1.0" 200 3635                         |
|ix−esc−ca2−07.ix.netcom.com − − [01/Aug/1995:00:00:12 −0400] "GET /history/apollo/images/apollo−logo1.gif HTTP/1.0" 200 1173   |
|slppp6.intermind.net − − [01/Aug/1995:00:00:13 −0400] "GET /history/apollo/images/apollo−logo.gif HTTP/1.0" 200 3047           |
|uplherc.upl.com − − [01/Aug/1995:00:00:14 −0400] "GET /images/NASA−logosmall.gif HTTP/1.0" 304 0                               |
|133.43.96.45 − − [01/Aug/1995:00:00:16 −0400] "GET /shuttle/missions/sts−69/mission−sts−69.html HTTP/1.0" 200 10566            |
|kgtyk4.kj.yamagata−u.ac.jp − − [01/Aug/1995:00:00:17 −0400] "GET / HTTP/1.0" 200 7280                                          |
|kgtyk4.kj.yamagata−u.ac.jp − − [01/Aug/1995:00:00:18 −0400] "GET /images/ksclogo−medium.gif HTTP/1.0" 200 5866                 |
|d0ucr6.fnal.gov − − [01/Aug/1995:00:00:19 −0400] "GET /history/apollo/apollo−16/apollo−16.html HTTP/1.0" 200 2743              |
|ix−esc−ca2−07.ix.netcom.com − − [01/Aug/1995:00:00:19 −0400] "GET /shuttle/resources/orbiters/discovery.html HTTP/1.0" 200 6849|
|d0ucr6.fnal.gov − − [01/Aug/1995:00:00:20 −0400] "GET /history/apollo/apollo−16/apollo−16−patch−small.gif HTTP/1.0" 200 14897  |
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
only showing top 20 rows

df: org.apache.spark.sql.Dataset[String] = [value: string]

tra le funzioni di Spark SQL trovo regexp_extract per eseguire una espressione regolare su ciascuna riga del dataframe.

def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

I parametri della funzione sono:
e: la colonna del dataframe alla quale applicare l’espressione regolare
exp: l’espressione regolare nel formato “””< espress. regolare >”””
groupIdx: se ci sono più matching devo indicare quale voglio selezionare, il primo è 0

import org.apache.spark.sql.functions._

val df_col = df.select(regexp_extract($"value", """[a-zA-z0-9\-\.]+""", 0).alias("host"),
   regexp_extract($"value", """\[([0-9a-z-A-Z\:\/]+)\s""", 1).alias("datetime"),
   regexp_extract($"value", """GET\s([\/a-zA-Z0-9\.\-\_]+)\sHTTP""",1).alias("request"),
   regexp_extract($"value", """\"\s([0-9]+)\s""",1).alias("code"),
   regexp_extract($"value", """\s([0-9]+)$""", 1).alias("bytes")).cache
df_col.show

+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|                host|            datetime|             request|code|bytes|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|   in24.inetnebr.com|01/Aug/1995:00:00:01|/shuttle/missions...| 200| 1839|
|     uplherc.upl.com|01/Aug/1995:00:00:07|                   /| 304|    0|
|     uplherc.upl.com|01/Aug/1995:00:00:08|/images/ksclogo−m...| 304|    0|
|     uplherc.upl.com|01/Aug/1995:00:00:08|/images/MOSAIC−lo...| 304|    0|
|     uplherc.upl.com|01/Aug/1995:00:00:08|/images/USA−logos...| 304|    0|
|ix−esc−ca2−07.ix....|01/Aug/1995:00:00:09|/images/launch−lo...| 200| 1713|
|     uplherc.upl.com|01/Aug/1995:00:00:10|/images/WORLD−log...| 304|    0|
|slppp6.intermind.net|01/Aug/1995:00:00:10|/history/skylab/s...| 200| 1687|
|piweba4y.prodigy.com|01/Aug/1995:00:00:10|/images/launchmed...| 200|11853|
|slppp6.intermind.net|01/Aug/1995:00:00:11|/history/skylab/s...| 200| 9202|
|slppp6.intermind.net|01/Aug/1995:00:00:12|/images/ksclogosm...| 200| 3635|
|ix−esc−ca2−07.ix....|01/Aug/1995:00:00:12|/history/apollo/i...| 200| 1173|
|slppp6.intermind.net|01/Aug/1995:00:00:13|/history/apollo/i...| 200| 3047|
|     uplherc.upl.com|01/Aug/1995:00:00:14|/images/NASA−logo...| 304|    0|
|        133.43.96.45|01/Aug/1995:00:00:16|/shuttle/missions...| 200|10566|
|kgtyk4.kj.yamagat...|01/Aug/1995:00:00:17|                   /| 200| 7280|
|kgtyk4.kj.yamagat...|01/Aug/1995:00:00:18|/images/ksclogo−m...| 200| 5866|
|     d0ucr6.fnal.gov|01/Aug/1995:00:00:19|/history/apollo/a...| 200| 2743|
|ix−esc−ca2−07.ix....|01/Aug/1995:00:00:19|/shuttle/resource...| 200| 6849|
|     d0ucr6.fnal.gov|01/Aug/1995:00:00:20|/history/apollo/a...| 200|14897|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
only showing top 20 rows

import org.apache.spark.sql.functions._
df_col: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, datetime: string ... 3 more fields]

Vediamo per ogni colonna se ci sono elementi nulli o NaN

df_col.agg(sum(when($"host".isNull, 1).otherwise(0)).alias("host-null"),
   sum(when($"datetime".isNull, 1).otherwise(0)).alias("datetime-null"),
   sum(when($"request".isNull, 1).otherwise(0)).alias("request-null"),
   sum(when($"code".isNull, 1).otherwise(0)).alias("code-null"),
   sum(when($"bytes".isNull, 1).otherwise(0)).alias("bytes-null")).show

+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
|host−null|datetime−null|request−null|code−null|bytes−null|
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
|        0|            0|           0|        0|         0|
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+


df_col.agg(sum(when($"host".isNaN, 1).otherwise(0)).alias("host-nan"),
   sum(when($"datetime".isNaN, 1).otherwise(0)).alias("datetime-nan"),
   sum(when($"request".isNaN, 1).otherwise(0)).alias("request-nan"),
   sum(when($"code".isNaN, 1).otherwise(0)).alias("code-nan"),
   sum(when($"bytes".isNaN, 1).otherwise(0)).alias("bytes-nan")).show

+−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−+
|host−nan|datetime−nan|request−nan|code−nan|bytes−nan|
+−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−+
|       0|           0|          0|       0|        0|
+−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−+


df_col.select($"datetime").count

res38: Long = 1043177

Sembra che questo approccio abbia funzionato bene e i valori siano stati estratti correttamente in ogni riga.

Leggere il file come csv

Vediamo innanzitutto come è composta una riga:

in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839

possiamo usare lo spazio come separatore e poi eliminare le colonne che non servono.

val df = spark.read.format("csv").option("header","false").option("sep"," ").load("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT").drop("_c1","_c2","_c4").toDF("host","datetime","request"
    ,"code","bytes").cache
df.show(5)

+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|             host|            datetime|             request|code|bytes|
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|
|  uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
only showing top 5 rows

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, datetime: string ... 3 more fields]

df.select( sum(when($"host".isNull, 1).otherwise(0)).alias("host-null"),
   sum(when($"datetime".isNull, 1).otherwise(0)).alias("datetime-null"),
   sum(when($"request".isNull, 1).otherwise(0)).alias("request-null"),
   sum(when($"code".isNull, 1).otherwise(0)).alias("code-null"),
   sum(when($"bytes".isNull, 1).otherwise(0)).alias("bytes-null")).show

+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
|host−null|datetime−null|request−null|code−null|bytes−null|
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
|        0|            0|           0|        0|         0|
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+


Vediamo lo schema del dataframe

df.printSchema

root
 |−− host: string (nullable = true)
 |−− datetime: string (nullable = true)
 |−− request: string (nullable = true)
 |−− code: string (nullable = true)
 |−− bytes: string (nullable = true)


Applichiamo describe ad ogni colonna del dataframe

df.describe("host","datetime","request","code","bytes").show()

+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
|summary|                host|            datetime|             request|              code|             bytes|
+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
|  count|             1043177|             1043177|             1043177|           1043177|           1043177|
|   mean|                null|                null|                null|210.77027485611083|17679.736875431394|
| stddev|                null|                null|                null| 33.52356783510582| 68832.10308836344|
|    min|         ***.novo.dk|[01/Aug/1995:00:0...|               GET /|               200|                 −|
|    max|zztduffy.slip.cc....|[22/Aug/1995:23:5...|POST /shuttle/mis...|         HTTP/1.0"|             99981|
+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+


Dalla tabella sopra ottengo due importanti informazioni:
– nella colonna request ci sono sia GET che POST
– nella colonna code alcuni valori sono sbagliati, contengono la stringa “HTTP/1.0”“ invece dei codici numerici
– nella colonna bytes alcuni valori sono non numerici e sono rappresentati come ”-” per questo non è risultato come Null o NaN

Filtriamo gli elementi di code che sono malformati, ovvero che contengono valori non numerici

df.filter($"code" contains "HTTP/1.0").show(truncate=false)

+−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−+−−−−−+
|host                     |datetime             |request                                              |code     |bytes|
+−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−+−−−−−+
|jhanley.doe.state.la.us  |[08/Aug/1995:11:24:31|GET /ksc.html                                        |HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:18:45:48|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:18:45:49|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|128.95.234.46            |[10/Aug/1995:19:25:40|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:20:08:13|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:20:08:13|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:20:08:41|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:20:08:41|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:20:09:31|GET /shuttle/missions/sts−70/images/KSC−95EC−o667.gif|HTTP/1.0"|404  |
|redx3.cac.washington.edu |[10/Aug/1995:20:09:32|GET /shuttle/missions/sts−70/images/KSC−95EC−o667.gif|HTTP/1.0"|404  |
|macba305.aerov.jussieu.fr|[22/Aug/1995:11:03:15|GET /shuttle/missions/sts−9/sts−9−patch−small.gif    |HTTP/1.0"|404  |
|macba305.aerov.jussieu.fr|[22/Aug/1995:11:03:18|GET /shuttle/missions/sts−45/sts−45−patch−small.gif  |HTTP/1.0"|404  |
|macba305.aerov.jussieu.fr|[22/Aug/1995:11:03:19|GET /shuttle/missions/sts−57/sts−57−patch−small.gif" |HTTP/1.0"|404  |
+−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−+−−−−−+


Ci dovremmo chiedere come mai questo errore nell’ingestione di queste righe. Dovremmo cercare di filtrare queste righe senza però suddividere in colonne.

Ci torna utile leggere il file come textfile e filtrare le righe che contengono le richieste del filtro sopra.

val df_text = spark.read.textFile("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT").filter(
 ($"value" contains "GET /ksc.html") && ($"value" contains "08/Aug/1995:11:24:31") ||
 ($"value" contains "/shuttle/missions/sts-70/images/ksc-95ec-o667.gif") && (($"value" contains "10/Aug/1995:18:45:48") ||
                    ($"value" contains "10/Aug/1995:18:45:49") ||
                    ($"value" contains "10/Aug/1995:19:25:40") ||
                    ($"value" contains "10/Aug/1995:20:08:13") ||
                    ($"value" contains "10/Aug/1995:20:08:41") ||
                    ($"value" contains "10/Aug/1995:20:09:31") ||
                    ($"value" contains "10/Aug/1995:20:09:32")) ||
 (($"value" contains "GET /shuttle/missions/sts-9/sts-9-patch-small.gif") && ($"value" contains "22/Aug/1995:11:03:15")) ||
 (($"value" contains "GET /shuttle/missions/sts-45/sts-45-patch-small.gif") && ($"value" contains "22/Aug/1995:11:03:18")) ||
 (($"value" contains "GET /shuttle/missions/sts-57/sts-57-patch-small.gif") && ($"value" contains "22/Aug/1995:11:03:19")))

df_text.show(5, truncate=false)


+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|value                                                                                                                            |
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|jhanley.doe.state.la.us − − [08/Aug/1995:11:24:31 −0400] "GET /ksc.html" HTTP/1.0" 404 −                                         |
|redx3.cac.washington.edu − − [10/Aug/1995:18:45:48 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −|
|redx3.cac.washington.edu − − [10/Aug/1995:18:45:49 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −|
|128.95.234.46 − − [10/Aug/1995:19:25:40 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −           |
|redx3.cac.washington.edu − − [10/Aug/1995:20:08:13 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −|
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
only showing top 5 rows

df_text: org.apache.spark.sql.Dataset[String] = [value: string]

Come si vede le righe in questione sono tutte malformate, cioè invece di essere nel formato "GET <url> HTTP/1.0" hanno degli extra “.

Si noti che ” è il simbolo quote di default nel csv dataframe reader, cioè quello che vi è tra virgolette viene considerato come un’unica stringa anche se c’è un carattere separator (lo spazio nel nostro caso) in mezzo. Per questo motivo "GET <url> HTTP/1.0" è visto come una unica stringa. Se invece ci sono altre virgolette in mezzo la lettura e la separazione della riga in field non funzionerà.

A questo punto ho due opzioni:
– correggere queste righe manualmente
– eliminare queste righe

Visto che non sono molte, e che non mi sembrano particolarmente importanti (anche se sono tutti codici 404, page not found) decido di eliminare queste righe.

val df_clean = df.filter(not(col("code").contains("HTTP"))).cache
df_clean.show


+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|                host|            datetime|             request|code|bytes|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|   in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|
|     uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304|    0|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687|
|piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304|    0|
|        133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 200| 7280|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
only showing top 20 rows

df_clean: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, datetime: string ... 3 more fields]

Potrebbe essere una buona idea quella di convertire in interi le ultime due colonne.

Definiamo una funzione in Scala e poi la convertiamo in UDF (User Defined Function) per usarla in Spark.

La funzione st_to_int definita sotto converte una stringa in interi. Una struttura try-catch fa in modo che quando non sia possibile convertire a intero, perchè ho una riga malformata, la funzione ritorna -9999 o 0 nel caso che bytes sia uguale a “-”.

//val st_to_int : String => Int = x => x.toInt

def st_to_int (x: String): Int = {
 
 try {
  x.toInt
 } catch {
  case ex: NumberFormatException => 
   { if (x == "-") 0 else -9999 }
 }
 
}
val udf_st_to_int = udf(st_to_int _)

val df_conv = df_clean.withColumn("code", udf_st_to_int($"code")).withColumn("bytes", udf_st_to_int($"bytes"))
df_conv.show(5)
df_conv.printSchema

+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|             host|            datetime|             request|code|bytes|
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|
|  uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
only showing top 5 rows

root
 |−− host: string (nullable = true)
 |−− datetime: string (nullable = true)
 |−− request: string (nullable = true)
 |−− code: integer (nullable = false)
 |−− bytes: integer (nullable = false)

st_to_int: (x: String)Int
udf_st_to_int: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))
df_conv: org.apache.spark.sql.DataFrame = [host: string, datetime: string ... 3 more fields]

Verifichiamo di avere corretto tutti i problemi usando la funzione describe sui dataframe prima e dopo.

df.describe("code","bytes").show

+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
|summary|              code|             bytes|
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
|  count|           1043177|           1043177|
|   mean|210.77027485611083|17679.736875431394|
| stddev| 33.52356783510582| 68832.10308836344|
|    min|               200|                 −|
|    max|         HTTP/1.0"|             99981|
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+


df_conv.describe("code","bytes").show

+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+
|summary|              code|            bytes|
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+
|  count|           1043164|          1043164|
|   mean|210.77027485611083|17531.77418219954|
| stddev| 33.52356783510582|68562.39834125541|
|    min|               200|                0|
|    max|               501|          3421948|
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+


👌

Importare i dati in un dataset

Se uso un dataset devo definire un tipo per ogni riga di dati. In questo modo i miei dati tipizzati

case class server_connection(host: String, datetime: String, request: String, code: Int, bytes: Int)

defined class server_connection

Per fare il mapping dei singoli field della classe server_connection le colonne devono avere gli stessi nomi dei field, quindi in queto caso: host,datetime,request,code,bytes.

Per convertire in Dataset[server_connection] devo usare la funzione .as[server_connection]

val df = spark.read.format("csv").option("header","false").option("sep"," ")
   .schema("host string, _c1 string, _c2 string, datetime string, _c4 string, request string, code int, bytes int")
   .load("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT")
   .drop("_c1","_c2","_c4")
   .as[server_connection]
   .cache
df.show

+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|                host|            datetime|             request|code|bytes|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|   in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|
|     uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304|    0|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687|
|piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304|    0|
|        133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 200| 7280|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
only showing top 20 rows

df: org.apache.spark.sql.Dataset[server_connection] = [host: string, datetime: string ... 3 more fields]

df.printSchema

root
 |−− host: string (nullable = true)
 |−− datetime: string (nullable = true)
 |−− request: string (nullable = true)
 |−− code: integer (nullable = true)
 |−− bytes: integer (nullable = true)


Possiamo anche fare un .describe sulle colonne code e bytes e vediamo che non ci sono campi/righe malformate.

Inoltre poiché describe è una operazione che viene eseguita su tutte le colonne ci potrà rivelare se ci sono problemi con con qualche record.

df.describe("host","datetime","request","code","bytes").show

+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+
|summary|                host|            datetime|             request|              code|            bytes|
+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+
|  count|             1034421|             1034421|             1034421|           1034421|          1034421|
|   mean|                null|                null|                null| 209.3868917974403|17679.95398681968|
| stddev|                null|                null|                null|29.757618057686102|68832.50836237728|
|    min|         ***.novo.dk|[01/Aug/1995:00:0...|               GET /|               200|                0|
|    max|zztduffy.slip.cc....|[22/Aug/1995:23:5...|POST /cgi−bin/new...|               500|          3421948|
+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+


Vediamo se ci sono elementi Null.

df.select( sum(when($"host".isNull, 1).otherwise(0)).alias("host-null"),
   sum(when($"datetime".isNull, 1).otherwise(0)).alias("datetime-null"),
   sum(when($"request".isNull, 1).otherwise(0)).alias("request-null"),
   sum(when($"code".isNull, 1).otherwise(0)).alias("code-null"),
   sum(when($"bytes".isNull, 1).otherwise(0)).alias("bytes-null")).show

+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
|host−null|datetime−null|request−null|code−null|bytes−null|
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
|     8756|         8756|        8756|     8756|      8756|
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+


Come si vede per 8756 record non è riuscito ad applicare lo schema pertanto ha riportato il record come null

df.filter($"host".isNull).show

+−−−−+−−−−−−−−+−−−−−−−+−−−−+−−−−−+
|host|datetime|request|code|bytes|
+−−−−+−−−−−−−−+−−−−−−−+−−−−+−−−−−+
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
|null|    null|   null|null| null|
+−−−−+−−−−−−−−+−−−−−−−+−−−−+−−−−−+
only showing top 20 rows


Dopo un po’ di investigazioni ci accorgiamo che i record null corrispondono ai casi in cui bytes=="-"

Eseguo una espressione regolare sul record in formato testo per vedere che effetivamente ottengo lo stesso numero: 8756.

La espressione regolare è semplicemente “”“-$”””, ove – è il simbolo da cercare, $ indica la fine della riga.

val df_raw = spark.read.textFile("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT")

def find_pattern(x: String) = {
val pattern = """-$""".r

pattern.findFirstIn(x) != None
 
}

val find_pattern_udf = udf(find_pattern _)

df_raw.filter( find_pattern_udf($"value") ).show(5, truncate=false)

println(s"Ho questo numero di matching: ${df_raw.filter( find_pattern_udf($"value") ).count}")

+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|value                                                                                                                      |
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|gw1.att.com − − [01/Aug/1995:00:03:53 −0400] "GET /shuttle/missions/sts−73/news HTTP/1.0" 302 −                            |
|js002.cc.utsunomiya−u.ac.jp − − [01/Aug/1995:00:07:33 −0400] "GET /shuttle/resources/orbiters/discovery.gif HTTP/1.0" 404 −|
|tia1.eskimo.com − − [01/Aug/1995:00:28:41 −0400] "GET /pub/winvn/release.txt HTTP/1.0" 404 −                               |
|itws.info.eng.niigata−u.ac.jp − − [01/Aug/1995:00:38:01 −0400] "GET /ksc.html/facts/about_ksc.html HTTP/1.0" 403 −         |
|grimnet23.idirect.com − − [01/Aug/1995:00:50:12 −0400] "GET /www/software/winvn/winvn.html HTTP/1.0" 404 −                 |
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
only showing top 5 rows

Ho questo numero di matching: 8756
df_raw: org.apache.spark.sql.Dataset[String] = [value: string]
find_pattern: (x: String)Boolean
find_pattern_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType)))

Verificato.

Adesso eliminiamo i record nulli.

val df_clean = df.filter($"bytes".isNotNull)

df_clean.show(5)

df_clean.count

+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|             host|            datetime|             request|code|bytes|
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
|in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|
|  uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|
|  uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+
only showing top 5 rows

df_clean: org.apache.spark.sql.Dataset[server_connection] = [host: string, datetime: string ... 3 more fields]
res55: Long = 1034421

Per ripulire i campi datetime e request possiamo procedere come in precedenza.

Per il campo datetime occorre prendere tutti i caratteri dal secondo fino alla fine.

val clean: String => String = x => x.slice(1,x.size)

val clean_udf = udf(clean)

val df_clean_datetime = df_clean.withColumn("clean_datetime", clean_udf($"datetime"))

df_clean_datetime.show

df_clean_datetime.describe("clean_datetime")

+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+
|                host|            datetime|             request|code|bytes|      clean_datetime|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+
|   in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|01/Aug/1995:00:00:01|
|     uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|01/Aug/1995:00:00:07|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|01/Aug/1995:00:00:08|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|01/Aug/1995:00:00:08|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|01/Aug/1995:00:00:08|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713|01/Aug/1995:00:00:09|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304|    0|01/Aug/1995:00:00:10|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687|01/Aug/1995:00:00:10|
|piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853|01/Aug/1995:00:00:10|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202|01/Aug/1995:00:00:11|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635|01/Aug/1995:00:00:12|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173|01/Aug/1995:00:00:12|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047|01/Aug/1995:00:00:13|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304|    0|01/Aug/1995:00:00:14|
|        133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566|01/Aug/1995:00:00:16|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 200| 7280|01/Aug/1995:00:00:17|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866|01/Aug/1995:00:00:18|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743|01/Aug/1995:00:00:19|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849|01/Aug/1995:00:00:19|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897|01/Aug/1995:00:00:20|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+
only showing top 20 rows

clean: String => String = <function1>
clean_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
df_clean_datetime: org.apache.spark.sql.DataFrame = [host: string, datetime: string ... 4 more fields]
res56: org.apache.spark.sql.DataFrame = [summary: string, clean_datetime: string]

Per il campo request posso usare una espressione regolare

val regexp = """\s*\/+([a-zA-Z0-9\/\-\_]*)\s*""".r

val func1: String => String = x => regexp.findFirstIn(x).getOrElse("")

val udf_func1 = udf(func1)

val df_clean_request = df_clean_datetime.withColumn("clean_request", udf_func1($"request"))
df_clean_request.show

+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|                host|            datetime|             request|code|bytes|      clean_datetime|       clean_request|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
|   in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|01/Aug/1995:00:00:01| /shuttle/mission...|
|     uplherc.upl.com|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 304|    0|01/Aug/1995:00:00:07|                  / |
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304|    0|01/Aug/1995:00:00:08| /images/ksclogo−...|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304|    0|01/Aug/1995:00:00:08| /images/MOSAIC−l...|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304|    0|01/Aug/1995:00:00:08| /images/USA−logo...|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713|01/Aug/1995:00:00:09| /images/launch−logo|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304|    0|01/Aug/1995:00:00:10| /images/WORLD−lo...|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687|01/Aug/1995:00:00:10| /history/skylab/...|
|piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853|01/Aug/1995:00:00:10| /images/launchme...|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202|01/Aug/1995:00:00:11| /history/skylab/...|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635|01/Aug/1995:00:00:12| /images/ksclogos...|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173|01/Aug/1995:00:00:12| /history/apollo/...|
|slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047|01/Aug/1995:00:00:13| /history/apollo/...|
|     uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304|    0|01/Aug/1995:00:00:14| /images/NASA−log...|
|        133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566|01/Aug/1995:00:00:16| /shuttle/mission...|
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|      GET / HTTP/1.0| 200| 7280|01/Aug/1995:00:00:17|                  / |
|kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866|01/Aug/1995:00:00:18| /images/ksclogo−...|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743|01/Aug/1995:00:00:19| /history/apollo/...|
|ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849|01/Aug/1995:00:00:19| /shuttle/resourc...|
|     d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897|01/Aug/1995:00:00:20| /history/apollo/...|
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+
only showing top 20 rows

regexp: scala.util.matching.Regex = \s*\/+([a−zA−Z0−9\/\−\_]*)\s*
func1: String => String = <function1>
udf_func1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
df_clean_request: org.apache.spark.sql.DataFrame = [host: string, datetime: string ... 5 more fields]

Rimuovo le colonne inutili

val new_df = df_clean_request.drop("datetime","request").cache

new_df.show(truncate=false)

+−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|host                       |code|bytes|clean_datetime      |clean_request                                   |
+−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
|in24.inetnebr.com          |200 |1839 |01/Aug/1995:00:00:01| /shuttle/missions/sts−68/news/sts−68−mcc−05    |
|uplherc.upl.com            |304 |0    |01/Aug/1995:00:00:07| /                                              |
|uplherc.upl.com            |304 |0    |01/Aug/1995:00:00:08| /images/ksclogo−medium                         |
|uplherc.upl.com            |304 |0    |01/Aug/1995:00:00:08| /images/MOSAIC−logosmall                       |
|uplherc.upl.com            |304 |0    |01/Aug/1995:00:00:08| /images/USA−logosmall                          |
|ix−esc−ca2−07.ix.netcom.com|200 |1713 |01/Aug/1995:00:00:09| /images/launch−logo                            |
|uplherc.upl.com            |304 |0    |01/Aug/1995:00:00:10| /images/WORLD−logosmall                        |
|slppp6.intermind.net       |200 |1687 |01/Aug/1995:00:00:10| /history/skylab/skylab                         |
|piweba4y.prodigy.com       |200 |11853|01/Aug/1995:00:00:10| /images/launchmedium                           |
|slppp6.intermind.net       |200 |9202 |01/Aug/1995:00:00:11| /history/skylab/skylab−small                   |
|slppp6.intermind.net       |200 |3635 |01/Aug/1995:00:00:12| /images/ksclogosmall                           |
|ix−esc−ca2−07.ix.netcom.com|200 |1173 |01/Aug/1995:00:00:12| /history/apollo/images/apollo−logo1            |
|slppp6.intermind.net       |200 |3047 |01/Aug/1995:00:00:13| /history/apollo/images/apollo−logo             |
|uplherc.upl.com            |304 |0    |01/Aug/1995:00:00:14| /images/NASA−logosmall                         |
|133.43.96.45               |200 |10566|01/Aug/1995:00:00:16| /shuttle/missions/sts−69/mission−sts−69        |
|kgtyk4.kj.yamagata−u.ac.jp |200 |7280 |01/Aug/1995:00:00:17| /                                              |
|kgtyk4.kj.yamagata−u.ac.jp |200 |5866 |01/Aug/1995:00:00:18| /images/ksclogo−medium                         |
|d0ucr6.fnal.gov            |200 |2743 |01/Aug/1995:00:00:19| /history/apollo/apollo−16/apollo−16            |
|ix−esc−ca2−07.ix.netcom.com|200 |6849 |01/Aug/1995:00:00:19| /shuttle/resources/orbiters/discovery          |
|d0ucr6.fnal.gov            |200 |14897|01/Aug/1995:00:00:20| /history/apollo/apollo−16/apollo−16−patch−small|
+−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+
only showing top 20 rows

new_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, code: int ... 3 more fields]


					

Leave a Reply

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>