Introduction to big-data using PySpark

Introduction to (Py)Spark

Overview

Teaching: 60 min
Exercises: 15 min
Questions
  • What is [a]Spark and PySpark?

  • How to use PySpark?

  • How to define a Spark context?

  • How can I create a RDD (Resilient Distributed Dataset)?

Objectives
  • Learn about the terminology behind Spark, [a]Spark

  • Learn about RDD

  • Learn about Spark context

Introduction to [a]Spark / PySpark (Slides)

Spark is a general purpose cluster computing framework:

Data Distribution

To distribute data, Spark uses a framework called Resilient Distributed Datasets (RDDs).

RDD

For instance, if you read a file with Spark, it will automatically create a RDD.

But a RDD is immutable so to modify it a new RDD needs to be created.

That is very helpful for reproducibility!

Computation Distribution

To distribute computation, Spark API (Application Program Interface) available in multiple programming languages (Scala, Java, Python and R) provides several operators such as map and reduce.

The figure below shows the Spark Software Layers.

SP

Spark Core contains the basic functionality of Spark; in particular the APIs that define RDDs and the operations and actions that can be undertaken upon them (map, filter, reduce, etc.).

The rest of Spark’s libraries are built on top of the RDD and Spark Core:

The python Spark API for these different Software Layers can be found here.

 

Spark Initialization: Spark Context

Spark applications are run as independent sets of processes, coordinated by a Spark Context in a driver program.

SP

It may be automatically created (for instance if you call pyspark from the shells (the Spark context is then called sc).

But we haven’t set it up automatically in the Galaxy eduPortal, so you need to define it:

from pyspark import SparkContext
sc = SparkContext('local', 'pyspark tutorial') 

 

 

Because we do not specify any number of tasks for local, it means we will be using one only. To use a maximum of 2 tasks in parallel:

from pyspark import SparkContext
sc = SparkContext('local[2]', 'pyspark tutorial') 

If you wish to use all the available resource, you can simply use ‘*’ i.e.

from pyspark import SparkContext
sc = SparkContext('local[*]', 'pyspark tutorial') 

Please note that within one session, you cannot define several Spark context! So if you have tried the 3 previous SparkContext examples, don’t be surprised to get an error!

 

Deployment of Spark:

It can be deployed on:

The development workflow is that you start small (local) and scale up to one of the other solutions, depending on your needs and resources.

At UIO, we have the Abel cluster where Spark is available.

Often, you don’t need to change any code to go between these methods of deployment!

map/reduce

Let’s start from our previous map example where the goal was to convert temperature from Celcius to Kelvin.

Here it is how it translates in PySpark.

temp_c = [10, 3, -5, 25, 1, 9, 29, -10, 5]
rdd_temp_c = sc.parallelize(temp_c)
rdd_temp_K = rdd_temp_c.map(lambda x: x + 273.15).collect()
print(rdd_temp_K)   

You recognize the map function (please note it is not the pure python map function but PySpark map function). It acts here as the transformation function while collect is the action. It pulls all elements of the RDD to the driver.

Remark:

It is often a very bad idea to pull all the elements of the RDD to the driver because we potentially handle very large amount of data. So instead we prefer to use take as you can specify how many elements you wish to pull from the RDD.

For instance to pull the first 3 elements only:

temp_c = [10, 3, -5, 25, 1, 9, 29, -10, 5]
rdd_temp_c = sc.parallelize(temp_c)
rdd_temp_K = rdd_temp_c.map(lambda x: x + 273.15).take(3)
print(rdd_temp_K)   

Challenge

def mod(x):
    import numpy as np
    return (x, np.mod(x, 2))

Nmax=1000
rdd = sc.parallelize(range(Nmax)).map(mod).take(5)
print(rdd)

Try the example above with different values for Nmax. Does it change the execution time if you take very large value for Nmax?

Why?

Solution to Challenge 1

Transformations are executed after actions and here we select 5 values only (take(5)) so whatever the number of Nmax, Spark executes exactly the same number of operations.

def mod(x):
  import numpy as np
  return (x, np.mod(x, 2))

Nmax= 10000000000
rdd = sc.parallelize(range(Nmax)).map(mod).take(5)
print(rdd)

Now let’s take another example where we use map as the transformation and reduce for the action.

# we define a list of integers
numbers = [1, 4, 6, 2, 9, 10]

rdd_numbers=sc.parallelize(numbers)

# Use reduce to combine numbers
rdd_reduce = rdd_numbers.reduce(lambda x,y: "(" + str(x) + ", " + str(y) + ")")
print(rdd_reduce)

 

Create a RDD from a file

Most of the time, we need to process data we have stored as “standard” files. Here we learn how to create a RDD from a file. Import a file from the Data Library “ in a new history (call it for instance “Gutenberg”):

from pyspark import SparkContext
sc = SparkContext('local[2]', 'pyspark tutorial') 

lines_rdd = sc.textFile(get(1))

The method *textFile” load the file passed as an argument and returns a RDD. Please note that you may add a second argument to specify the minimum number of partitions for your RDD. If not specified, you let Spark decides.

In the following example, we load a text file as a RDD and counts how many times does each word appear.

from pyspark import SparkContext
import str

sc = SparkContext('local[2]', 'pyspark tutorial') 

def noPunctuations(text):
    """Removes punctuation and convert to lower case
    Args:
        text (str): A string.
    Returns:
        str: The cleaned up string.
    """
    return text.translate(str.maketrans("","",string.punctuation)).lower()

lines_rdd = sc.textFile(get(1), 1)
counts = lines_rdd.map(noPunctuations).flatMap(lambda x: x.split(' ')) \
         .map(lambda x: (x, 1)) \
		 .reduceByKey(lambda x, y: x+y)
		 
for (word, count) in counts.collect():
   print(word,count)

 

map vs. flatMap and reduce vs. reduceByKey:

In the previous example, we used flatMap as a transformation function and reduceByKey as an action.

map: It returns a new RDD by applying a function to each element of the RDD. Function in map can return only one item. flatMap: It returns a new RDD by applying a function to each element of the RDD, but output is flattened. Also, function in flatMap can return a list of elements (0 or more)

sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()
[[1, 2], [1, 2, 3], [1, 2, 3, 4]]
sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()
[1, 2, 1, 2, 3, 1, 2, 3, 4] 

reduceByKey is very often used as it combines values with the same key. In our example, we wanted to count the number of occurence of the same word. A simple reduce would not differentiate the different words and would count the total number of words.

       

Key Points