50.043 Spark¶
Learning Outcomes¶
By the end of this lesson, you are able to
- Differentiate Hadoop MapReduce and Spark
- Apply Spark based on application requirements
- Develop data engineering process using Spark RDD
- Reason about Spark Execution Model
- Develop machine learning application using Spark MLLib
- Explain Spark Architecture
- Develop Data processing application using Spark Dataframe
- Develop Machine Learning application using Spark ML package
- Explain Spark Streaming
Spark VS Hadoop MapReduce¶
Hadoop MapReduce was created to batch process data once or twice, e.g. web search index, processing crawled data, etc.
When machine learning being applied to big data, the terabyte or zeta byte of data probably need to be processed iteratively. For instance, if we need to run gradient descent thousands of times.
On top of that data visualization for big data also impose further challenge. It requires data to be reprocessed based on the user inputs such as sorting and filtering constraints.
Hadoop MapReduce is no longer suitable for these applications because of the following 1. each of mapper (and educer) task needs to transfer intermediate data to the disk back and forth. 2. its rigit computation model (i.e. one step of map followed by one step of reduce) makes most of the application look unnecessarily complex. 3. it does not utilize much of RAM. Many of the MapReduce applications are disk and network I/O bound rather than RAM and CPU bound. 4. it is hard to redistribute the workload
Apache Spark is a project which started off as an academic reseach idea and became an enterprise level success. It addressed the above-mentioned limitations of the Hadoop MapReduce by introducing the following features
- Resilient distributed datasets, which act as the primary data structure for distributed map reduce operation
- It unions all the available RAM from the cluster (all data nodes) to form a large pool of virtual RAM. RDD and its derivative such as dataframe and dataset, are in memory parallel distributed data structure to be used in the virtual RAM pool.
- It has a MapReduce like programming interface (closer to our toy MapReduce library compared to the Hadoop MapReduce)
- It offers fault tolerance, RDD, dataframe and datasets can always be re-computed in case of node failure
- Machine Learning Libraries, Graph computation libraries.
Spark supports many mainstream programming languagues such as Scala, Python, R, Java and SQL. In this module we consider the Python interface.
Spark RDD API¶
The primary data structure of the Spark RDD API is the RDD.
In the above code snippet, we initialize a list of integers and turn it into an RDD distData
.
Alternatively, we can load the data from a file.
Given the RDD, we can now perform data manipulation. There are two kinds of RDD APIs, namely Transformations and Actions. Transformation APIs are lazy and action APIs are strict.
Lazy vs Strict Evaluation¶
In lazy evaluation, the argument of a function is not fully computed until it is needed. We can mimic this in Python using generator and iterator.
takeOne(l)
is called, only the first element of l
is computed, the rest are discarded.
In strict evaluation, the argument of a function is alwaysfully computed.
takeOneStrict(l)
is called, even though only the first element of l
is required, the rest are computed eagerly, thanks to the list()
function call materializing the generator.
RDD Transformations¶
All Transformations takes the current RDD as (part of) the input and return some a new RDD as output.
Let l
, l1
and l2
be RDDs, f
be a function
RDD APIs | Description | Toy MapReduce equivalent |
---|---|---|
l.map(f) |
map(f,l) |
|
l.flatMap(f) |
flatMap(f,l) |
|
l.filter(f) |
filter(f,l) |
|
l.reduceByKey(f) |
reduceByKey(f,l) |
|
l.mapPartition(f) |
similar to map , f takes an iterator and produces an iterator |
NA |
l.distinct() |
all distinct elems | N.A. |
l.sample(b,ratio,seed) |
sample dataset. b : a boolean value to indicate w/wo replacement. ratio : a value range [0,1] |
N.A. |
l.aggregateByKey(zv)(sop,cop) |
zv : accumulated value. sop : intra-partition aggregation function. cop : inter-partition aggregation function |
similar to reduceByKey(f,l,acc) , except that we don't have 2 version of f |
l1.union(l2) |
union l1 l2 |
l1 + l2 |
l1.intersection(l2) |
the intersection of elements from l1 and l2 |
N.A. |
l1.groupByKey() |
group elemnts by keys | shuffle(l1) |
l1.sortByKey() |
sort by keys | N.A. |
l1.join(l2) |
join l1 l2 by keys |
we've done it in lab |
l1.cogroup(l2) |
similar to join , it returns RDDs of (key, ([v1,..], [v2,..])) , [v1,...] are values from l1 , [v2,...] are values from l2 |
N.A. |
Note that the RDDS APIs follow the builtin Scala library's convention, map
,
filter
and etc are methods of the List class.
RDD Actions¶
All Actions takes the current RDD as (part of) the input and return some value that is not an RDD. It forces computation to happen.
RDDs | Desc | Toy MR |
---|---|---|
l.reduce(f) |
reduce(f,l) |
|
l.collect() |
converts rdd to a local array | |
l.count() |
len(l) |
|
l.first() |
l[0] |
|
l.take(n) |
returns an array | l[:n] |
l.saveAsTextFile(path) |
save rdd to text file | N.A. |
l.countByKey() |
return hash map of key and count | N.A. |
l.foreach(f) |
run a function for each element in the dataset with side-effects | for x in l: ... |
Special Transformation¶
Some transformation/opereations such as reduceByKey
, join
, groupByKey
and sortByKey
will trigger a shuffle event, in which
Spark redistribute the data across partititon, which means intermediate results from lazy operations will be materialized.
Wordcount example in Spark¶
The follow code snippet, we find the wordcount application implemented using PySpark.
We can compare it with the version in toy MapReduce, and find many similarities
We will see more examples of using PySpark in the lecture and the labs.
Spark Architecture¶
Next we consider the how Spark manages the execution model.
In the following, we find the Spark Architecture (in simple standalone mode)
Like Hadoop, Spark follows a simple master and worker architecture. A machine is dedicated to manage the Spark cluster, which is known as the Spark Master node (c.f. namenode in a Hadoop Cluster). A set of machines are in charge of running the actual computation, namely, the Spark Worker nodes (c.f. data nodes in a Hadoop Cluster).
A driver is a program that runs on the Spark Master node, which manages the interaction between the application and the client. Upon a job submission from the client, a Spark driver schedules the jobs by analyzing the sub tasks dependency and allocate the sub tasks to the executors. A list of executors (runnning on some worker nodes) receive tasks from the driver and reports the result upon completion.
Spark Task Scheduling¶
In this section, we take a look at how Spark divides a given job into tasks and schedules the tasks to the workers.
As illustrated by the above diagram Spark takes 4 stages to schedule the job.
- Given a job, Spark builds a directed acyclic graph (DAG) which represents the dependencies among the operations performed in the job.
- Given the DAG, Spark splits the graph into multiple stages of tasks.
- Tasks are scheduled according to their stages, A later stage must not be started until the earlier stages are completed.
- Tasks scheduled to the workers then executed.
A simple example¶
Let's consider a simple example
A spark driver takes the above program and construct the DAG, since it contain just a read followed by a map operations. It has the following DAG.graph TD
r1 --map--> r2
It is clear that there is only one stage, (because there is only one operation, one set of inputs and one set of outputs).
The Task Scheduler allocate the tasks in parallel, as follows.
A less simple example¶
Let's consider another example
The DAG is as follows
graph TD;
r1 --map--> r2
r3 --map--> r4
r2 --join--> r5
r4 --join--> r5
r5 --groupByKey--> r6
r6 --map--> r7
r7 --reduce--> r8
There are mulitple way of dividing the above DAG into stages. For instance, we could naively turns each operation (each arrow) into a stage (we end up with many stages and poor parallelization) or we group the simple paths (straight line paths) into a stage.
Spark takes a more intelligent approach by classifying different types of dependencies.
-
Narrow dependencies - Input partition used by at most 1 output partition
-
Wide dependencies - Input partition used by more than one output partitions
Thus for Spark Job 2
, we further annotate the DAG with dependency types.
graph TD;
r1 --map/narrow--> r2
r3 --map/narrow--> r4
r2 --join/narrow--> r5
r4 --join/wide--> r5
r5 --groupByKey/wide--> r6
r6 --map/narrow--> r7
r7 --reduce--> r8
All the map operations are narrow. The join of r2
and r4
should be wide. However since the result must be ordered either by r2
's partition order or r4
's. Only one side of the operation is wide. In this case, we assume r5
's partition follows r2
's, hence r2
to r5
is narrow. groupByKey
operations are wide.
Next we can decide how many stages we need by
- allowing narrow dependencies preceding the same wide dependency to the grouped under the same stage, because they do not incur any network I/O.
- We apply task parallelism here.,
- wide dependency initiates a new stage, as we need to wait for all the data operands to be fully computed before the operation being executed.
graph TD;
r1 --map/narrow/1--> r2
r3 --map/narrow/1--> r4
r2 --join/narrow/2--> r5
r4 --join/wide/2--> r5
r5 --groupByKey/wide/3--> r6
r6 --map/narrow/3--> r7
r7 --reduce/3--> r8
In the above, we stage the first two map operations. The join operation is assigned to the 2nd stage. The groupByKey
initiates the 3rd stage which includes the following map and reduce.
Spark performance tuning¶
Knowing how Spark schedules a job into stages of sub tasks. We can optimize the job by rewriting it into an equivalent one such that
- Pre-partition or re-partition of the data
- Cache the common intermediate data
Pre-partition or re-partition of the data¶
Recall that a spark job is represented a DAG of stages
If we know the join operation and pre- (or re-) arrange the data according to the key to partition mapping, we could reduce the shuffling.
let assume that r1
is by default partitioned randomly into two partitions
and so is r2
.
next we would like join r1
with r2
by key, i.e. the first component of the pairs
the result will be stored in three partitions
Note that the actual order of the collected result might not be same as above, Spark tries to reuse partitions being created whenever possible. (For breivity we omit an empty partition p4
.)
As observed, there are tuple transferred from p1
to p2
, e.g. (2, 'B')
and from p2
to p1
, e.g. (1, 'E')
. These transfers can be eliminited
if we manually control the partitioning of the initial RDDs,
when we perform the join
we have
- Sample code can be found here.
Cache the intermediate data¶
Recall that in Spark transformation are lazy until a shuffling is required. Laziness is a double-edged sword.
In the above, the orange partition (data) is being used by 3 different "sinks". If all the operations above the last levels are transformations (i.e. lazy.) Having 3 sink operations will require most of the intermediate partitions (all boxes except for the last)
For example consider the following
There are two downstream operation of r3
, namely r4
computing the standard deviation by key, and r5
computing the minimum by key.
Due to the fact that r3
is lazy. The r4.collect()
action (sink) triggers the computation of r1
to r3
and r4
. The r5.collect()
triggers the recomputation of r1
to r3
and the computation of r5
.
If we materialize r3
and cache it, we would avoid the recomputation of r1
to r3
.
- Sample code
Other optimization tricks¶
Besides the above mentioned two approaches, a trick that we've used in RDBMS for optimization is also applicable to Spark. i.e. apply filter as early as possible so as to reduce the size of intermediate output.
Another Spark specific optimization trick is to rewriting groupByKey().map(...)
by reduceByKey(...)
. However in some cases, a rewrite solution might not exist.
Spark Failure Recovery¶
Recall that for any MapReduce implementation to produce deterministic results. The computation must be pure.
It turns out that purity property makes failure recovery much easier. For each Spark job, a lineage of the sub tasks is computed. In the event of failure, the Spark driver will refer to the lineage and recompute the affected sub-tasks. Thanks to the purity property, partially completed and incomplete computation can always be computed without the need of restoring the original state.
For instance consider the following job
Where all the square boxes denote the partitions of some RDDs, suppose partition C1
is faulty.
Based on the diagram (lineage), we know that we can recompute C1
elsewhere by using B1
and B2
.
Spark DataFrame¶
Besides Spark RDD, Spark offers DataFrame as a higher level API interface to the programmers and data engineers. The ussage is influenced and inspired by Python's Pandas.
In a nutshell, a dataframe can be seenas a schema plus a set of RDDs.
Since Dataframe was designed for machine learning applications, it adopts the column-based data structure instead of row based.
Question: Why Columnar?¶
Hint: How data are used in ML model?
Creating Spark Dataframe¶
We can convert a Spark rdd into a dataframe.
Alternatively, we can create DataFrames directly from a CSV file.
Given file hdfs://127.0.0.1:9000/foo.csv
Note in the above, Spark loads a text file (CSV) from HDFS and infers the schema based on the first line and the values.
DataFrame APIs¶
Let's have tour of the Spark DataFrame APIs by going through some examples.
Here is the data we use in the examples
studentid | name | module | score |
---|---|---|---|
100001 | Ace | 50043 | 90 |
100002 | Brandon | 50043 | 95 |
100003 | Cheryl | 50043 | 80 |
Column Projection¶
To project (select the columns) we use the .select()
method.
studentid | score |
---|---|
100001 | 90 |
100002 | 95 |
100003 | 80 |
To compute a new column based on the existing one, we use an overloaded version of the .select()
method whose first argument is the operation and the second argument is the name of the new column.
100001@mymail.sut... |
100002@mymail.sut... |
100003@mymail.sut... |
For the full set of builtin funtcions for column operations.
There are times we want to create a new column and keeping the old columns.
studentid | name | module | score | |
---|---|---|---|---|
100001 | Ace | 50043 | 90 | 100001@mymail.sut... |
100002 | Brandon | 50043 | 95 | 100002@mymail.sut... |
100003 | Cheryl | 50043 | 80 | 100003@mymail.sut... |
Row filtering¶
For row filterings, we use .filter()
method.
studentid | name | module | score |
---|---|---|---|
100003 | Cheryl | 50043 | 80 |
And similar for range filter
studentid | name | module | score |
---|---|---|---|
100002 | Brandon | 50043 | 95 |
lit()
is optional here, pyspark inserts it for us.
Group By and Aggregation¶
For aggregation, we use .groupBy()
module | avg(score) |
---|---|
50043 | 88.33333333333333 |
Join¶
For join, we use .join()
studentid | name | module | score | modname |
---|---|---|---|---|
100001 | Ace | 50043 | 90 | Database and Big ... |
100002 | Brandon | 50043 | 95 | Database and Big ... |
100003 | Cheryl | 50043 | 80 | Database and Big ... |
Spark SQL¶
Besides Spark RDD, Spark allows program to use SQL query to perform data transformation and action.
For example,
studentid | name | module | score |
---|---|---|---|
100001 | Ace | 50043 | 90 |
100002 | Brandon | 50043 | 95 |
100003 | Cheryl | 50043 | 80 |
With some notebook support, we can even use SQL to perform data visualization.
Spark Machine Learning¶
Spark comes with two differen Machine Learning libraries.
MLLib
package - for RDDML
package - for dataframe (and dataset)
MLLib package¶
MLLib package offers a lower level access of data type such as vectors and label points.
Vectors¶
In Spark, vectors are local data collection (non-distributed). There are dense vectors and sparse vectors.
-
For dense vector - all values need to be specified when it is created
-
For sparse vector - we don't need to specify all the value, instead we specify the size of the vector as well as the non-zero values.
Labeled points¶
With features extracted as vectors. We need to find a way to label them.
Spark MLLib comes with its own labeled point data type
Training an inference¶
With labeled points defined and extracted, assuming
we train the model
which is a support vector machine with SGD algorithm.
To perform inference, we need to feed a new sample as a vector to the model.
ML Package¶
As the ML package is targetting the higher level data structures (Dataframe and Dataset), machine learning models in ML package are built using the pipeline.
The Training Pipeline¶
One of the training pipelines is known as the estimator
In the diagram above, it illustrate the pipeline of training a classifier using logistic regression.
The transformer pipeline¶
The inference pipeline on the other hand is known as the transformer
Sample code¶
Spark Streaming¶
Spark offers Streaming API which handles real time (infinite) input data.
The real-timeness is approxmiated by chopping the data stream into small batches. These small batches are fed to the spark application.
For example the following is a simplified version of a data streaming application that computes the page views by URL over time.
Additional References¶
- Spark Dataframe
- https://spark.apache.org/docs/latest/sql-getting-started.html
- Spark MLLib and ML package
- https://spark.apache.org/docs/latest/ml-guide.html
- Spark Streaming
- https://spark.apache.org/docs/latest/streaming-programming-guide.html