In [1]:
import numpy as np

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Spark RDD Course")
sc = SparkContext(conf=conf)

23/04/11 22:15:11 WARN Utils: Your hostname, MacBook-Pro-de-Stephane-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.15 instead (on interface en0)
23/04/11 22:15:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/11 22:15:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/11 22:15:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/11 22:15:12 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


# Introduction to `Spark` RDD

In [4]:
rdd = sc.parallelize(range(64))

Note that `parallelize` takes an optional argument to choose the number of partitions

In [5]:
rdd.getNumPartitions()

8

In [6]:
rdd = sc.parallelize(range(1000), 10)
rdd.getNumPartitions()

10

## Transformations

### `map`

In [None]:
rdd = sc.parallelize([2, 3, 4])
rdd = rdd.map(lambda x: list(range(1, x)))

In [None]:
rdd

In [None]:
(
    sc.parallelize([2, 3, 4])
      .map(lambda x: list(range(1, x)))
)

`map` is a *transformation*. It is *lazily* evaluated. Hence execution is delayed until an *action* is met in the DAG).

In [None]:
rdd.collect()  # collect is an action 

In [None]:
(
    sc.parallelize([2, 3, 4])
      .map(lambda x: list(range(1, x)))
      .collect()
)

### Exercice: `map` with a method

**Warning.** This example is a bad practice !!! Don't do this at home

In [None]:
class TelephoneDB(object):
    
    def __init__(self):
        self.tel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
   
    def add_tel(self, name):
        return name, self.tel[name]

In [None]:
tel_db = TelephoneDB()
names = ['arthur', 'riad']

In [None]:
rdd = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd

- Replace the `tel` dictionary by a `defaultdict` with default number `999` 
- Use it on a `rdd` containing names as above including an unknown one, and try it

In [None]:
from collections import defaultdict

class TelephoneDefaultDB(object):
    
    def __init__(self):
        self.tel = defaultdict(lambda: 999, {'arthur': 1234, 'riad': 4567, 'anatole': 3615})
    
    def add_tel(self, name):
        return name, self.tel[name]
    
    def add_tel_rdd(self, rdd):  
        return rdd.map(self.add_tel)

In [None]:
tel_db = TelephoneDefaultDB()
names = ['riad', 'anatole', 'yiyang']
rdd = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd

**Warning**. Once again, this is a bad idea to pass *class methods* to spark's `map`.
Since `add_tel` needs `self`, the whole object is serialized so that `spark` can use it.
This breaks if the `tel` is large, or if it is not serializable.

### `flatMap`

In [None]:
rdd = sc.parallelize([2, 3, 4, 5])
rdd.flatMap(lambda x: range(1, x)).collect()

### `filter`

In [None]:
rdd = sc.parallelize(range(10))
rdd.filter(lambda x: x % 2 == 0).collect()

### `distinct`

In [None]:
rdd = sc.parallelize([1, 1, 4, 2, 1, 3, 3])
rdd.distinct().collect()

### "Pseudo-set" operations

In [None]:
rdd1 = sc.parallelize(range(5))
rdd2 = sc.parallelize(range(3, 9))
rdd3 = rdd1.union(rdd2)
rdd3.collect()

In [None]:
rdd3.distinct().collect()

In [None]:
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
rdd1.cartesian(rdd2).collect()

## Actions

Well, `collect` is obviously an action...

### `count`, `countByValue`

In [None]:
rdd = sc.parallelize([1, 3, 1, 2, 2, 2])
rdd.count()

In [None]:
rdd.countByValue()

In [None]:
u = np.int32((np.random.sample(100000) * 100000))  # 100000 random integers uniformly distributed on 0, ..., 100000

p = (
    sc.parallelize(u)
    .countByValue()
)

q = sorted(p.items(), key = lambda x : x[1], reverse=True)

q[0]

# q[0], 1 + np.log(len(u))/ np.log(np.log(len(u))), len(q)

- How many distinct values do you expect in `u` ?
- How large is the largest value in $q$ ?

### `take`, `takeOrdered`

In [None]:
rdd = sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')])

In [None]:
(1, 'b') <=  (2, 'd') <= (3, 'a')

In [None]:
rdd.takeOrdered(2)

In [None]:
rdd.takeOrdered(2, key=lambda x: x[1])

### `reduce`, `fold`

In [None]:
rdd = sc.range(1, 4)
rdd.reduce(lambda a, b: a + b)

In [None]:
rdd = sc.range(1, 4, numSlices=7)
rdd.reduce(lambda a, b: a + b)

In [None]:
rdd = sc.parallelize(range(1,4), 3)
rdd.reduce(lambda a, b: a + b)

In [None]:
( 
    sc.parallelize(range(1, 4), 2)
      .fold(0, lambda a, b: a + b)
)

In [None]:
( 
    sc.parallelize(range(1, 4), 1)
      .fold(3, lambda a, b: a + b)
),( 
    sc.parallelize(range(1, 4), 2)
      .fold(2, lambda a, b: a + b)
)

In [None]:
rdd =  sc.parallelize(range(1, 4),3)
rdd.fold(1, lambda a, b: a + b), rdd.getNumPartitions()

In [None]:
rdd =  sc.parallelize(range(1, 4),4)
rdd.fold(1, lambda a, b: a + b), rdd.getNumPartitions()

In [None]:
rdd = sc.parallelize([1, 2, 4], 2)
rdd.fold(2, lambda a, b: a + b)

In [None]:
rdd = sc.parallelize([1, 2, 4], 3)
rdd.fold(2, lambda a, b: a + b)

In [None]:
rdd.getNumPartitions()

### `aggregate`

In [None]:
seqOp = lambda x, y: (x[0] + y, x[1] + 1)
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])

rdd = sc.parallelize([1, 2, 3, 4], 8)
rdd.aggregate((0, 0), seqOp, combOp), rdd.getNumPartitions()

In [None]:
op = lambda x, y: x+y
rdd = sc.parallelize([1, 2, 3, 4], 4)
rdd.aggregate(0, op, op), rdd.getNumPartitions()

### Exercice: sum of powers with `aggregate`

- Using `aggregate`, compute the sum, the sum of squares $x^2$ and the sum of $x^3$ for 
$x \in \{1, \ldots, 10 \}$.
- Check your computations using `numpy`

In [None]:
seqOp = lambda x, y: (x[0] + y, x[1] + y ** 2, x[2] + y ** 3)

In [None]:
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])

In [None]:
sc.range(1, 11).aggregate((0, 0, 0), seqOp, combOp)

In [None]:
import numpy as np

x = np.arange(1, 11)
x

In [None]:
x.sum(), (x**2).sum(), (x**3).sum(), x.cumsum()

### Computing an empirical variance with `aggregate`

Assume a sample is stored as a RDD. Using `aggregate`, compute the sample variance $\frac{1}{n}\sum_{i=1}^n (x_i - \overline{X}_n)^2$ where $\overline{X}_n = \frac{1}{n} \sum_{i=1}^n x_i$ 

# `PairRDD`

In [None]:
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])

rdd.collect()  # not yet 

In [None]:
rdd = rdd.map(lambda x: (x[0], x[1:]))

rdd.collect()  # done 

## Transformations

### `keys`, `values`

In [None]:
rdd.keys().collect()

In [None]:
rdd.values().collect()

**Warning**. All elements must be tuples with two elements (the key and the value)

In [None]:
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd.keys().collect()

In [None]:
rdd.values().collect()

The values are **not** what we expected wrong... so we **must** do

In [None]:
rdd = ( sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
          .map(lambda x: (x[0], x[1:]))
      )
rdd.keys().collect()

In [None]:
rdd.values().collect()

Now the values are correct. 

### `mapValues`, `flatMapValues`

In [None]:
rdd = sc.parallelize([("a", "x y z"), ("b", "p r")])

rdd.mapValues(lambda v: v.split(' ')).collect(), rdd.collect()

In [None]:
rdd.flatMapValues(lambda v: v.split(' ')).collect()

### `groupByKey`

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
( 
    rdd.groupByKey()
       .mapValues(list)
       .collect()
)

In [None]:
rdd.groupByKey().collect()

### `reduceByKey`

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda a, b: a + b).collect()

### `combineByKey`

In [None]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 13)])

def add(a, b): 
    return a + str(b)

rdd.combineByKey(str, add, add).collect()

### `join`, `rightOuterJoin`, `leftOuterJoin`

In [None]:
employees = sc.parallelize([
    (31, "Rafferty"),
    (33, "Jones"),
    (33, "Heisenberg"),
    (34, "Robinson"),
    (34, "Smith"),
    (None, "Williams")
])

In [None]:
departments = sc.parallelize([
    (31, "Sales"),
    (33, "Engineering"),
    (34, "Clerical"),
    (35, "Marketing")
])

In [None]:
employees.join(departments).sortByKey().collect()

In [None]:
employees.rightOuterJoin(departments).sortByKey().collect()

In [None]:
employees.leftOuterJoin(departments).collect()

## Actions

In [None]:
employees.countByKey()

In [None]:
employees.lookup(33)

In [None]:
employees.lookup(None)

In [None]:
employees.collectAsMap()

## References

[Spark Core reference](https://spark.apache.org/docs/3.3.1/api/python/reference/pyspark.html)