## 4、RDD三类算子

### Transformation

- 通过已有的RDD生成一个新的RDD
- Lazy计算：Transformation只会记录RDD转化关系，并不会触发计算
- 举例：map，filter，groupBy，reduceBy
- 优点：可以中间插入优化过程

### Action

- 通过RDD计算得到一个或者一组值
- Action是立即执行的
- 举例：count，reduce，saveAsTextFile
- 缺点：不能插入优化过程

### Persistence

- cache：缓存到内存
- Persist：更灵活的缓存策略
- cache()方法调用的也是persist方法，缓存策略均为MEMORY_ONLY
- 可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别
- cache或者persist并不是action

![s4](pics/s4.png)

![s5](pics/s5.png)

![s6](pics/s6.png)

![s7](pics/s7.png)

- map：通过函数FUNC传递源的每个元素，返回一个新的分布式的数据集
- filter：通过选择FUNC返回true的源的元素返回一个新的数据集
- flatmap：与map类似，但是每个输入项可以映射到0或更多个输出项（因此func应该返回Seq而不是单个项）。
- groupby：当调用（k，v）对的数据集时，返回一个数据集（k，可迭代<v>）对。
  注意：如果分组是为了对每个键执行聚合（例如求和或平均值），则使用reduceByKey或.gateByKey将产生更好的性能。
  注意：默认情况下，输出中的并行级别取决于父RDD分区的数量。可以通过可选的Nuffice参数设置不同数量的任务。
- reduceByKey：在对(K,V)对的数据集进行调用时，返回(K,V)对的数据集，其中每个键的值都使用给定的reduce函数func进行聚合，该函数必须是类型(V,V)=>V。与groupByKey中一样，reduce任务的数量可以通过可选的第二个参数进行配置。
- union：返回包含源数据集和参数中元素的合并后的新数据集
- distinct：返回包含源数据集的不重复元素的新数据集
- join：当调用类型（K，V）和（K，W）的数据集时，返回（K，（V，W））对的数据集包含每个键的所有元素对。外部连接通过leftOuterJoin、rightOuterJoin和fullOuterJoin进行支持。
- foreach：在数据集的每个元素上运行函数FUNC。这通常是用于副作用，例如更新累加器或与外部存储系统交互。
- collect：在驱动器程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他操作返回有用的数据子集后非常有用。
- count：返回数据集中的元素数量。
- first：返回数据集的第一个元素
- take：返回具有数据集的第一个n个元素的数组

### 练习

**案例1：map filter**

map:所有数据按照map中的参数对应的函数来处理并产生一个新的RDD

filter：所有数据按照map中的参数对应的函数(必须存在返回boolean的函数)来处理，并产生一个新的RDD

```python
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = rdd1.map(lambda x:x*2)
rdd3 = rdd2.filter(lambda x:x>3)
rdd3.collect()
```

**案例2：flatMap**

flatMap：和map类似，但是传入一个数据，返回多个数据

```python
rdd1 = sc.parallelize(["a b c","d e f","h i j"])
rdd2 = rdd1.flatMap(lambda x:x.split(" "))
rdd2.collect()
```

**案例3：交集、并集**

union：对两个RDD求并集

intersection：对两个RDD求交集

```python
rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd4 = rdd1.intersection(rdd2)
rdd4.collect()
```

**案例4：groupByKey**

groupByKey：以元组中的第0个元素作为key，进行分组，返回一个新的RDD

```python
rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd4 = rdd3.groupByKey()#返回的元组中第1个元素是可迭代对象，需要进一步遍历才能得到结果
rdd4.collect()
```

**案例5：reduce**

reduce：所有数据都需要针对当前函数参数对应的函数进行处理，直接得到一个结果

```python
rdd1 = sc.parallelize([1,2,3,4,5])
rdd1.reduce(lambda a,b:a+b)
```

**案例6：reduceByKey、sortByKey**

reduceByKey：所有数据都需要针对当前函数参数对应的函数进行处理，直接得到一个结果(需要有一个key作为基准，求value的和)

sortByKey：所有数据都需要针对当前函数参数对应的函数进行排序处理，直接得到一个结果(需要有一个key作为基准，求value的和)

```python
rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
rdd4.collect()
rdd5 = rdd4.sortByKey(ascending=False)
rdd5.collect()
rdd6 = rdd4.map(lambda x:(x[1],x[0])).sortByKey(ascending=False).map(lambda x:(x[1],x[0]))
rdd6.collect()
```

