概述
什么是SparkSQL
Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。
Hive and SparkSQL
Hive 和 SparkSQL都是通过SQL进行操作,Hive主要用于操作MapReduce,SparkSQL主要用于操作Spark。
Spark SQL 为了简化 RDD 的开发, 提高开发效率,提供了 2 个编程抽象:
-
DataFrame
-
DataSet
DataFrame 是什么
在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型(数据的结构)。
DataSet 是什么
DataSet 是分布式数据集合,是 DataFrame 的一个扩展。
DataFrame 是 DataSet 的特例,DataFrame=DataSet[Row]。
Row 是一个类型,所有的表结构信息都用 Row 来表示,获取数据时需要指定顺序。
SparkSQL 核心编程
Spark SQL 可以理解为对 Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
SparkSession 是 Spark 最新的 SQL 查询起始点,SparkSession 内部封装了 SparkContext,所以计算实际上是由 SparkContext 完成的。
DataFrame
创建 DataFrame
创建 DataFrame 有三种方式:
-
通过 Spark 的数据源进行创建;
// 读取 json 文件 创建 DataFrame val df = spark.read.json("data/user.json")
如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。
-
从一个存在的 RDD 进行转换;
-
还可以从 Hive Table 进行查询返回。
SQL 风格语法
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
// SQL风格语法
df.createOrReplaceTempView("user")
spark.sql("select * from user").show()
普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.表名
df.createGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()
DSL 风格语法
DataFrame 提供一个领域特定语言(domain-specific language, DSL)去管理结构化的数据。
使用 DSL 语法风格不必创建临时视图。
df.select("username").show()
涉及到运算的时候, 每列都必须使用$, 或者采用单引号表达式:单引号+字段名
df.select($"username",$"age" + 1).show()
df.select('username, 'age + 1).show()
DataSet
创建 DataSet
通过toDS()
方法来创建DataSet
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("zhangsan",2)).toDS()
val ds = Seq(1,2,3,4,5).toDS()
在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
RDD <=> DataFrame <=> DataSet
在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入
import spark.implicits._
在创建好 SparkSession 对象后尽量直接导入,避免遗忘
IDEA 开发 SparkSQL
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
代码实现
object Basic {
def main(args: Array[String]): Unit = {
// 创建SparkSQL运行环境
val conf = new SparkConf().setMaster("local[*]").setAppName("basic")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 读取 json 文件 创建 DataFrame
val df = spark.read.json("input/user.json")
// SQL风格语法
df.createOrReplaceTempView("user")
spark.sql("select * from user").show()
// DSL风格语法
df.select('age + 1).show()
// RDD <=> DataFrame
val rdd1 = spark.sparkContext.makeRDD(List(User(1, "zhangsan", 20), User(2, "lisi", 30)))
val df1 = rdd1.toDF("id", "name", "age")
val rdd2 = df1.rdd
// DataFrame <=> Dataset
val ds1 = df1.as[User]
val df2 = ds1.toDF()
// RDD <=> Dataset
val ds2 = rdd1.toDS()
val rdd3 = ds2.rdd
// 关闭环境
spark.close()
}
private case class User(id: Int, name: String, age: Int)
}
用户自定义函数
UDF
object UDF {
def main(args: Array[String]): Unit = {
// 创建SparkSQL运行环境
val conf = new SparkConf().setMaster("local[*]").setAppName("basic")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 读取 json 文件 创建 DataFrame
val df = spark.read.json("input/user.json")
// 注册 UDF
spark.udf.register("prefixName", (name: String) => {
"Name:" + name
})
// 创建临时表
df.createOrReplaceTempView("user")
// 应用 UDF
spark.sql("select age, prefixName(username) from user").show()
// 关闭环境
spark.close()
}
}
UDAF
object UDAF {
def main(args: Array[String]): Unit = {
// 创建SparkSQL运行环境
val conf = new SparkConf().setMaster("local[*]").setAppName("basic")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 读取 json 文件 创建 DataFrame
val df = spark.read.json("input/user.json")
// 注册 UDAF
spark.udf.register("avgAge", functions.udaf(new MyAvgUDAF))
df.createOrReplaceTempView("user")
spark.sql("select avgAge(age) from user").show()
// 关闭环境
spark.close()
}
// 缓存类型
case class Buff(var total: Long, var count: Long)
class MyAvgUDAF extends Aggregator[Long, Buff, Long] {
// 缓冲区初始化
override def zero: Buff = {
Buff(0L, 0L)
}
// 更新缓冲区数据
override def reduce(buff: Buff, input: Long): Buff = {
buff.total += input
buff.count += 1
buff
}
// 缓冲区合并
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.total += buff2.total
buff1.count += buff2.count
buff1
}
// 计算结果
override def finish(reduction: Buff): Long = {
reduction.total / reduction.count
}
// 固定写法
// 自定义类型就是 product 自带类型根据类型选择
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
从 Spark3.0 版本后,用户自定义弱类型聚合函数UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数 Aggregator
数据的加载和保存
通用的加载和保存方式
加载数据
spark.read.load
是加载数据的通用方法
spark.read.format("…")[.option("…")].load("…")
format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和 "textFile"。
option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,如:url、user、password 和 dbtable
load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
直接在文件上进行查询:文件格式.`文件路径`
spark.sql("select * from json.`/opt/module/data/user.json`").show
保存数据
df.write.save
是保存数据的通用方法
保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。
df.write.mode("append").json("/opt/module/data/output")
SaveMode包含4个值,分别是:
“error”(默认):如果文件已经存在则抛出异常
“append”:如果文件已经存在则追加
“overwrite”:如果文件已经存在则覆盖
“ignore”:如果文件已经存在则忽略
Parquet
Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。
修改配置项 spark.sql.sources.default
,可修改默认数据源格式。
JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 DataFrame。 可以通过 SparkSession.read.json()
去加载 JSON 文件。
Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。
CSV
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列
spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")
MySQL
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据和写入数据。
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
读取数据
// 使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
val df = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql",
"user", props)
写入数据
// 通过 jdbc 方法写入
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql",
"user", props)
Hive
连接外部已经部署好的 Hive,需要通过以下几个步骤:
-
Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
-
把 Mysql 的驱动 copy 到 jars/目录下
-
如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
-
重启 spark-shell
运行 Spark SQL CLI:
bin/spark-sql
运行 Spark beeline:
sbin/start-thriftserver.sh
bin/beeline -u jdbc:hive2://linux1:10000 -n root
代码操作Hive:
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
将 hive-site.xml 文件拷贝到项目的 resources 目录中
代码实现
//创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
注意事项:
在开发工具中创建数据库默认是在本地仓库,可以通过参数修改数据仓库的地址
val conf = new SparkConf()
.set("spark.sql.warehouse.dir", "hdfs://linux1:8020/user/hive/warehouse")
val spark = SparkSession.builder().config(conf).getOrCreate()
如果在执行操作时,出现AccessControlException,可以在代码最前面增加如下代码解决:
System.setProperty("HADOOP_USER_NAME", "kaze")