Kaze
Kaze
Published on 2023-11-14 / 54 Visits
1
2

SparkStreaming

概述

什么是Spark Streaming

Spark Streaming 用于流式数据的处理,是对Spark Core的拓展。

Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。

DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列。对数据的操作也是按照 RDD 为单位来进行的。

简单来讲,DStream 就是对 RDD 在实时数据处理场景的一种封装。

image-20231114151136495

Spark Streaming最大的特点:准实时,微批次

Spark Streaming 架构

image-20231114145331715

背压机制

Spark Streaming 可以通过背压机制动态控制数据接收速率来适配集群数据处理能力。

通过属性spark.streaming.backpressure.enabled来控制是否启用 backpressure 机制,默认值 false,即不启用。

快速开始

WordCount示例

添加依赖

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming_2.12</artifactId>
     <version>3.0.0</version>
</dependency>

编写代码

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf = new
        SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //3.通过监控端口创建 DStream,读进来的数据为一行一行
    val lineStreams = ssc.socketTextStream("localhost", 9999)
    //将每一行数据做切分,形成一个个单词
    val wordStreams = lineStreams.flatMap(_.split(" "))
    //将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))
    //将相同的单词次数做统计
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
    //打印
    wordAndCountStreams.print()
    //启动 SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}

DStream 创建

RDD 队列

使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD都会作为一个 DStream 处理。

object RDDQueue {
  def main(args: Array[String]) {
    //1.初始化 Spark 配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDQueue")
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))
    //3.创建 RDD 队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //4.创建 QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
    //5.处理队列中的 RDD 数据
    val mappedStream = inputStream.map((_, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    //6.打印结果
    reducedStream.print()
    //7.启动任务
    ssc.start()
    //8.循环向 RDD 队列中放入 RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }
    ssc.awaitTermination()
  }
}

自定义数据源

需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

object CustomReceiver {
  def main(args: Array[String]): Unit = {
    // 创建环境对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(conf, Seconds(3))

    // 逻辑处理
    val stream = ssc.receiverStream(new MyReceiver)
    stream.print()

    // 开启采集器
    ssc.start()
    // 等待采集器关闭
    ssc.awaitTermination()
  }
    
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    private var flag = true
    override def onStart(): Unit = {
      new Thread(() => {
        while (flag) {
          val message = "采集的数据:" + new Random().nextInt(100).toString
          store(message)
          Thread.sleep(500)
        }
      }).start()
    }

    override def onStop(): Unit = flag = false
  }
}

Kafka 数据源

Kafka 0-10 Direct 模式

添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.10.1</version>
</dependency>

编写代码

object KafkaReceiver {
  def main(args: Array[String]): Unit = {
    // 创建环境对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(conf, Seconds(3))
    // 定义 Kafka 参数
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
        "143.198.165.225:9093",
      ConsumerConfig.GROUP_ID_CONFIG -> "spark",
      "key.deserializer" ->
        "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" ->
        "org.apache.kafka.common.serialization.StringDeserializer"
    )
    // 读取 Kafka 数据创建 DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set("spark"), kafkaPara))
    // 将每条消息的 KV 取出
    val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
    // 计算 WordCount
    valueDStream.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    // 开启采集器
    ssc.start()
    // 等待采集器关闭
    ssc.awaitTermination()
  }
}

DStream 转换

无状态转化操作

无状态转化操作是分别应用到 DStream 内部每个 RDD 上的。

image-20231114155121282

针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数,其实就是对 DStream 中的 RDD 应用转换。

object Transform {
  def main(args: Array[String]): Unit = {
    // 创建环境对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(conf, Seconds(3))

    // 逻辑处理
    val stream = ssc.socketTextStream("localhost", 9999)
    val result = stream.transform(
      rdd => {
        val flatRDD = rdd.flatMap(_.split(" "))
        val mapRDD = flatRDD.map((_, 1))
        val reduceRDD = mapRDD.reduceByKey(_ + _)
        reduceRDD
      }
    )
    result.print()

    // 开启采集器
    ssc.start()
    // 等待采集器关闭
    ssc.awaitTermination()
  }
}
join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

数据源为KV类型才可以进行 join

有状态转化操作

UpdateStateByKey

updateStateByKey 可以在 DStream 中跨批次维护状态

object State {
  def main(args: Array[String]): Unit = {
    // 创建环境对象
    System.setProperty("HADOOP_USER_NAME", "kaze")
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(conf, Seconds(3))
    // 状态的保存需要使用checkpoint,所以需要设置checkpoint目录
    ssc.checkpoint("hdfs://hadoop102/checkpoint")
    // 逻辑处理
    val stream = ssc.socketTextStream("localhost", 9999)
    val flatMapStream = stream.flatMap(_.split(" "))
    val mapStream = flatMapStream.map((_, 1))
    mapStream.updateStateByKey(
      // 第一个参数是新值,第二个参数是之前的状态
      (seq: Seq[Int], opt: Option[Int]) => {
        // 用新值更新之前的状态
        val newCount = opt.getOrElse(0) + seq.sum
        // 返回新的状态
        Option(newCount)
      }
    ).print()

    // 开启采集器
    ssc.start()
    // 等待采集器关闭
    ssc.awaitTermination()
  }
}
WindowOperations

基于窗口的操作需要两个参数,分别为窗口时长以及滑动步长

  • 窗口时长:计算内容的时间范围

  • 滑动步长:隔多久触发一次计算

两者都必须为采集周期大小的整数倍

关于 Window 的操作有如下方法:

  1. window(windowLength, slideInterval)

  2. countByWindow(windowLength, slideInterval)

  3. reduceByWindow(func, windowLength, slideInterval)

  4. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

  5. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    invFunc为func的反向函数。

    该函数每个窗口的 reduce 值都是通过用前一个窗口的 reduce 值来递进计算。 通过 reduce 进入到滑动窗口的新数据并“反向 reduce”离开窗口的旧数据来实现这个操作。这样做可以避免重复计算提高计算效率

DStream 输出

常用输出操作如下:

  1. print()

  2. saveAsTextFiles(prefix, [suffix])

  3. saveAsObjectFiles(prefix, [suffix])

  4. saveAsHadoopFiles(prefix, [suffix])

  5. foreachRDD(func)

    顾名思义,遍历DStream内部的RDD,这是最通用的输出操作。

在将数据写到诸如 MySQL 的外部数据库中时,请注意:

  1. 连接不能写在 driver 层面(RDD算子外),因为连接不能序列化,所以闭包检测会报错
  2. 如果把连接写在 foreach 算子中则每个 RDD 中的每一条数据都会创建一个连接,浪费资源
  3. 推荐在 foreachPartition 算子中创建连接,使同一分区的数据共用一个连接,提高资源利用率

优雅关闭

使用外部系统来控制内部程序关闭

object GracefullyShutdown {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "kaze")
    // 从之前的ssc恢复,若之前的ssc不存在则创建新的ssc
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("hdfs://hadoop102/checkpoint", () =>
      createSSC())
    // 监控关闭状态
    new Thread(new MonitorStop(ssc)).start()
    ssc.start()
    ssc.awaitTermination()
  }

  private def createSSC(): StreamingContext = {
    val update = (values: Seq[Int], status: Option[Int]) => {
      //当前批次内容的计算
      val sum: Int = values.sum
      //取出状态信息中上一次状态
      val lastStatu: Int = status.getOrElse(0)
      Some(sum + lastStatu)
    }

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark")
    //设置优雅的关闭
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    // 保存ssc
    ssc.checkpoint("hdfs://hadoop102/checkpoint")
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val word: DStream[String] = line.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
    val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
    wordAndCount.print()
    ssc
  }

  private class MonitorStop(ssc: StreamingContext) extends Runnable {
    override def run(): Unit = {
      val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), ssc.sparkContext.hadoopConfiguration, "kaze")
      while (true) {
        try {
          Thread.sleep(5000)
        } catch {
          case e: InterruptedException =>
            e.printStackTrace()
        }
        val state: StreamingContextState = ssc.getState
        val bool: Boolean = fs.exists(new Path("hdfs://hadoop102/stopSpark"))
        if (bool) {
          if (state == StreamingContextState.ACTIVE) {
            ssc.stop(stopSparkContext = true, stopGracefully = true)
            System.exit(0)
          }
        }
      }
    }
  }
}

Comment