Dataframes classic intro

Let's check how counting words works for dataframes

scala> val rddTextFile=sc.textFile("LICENSE")

scala> rddTextFile.count
res1: Long = 953

Ok and now time for Dataframe

scala> val fileDF=rddTextFile.toDF("line")
fileDF: org.apache.spark.sql.DataFrame = [line: string]

We can check that we have a very sophisticated schema

scala> fileDF.printSchema
root
 |-- line: string (nullable = true)

And display soem content from DataFrame

scala> fileDF.show(10)
+--------------------+
|                line|
+--------------------+
|                    |
|                 ...|
|                 ...|
|                 ...|
|                    |
|   TERMS AND COND...|
|                    |
|     1. Definitions.|
|                    |
|      "License" s...|
+--------------------+
only showing top 10 rows

It's time to see how dataframes are represented in Web UI. We can see that mapping between what we do in REPL and what we see in UI is not as obvious as in case of RDDs. This is the price for higher abstraction.

And let's see that after all everything is mapped to RDD

scala> val wordsDF=fileDF.explode("line","word"){(line:String) => line.split(" ")}
wordsDF: org.apache.spark.sql.DataFrame = [line: string, word: string]

//Or with some magic
scala> val wordsDF=fileDF.explode("line","word"){(_:String).split(" ")}
wordsDF: org.apache.spark.sql.DataFrame = [line: string, word: string]

scala> wordsDF.printSchema
root
 |-- line: string (nullable = true)
 |-- word: string (nullable = true)

scala> wordsDF.show(10)
+--------------------+----+
|                line|word|
+--------------------+----+
|                    |    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
|                 ...|    |
+--------------------+----+
only showing top 10 rows

For now results are not very exciting..

Column

This is actually an interesting moment when we are going outside simple "SQL table analogy". Look we can taka column from dataframe

scala> val wordCol=wordsDF("word")
wordCol: org.apache.spark.sql.Column = word

Now we can use columns in a function which carefully prepares domain filtering condition

scala> val condition=wordCol!==""
condition: org.apache.spark.sql.Column = NOT (word = )

scala> val notEmptyDF=wordsDF.select(wordCol).filter(condition)
notEmptyDF: org.apache.spark.sql.DataFrame = [word: string]

scala> notEmptyDF.show(10)
+--------------------+
|                word|
+--------------------+
|              Apache|
|             License|
|             Version|
|                2.0,|
|             January|
|                2004|
|http://www.apache...|
|               TERMS|
|                 AND|
|          CONDITIONS|
+--------------------+
only showing top 10 rows

That is a lot better!

Reduction

val result=wordsDF.filter($"word"!=="").groupBy($"word").count
scala> result.show(10)


+--------------------+-----+
|                word|count|
+--------------------+-----+
|          beneficial|    1|
|              ACTION|    5|
|               1995,|    2|
|                (now|    1|
|(net.sourceforge....|    1|
|              rights|   13|
|          derivative|   14|
|http://www.string...|    2|
|          litigation|    2|
|         conditions:|    5|
+--------------------+-----+

result.orderBy(result("count")).show(10)
+--------------------+-----+
|                word|count|
+--------------------+-----+
|           Pervasive|    1|
|             thereof|    1|
|               scopt|    1|
|           "License"|    1|
|            3-clause|    1|
|          BEOPEN.COM|    1|
|                (now|    1|
|          beneficial|    1|
|                (all|    1|
|(net.sourceforge....|    1|
+--------------------+-----+


// And finally
scala> result.orderBy(result("count").desc).show(10)
+----+-----+
|word|count|
+----+-----+
|   #|  279|
| the|  279|
|  of|  154|
|  to|  127|
| and|  126|
|  or|  124|
|  OR|  124|
|  OF|  102|
|  in|   89|
| THE|   69|
+----+-----+
only showing top 10 rows

Let's check UI to see what actually Happened.

There is a lot of new things which we didn't saw when working with RDD. Let's take a closer look at the last phase.

So everything is mapped into RDDs but some are quite intersting ones like : "MapPartitionWithPreparationRDD". Catalyst optimizer in action.

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

Ok, now we need to filter out stop words.

UDFs

scala> val stopWords=Set("#","the","of","to","and","or","in","the")

Let's now create a function to filter stop words

scala> val notStopWord:String=>Boolean = s=> !stopWords.contains(s)
notStopWord: String => Boolean = <function1>

scala> notStopWord("of")
res59: Boolean = false

scala> notStopWord("szczebrzeszyn")
res60: Boolean = true

Is using stop words in function a good Idea or should we somehow send this set to workers before calling lambdas?

Important Note - It is a FUNCTION! Not a stored procedure but a function which is easy to test and compose!!! End of important note

Let's register (or lift - for FP fans) our function to use columns level.

scala> val notStopWordUDF=sqlContext.udf.register("notStopWord",notStopWord)
notStopWordUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,List())

And now time for a Combo

notEmptyDF.filter(notStopWordUDF(wordCol)).groupBy(wordCol).count.orderBy($"count".desc).show(10)

During workshops we will discuss why this line works. And the result is almost good.

+-------+-----+
|   word|count|
+-------+-----+
|     OR|  124|
|     OF|  102|
|    THE|   69|
|   this|   63|
|    ANY|   61|
|    any|   50|
|      a|   49|
|License|   45|
|   that|   44|
|      -|   44|
+-------+-----+

We can also use predefined function "lower" which oeprates on dataframe column.

scala> notEmptyDF.select(lower(wordCol).alias("word")).filter(notStopWordUDF($"word")).groupBy($"word").count.orderBy($"count".desc).show(10)
+---------+-----+
|     word|count|
+---------+-----+
|      any|  111|
|     this|   94|
|      for|   86|
|copyright|   65|
|  license|   64|
|        a|   64|
| software|   56|
|      not|   50|
|       by|   49|
|       is|   48|
+---------+-----+
only showing top 10 rows

Composition

If don't want to lower each word in file then we can compose function "lower" with our predefined function. But there is one problem : "lower" has type :

scala> lower _
res84: org.apache.spark.sql.Column => org.apache.spark.sql.Column = <function1>

And our defined function is actially a case class which needs to be converted into function

scala> notStopWordUDF.apply _
res86: Seq[org.apache.spark.sql.Column] => org.apache.spark.sql.Column = <function1>

And we have another problem because our function expectects sequence of Columns (varargs in apply). So to compose those two operations we need to create "glue lambda" (I don't know how to call it in other words)

val removeStopWords=(lower _) andThen (c=>notStopWordUDF.apply(c))

scala> notEmptyDF.filter(removeStopWords($"word").alias("word")).groupBy($"word").count.orderBy($"count".desc).show(10)
+-------+-----+                                                                 
|   word|count|
+-------+-----+
|   this|   63|
|    ANY|   61|
|    any|   50|
|      a|   49|
|License|   45|
|   that|   44|
|      -|   44|
|    PSF|   38|
|     is|   37|
|     by|   36|

We can see that results are slightly different because we haven't converted every word to lower.

Full Program

sc.textFile("LICENSE").toDF("line")
.explode("line","word"){(line:String) => line.split("")}
.filter($"word"!=="").filter(removeStopWords($"word").alias("word"))
.groupBy($"word").count.orderBy($"count".desc).show(10)
+-------+-----+
|   word|count|
+-------+-----+
|   this|   63|
|    ANY|   61|
|    any|   50|
|      a|   49|
|License|   45|
|   that|   44|
|      -|   44|
|    PSF|   38|
|     is|   37|
|     by|   36|
+-------+-----+

And we arew orking on a level of abstraction which don't use words like map or flatMap.

We can also check Details for SQL

and also go in details through optimalization phases for our operations

== Parsed Logical Plan ==
Limit 11
 Sort [count#96L DESC], true
  Aggregate [word#94], [word#94,count(1) AS count#96L]
   Filter UDF(lower(word#94))
    Filter NOT (word#94 = )
     Generate UserDefinedGenerator(line#92), true, false, None, [word#94]
      Project [_1#91 AS line#92]
       LogicalRDD [_1#91], MapPartitionsRDD[283] at stringRddToDataFrameHolder at <console>:30

== Analyzed Logical Plan ==
word: string, count: bigint
Limit 11
 Sort [count#96L DESC], true
  Aggregate [word#94], [word#94,count(1) AS count#96L]
   Filter UDF(lower(word#94))
    Filter NOT (word#94 = )
     Generate UserDefinedGenerator(line#92), true, false, None, [word#94]
      Project [_1#91 AS line#92]
       LogicalRDD [_1#91], MapPartitionsRDD[283] at stringRddToDataFrameHolder at <console>:30

== Optimized Logical Plan ==
Limit 11
 Sort [count#96L DESC], true
  Aggregate [word#94], [word#94,count(1) AS count#96L]
   Project [word#94]
    Filter (NOT (word#94 = ) && UDF(lower(word#94)))
     Generate UserDefinedGenerator(line#92), true, false, None, [word#94]
      Project [_1#91 AS line#92]
       LogicalRDD [_1#91], MapPartitionsRDD[283] at stringRddToDataFrameHolder at <console>:30

== Physical Plan ==
TakeOrderedAndProject(limit=11, orderBy=[count#96L DESC], output=[word#94,count#96L])
 ConvertToSafe
  TungstenAggregate(key=[word#94], functions=[(count(1),mode=Final,isDistinct=false)], output=[word#94,count#96L])
   TungstenExchange hashpartitioning(word#94)
    TungstenAggregate(key=[word#94], functions=[(count(1),mode=Partial,isDistinct=false)], output=[word#94,currentCount#99L])
     TungstenProject [word#94]
      Filter (NOT (word#94 = ) && UDF(lower(word#94)))
       !Generate UserDefinedGenerator(line#92), true, false, [line#92,word#94]
        ConvertToSafe
         TungstenProject [_1#91 AS line#92]
          Scan PhysicalRDD[_1#91]

Code Generation: true

Read CSV

We need to start Spark like this :

bin/spark-shell --packages com.databricks:spark-csv_2.10:1.3.0

So it will automatically download csv plugin.

val plDF=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("E0.csv"

This is what we will use : http://www.football-data.co.uk/mmz4281/1415/E0.csv

and here is column description: http://www.football-data.co.uk/notes.txt

And we can see quite big schema

scala> plDF.printSchema
root
 |-- Div: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- FTHG: integer (nullable = true)
 |-- FTAG: integer (nullable = true)
 |-- FTR: string (nullable = true)
 |-- HTHG: integer (nullable = true)
 |-- HTAG: integer (nullable = true)
 |-- HTR: string (nullable = true)
 |-- Referee: string (nullable = true)
 |-- HS: integer (nullable = true)
 |-- AS: integer (nullable = true)
 |-- HST: integer (nullable = true)
(...and more...)

Full SQL

Now let's see soemthign else. We can actually map dataframe to notmal sql by registering temp table

scala> plDF.registerTempTable("premierleague")

val footFrame=sqlContext.sql ("select HomeTeam as team,HS as shoots ,HST shoots_target from premierleague")
footFrame: org.apache.spark.sql.DataFrame = [team: string, shoots: int, shoots_target: int]

scala> footFrame.show(10)
+----------+------+-------------+
|      team|shoots|shoots_target|
+----------+------+-------------+
|   Arsenal|    14|            6|
| Leicester|    11|            3|
|Man United|    14|            5|
|       QPR|    19|            6|
|     Stoke|    12|            2|
| West Brom|    10|            5|
|  West Ham|    18|            4|
| Liverpool|    12|            5|
| Newcastle|    12|            0|
|   Burnley|     9|            2|
+----------+------+-------------+

Now let's create UDF which counts percentage of shots on target

scala> val accuracy : (Int,Int) => Double = (shootsTarget,shoots) => (shootsTarget.toDouble/ shoots.toDouble)*100
accuracy: (Int, Int) => Double = <function2>

//We can use easily scalacheck here
scala> accuracy(5,10)
res9: Double = 50.0

scala> accuracy(3,20)
res10: Double = 15.0

scala> accuracy(7,20)
res11: Double = 35.0

Lift pure fucntion to column

scala> val accuracyUDF =udf(accuracy)
accuracyUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,DoubleType,List(IntegerType, IntegerType))

Add Accuracy column

scala> val accFrame =footFrame.withColumn("accuracy",accuracyUDF($"shoots_target",$"shoots"))
accFrame: org.apache.spark.sql.DataFrame = [team: string, shoots: int, shoots_target: int, accuracy: double]

scala> accFrame.show(10)
+----------+------+-------------+------------------+
|      team|shoots|shoots_target|          accuracy|
+----------+------+-------------+------------------+
|   Arsenal|    14|            6|42.857142857142854|
| Leicester|    11|            3| 27.27272727272727|
|Man United|    14|            5|35.714285714285715|
|       QPR|    19|            6| 31.57894736842105|
|     Stoke|    12|            2|16.666666666666664|
| West Brom|    10|            5|              50.0|
|  West Ham|    18|            4| 22.22222222222222|
| Liverpool|    12|            5| 41.66666666666667|
| Newcastle|    12|            0|               0.0|
|   Burnley|     9|            2| 22.22222222222222|
+----------+------+-------------+------------------+

Now some serious Data Science!!

Ten the most accurate teams in 2014

scala> accFrame.groupBy($"team").agg(avg("accuracy").alias("avgAcc")).orderBy($"avgAcc".desc).show(10)
+--------------+------------------+
|          team|            avgAcc|
+--------------+------------------+
|       Chelsea| 41.98844127382444|
|       Arsenal| 39.17117668688947|
|    Man United|37.850357151513364|
|       Swansea| 36.67447241893226|
|      Man City|35.149588998836165|
|   Southampton|34.675709091582874|
|Crystal Palace|34.149016165881086|
|     West Brom| 33.04894986110493|
|          Hull|  32.4177945784594|
|       Everton| 31.75502043536099|
+--------------+------------------+
only showing top 10 rows

Ten the least accurate teams in 2014

scala> accFrame.groupBy($"team").agg(avg("accuracy").alias("avgAcc")).orderBy($"avgAcc".asc).show(10)
+-----------+------------------+
|       team|            avgAcc|
+-----------+------------------+
|      Stoke| 25.31579664558531|
| Sunderland|26.755766163660898|
|Aston Villa| 28.98439280018227|
|  Leicester|30.022138526293656|
|  Newcastle| 30.67953658873489|
|        QPR| 30.79045800306188|
|  Liverpool|  31.4443402420427|
|  Tottenham|31.506379478246483|
|   West Ham|31.648911477832154|
|    Burnley|31.745868510574393|
+-----------+------------------+

Read from Hive -

To show that Spark is not living outside big data world it should be interesting to show how it cooperates with Hive.

To do it let's use recent Cloudera QuickStart VM 5.5 which has Spark version 1.5.0

http://www.cloudera.com/content/www/en-us/downloads/quickstart_vms/5-5.html

Then you need to perform first exercise during which you are going to import data from mysql to hive with sqoop

Now Let's copy hive config to spark config folder to allow spark read hive tables.

[cloudera@quickstart spark]$ sudo cp /etc/hive/conf/hive-site.xml /etc/spark/conf/hive-site.xml

And we can play

val customersDF=sqlContext.sql("select customer_id,customer_fname,customer_lname from customers")

scala> customersDF.printSchema
root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)


scala> val ordersDF=sqlContext.sql("select order_id,order_customer_id from orders where order_status = 'COMPLETE' ")
ordersDF: org.apache.spark.sql.DataFrame = [order_id: int, order_customer_id: int]

scala> ordersDF.printSchema
root
 |-- order_id: integer (nullable = true)
 |-- order_customer_id: integer (nullable = true)

Let's do a simple join

scala> val joined=customersDF.join(ordersDF,$"customer_id" === $"order_customer_id")
joined: org.apache.spark.sql.DataFrame = [customer_id: int, customer_fname: string, customer_lname: string, order_id: int, order_customer_id: int]

scala> joined.printSchema
root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- order_customer_id: integer (nullable = true)

And DATASCIENCE!!!

Top 10 customers

scala> joined.groupBy($"customer_lname",$"customer_fname").count.orderBy($"count".desc).show(10)
+--------------+--------------+-----+                                           
|customer_lname|customer_fname|count|
+--------------+--------------+-----+
|         Smith|          Mary| 3153|
|         Smith|        Robert|   99|
|         Smith|         James|   95|
|         Smith|          John|   86|
|         Smith|         David|   82|
|         Smith|       William|   66|
|         Jones|          Mary|   64|
|         Smith|       Michael|   62|
|         Smith|       Matthew|   50|
|         Smith|   Christopher|   49|
+--------------+--------------+-----+
only showing top 10 rows

Indeed there is a big gang of Smiths in the Hive

SparkR

As the last exercise we can look a little bit closer at SparkR and R language from where Dataframes concepts was borrowed

First of all we need to have R language installed and then we can start SparkR

~/bin/Spark$ bin/sparkR

R version 3.2.2 (2015-08-14) -- "Fire Safety"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R has and example dataframe called mtcars which we will use for this tutorial

> mtcars
                     mpg cyl  disp  hp drat    wt  qsec vs am gear carb
Mazda RX4           21.0   6 160.0 110 3.90 2.620 16.46  0  1    4    4
Mazda RX4 Wag       21.0   6 160.0 110 3.90 2.875 17.02  0  1    4    4
Datsun 710          22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1
Hornet 4 Drive      21.4   6 258.0 110 3.08 3.215 19.44  1  0    3    1
Hornet Sportabout   18.7   8 360.0 175 3.15 3.440 17.02  0  0    3    2
Valiant             18.1   6 225.0 105 2.76 3.460 20.22  1  0    3    1
Duster 360          14.3   8 360.0 245 3.21 3.570 15.84  0  0    3    4
Merc 240D           24.4   4 146.7  62 3.69 3.190 20.00  1  0    4    2
Merc 230            22.8   4 140.8  95 3.92 3.150 22.90  1  0    4    2
Merc 280            19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
Merc 280C           17.8   6 167.6 123 3.92 3.440 18.90  1  0    4    4

Let's create SparkR dataframe from R dataframe - first attempt

> sparkCars <- createDataFrame(sqlContext, mtcars)
> sparkCars
DataFrame[mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]
> head(sparkCars)
   mpg cyl disp  hp drat    wt  qsec vs am gear carb
1 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
2 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
3 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
4 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
5 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
6 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

We don't have column names so let's try it again

> carNames<-rownames(mtcars)
> carNames
 [1] "Mazda RX4"           "Mazda RX4 Wag"       "Datsun 710"         
 [4] "Hornet 4 Drive"      "Hornet Sportabout"   "Valiant"            
 [7] "Duster 360"          "Merc 240D"           "Merc 230"           
[10] "Merc 280"            "Merc 280C"           "Merc 450SE"         
[13] "Merc 450SL"          "Merc 450SLC"         "Cadillac Fleetwood" 
[16] "Lincoln Continental" "Chrysler Imperial"   "Fiat 128"           
[19] "Honda Civic"         "Toyota Corolla"      "Toyota Corona"      
[22] "Dodge Challenger"    "AMC Javelin"         "Camaro Z28"         
[25] "Pontiac Firebird"    "Fiat X1-9"           "Porsche 914-2"      
[28] "Lotus Europa"        "Ford Pantera L"      "Ferrari Dino"       
[31] "Maserati Bora"       "Volvo 142E"         
> frameWithNames<-cbind(carNames,mtcars)
> sparkCars<-createDataFrame(sqlContext,frameWithNames)
> head(sparkCars)
           carNames  mpg cyl disp  hp drat    wt  qsec vs am gear carb
1         Mazda RX4 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
2     Mazda RX4 Wag 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
3        Datsun 710 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
4    Hornet 4 Drive 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
5 Hornet Sportabout 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
6           Valiant 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

Now let's filter out all cars which have less or 6 cylinders.

> bigEngine<-sparkCars[sparkCars$cyl > 6]
> head(bigEngine)
            carNames  mpg cyl  disp  hp drat   wt  qsec vs am gear carb
1  Hornet Sportabout 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
2         Duster 360 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
3         Merc 450SE 16.4   8 275.8 180 3.07 4.07 17.40  0  0    3    3
4         Merc 450SL 17.3   8 275.8 180 3.07 3.73 17.60  0  0    3    3
5        Merc 450SLC 15.2   8 275.8 180 3.07 3.78 18.00  0  0    3    3
6 Cadillac Fleetwood 10.4   8 472.0 205 2.93 5.25 17.98  0  0    3    4

And finally we will take a hp column and calculate average speed of those cars

> mhp <- mean(bigEngine$hp)
> mhp
Column avg(hp) 
> showDF(select(bigEngine,mhp))
+------------------+
|           avg(hp)|
+------------------+
|209.21428571428572|
+------------------+

And this should be more than enough for an interesting introduction to DataFrames in Spark!

links

https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/ https://www.mapr.com/ebooks/spark/chapter05-processing-tabular-data.html http://www.nodalpoint.com/dataframes-from-csv-files-in-spark-1-5-automatic-schema-extraction-neat-summary-statistics-elementary-data-exploration/ http://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/ https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html https://dzone.com/articles/using-apache-spark-dataframes-for-processing-of-ta http://blog.knoldus.com/2015/10/21/using-spark-dataframes-for-word-count/

results matching ""

    No results matching ""