文章目录
背景
在默认情况下,Spark Streaming 通过 receivers (或者是 Direct 方式) 以生产者生产数据的速率接收数据。当 batch processing time > batch interval 的时候,也就是每个批次数据处理的时间要比 Spark Streaming 批处理间隔时间长;越来越多的数据被接收,但是数据的处理速度没有跟上,导致系统开始出现数据堆积,可能进一步导致 Executor 端出现 OOM 问题而出现失败的情况。
而在 Spark 1.5 版本之前,为了解决这个问题,对于 Receiver-based 数据接收器,我们可以通过配置 spark.streaming.receiver.maxRate
参数来限制每个 receiver 每秒最大可以接收的记录的数据;对于 Direct Approach 的数据接收,我们可以通过配置 spark.streaming.kafka.maxRatePerPartition
参数来限制每次作业中每个 Kafka 分区最多读取的记录条数。这种方法虽然可以通过限制接收速率,来适配当前的处理能力,但这种方式存在以下几个问题:
- 我们需要事先估计好集群的处理速度以及消息数据的产生速度;
- 这两种方式需要人工参与,修改完相关参数之后,我们需要手动重启 Spark Streaming 应用程序;
- 如果当前集群的处理能力高于我们配置的 maxRate,而且 producer 产生的数据高于 maxRate,这会导致集群资源利用率低下,而且也会导致数据不能够及时处理。
反压机制
那么有没有可能不需要人工干预,Spark Streaming 系统自动处理这些问题呢?当然有了!Spark 1.5 引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来自动地适配集群数据处理能力。详细的记录请参见 SPARK-7398 里面的说明。
Spark Streaming 1.5 以前的体系结构
在 Spark 1.5 版本之前,Spark Streaming 的体系结构如下所示:
- 数据是源源不断的通过 receiver 接收,当数据被接收后,其将这些数据存储在 Block Manager 中;为了不丢失数据,其还将数据备份到其他的 Block Manager 中;
- Receiver Tracker 收到被存储的 Block IDs,然后其内部会维护一个时间到这些 block IDs 的关系;
- Job Generator 会每隔 batchInterval 的时间收到一个事件,其会生成一个 JobSet;
- Job Scheduler 运行上面生成的 JobSet。
Spark Streaming 1.5 之后的体系结构
- 为了实现自动调节数据的传输速率,在原有的架构上新增了一个名为
RateController
的组件,这个组件继承自StreamingListener
,其监听所有作业的onBatchCompleted
事件,并且基于processingDelay
、schedulingDelay
、当前 Batch 处理的记录条数以及处理完成事件来估算出一个速率;这个速率主要用于更新流每秒能够处理的最大记录的条数。速率估算器(RateEstimator
)可以又多种实现,不过目前的 Spark 2.2 只实现了基于 PID 的速率估算器。 - InputDStreams 内部的
RateController
里面会存下计算好的最大速率,这个速率会在处理完onBatchCompleted
事件之后将计算好的速率推送到ReceiverSupervisorImpl
,这样接收器就知道下一步应该接收多少数据了。 - 如果用户配置了
spark.streaming.receiver.maxRate
或spark.streaming.kafka.maxRatePerPartition
,那么最后到底接收多少数据取决于三者的最小值。也就是说每个接收器或者每个 Kafka 分区每秒处理的数据不会超过spark.streaming.receiver.maxRate
或spark.streaming.kafka.maxRatePerPartition
的值。
详细的过程如下图所示:
Spark Streaming 反压机制的使用
在 Spark 启用反压机制很简单,只需要将 spark.streaming.backpressure.enabled
设置为 true
即可,这个参数的默认值为 false。反压机制还涉及以下几个参数,包括文档中没有列出来的:
- spark.streaming.backpressure.initialRate: 启用反压机制时每个接收器接收第一批数据的初始最大速率。默认值没有设置。
- spark.streaming.backpressure.rateEstimator:速率估算器类,默认值为 pid ,目前 Spark 只支持这个,大家可以根据自己的需要实现。
- spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的更改)。默认值为1,只能设置成非负值。weight for response to "error" (change between last batch and this batch)
- spark.streaming.backpressure.pid.integral:错误积累的响应权重,具有抑制作用(有效阻尼)。默认值为 0.2 ,只能设置成非负值。weight for the response to the accumulation of error. This has a dampening effect.
- spark.streaming.backpressure.pid.derived:对错误趋势的响应权重。 这可能会引起 batch size 的波动,可以帮助快速增加/减少容量。默认值为0,只能设置成非负值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.
- spark.streaming.backpressure.pid.minRate:可以估算的最低费率是多少。默认值为 100,只能设置成非负值。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming 反压(Back Pressure)机制介绍】(https://www.iteblog.com/archives/2323.html)
一直关注,从未放弃
学习下
大佬,一直来学习。。。
支持支持支持支持支持支持
spark streaming优化
支持支持
从市府广场冲纳
支持
支持
学习学习
膜拜大佬,学习了。
学习一下
学习
学习学习
学习一下 持续关注
学习一下 持续关注
前来学习
学习一下
学习一下
学习一下,一直想看来着
学习一下
学习学习
学习,最近正好要用
1学习
嗯,一直想看来着。
不错
这些都是好东西
前来学习
学习了
学习
再次学习了,赞赞赞!
不错
一直关注贵博客,学习了
啊啊三生三世少时诵诗书是说撒是撒啊啊啊啊啊啊啊啊啊啊啊啊个GVvvvv变边唱咔咔咔咔咔咔
diaodiaodiao
不错
前来深造
前来深造
查看streaming反压机制,正好碰到这个问题。
已关注
学习学习
你的文章经常关注,很赞!
一直在支持
加了需要评论才能看的内容
对于小白来说很深奥 哦
自动垃圾平均识别怎么回事
什么意思?
感谢楼主
一直在支持
支持过往记忆大牛
好文,拜读一下。