Spark Fundamentals

Our First Spark Program

In the following we are using the third-party package findspark to easily locate the Spark installation on our system:

In [1]:
import findspark
findspark.init()

Afterwards, we are able to import the pyspark module:

In [2]:
import pyspark

Our first Spark program will be an approximation of the number $\pi$ - not the most efficient one, but our program is going to use several fundamental features of Spark.

First, we create a SparkContext to communicate with the Spark cluster, and give our program a name:

In [3]:
sc = pyspark.SparkContext(appName="Pi")

Now to the implementation of our algorithm: Here is the first function we need. It generates a random point in a square of size 1x1. (If you are wondering about the argument x, one argument is required for this to be used later as an argument to the map operation.)

In [4]:
import random
In [5]:
def random_point(x):
    return (random.random(), random.random())

We need to define one more function: The following function is given a point $p = (x,y)$ and checks whether $x^2 + y^2$ is less than 1:

In [6]:
def inside(p):
    x, y = p
    return x*x + y*y < 1

The following code generates a large array of random points, distributes it to the cluster and calls our function once for each of them. It then counts the number of times our function returned True - which is approximately equal to $\pi$. Finally, it calls the stop method of the Spark context to terminate the program - this is necessary before we can create a new Spark context.

In [7]:
%%time
num_samples = 100000000
count = sc.range(num_samples).map(random_point).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()
3.14078208
CPU times: user 19 ms, sys: 8.37 ms, total: 27.4 ms
Wall time: 45.3 s

(Curious why this works? Read more on how to calculate $\pi$ via Monte Carlo approximation)

SparkContext and the Architecture of a Spark Program

Let us take apart the program step by step and look at each one. Our first step was to create a SparkContext.

In [8]:
sc = pyspark.SparkContext(appName="Pi")

The SparkContext represents the connection to a Spark cluster and allows your Spark driver application to access the cluster. Each driver application commands a number of executors. These are processes that run on the nodes of the cluster and perform the actual computation and storage operations. They remain running as long as the driver application has a SparkContext. A program called the cluster manager sits between driver and executors and manages the resources given to the Spark program - for example, fairly assigning the number of nodes given to each program when multiple users are working on the cluster.

Architecture of a spark progarm

Source: MSDN

In our example program, we use the SparkContext to create a dataset of points in a parallel way and also distribute it to the executors. We start by creating a range of numbers up to the number of samples.

In [9]:
%%time
rdd = sc.range(num_samples)
CPU times: user 2.74 ms, sys: 2.36 ms, total: 5.1 ms
Wall time: 7.94 ms

With a call to range, we have created our first Resilient Distributed Dataset or RDD.

Resilient Distributed Datasets (RDDs)

The RDD is Spark's core data type, and it is in essence a distributed collection of data objects. It is resilient because it can cope with failure of some nodes of the cluster - and in a large distributed system, things like hardware failure become something that needs to be anticipated.

Transformations and Actions

Now to generate random points, we want to map each number from our range to a new random point:

In [10]:
%time
rdd = rdd.map(random_point)
CPU times: user 6 µs, sys: 2 µs, total: 8 µs
Wall time: 13.1 µs

In the next step, we want to keep only the points that pass our filter criterion defined in the inside function above:

In [11]:
%%time
rdd = rdd.filter(inside)
CPU times: user 29 µs, sys: 2 µs, total: 31 µs
Wall time: 36.7 µs

Understanding Lazy Evaluation

Take a closer look at the running time measurement: Calling the map and filter methods on the RDD containing lots of random points was finished in a fraction of a second - this is because no actual computation on the data has happened yet. At this point, we need to understand the concept of lazy evaluation and learn about the difference between Spark transformations and actions.

The filter method is a transformation. It is called on an RDD and returns another RDD. What actually happens under the hood when calling the method is not the computation of an RDD. When we call a transformation, we only add an operation to an operator graph (to be precise, a directed acyclic graph of RDDs and operations). This graph is an abstract description of the computation to be performed - basically, a roadmap of how an RDD is turned into another one. When working with big data, not computing immediately (lazy evaluation) has several advantages. Our computation might take very long, and we can first build a sequence of operations before triggering the computation and waiting for the result. Also (without going to much into the technical details of Spark), building the operator graph first allows Spark to analyze it before and coming up with an optimized plan for the actual computation (e.g. by combining operations that can be performed together, by copying or moving around as little data as possible, ...).

The last Spark operation in our program is a call to the count method of the RDD, which simply counts the number of elements in it. Watch the running time:

In [12]:
%%time
count = rdd.count()
CPU times: user 19.1 ms, sys: 7.35 ms, total: 26.5 ms
Wall time: 55.4 s

count is not a transformation but an action. When we invoke an action to retrieve a result, the actual computation is triggered. That is, the operator graph of transforms we have built before is turned into an execution plan, and the executors perform the computation on the nodes of the cluster. Then, a result is returned to the driver process.

In [13]:
pi = 4 * count / num_samples
print(pi)
sc.stop()
3.14161168

Common Transformations and Actions

Here we list some commonly used transformations and actions. Spark provides many more.

Transformations on a single RDD

method purpose
rdd.map(f) apply a function to each element in the RDD and return an RDD of the result
rdd.flatMap(f) apply a function to each element in the RDD and return an RDD of the contents of the iterators returned
rdd.filter(f) return an RDD consisting of only elements that pass the condition passed
rdd.distinct() return RDD with duplicates removed
rdd.sample(withReplacement, fraction, [seed]) sample an RDD, with or without replacement

Transformations on two RDDs

method purpose
rdd.union(other) produce an RDD containing elements from both RDDs
rdd.intersection(other) produce an RDD containing elements in the intersection of both RDDs
rdd.subtract(other) produce RDD with the contents of the other RDD removed from the first one

Actions on an RDD

method purpose
rdd.collect() return all elements
rdd.count() return number of elements
rdd.take(num) return num elements
rdd.reduce(f) combine elements as defined in f in parallel and return result

Exercise: Transformations and Actions

In [14]:
# EXERCISE: try the transformations and actions described above on our RDD of random points
sc = pyspark.SparkContext(appName="Exercise")
# write your code here
In [15]:
sc.stop()

Exercise: MapReduce

You may have heard of MapReduce in the context of big data. The name MapReduce originally referred to the proprietary technology of Google, where it was pioneered, but today it refers generally to a certain programming model for processing big data sets with a parallel, distributed algorithm on a cluster. As the name says, it relies on two main operations, map and reduce. And as you have seen, both are available in Spark: Sparks programming model includes MapReduce as part of its much larger set of transformations and actions.

As an exercise, try impelemting our $\pi$ approximation algorithm with map and reduce instead of filter and count.

In [16]:
# EXERCISE: approximation of pi using map and reduce

Caching

Intermediate RDDs in the operator graph of our program are computed on demand when asking for a result via an action. In our example program, the filtered version of the RDD (containing only the points for which the inside function has returned True) is recomputed every time we call count:

In [17]:
sc = pyspark.SparkContext(appName="PiCached")
In [18]:
%%time
count = sc.range(num_samples).map(random_point).filter(inside).count()
CPU times: user 17.8 ms, sys: 7.32 ms, total: 25.1 ms
Wall time: 51.8 s
In [19]:
%%time
count = sc.range(num_samples).map(random_point).filter(inside).count()
CPU times: user 20.4 ms, sys: 6.39 ms, total: 26.8 ms
Wall time: 49 s

This is often efficient when working with big data. However, in some cases (think of an iterative algorithm that needs to go over the intermediate RDD many times), it is more efficient to store an intermediate RDD rather than recomputing it. This can be done via Spark's functionality for caching. Here, we try to cache the RDD containing the points, then filtering and counting twice:

In [20]:
cached_rdd = sc.range(num_samples).map(random_point).cache()
In [21]:
%%time
count = cached_rdd.filter(inside).count()
CPU times: user 12.2 ms, sys: 5.14 ms, total: 17.3 ms
Wall time: 1min 11s
In [22]:
%%time
count = cached_rdd.filter(inside).count()
CPU times: user 13.7 ms, sys: 7.96 ms, total: 21.6 ms
Wall time: 1min 21s
In [23]:
pi = 4 * count / num_samples
print(pi)
sc.stop()
3.1418192

This notebook is licensed under a Creative Commons Attribution 4.0 International License (CC BY 4.0). Copyright © 2018 Point 8 GmbH