暂无图片
暂无图片
暂无图片
暂无图片
1
暂无图片

Spark|如何通过PID算法实现反压机制

大数据记事本 2021-06-17
1149

一、反压机制简介

    反压全称反向压力,是异步编程中进行流量控制的一种优雅的方式。在异步编程中,上下游之间往往会使用队列来进行解耦,如果上游发送给任务队列的速率大于下游消费的速率,就会导致任务队列中积压的任务越来越多,最终导致 JVM 的内存耗尽,抛出OOM异常,程序退出。
    所以,为了避免 OOM 的问题,必须对上游输出给下游的速度做流量控制。
    如下图所示:当process处理消息的速度比decode发送消息的速度慢时,就会导致队列中的消息不断积压:

    在反向压力的方案中,上游能够根据下游的处理能力,动态地调整输出速度。当下游处理不过来时,上游就减慢发送速度,当下游处理能力提高时,上游就加快发送速度。
    反向压力的思想,已经成为流计算领域的共识,并且形成了反向压力相关的标准:https://www.reactive-streams.org

    上图描述了Reactive Streams 的工作原理。当下游的消息订阅者,从上游的消息发布者接收消息前,会先通知消息发布者自己能够接收多少消息。然后消息发布者就按照这个数量,向下游的消息订阅者发送消息。这样,整个消息传递的过程都是量力而行的,就不存在上下游之间因为处理速度不匹配,而造成的 OOM 问题了。
    要实现反向压力的功能,只需要从两个方面来进行控制。
  • 其一是,执行器的任务队列,它的容量必须是有限的。(队列有容量限制)

  • 其二是,当执行器的任务队列已经满了时,就阻止上游继续提交新的任务,直到任务队列重新有新的空间可用为止。(队列是阻塞的)

二、PID算法简介

    PID:就是比例(proportional)、积分(integral)、微分(differential),是一种很常见的控制算法。
    使用场景:需要将某一个物理量“保持稳定”的场合,比如保持平衡,稳定温度,或者我们即将分析的Spark控制接收数据的速率的场景。
    下面通过一个例子对该算法进行简要说明:
    假设我们要控制一个热水器,让水的温度保持在50℃,怎么做
    最简单的想法是:当水温低于50℃时通电,高于50℃时断电。但是这样会产生一个问题:由于达到50℃时断电,热水器的余热会将水加热到高于50℃。这时,就需要一种算法,它可以做到:
  • 将需要控制的物理量带到目标附近(比如48℃)
  • “预见”这个量的变化趋势(热水器余热会继续提高水温)
  • 消除因为散热、阻力等因素造成的静态误差
    这时,就出现了PID算法。P、I、D是三种不同的调节方法,可以单独使用,也可以组合使用。那么这三种调节方法有什么区别呢?
    PID 控制器的三个最基本的参数:KP、KI、KD
    KP:P就是比例的意思。它的作用最明显,原理也最简单。需要控制的物理量,比如上面的水温,有它现在的『当前值』,也有我们期望的『目标值』。
  • 当两者差距不大时,只需 "稍微加热" 即可
  • 当两者差距较大时,就让热水器 "开足马力" ,尽快加热到目标值附近
    实际编程时,就让偏差(目标减去当前)与调节装置的“调节力度”,建立一个一次函数的关系,就可以实现最基本的“比例”控制了;KP越大,调节作用越激进,反之调节作用更保守。
    KD:D就是微分的意思。如果仅仅依靠KP来调节水温,会出现一个现象:水温在50℃上下频繁变动,无法保持稳定。这时我们需要一个控制作用,让被控制的物理量的 "变化速度" 趋于0,即类似于“阻尼”的作用。
    因为当比较接近目标时,P的控制作用就比较小了。此时有很多内在的或者外部的因素,使控制量发生小范围的摆动。而D的作用就是让物理量的变化速度趋于0,只要什么时候,这个物理量具有了变化速度,D就向相反的方向用力,尽力刹住这个变化。
    KD参数越大,向变化速度相反方向刹车的力道就越强。
    此时水温这个物理量已经相对稳定,为什么还要 I 这个参数?
    假设将上面这个热水器带到一个寒冷的地方对水进行加热,还是要加热到50℃,在P的作用下,水温逐渐升高到 45℃,此时出现一个现象,外界环境使水散热的速度和通过P控制的加热速度一致,此时的状态为:
  • 对于P,已经控制和目标温度接近了,只需稍微加热即可
  • 对于D,加热和散热速率一致,温度没有波动,并不需要D进行调节
    这样就会导致温度一维持在 45℃ 左右,永远无法达到 50℃。
    在这种情况下,实际做法是应该进一步增加加热的功率,但是增加多少如何计算呢?这里就引入了一个积分量。只要偏差存在,就不断地对偏差进行积分(累加),并反应在调节力度上。
    即使45℃和50℃相差不太大,但是随着时间的推移,只要没达到目标温度,这个积分量就不断增加。到了目标温度后,假设温度没有波动,积分值就不会再变动。这时,加热功率仍然等于散热功率。但是,温度是稳稳的50℃。
    KI的值越大,积分时乘的系数就越大,积分效果越明显。所以,I的作用就是,减小静态情况下的误差,让受控物理量尽可能接近目标值。
三、PID算法在SparkStreaming反压中的应用及源码详解
    Spark Streaming 消费 Kafka 有两种方式:ReceiverDirect,由于在 spark-streaming-kafka-0-10 中已经不再支持 Receiver 方式,所以这里只分析 Direct 方式。
    对于 Direct 方式来说,输入数据源对应的类是 DirectKafkaInputDStream ,其 compute() 方法用于读取 Kafka 数据并构建 KafkaRDD ,在拉取 Kafka 数据时,就要根据 PID 算法获取每个分区需要读取的数据量,过程如下:
1.计算每个分区可以读取的最大的 offset,代码如下:
    val untilOffsets = clamp(latestOffsets())
    复制
        这里使用 latestOffsets() 方法的返回值作为 clamp() 方法的参数, latestOffsets() 方法有两个作用:
    • 返回每个分区最大的 offset
    • 用于动态发现新加入的分区
        这里只看该方法的注释即可
      /**
      * Returns the latest (highest) available offsets, taking new partitions into account.
      * 1.获取每个分区的最大offset
      * 2.用于动态感知新加入的分区
      */
      protected def latestOffsets(): Map[TopicPartition, Long] = {
      ...
      }
      复制
          clamp() 方法用于限制每个分区处理的最大 offset 不超过 kafka 每个分区的最大offset
        //限制每个分区处理的最大消息条数不超过kafka分区里的最大offset
        protected def clamp(
        offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
        maxMessagesPerPartition(offsets).map { mmp =>
        mmp.map { case (tp, messages) =>
        //Kafka每个分区的最大offset
        val uo = offsets(tp)
        //限制每个分区可以消费的最大offset不超过实际的最大offset
        tp -> Math.min(currentOffsets(tp) + messages, uo)
        }
        }.getOrElse(offsets)
        }
        复制
            这里注意 maxMessagesPerPartition() 方法,该方法用来计算每个分区本次应该读取的消息数量,方法的返回值为 [TopicPartition,Long],即每个分区需读取的消息数量。注意这个数据量是一个预估的值,currentOffsets(tp) + messages 即当前消费到的 offset 加上读取的消息量就是预估的读取的最大 offset,这个 offset 可能比实际最大的 offset 要大,所以 clamp() 方法中要限制这个 offset 为分区实际最大的 offset
          protected[streaming] def maxMessagesPerPartition(
          offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
          //预估的消费Rate
          val estimatedRateLimit = rateController.map { x => {
          //获取最后一次PID计算器算出的Rate,是一个消息数量
          val lr = x.getLatestRate()
          //如果没有Rate,直接返回spark.streaming.kafka.maxRatePerPartition参数设置的最大rate
          if (lr > 0) lr else initialRate
          }}


          // calculate a per-partition rate limit based on current lag
          //根据当前延迟计算每个分区的速率限制(TopicPartition,Double)
          val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
          case Some(rate) =>
          //获取每个分区还未消费的数据量,即最大offset-当前消费offset的值
          val lagPerPartition = offsets.map { case (tp, offset) =>
          tp -> Math.max(offset - currentOffsets(tp), 0)
          }
          //计算所有分区未消费的数据总量
          val totalLag = lagPerPartition.values.sum


          lagPerPartition.map { case (tp, lag) =>
          //每个分区消费的最大条数
          val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
          //计算反压速率:单个分区未消费的数据量/所有分区未消费数据量总和 * 比率
          val backpressureRate = lag totalLag.toDouble * rate
          //确定该分区消费的反压速率:
          //如果设置了spark.streaming.kafka.maxRatePerPartition > 0,计算的最大rate不能超过该值。
          //如果没有设置,则直接采用反压速率backpressureRate
          tp -> (if (maxRateLimitPerPartition > 0) {
          Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
          }
          case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toDouble }
          }


          if (effectiveRateLimitPerPartition.values.sum > 0) {
          //每个计算批次的时间间隔,单位秒
          val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 1000
          Some(effectiveRateLimitPerPartition.map {
          //每个分区的限制比率*时间区间就是本批次消费的数据量
          case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
          ppc.minRatePerPartition(tp))
          })
          } else {
          None
          }
          }
          复制
          这里计算每个分区最大的消费速率有两种情况:
          a. 已经通过PID计算器计算过Rate,即 rateLimit > 0
          • 计算每个分区滞后消费的数据量:lagPerPartition
          • 计算所有分区滞后消费的总数据量:totalLag
          • 计算每个分区反压的速率:backpressureRate = lag totalLag.toDouble * rate,这里的rate就是之前通过PID计算器算出的Rate,即 rateLimit 变量的值
          • 如果设置了spark.streaming.kafka.maxRatePerPartition > 0,计算的最大rate不能超过该值。如果没有设置,则直接采用反压速率backpressureRate
          • 将计算批次batch的时间间隔转化为s,并且乘以前面算出的速率,得到每个分区本次消费的数据量。
          b. 没有通过PID计算器计算过Rate,即 rateLimit  <=0
          • 则每个分区直接返回 spark.streaming.kafka.maxRatePerPartition 参数设置的最大rate
              在 maxMessagesPerPartition 方法中,用到了一个 RateController 类型的变量 rateController 来预估消费的数据量rate,该变量的定义如下:
            override protected[streaming] val rateController: Option[RateController] = {
            if (RateController.isBackPressureEnabled(ssc.conf)) {
            Some(new DirectKafkaRateController(id,
            RateEstimator.create(ssc.conf, context.graph.batchDuration)))
            } else {
            None
            }
            }
            复制
                可以看到,当开启了反压机制后,RateController 具体的实现类是 DirectKafkaRateController,在初始化该类实例对象时,需要传入一个 RateEstimator 类型的参数,RateEstimator 是一个接口,具体的实现就包括 PIDRateEstimator,也就是PID算法的具体实现类,计算消费速率的方法为 PIDRateEstimator.compute()
            注意:
            • PIDRateEstimator是获取当前这个结束的batch的数据,然后估计下一个batch的rate(注意,下一个batch并不一定跟当前结束的batch是连续两个batch,可能会有积压未处理的batch)
            • PID计算出的速率是针对所有分区的,而不是单个分区
            compute() 方法的四个参数对应 StreamingListener 体系获取的下列四个数据指标
            • time -> 处理结束时间 batchCompleted.batchInfo.processingEndTime
            • numElements -> 消息数elements.get(streamUID).map(_.numRecords)
            • processingDelay ->处理时间 batchCompleted.batchInfo.processingDelay
            • schedulingDelay -> 调度延迟 batchCompleted.batchInfo.schedulingDelay
              /**
              *
              * @param time 这个batch处理结束的时间
              * @param numElements 这个batch处理的消息条数
              * @param processingDelay 这个job在实际计算阶段花的时间(不算调度延迟)
              * @param schedulingDelay 这个job花在调度队列里的时间
              * @return
              */
              def compute(
              time: Long, // in milliseconds
              numElements: Long,
              processingDelay: Long, // in milliseconds
              schedulingDelay: Long // in milliseconds
              ): Option[Double] = {


              this.synchronized {
              if (time > latestTime && numElements > 0 && processingDelay > 0) {


              //计算批次的实际处理时间间隔,单位秒
              val delaySinceUpdate = (time - latestTime).toDouble 1000


              //实际处理速度
              val processingRate = numElements.toDouble processingDelay * 1000


              //计算误差:预估处理速度-实际的处理速度,这个结果就是PID中P系数要乘的值
              //latestRate是PID为刚刚结束的batch,在生成这个batch时预估的处理速度
              //processingRate 是刚刚结束的batch实际的处理速度
              val error = latestRate - processingRate


              //计算积分:调度延时 * 每秒处理的速率 批次间隔,得到的值可以认为是之后每秒要多消费多少数据才可以弥补延时带来的时间误差
              val historicalError = schedulingDelay.toDouble * processingRate batchIntervalMillis


              //计算微分:
              val dError = (error - latestError) delaySinceUpdate


              //计算新的速率:proportional为修正系数
              val newRate = (latestRate - proportional * error -
              integral * historicalError -
              derivative * dError).max(minRate)


              //更新latestTime
              latestTime = time
              //如果是第一次使用PID计算
              if (firstRun) {
              latestRate = processingRate
              latestError = 0D
              firstRun = false
              logTrace("First run, rate estimation skipped")
              None
              } else {
              //更新latestRate
              latestRate = newRate
              //更新latestError
              latestError = error
              logTrace(s"New rate = $newRate")
              Some(newRate)
              }
              } else {
              logTrace("Rate estimation skipped")
              None
              }
                }
              复制

              该方法有以下几个关键步骤:

              • 计算速率差值:预估处理速度 - 实际的处理速度,这个结果就是PID中P系数要乘的值

                val error = latestRate - processingRate
                复制
                • 计算积分:调度延时 * 每秒处理的速率 批次间隔,得到的值可以认为是之后每秒要多消费多少数据才可以弥补延时带来的时间误差

                  val historicalError = schedulingDelay.toDouble * processingRate  batchIntervalMillis
                  复制
                  • 计算微分:

                    val dError = (error - latestError) / delaySinceUpdate
                    复制
                    • 计算新的速率:可以看到,这里就用到了上面算出的三个系数
                      val newRate = (latestRate - proportional * error -
                      integral * historicalError -
                      derivative * dError).max(minRate)
                      复制

                          那么,什么时候 error 为 0,什么时候 PID 的输出为 0 呢?

                          当 latestRate = processingRate 时,error 为 0,即预估的速率和实际的速率一致。由于latestRate实际上等于numElements / batchDuration,因为numElements是上次生成job时根据这个latestRate(也就是当时的estimatedRate)算出来的,而 processingRate = numElements / processingDelay,所以可以得到,当 processingDelay = batchDuration 时,error 为 0.

                          当 error 为 0 时,PID 的输出不一定为 0.因为要考虑历史误差,这里刚结束的batch可能并非生成后就立即被执行,而是在调度队列里排了一会队,所以还是需要考虑 schedulingDelay。

                          当PID输出为0时,newRate 就等于 latestRate,此时系统达到了稳定状态,error=0,historicalError=0 且 dError= 0。此时意味着:

                      • 没有 schedulingDelay,意味着job等待被调度的时间为0. 如果没有累积的未执行的job,那么schedulingDelay 大致等于0.

                      • error为零,意味着 batchDuration 等于processingDelay

                      • dError为零,在 error 等于 0 时,意味着上一次计算的 error 也为零。

                      这就是backpressure想要系统达到的状态。

                      反压机制开启方式及涉及的参数:
                        开启反压:
                        spark.streaming.backpressure.enabled=true


                        设置每秒从kafka分区中拉取的最大数据条数
                        spark.streaming.kafka.maxRatePerPartition=xxxx


                        启用反压机制时每个接收器接收第一批数据的初始最大速率
                        spark.streaming.backpressure.initialRate


                        可以估算的最低速率。默认值为 100,只能设置为0或者大于0的数
                        spark.streaming.backpressure.pid.minRate


                        错误积累的响应权重,具有抑制作用(有效阻尼)。默认值为 0.2
                        只能设置为0或者大于0的数
                        spark.streaming.backpressure.pid.integral


                        对错误趋势的响应权重。这可能会引起 batch size 的波动,
                        可以帮助快速增加/减少容量。默认值为0.0,只能设置为0或者大于0的数
                        spark.streaming.backpressure.pid.derived


                        用于响应错误的权重。默认值为1.0,只能设置为0或者大于0的数
                        spark.streaming.backpressure.pid.proportional
                        复制
                        参考链接:
                        https://www.cnblogs.com/devos/p/9563031.html
                        https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81230564
                        https://blog.csdn.net/qq_41673920/article/details/84860697
                        https://kaiwu.lagou.com/course/courseInfo.htm?courseId=614#/detail/pc?id=6420

                        推荐阅读:
                        1.Spark 启动 | 从启动脚本分析 Master 的启动流程
                        2.Spark 启动 | Worker 启动流程详解
                        3.如何使用Spark Rpc框架实现C/S架构基本的注册和心跳功能
                        文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论

                        张柳
                        暂无图片
                        1年前
                        评论
                        暂无图片 0
                        大佬怎么看你其他的文章啊
                        1年前
                        暂无图片 点赞
                        评论