In the following we are using the third-party package findspark
to easily locate the Spark installation on our system:
import findspark
findspark.init()
Afterwards, we are able to import the pyspark
module:
import pyspark
Our first Spark program will be an approximation of the number $\pi$ - not the most efficient one, but our program is going to use several fundamental features of Spark.
First, we create a SparkContext
to communicate with the Spark cluster, and give our program a name:
sc = pyspark.SparkContext(appName="Pi")
Now to the implementation of our algorithm: Here is the first function we need. It generates a random point in a square of size 1x1. (If you are wondering about the argument x
, one argument is required for this to be used later as an argument to the map
operation.)
import random
def random_point(x):
return (random.random(), random.random())
We need to define one more function: The following function is given a point $p = (x,y)$ and checks whether $x^2 + y^2$ is less than 1:
def inside(p):
x, y = p
return x*x + y*y < 1
The following code generates a large array of random points, distributes it to the cluster and calls our function once for each of them. It then counts the number of times our function returned True
- which is approximately equal to $\pi$. Finally, it calls the stop
method of the Spark context to terminate the program - this is necessary before we can create a new Spark context.
%%time
num_samples = 100000000
count = sc.range(num_samples).map(random_point).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()
(Curious why this works? Read more on how to calculate $\pi$ via Monte Carlo approximation)
Let us take apart the program step by step and look at each one. Our first step was to create a SparkContext
.
sc = pyspark.SparkContext(appName="Pi")
The SparkContext
represents the connection to a Spark cluster and allows your Spark driver application to access the cluster. Each driver application commands a number of executors. These are processes that run on the nodes of the cluster and perform the actual computation and storage operations. They remain running as long as the driver application has a SparkContext
. A program called the cluster manager sits between driver and executors and manages the resources given to the Spark program - for example, fairly assigning the number of nodes given to each program when multiple users are working on the cluster.
Source: MSDN
In our example program, we use the SparkContext
to create a dataset of points in a parallel way and also distribute it to the executors. We start by creating a range of numbers up to the number of samples.
%%time
rdd = sc.range(num_samples)
With a call to range
, we have created our first Resilient Distributed Dataset or RDD.
The RDD is Spark's core data type, and it is in essence a distributed collection of data objects. It is resilient because it can cope with failure of some nodes of the cluster - and in a large distributed system, things like hardware failure become something that needs to be anticipated.
Now to generate random points, we want to map each number from our range to a new random point:
%time
rdd = rdd.map(random_point)
In the next step, we want to keep only the points that pass our filter criterion defined in the inside
function above:
%%time
rdd = rdd.filter(inside)
Take a closer look at the running time measurement: Calling the map
and filter
methods on the RDD containing lots of random points was finished in a fraction of a second - this is because no actual computation on the data has happened yet. At this point, we need to understand the concept of lazy evaluation and learn about the difference between Spark transformations and actions.
The filter
method is a transformation. It is called on an RDD and returns another RDD. What actually happens under the hood when calling the method is not the computation of an RDD. When we call a transformation, we only add an operation to an operator graph (to be precise, a directed acyclic graph of RDDs and operations). This graph is an abstract description of the computation to be performed - basically, a roadmap of how an RDD is turned into another one. When working with big data, not computing immediately (lazy evaluation) has several advantages. Our computation might take very long, and we can first build a sequence of operations before triggering the computation and waiting for the result. Also (without going to much into the technical details of Spark), building the operator graph first allows Spark to analyze it before and coming up with an optimized plan for the actual computation (e.g. by combining operations that can be performed together, by copying or moving around as little data as possible, ...).
The last Spark operation in our program is a call to the count
method of the RDD, which simply counts the number of elements in it. Watch the running time:
%%time
count = rdd.count()
count
is not a transformation but an action. When we invoke an action to retrieve a result, the actual computation is triggered. That is, the operator graph of transforms we have built before is turned into an execution plan, and the executors perform the computation on the nodes of the cluster. Then, a result is returned to the driver process.
pi = 4 * count / num_samples
print(pi)
sc.stop()
Here we list some commonly used transformations and actions. Spark provides many more.
Transformations on a single RDD
method | purpose |
---|---|
rdd.map(f) |
apply a function to each element in the RDD and return an RDD of the result |
rdd.flatMap(f) |
apply a function to each element in the RDD and return an RDD of the contents of the iterators returned |
rdd.filter(f) |
return an RDD consisting of only elements that pass the condition passed |
rdd.distinct() |
return RDD with duplicates removed |
rdd.sample(withReplacement, fraction, [seed]) |
sample an RDD, with or without replacement |
Transformations on two RDDs
method | purpose |
---|---|
rdd.union(other) |
produce an RDD containing elements from both RDDs |
rdd.intersection(other) |
produce an RDD containing elements in the intersection of both RDDs |
rdd.subtract(other) |
produce RDD with the contents of the other RDD removed from the first one |
Actions on an RDD
method | purpose |
---|---|
rdd.collect() |
return all elements |
rdd.count() |
return number of elements |
rdd.take(num) |
return num elements |
rdd.reduce(f) |
combine elements as defined in f in parallel and return result |
Exercise: Transformations and Actions
# EXERCISE: try the transformations and actions described above on our RDD of random points
sc = pyspark.SparkContext(appName="Exercise")
# write your code here
sc.stop()
You may have heard of MapReduce in the context of big data. The name MapReduce originally referred to the proprietary technology of Google, where it was pioneered, but today it refers generally to a certain programming model for processing big data sets with a parallel, distributed algorithm on a cluster. As the name says, it relies on two main operations, map
and reduce
. And as you have seen, both are available in Spark: Sparks programming model includes MapReduce as part of its much larger set of transformations and actions.
As an exercise, try impelemting our $\pi$ approximation algorithm with map
and reduce
instead of filter
and count
.
# EXERCISE: approximation of pi using map and reduce
Intermediate RDDs in the operator graph of our program are computed on demand when asking for a result via an action. In our example program, the filtered version of the RDD (containing only the points for which the inside
function has returned True
) is recomputed every time we call count
:
sc = pyspark.SparkContext(appName="PiCached")
%%time
count = sc.range(num_samples).map(random_point).filter(inside).count()
%%time
count = sc.range(num_samples).map(random_point).filter(inside).count()
This is often efficient when working with big data. However, in some cases (think of an iterative algorithm that needs to go over the intermediate RDD many times), it is more efficient to store an intermediate RDD rather than recomputing it. This can be done via Spark's functionality for caching. Here, we try to cache the RDD containing the points, then filtering and counting twice:
cached_rdd = sc.range(num_samples).map(random_point).cache()
%%time
count = cached_rdd.filter(inside).count()
%%time
count = cached_rdd.filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()
This notebook is licensed under a Creative Commons Attribution 4.0 International License (CC BY 4.0). Copyright © 2018 Point 8 GmbH