### 1、spark概述

* 1、什么是spark

  * 基于内存的计算引擎，它的计算速度非常快。但是仅仅只涉及到数据的计算，并没有涉及到数据的存储。

* 2、为什么要学习spark

  **MapReduce框架局限性**

  - 1，Map结果写磁盘，Reduce写HDFS，多个MR之间通过HDFS交换数据
  - 2，任务调度和启动开销大
  - 3，无法充分利用内存
  - 4，不适合迭代计算（如机器学习、图计算等等），交互式处理（数据挖掘）
  - 5，不适合流式处理（点击日志分析）
  - 6，MapReduce编程不够灵活，仅支持Map和Reduce两种操作

  **Hadoop生态圈**

  - 批处理：MapReduce、Hive、Pig
  - 流式计算：Storm
  - 交互式计算：Impala、presto

  需要一种灵活的框架可同时进行批处理、流式计算、交互式计算

  - 内存计算引擎，提供cache机制来支持需要反复迭代计算或者多次数据共享，减少数据读取的IO开销

  - DAG引擎，较少多次计算之间中间结果写到HDFS的开销

    DAG计算：将计算任务在内部分解成为若干个子任务，将这些子任务之间的逻辑关系构建成DAG结构。

  - 使用多线程模型来减少task启动开销，shuffle过程中避免不必要的sort操作以及减少磁盘IO

  spark的缺点是：吃内存，不太稳定

* 3、spark特点

  * 1、速度快（比mapreduce在内存中快100倍，在磁盘中快10倍）
    * spark中的job中间结果可以不落地，可以存放在内存中。
    * mapreduce中map和reduce任务都是以进程的方式运行着，而spark中的job是以线程方式运行在进程中。
  * 2、易用性（可以通过java/scala/python/R开发spark应用程序）
  * 3、通用性（可以使用spark sql/spark streaming/mlib/Graphx）
  * 4、兼容性（spark程序可以运行在standalone/yarn/mesos）

Spark2.x 新特性

- SparkSession：新的上下文入口，统一SQLContext和HiveContext

- dataframe和dataset统一，dataframe只是dataset[ROW]的类型别名

  dataset在spark1.6出现，在spark2.x时，dataframe只是dataset[ROW]的类型别名，但是dataset是一种强类型语言设定，由于python是弱类型语言，只能使用DataFrame

  dataframe本身是scala实现，而spark也是scala语言写的，python只是一个壳子，对于python编写的spark代码性能不变

- Spark SQL支持sql 2003标准
  - 支持ansi-sql
  - 支持ddl命令
  - 支持子查询：in/not in、exists/not exists
  - 提升catalyst查询优化器的性能
  - code generation技术将spark sql和dataset的性能提升2~10倍(钨丝计划)
  - vectorization技术提升parquet文件的扫描吞吐量
  - spark mllib基于rdd的api转为维护阶段
  - 未来将主要基于dataset api的ml，向量和矩阵使用性能更高的序列化机制

### 2、spark安装部署

* 1、下载spark安装包

  http://spark.apache.org/downloads.html

  高版本不存在cdh的编译版本，可以从官网下载源码版本，指定高版本hadoop进行编译

  编译步骤：

  - 1，安装java(JDK 1.7及以上)

    ```
    export JAVA_HOME=/xxx
    export JRE_HOME=/xxx
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
    export PATH=$JAVA_HOME/bin:$PATH
    ```

  - 2，安装Maven， 版本为3.3.9或者以上

    下载地址：https://mirrors.tuna.tsinghua.edu.cn/apache//maven/maven-3/3.3.9/binaries

    配置MAVEN_HOME

    ```
    export MAVEN_HOME=/xxx
    export PATH=$MAVEN_HOME/bin:$PATH
    ```

  - 3，下载spark源码

    ![s1](pics/s1.png)

  - 4，增加cdh的repository

    解压spark的源码包，编辑pom.xml文件， 在repositories节点 加入如下配置：

    ```xml
    <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
    ```

  - 5，编译

    设置内存：

    export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"

    开始编译：

    `./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz  -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -DskipTests clean package`

    源码编译后，bin目录下的文件可能不存在可执行权限，需要通过chmod指令添加可执行权限

    `chmod +x xxx`

* 2、规划spark安装目录

* 3、解压安装包

* 4、重命名安装目录

* 5、修改配置文件

  * spark-env.sh(需要将spark-env.sh.template重命名)

    * 配置hadoop的CONF目录，spark会将hadoop中的配置文件进行加载

      HADOOP_CONF_DIR=/root/bigdata/hadoop-2.6.0-cdh5.7.0/etc/hadoop

    * 配置java环境变量

      `export JAVA_HOME=java_home_path`

    * 配置PYTHON环境

      PYTHON_HOME=/usr/local/python3/bin

      `export PYSPARK_PYTHON=/xx/pythonx_home/bin/python3`

    * 配置spark的日志目录(注：需要先在hadoop上创建一个目录(directory,spark不会主动创建这个目录))

      SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node-teach1:8020/spark_directory"

    * 配置master的地址(可选)

      `export SPARK_MASTER_HOST=node-teach1`

    * 配置master的端口(可选)

      `export SPARK_MASTER_PORT=7077

    * 配置spark-defaults文件

      spark.eventLog.enabled           true
      spark.eventLog.dir               hdfs://node-teach1:8020/directory

* 6、配置spark环境变量

  ```
  export SPARK_HOME=/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0
  export PATH=$SPARK_HOME/bin:$PATH
  ```


### 3、spark启动和停止

* 启动pyspark
  * 在节点上执行

    `./pyspark`

### 4、在Spark上编写代码实战

wordcount案例

- Spark1.6

  ```python
  sc = SparkContext('local')
  doc = sc.textFile('file:///root/bigdata/data/spark_test.log')
  words = doc.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reducebyKey(lambda x,y:x+y).collect()
  ```

- Spark 2.x

  ```python
  spark = SparkSession.builder.appName('test').getOrCreate()
  sc = spark.sparkContext
  words = sc.textFile('file:///root/bigdata/data/spark_test.log') \
              .flatMap(lambda line: line.split(" ")) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(lambda a, b: a + b).collect()
  ```

注：以上为样例对比，后面会加上每句代码的解释。

### 5、利用PyCharm编写spark wordcount程序

- 环境配置

  将spark目录下的python目录下的pyspark整体拷贝到pycharm使用的python环境下

  将下图中的pyspark

  ![s2](pics/s2.png)

  拷贝到pycharm使用的：xxx\Python\Python36\Lib\site-packages目录下

* 代码

```python
import sys

#SparkSession：spark2.0为了创建spark相关的环境(sparksql，spark-mllib,spark-core)而设计的统一的接口类
from pyspark.sql import SparkSession
import os

#注：在pycharm上运行时，会提示SPARK_HOME找不到，需要配置SPARK_HOME环境变量
os.environ["SPARK_HOME"]="/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0"
#由于系统中存在两种不同版本的python，需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3
os.environ["PYSPARK_PYTHON"]="/usr/local/python3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/local/python3/bin/python3"

#构造器模式，builder模式
#在当前类中创建一个Builder类，Builder类会执行一些操作，最终返回SparkSession对象的实例
#appName：指定应用名称
#getOrCreate:获取或创建，如果内存中存在SparkSession实例，则get，否则create
spark = SparkSession.builder.appName("test").getOrCreate()
#通过SparkSession获取SparkContext用于编写Spark-Core代码
sc = spark.sparkContext

#sc.textFile:读取一个文件
#textFile读取数据时，默认会以hdfs为基准，需要指定文件协议
#flatMap：对读取的文件按行进行处理，返回多个结果
#map:针对于数据做操作，每个操作返回单个结果
#reduceByKey：按照相同的key做操作
counts = sc.textFile("file:///root/bigdata/data/test.log").flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
#执行计算并输出
print(counts.collect())
#执行完成程序后，要停止当前程序
sc.stop()
```

注：在pycharm上运行时，会提示SPARK_HOME找不到，需要配置SPARK_HOME环境变量

![](pics/spark2.png)

#由于系统中存在两种不同版本的python，需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3

![](pics/spark3.png)

- 将代码上传到远程cent-os系统上

- 利用pycharm运行代码，得出最终的计算结果

  ![](pics/spark1.png)