Kaze
Kaze
Published on 2023-11-07 / 26 Visits
0
0

SparkCore

概述

什么是Spark

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

Spark and Hadoop

Spark 出现的时间相对较晚,并且主要功能是用于数据计算, 所以 Spark 一直被认为是 Hadoop MapReduce 的升级版。

Spark和Hadoop的根本差异是多个作业之间的数据通信问题:

  • Spark 多个作业之间数据通信是基于内存,而 Hadoop是基于磁盘。

  • Spark 只有在 shuffle 的时候将数据写入磁盘,而 Hadoop 中多个 MR 作业之间的数据交互都要依赖于磁盘交互。

在绝大多数的数据计算场景中,Spark 确实会比 MapReduce 更有优势。但是 Spark 是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致 Job 执行失败,此时,MapReduce 其实是一个更好的选择,所以 Spark 并不能完全替代 MR。

Spark 核心模块

**Spark Core **

Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能都是在 Spark Core 的基础上进行扩展的 。

**Spark SQL **

Spark SQL 是 Spark 用来操作结构化数据的组件。

**Spark Streaming **

Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件。

**Spark MLlib **

MLlib 是 Spark 提供的一个机器学习算法库。

**Spark GraphX **

GraphX 是 Spark 面向图计算提供的框架与算法库。

快速开始

创建 Maven 项目

增加 Scala 插件

当前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,请保证 IDEA 开发工具中含有 Scala 开发插件

增加依赖

<dependencies>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.12</artifactId>
		<version>3.0.0</version>
	</dependency>
</dependencies>
<build>
    <plugins>
        <!-- 该插件用于将Scala代码编译成class文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <!-- 声明绑定到maven的compile阶段 -->
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 该插件用于将依赖打入jar包 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

示例代码

object WordCount {
  def main(args: Array[String]): Unit = {
    // 创建Spark运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    // 创建Spark上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 读取文件数据
    val fileRDD: RDD[String] = sc.textFile("hdfs://hadoop102/wcinput")

    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))

    // 转换数据结构 word => (word, 1)
    val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_, 1))

    // 将转换结构后的数据按照相同的单词进行分组聚合
    val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _)

    // 将数据聚合结果采集到内存中
    val word2Count: Array[(String, Int)] = word2CountRDD.collect()

    // 打印结果
    word2Count.foreach(println)

    //关闭Spark连接
    sc.stop()
  }
}

在项目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/ddHH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell,the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistentUDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

Spark 运行环境

Local 模式

将Spark文件解压缩后进入解压缩后的路径,执行如下指令即可启动spark本地模式

bin/spark-shell

启动完成后会进入spark的命令行工具,在命令行工具中执行如下代码指令

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

启动成功后,可以输入网址进行 Web UI 监控页面访问

http://虚拟机地址:4040

按键 Ctrl+C 或输入 Scala 指令 :quit即可退出本地模式

Standalone 模式

独立部署模式,计算和资源管理都交给Spark,启动后会开启Worker和Master两种进程来进行资源的管理。

  1. 进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves

    mv slaves.template slaves 
    
  2. 修改 slaves 文件,添加 work 节点

    hadoop102
    hadoop103
    hadoop104
    
  3. 修改 spark-env.sh.template 文件名为 spark-env.sh

    mv spark-env.sh.template spark-env.sh 
    
  4. 修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点

    export JAVA_HOME=/opt/module/jdk1.8.0_144 
    SPARK_MASTER_HOST=hadoop102
    SPARK_MASTER_PORT=7077
    

    注意:7077 端口,相当于 hadoop 内部通信的 8020 端口

  5. 分发spark

  6. 启动spark

    sbin/start-all.sh
    
  7. Master 资源监控 Web UI 界面: http://hadoop102:8080

Yarn 模式

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。但是 Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。

  1. 修改 hadoop 配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发

    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是 true -->
    <property>
    	<name>yarn.nodemanager.pmem-check-enabled</name>
    	<value>false</value>
    </property>
    <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是 true -->
    <property>
    	<name>yarn.nodemanager.vmem-check-enabled</name>
    	<value>false</value>
    </property>
    
  2. 修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR 配置

    mv spark-env.sh.template spark-env.sh
    
    export JAVA_HOME=/opt/module/jdk1.8.0_144
    YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
    
  3. 启动 HDFS 以及 YARN 集群

  4. 提交应用

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

配置历史服务

  1. 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf

    mv spark-defaults.conf.template spark-defaults.conf 
    
  2. 修改 spark-defaults.conf 文件

    spark.eventLog.enabled          true
    spark.eventLog.dir              hdfs://hadoop102:8020/sparklog
    
    spark.yarn.historyServer.address=hadoop102:18080
    spark.history.ui.port=18080
    

    注意:需要启动 hadoop 集群,HDFS 上的 sparklog 目录需要提前存在。

  3. 修改 spark-env.sh 文件, 添加日志配置

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080
    -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/sparklog 
    -Dspark.history.retainedApplications=30"
    

    参数 1 含义:WEB UI 访问的端口号为 18080

    参数 2 含义:指定历史服务器日志存储路径

    参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数而不是页面上显示的应用数。

  4. 启动历史服务

    sbin/start-history-server.sh
    
  5. 提交应用

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode client \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

部署模式对比

模式Spark 安装机器数需启动的进程所属者应用场景
Local1Spark测试
Standalone3Master 及 WorkerSpark单独部署
Yarn1Yarn 及 HDFSHadoop混合部署

端口号

Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)

Spark Master 内部通信服务端口号:7077

Standalone 模式下,Spark Master Web 端口号:8080(资源)

Spark 历史服务器端口号:18080

Hadoop YARN 任务运行情况查看端口号:8088

Spark 运行架构

image-20231106143430522

核心组件

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。

Driver 在 Spark 作业执行时主要负责:

  • 将用户程序转化为作业(job)

  • 在 Executor 之间调度任务(task)

  • 跟踪 Executor 的执行情况

  • 通过 UI 展示查询运行情况

Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业 中运行具体任务(Task),任务彼此之间相互独立。

Executor 有两个核心功能:

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程

  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

提交流程

Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client 和 Cluster。两种模式主要区别在于:Driver 程序的运行节点位置。

Yarn Cluster 模式

Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。

  1. 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster

  2. 随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster, 此时的 ApplicationMaster 就是 Driver。

  3. Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程

  4. Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数,

  5. 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

Yarn Client 模式

Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。

Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。

三大数据结构分别是:

  • RDD : 弹性分布式数据集

  • 累加器:分布式共享只写变量

  • 广播变量:分布式共享只读变量

RDD

什么是 RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。

RDD特点:

  • 弹性

    • 存储的弹性:内存与磁盘的自动切换;

    • 容错的弹性:数据丢失可以自动恢复;

    • 计算的弹性:计算出错重试机制;

    • 分片的弹性:可根据需要重新分片。

  • 分布式:数据存储在大数据集群不同节点上

  • 数据集:RDD 封装了计算逻辑,并不保存数据

  • 数据抽象:RDD 是一个抽象类,需要子类具体实现

  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在 新的 RDD 里面封装计算逻辑

  • 可分区、并行计算

RDD核心属性

分区列表:RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

分区计算函数:Spark 在计算时,是使用分区函数对每一个分区进行计算

RDD 之间的依赖关系:RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

分区器(可选):当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

首选位置(可选):计算数据时,可以根据计算节点的状态选择最合适的节点位置进行计算

执行原理

Yarn 环境中,RDD 的工作原理:

  1. 启动 Yarn 集群环境

    image-20231106160915361

  2. Spark 通过申请资源创建调度节点和计算节点

    image-20231106160931294

  3. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

    image-20231106160947554

  4. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

    image-20231106160959951

基础编程

RDD 创建

从集合(内存)中创建 RDD

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
 List(1,2,3,4), 2
)
val rdd2 = sparkContext.makeRDD(
 List(1,2,3,4), 2
)

makeRDD 方法底层使用了parallelize 方法,从语义化表达方面来说更推荐使用makeRDD

从外部存储(文件)创建 RDD

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
// 第二个参数指定分区数量
val fileRDD = sparkContext.textFile("input", 2)

从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD。

直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型

Value 类型

map

val dataRDD1: RDD[Int] = dataRDD.map(num => num * 2)

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

mapPartitions

val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
    datas => datas.filter(_ == 2)
)

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

mapPartitionsWithIndex

val dataRDD1 = dataRDD.mapPartitionsWithIndex(
 (index, datas) => {
 	datas.map(index, _)
 }
)

在处理时同时可以获取当前分区索引

flatMap

val dataRDD1 = dataRDD.flatMap(list => list)

将处理的数据逐条进行映射转换,最后进行扁平化处理

在这里插入图片描述

glom

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

groupBy

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(_ % 2)

将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

filter

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

sample

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数进行比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,取值范围大于等于0。表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

根据指定的规则从数据集中抽取数据

distinct

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
val dataRDD1 = dataRDD.distinct()

将数据集中重复的数据去重

coalesce

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),6)
val dataRDD1 = dataRDD.coalesce(2)

缩减分区

repartition

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),2)
val dataRDD1 = dataRDD.repartition(4)

扩大分区

sortBy

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),2)
val dataRDD1 = dataRDD.sortBy(num => num)

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

双 Value 类型
object DoubleValueOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
    val sc = new SparkContext(conf)

    val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 1)
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 1)

    // 交集
    val rdd3 = rdd1.intersection(rdd2)
    println(rdd3.collect().mkString(","))

    // 并集 不去重
    val rdd4 = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))

    // 差集
    val rdd5 = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))

    // zip 返回元组(1,3),(2,4),(3,5),(4,6)
    // 两个数据源要求分区数量和数据数量保持一致
    val rdd6 = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
}
Key-Value 类型

partitionBy

val rdd = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val rdd2 = rdd.partitionBy(new HashPartitioner(2))

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

自定义分区器只需要继承Partitioner后重写方法即可

reduceByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_ + _)

可以将数据按照相同的 Key 对 Value 进行聚合

groupByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()

将数据源的数据根据 key 对 value 进行分组

aggregateByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.aggregateByKey(0)(_ + _, _ + _)

将数据根据不同的规则进行分区内计算和分区间计算

zeroValue只参与分区内计算

foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_ + _)

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

combineByKey

求每个 key 的平均值

val list = List(
    ("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input = sc.makeRDD(list, 2)
val combineRdd = input.combineByKey(
 (_, 1),
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

combineByKey允许用户返回值的类型与输入不一致。

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:

  • reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同

  • FoldByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同

  • AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同

  • CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

sortByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
// 降序排序
val sortRDD1 = dataRDD1.sortByKey(false)

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的RDD

join

// 内连接,相同key依次匹配
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 3), ("b", 4), ("c", 1)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 3), ("b", 15)))
rdd1.join(rdd2).collect().foreach(println)

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD

leftOuterJoinrightOuterJoin

// 外连接,相同key依次匹配
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 3), ("b", 4), ("c", 1)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 3), ("b", 15)))
rdd1.leftOuterJoin(rdd2).collect().foreach(println)
println("========")
rdd1.rightOuterJoin(rdd2).collect().foreach(println)

类似于 SQL 语句的外连接

cogroup

val rdd1 = sc.makeRDD(List(("a", 1), ("b", 3), ("b", 4), ("c", 1)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 3), ("b", 15)))
// cogroup = connect + group 先数据源内分组再数据源间连接
rdd1.cogroup(rdd2).collect().foreach(println)

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

RDD 行动算子

reduce

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val reduceResult: Int = rdd.reduce(_ + _)

聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

collect

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集数据到 Driver
rdd.collect().foreach(println)

在驱动程序中,以数组 Array 的形式返回数据集的所有元素

count

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val countResult: Long = rdd.count()

返回 RDD 中元素的个数

first

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val firstResult: Int = rdd.first()
println(firstResult)

返回 RDD 中的第一个元素

take

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))

返回一个由 RDD 的前 n 个元素组成的数组

takeOrdered

val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
val result: Array[Int] = rdd.takeOrdered(2)

返回该 RDD 排序后的前 n 个元素组成的数组

aggregate

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
// 将该 RDD 所有元素相加得到结果
val result: Int = rdd.aggregate(10)(_ + _, _ + _)

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

zeroValue参与分区内和分区间计算

fold

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_ + _)

aggregate 的简化版操作

countByKey

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
val result: collection.Map[Int, Long] = rdd.countByKey()

统计每种 key 的个数

save 相关算子

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件,要求数据类型必须为KV类型
rdd.map((_,1)).saveAsSequenceFile("output2")

将数据保存到不同格式的文件中

foreach

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 在Driver端打印,有序
rdd.collect().foreach(println)
println("****************")
// 在Executor端打印,无序
rdd.foreach(println)

分布式遍历 RDD 中的每一个元素,调用指定函数

RDD 序列化

闭包检测

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

自定义对象序列化需要继承 Serializable 接口

RDD 依赖关系

RDD 血缘关系

RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

// 打印出RDD的血缘关系
println(resultRDD.toDebugString)
RDD 依赖关系

这里的依赖关系指的是两个相邻 RDD 之间的关系

// 打印出RDD的依赖关系
println(resultRDD.dependencies)
RDD 窄依赖

窄依赖表示每一个父RDD 的 Partition 最多被子RDD 的一个 Partition 使用

OneToOneDependency

RDD 宽依赖

宽依赖表示同一个父RDD 的 Partition 被多个子RDD 的 Partition 依赖,会引起 Shuffle

ShuffleDependency

RDD 任务划分

Application:初始化一个 SparkContext 即生成一个 Application;

Job:一个 Action 算子就会生成一个 Job;

Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1(ResultStage);

Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

RDD 持久化

RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存并供后面重用

// cache 操作会增加血缘关系,不改变原有的血缘关系
rdd.cache()
rdd.persist(StorageLevel.DISK_ONLY)
RDD CheckPoint 检查点

所谓的检查点其实就是将 RDD 中间结果写入磁盘。

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

因为checkpoint会单独执行一次job,建议对 checkpoint的 RDD 使用 Cache 缓存,这样checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
......
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()

缓存和检查点区别:Cache 不切断血缘依赖,Checkpoint 检查点切断血缘依赖。

RDD 文件读取

sc.textFile("input/1.txt")
sc.sequenceFile[Int,Int]("output")
sc.objectFile[Int]("output")

累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。

系统累加器

val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
    num => {
        // 使用累加器
        sum.add(num)
    }
)
// 获取累加器的值
println("sum = " + sum.value)

自定义累加器

// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String,
  Long]] {
  var map: mutable.Map[String, Long] = mutable.Map()

  // 累加器是否为初始状态
  override def isZero: Boolean = {
    map.isEmpty
  }

  // 复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator
  }

  // 重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  // 向累加器中增加数据 (In)
  override def add(word: String): Unit = {
    // 查询 map 中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加 1
    // 如果没有相同的单词,那么在 map 中增加这个单词
    map(word) = map.getOrElse(word, 0L) + 1L
  }

  // 合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
  Unit = {
    val map1 = map
    val map2 = other.value
    // 两个 Map 的合并
    map = map1.foldLeft(map2)(
      (innerMap, kv) => {
        innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
        innerMap
      }
    )
  }

  // 返回累加器的结果 (Out)
  override def value: mutable.Map[String, Long] = map
}

广播变量

广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读变量,之后Executor中的Task可以共享这个只读变量。

val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val map2 = mutable.Map(("a", 4), ("b", 5), ("c", 6))
// 声明广播变量
val bc = sc.broadcast(map2)
rdd.map {
    case (w, c) => {
        // 使用广播变量
        val i = bc.value.getOrElse(w, 0)
        (w, (c, i))
    }
}.collect().foreach(println)

Comment