概述
什么是Spark
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Spark and Hadoop
Spark 出现的时间相对较晚,并且主要功能是用于数据计算, 所以 Spark 一直被认为是 Hadoop MapReduce 的升级版。
Spark和Hadoop的根本差异是多个作业之间的数据通信问题:
-
Spark 多个作业之间数据通信是基于内存,而 Hadoop是基于磁盘。
-
Spark 只有在 shuffle 的时候将数据写入磁盘,而 Hadoop 中多个 MR 作业之间的数据交互都要依赖于磁盘交互。
在绝大多数的数据计算场景中,Spark 确实会比 MapReduce 更有优势。但是 Spark 是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致 Job 执行失败,此时,MapReduce 其实是一个更好的选择,所以 Spark 并不能完全替代 MR。
Spark 核心模块
**Spark Core **
Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能都是在 Spark Core 的基础上进行扩展的 。
**Spark SQL **
Spark SQL 是 Spark 用来操作结构化数据的组件。
**Spark Streaming **
Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件。
**Spark MLlib **
MLlib 是 Spark 提供的一个机器学习算法库。
**Spark GraphX **
GraphX 是 Spark 面向图计算提供的框架与算法库。
快速开始
创建 Maven 项目
增加 Scala 插件
当前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,请保证 IDEA 开发工具中含有 Scala 开发插件
增加依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 该插件用于将依赖打入jar包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
示例代码
object WordCount {
def main(args: Array[String]): Unit = {
// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建Spark上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)
// 读取文件数据
val fileRDD: RDD[String] = sc.textFile("hdfs://hadoop102/wcinput")
// 将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _)
// 将数据聚合结果采集到内存中
val word2Count: Array[(String, Int)] = word2CountRDD.collect()
// 打印结果
word2Count.foreach(println)
//关闭Spark连接
sc.stop()
}
}
在项目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/ddHH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell,the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistentUDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Spark 运行环境
Local 模式
将Spark文件解压缩后进入解压缩后的路径,执行如下指令即可启动spark本地模式
bin/spark-shell
启动完成后会进入spark的命令行工具,在命令行工具中执行如下代码指令
sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
启动成功后,可以输入网址进行 Web UI 监控页面访问
http://虚拟机地址:4040
按键 Ctrl+C 或输入 Scala 指令 :quit
即可退出本地模式
Standalone 模式
独立部署模式,计算和资源管理都交给Spark,启动后会开启Worker和Master两种进程来进行资源的管理。
-
进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves
mv slaves.template slaves
-
修改 slaves 文件,添加 work 节点
hadoop102 hadoop103 hadoop104
-
修改 spark-env.sh.template 文件名为 spark-env.sh
mv spark-env.sh.template spark-env.sh
-
修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点
export JAVA_HOME=/opt/module/jdk1.8.0_144 SPARK_MASTER_HOST=hadoop102 SPARK_MASTER_PORT=7077
注意:7077 端口,相当于 hadoop 内部通信的 8020 端口
-
分发spark
-
启动spark
sbin/start-all.sh
-
Master 资源监控 Web UI 界面: http://hadoop102:8080
Yarn 模式
独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。但是 Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。
-
修改 hadoop 配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是 true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是 true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
-
修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR 配置
mv spark-env.sh.template spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144 YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
-
启动 HDFS 以及 YARN 集群
-
提交应用
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
配置历史服务
-
修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
-
修改 spark-defaults.conf 文件
spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/sparklog spark.yarn.historyServer.address=hadoop102:18080 spark.history.ui.port=18080
注意:需要启动 hadoop 集群,HDFS 上的 sparklog 目录需要提前存在。
-
修改 spark-env.sh 文件, 添加日志配置
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/sparklog -Dspark.history.retainedApplications=30"
参数 1 含义:WEB UI 访问的端口号为 18080
参数 2 含义:指定历史服务器日志存储路径
参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数而不是页面上显示的应用数。
-
启动历史服务
sbin/start-history-server.sh
-
提交应用
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
部署模式对比
模式 | Spark 安装机器数 | 需启动的进程 | 所属者 | 应用场景 |
---|---|---|---|---|
Local | 1 | 无 | Spark | 测试 |
Standalone | 3 | Master 及 Worker | Spark | 单独部署 |
Yarn | 1 | Yarn 及 HDFS | Hadoop | 混合部署 |
端口号
Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)
Spark Master 内部通信服务端口号:7077
Standalone 模式下,Spark Master Web 端口号:8080(资源)
Spark 历史服务器端口号:18080
Hadoop YARN 任务运行情况查看端口号:8088
Spark 运行架构

核心组件
Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
Driver 在 Spark 作业执行时主要负责:
-
将用户程序转化为作业(job)
-
在 Executor 之间调度任务(task)
-
跟踪 Executor 的执行情况
-
通过 UI 展示查询运行情况
Executor
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业 中运行具体任务(Task),任务彼此之间相互独立。
Executor 有两个核心功能:
-
负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
-
它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
提交流程
Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client 和 Cluster。两种模式主要区别在于:Driver 程序的运行节点位置。
Yarn Cluster 模式
Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。
-
在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster
-
随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster, 此时的 ApplicationMaster 就是 Driver。
-
Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程
-
Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数,
-
之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。
Yarn Client 模式
Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。
Spark 核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。
三大数据结构分别是:
-
RDD : 弹性分布式数据集
-
累加器:分布式共享只写变量
-
广播变量:分布式共享只读变量
RDD
什么是 RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。
RDD特点:
-
弹性
-
存储的弹性:内存与磁盘的自动切换;
-
容错的弹性:数据丢失可以自动恢复;
-
计算的弹性:计算出错重试机制;
-
分片的弹性:可根据需要重新分片。
-
-
分布式:数据存储在大数据集群不同节点上
-
数据集:RDD 封装了计算逻辑,并不保存数据
-
数据抽象:RDD 是一个抽象类,需要子类具体实现
-
不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在 新的 RDD 里面封装计算逻辑
-
可分区、并行计算
RDD核心属性
分区列表:RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
分区计算函数:Spark 在计算时,是使用分区函数对每一个分区进行计算
RDD 之间的依赖关系:RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
分区器(可选):当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
首选位置(可选):计算数据时,可以根据计算节点的状态选择最合适的节点位置进行计算
执行原理
Yarn 环境中,RDD 的工作原理:
-
启动 Yarn 集群环境
-
Spark 通过申请资源创建调度节点和计算节点
-
Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
-
调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
基础编程
RDD 创建
从集合(内存)中创建 RDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
List(1,2,3,4), 2
)
val rdd2 = sparkContext.makeRDD(
List(1,2,3,4), 2
)
makeRDD 方法底层使用了parallelize 方法,从语义化表达方面来说更推荐使用makeRDD
从外部存储(文件)创建 RDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
// 第二个参数指定分区数量
val fileRDD = sparkContext.textFile("input", 2)
从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。
直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
RDD 转换算子
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型
Value 类型
map
val dataRDD1: RDD[Int] = dataRDD.map(num => num * 2)
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
mapPartitions
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => datas.filter(_ == 2)
)
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
mapPartitionsWithIndex
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
(index, datas) => {
datas.map(index, _)
}
)
在处理时同时可以获取当前分区索引
flatMap
val dataRDD1 = dataRDD.flatMap(list => list)
将处理的数据逐条进行映射转换,最后进行扁平化处理
glom
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
groupBy
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(_ % 2)
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
filter
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
sample
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数进行比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,取值范围大于等于0。表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
根据指定的规则从数据集中抽取数据
distinct
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
val dataRDD1 = dataRDD.distinct()
将数据集中重复的数据去重
coalesce
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),6)
val dataRDD1 = dataRDD.coalesce(2)
缩减分区
repartition
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),2)
val dataRDD1 = dataRDD.repartition(4)
扩大分区
sortBy
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),2)
val dataRDD1 = dataRDD.sortBy(num => num)
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
双 Value 类型
object DoubleValueOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 1)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 1)
// 交集
val rdd3 = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
// 并集 不去重
val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集
val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// zip 返回元组(1,3),(2,4),(3,5),(4,6)
// 两个数据源要求分区数量和数据数量保持一致
val rdd6 = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
Key-Value 类型
partitionBy
val rdd = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val rdd2 = rdd.partitionBy(new HashPartitioner(2))
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
自定义分区器只需要继承Partitioner后重写方法即可
reduceByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_ + _)
可以将数据按照相同的 Key 对 Value 进行聚合
groupByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
将数据源的数据根据 key 对 value 进行分组
aggregateByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.aggregateByKey(0)(_ + _, _ + _)
将数据根据不同的规则进行分区内计算和分区间计算
zeroValue只参与分区内计算
foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_ + _)
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
combineByKey
求每个 key 的平均值
val list = List(
("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input = sc.makeRDD(list, 2)
val combineRdd = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
combineByKey允许用户返回值的类型与输入不一致。
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:
reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
sortByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
// 降序排序
val sortRDD1 = dataRDD1.sortByKey(false)
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的RDD
join
// 内连接,相同key依次匹配
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 3), ("b", 4), ("c", 1)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 3), ("b", 15)))
rdd1.join(rdd2).collect().foreach(println)
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD
leftOuterJoin和rightOuterJoin
// 外连接,相同key依次匹配
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 3), ("b", 4), ("c", 1)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 3), ("b", 15)))
rdd1.leftOuterJoin(rdd2).collect().foreach(println)
println("========")
rdd1.rightOuterJoin(rdd2).collect().foreach(println)
类似于 SQL 语句的外连接
cogroup
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 3), ("b", 4), ("c", 1)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 3), ("b", 15)))
// cogroup = connect + group 先数据源内分组再数据源间连接
rdd1.cogroup(rdd2).collect().foreach(println)
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
RDD 行动算子
reduce
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val reduceResult: Int = rdd.reduce(_ + _)
聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
collect
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集数据到 Driver
rdd.collect().foreach(println)
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
count
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val countResult: Long = rdd.count()
返回 RDD 中元素的个数
first
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val firstResult: Int = rdd.first()
println(firstResult)
返回 RDD 中的第一个元素
take
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))
返回一个由 RDD 的前 n 个元素组成的数组
takeOrdered
val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
val result: Array[Int] = rdd.takeOrdered(2)
返回该 RDD 排序后的前 n 个元素组成的数组
aggregate
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
// 将该 RDD 所有元素相加得到结果
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
zeroValue参与分区内和分区间计算
fold
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_ + _)
aggregate 的简化版操作
countByKey
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
val result: collection.Map[Int, Long] = rdd.countByKey()
统计每种 key 的个数
save 相关算子
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件,要求数据类型必须为KV类型
rdd.map((_,1)).saveAsSequenceFile("output2")
将数据保存到不同格式的文件中
foreach
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 在Driver端打印,有序
rdd.collect().foreach(println)
println("****************")
// 在Executor端打印,无序
rdd.foreach(println)
分布式遍历 RDD 中的每一个元素,调用指定函数
RDD 序列化
闭包检测
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
自定义对象序列化需要继承 Serializable 接口
RDD 依赖关系
RDD 血缘关系
RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
// 打印出RDD的血缘关系
println(resultRDD.toDebugString)
RDD 依赖关系
这里的依赖关系指的是两个相邻 RDD 之间的关系
// 打印出RDD的依赖关系
println(resultRDD.dependencies)
RDD 窄依赖
窄依赖表示每一个父RDD 的 Partition 最多被子RDD 的一个 Partition 使用
OneToOneDependency
RDD 宽依赖
宽依赖表示同一个父RDD 的 Partition 被多个子RDD 的 Partition 依赖,会引起 Shuffle
ShuffleDependency
RDD 任务划分
Application:初始化一个 SparkContext 即生成一个 Application;
Job:一个 Action 算子就会生成一个 Job;
Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1(ResultStage);
Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
RDD 持久化
RDD Cache 缓存
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存并供后面重用
// cache 操作会增加血缘关系,不改变原有的血缘关系
rdd.cache()
rdd.persist(StorageLevel.DISK_ONLY)
RDD CheckPoint 检查点
所谓的检查点其实就是将 RDD 中间结果写入磁盘。
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
因为checkpoint会单独执行一次job,建议对 checkpoint的 RDD 使用 Cache 缓存,这样checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
......
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
缓存和检查点区别:Cache 不切断血缘依赖,Checkpoint 检查点切断血缘依赖。
RDD 文件读取
sc.textFile("input/1.txt")
sc.sequenceFile[Int,Int]("output")
sc.objectFile[Int]("output")
累加器
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。
系统累加器
val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
num => {
// 使用累加器
sum.add(num)
}
)
// 获取累加器的值
println("sum = " + sum.value)
自定义累加器
// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String,
Long]] {
var map: mutable.Map[String, Long] = mutable.Map()
// 累加器是否为初始状态
override def isZero: Boolean = {
map.isEmpty
}
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new WordCountAccumulator
}
// 重置累加器
override def reset(): Unit = {
map.clear()
}
// 向累加器中增加数据 (In)
override def add(word: String): Unit = {
// 查询 map 中是否存在相同的单词
// 如果有相同的单词,那么单词的数量加 1
// 如果没有相同的单词,那么在 map 中增加这个单词
map(word) = map.getOrElse(word, 0L) + 1L
}
// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
Unit = {
val map1 = map
val map2 = other.value
// 两个 Map 的合并
map = map1.foldLeft(map2)(
(innerMap, kv) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
innerMap
}
)
}
// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
}
广播变量
广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读变量,之后Executor中的Task可以共享这个只读变量。
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val map2 = mutable.Map(("a", 4), ("b", 5), ("c", 6))
// 声明广播变量
val bc = sc.broadcast(map2)
rdd.map {
case (w, c) => {
// 使用广播变量
val i = bc.value.getOrElse(w, 0)
(w, (c, i))
}
}.collect().foreach(println)