概述
什么是Spark Streaming
Spark Streaming 用于流式数据的处理,是对Spark Core的拓展。
Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列。对数据的操作也是按照 RDD 为单位来进行的。
简单来讲,DStream 就是对 RDD 在实时数据处理场景的一种封装。
Spark Streaming最大的特点:准实时,微批次
Spark Streaming 架构
背压机制
Spark Streaming 可以通过背压机制动态控制数据接收速率来适配集群数据处理能力。
通过属性spark.streaming.backpressure.enabled
来控制是否启用 backpressure 机制,默认值 false,即不启用。
快速开始
WordCount示例
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
编写代码
object StreamWordCount {
def main(args: Array[String]): Unit = {
//1.初始化 Spark 配置信息
val sparkConf = new
SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.通过监控端口创建 DStream,读进来的数据为一行一行
val lineStreams = ssc.socketTextStream("localhost", 9999)
//将每一行数据做切分,形成一个个单词
val wordStreams = lineStreams.flatMap(_.split(" "))
//将单词映射成元组(word,1)
val wordAndOneStreams = wordStreams.map((_, 1))
//将相同的单词次数做统计
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
//打印
wordAndCountStreams.print()
//启动 SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}
DStream 创建
RDD 队列
使用 ssc.queueStream(queueOfRDDs)
来创建 DStream,每一个推送到这个队列中的 RDD都会作为一个 DStream 处理。
object RDDQueue {
def main(args: Array[String]) {
//1.初始化 Spark 配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDQueue")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环向 RDD 队列中放入 RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
自定义数据源
需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
object CustomReceiver {
def main(args: Array[String]): Unit = {
// 创建环境对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 逻辑处理
val stream = ssc.receiverStream(new MyReceiver)
stream.print()
// 开启采集器
ssc.start()
// 等待采集器关闭
ssc.awaitTermination()
}
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var flag = true
override def onStart(): Unit = {
new Thread(() => {
while (flag) {
val message = "采集的数据:" + new Random().nextInt(100).toString
store(message)
Thread.sleep(500)
}
}).start()
}
override def onStop(): Unit = flag = false
}
}
Kafka 数据源
Kafka 0-10 Direct 模式
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
编写代码
object KafkaReceiver {
def main(args: Array[String]): Unit = {
// 创建环境对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
"143.198.165.225:9093",
ConsumerConfig.GROUP_ID_CONFIG -> "spark",
"key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer"
)
// 读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("spark"), kafkaPara))
// 将每条消息的 KV 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
// 计算 WordCount
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
// 开启采集器
ssc.start()
// 等待采集器关闭
ssc.awaitTermination()
}
}
DStream 转换
无状态转化操作
无状态转化操作是分别应用到 DStream 内部每个 RDD 上的。
针对键值对的 DStream 转化操作(比如 reduceByKey())要添加
import StreamingContext._
才能在 Scala 中使用。
Transform
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数,其实就是对 DStream 中的 RDD 应用转换。
object Transform {
def main(args: Array[String]): Unit = {
// 创建环境对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 逻辑处理
val stream = ssc.socketTextStream("localhost", 9999)
val result = stream.transform(
rdd => {
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD
}
)
result.print()
// 开启采集器
ssc.start()
// 等待采集器关闭
ssc.awaitTermination()
}
}
join
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
数据源为KV类型才可以进行 join
有状态转化操作
UpdateStateByKey
updateStateByKey 可以在 DStream 中跨批次维护状态
object State {
def main(args: Array[String]): Unit = {
// 创建环境对象
System.setProperty("HADOOP_USER_NAME", "kaze")
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 状态的保存需要使用checkpoint,所以需要设置checkpoint目录
ssc.checkpoint("hdfs://hadoop102/checkpoint")
// 逻辑处理
val stream = ssc.socketTextStream("localhost", 9999)
val flatMapStream = stream.flatMap(_.split(" "))
val mapStream = flatMapStream.map((_, 1))
mapStream.updateStateByKey(
// 第一个参数是新值,第二个参数是之前的状态
(seq: Seq[Int], opt: Option[Int]) => {
// 用新值更新之前的状态
val newCount = opt.getOrElse(0) + seq.sum
// 返回新的状态
Option(newCount)
}
).print()
// 开启采集器
ssc.start()
// 等待采集器关闭
ssc.awaitTermination()
}
}
WindowOperations
基于窗口的操作需要两个参数,分别为窗口时长以及滑动步长
-
窗口时长:计算内容的时间范围
-
滑动步长:隔多久触发一次计算
两者都必须为采集周期大小的整数倍
关于 Window 的操作有如下方法:
-
window(windowLength, slideInterval)
-
countByWindow(windowLength, slideInterval)
-
reduceByWindow(func, windowLength, slideInterval)
-
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
-
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
invFunc为func的反向函数。
该函数每个窗口的 reduce 值都是通过用前一个窗口的 reduce 值来递进计算。 通过 reduce 进入到滑动窗口的新数据并“反向 reduce”离开窗口的旧数据来实现这个操作。这样做可以避免重复计算提高计算效率
DStream 输出
常用输出操作如下:
-
print()
-
saveAsTextFiles(prefix, [suffix])
-
saveAsObjectFiles(prefix, [suffix])
-
saveAsHadoopFiles(prefix, [suffix])
-
foreachRDD(func)
顾名思义,遍历DStream内部的RDD,这是最通用的输出操作。
在将数据写到诸如 MySQL 的外部数据库中时,请注意:
- 连接不能写在 driver 层面(RDD算子外),因为连接不能序列化,所以闭包检测会报错
- 如果把连接写在 foreach 算子中则每个 RDD 中的每一条数据都会创建一个连接,浪费资源
- 推荐在 foreachPartition 算子中创建连接,使同一分区的数据共用一个连接,提高资源利用率
优雅关闭
使用外部系统来控制内部程序关闭
object GracefullyShutdown {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "kaze")
// 从之前的ssc恢复,若之前的ssc不存在则创建新的ssc
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("hdfs://hadoop102/checkpoint", () =>
createSSC())
// 监控关闭状态
new Thread(new MonitorStop(ssc)).start()
ssc.start()
ssc.awaitTermination()
}
private def createSSC(): StreamingContext = {
val update = (values: Seq[Int], status: Option[Int]) => {
//当前批次内容的计算
val sum: Int = values.sum
//取出状态信息中上一次状态
val lastStatu: Int = status.getOrElse(0)
Some(sum + lastStatu)
}
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark")
//设置优雅的关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 保存ssc
ssc.checkpoint("hdfs://hadoop102/checkpoint")
val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val word: DStream[String] = line.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
wordAndCount.print()
ssc
}
private class MonitorStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), ssc.sparkContext.hadoopConfiguration, "kaze")
while (true) {
try {
Thread.sleep(5000)
} catch {
case e: InterruptedException =>
e.printStackTrace()
}
val state: StreamingContextState = ssc.getState
val bool: Boolean = fs.exists(new Path("hdfs://hadoop102/stopSpark"))
if (bool) {
if (state == StreamingContextState.ACTIVE) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
}