文章目录
导读
唯品会离线平台SPARK2.3.2无缝升级到SPARK3.0.1版本,完全做到了对用户透明,目前正按着既定方案进行升级,新的版本SPARK CORE/SQL/PySpark进行了优化和BugFix,并且Merge了SPARK vip 2.3.2 重要Patch,在性能和易用性上比旧版本都有较大提升。这篇文章介绍了我们升级SPARK过程中遇到的挑战和思考,希望能给大家带来启发。
Spark应用现状
本次版本升级之前,唯品会大数据平台使用的主要版本为SPARK2.3.2,并且在社区版本上做了增强和BugFix,为用户提供SparkSQL/SparkJar/PySpark/SparkStream/Spark ML。现在集群有2300物理机,每天有1.2w SPARK 定时作业,8w个实例 在YARN上运行,Spark Adhoc日查询次数9000。
SPARK在公司推荐使用效果不错,作为批处理默认引擎。从下面图反映SPARK在唯品会大数据应用有半壁江山。
SPARK3.0特性和升级背景
Spark SQL是3.0.1中最侧重组件。已解决ISSUE 46%用于Spark SQL。使基于SQL高级别的库(包括结构化流和MLlib)和更高级别的API包括SQL和DataFrames)受益。这也迎合我们现在的主要场景(我们现在90%是SQL),同时也是优化痛点和主要功能点。
PySpark现在是Spark非常活跃的模块。此版本改善了功能和可用性,包括使用Python类型提示重新设计了Pandas UDF API,新的Pandas UDF类型以及更多的Python错误处理。这为我们新的战场比如推荐,特征工程,下一代AI平台等项目提供更好支持。
Spark 3.0中的功能亮点:自适应查询执行;动态分区修剪;符合ANSI SQL;Pandas API的重大改进;用于结构化流的新UI;加速器感知调度器;增强测试和SQL参考文档。达到了增效降本的目的。
# | 目标 | 现状 | 痛点 | 解决方案(3.0特性支持) |
背景1 | 智能优化 性能提升 | ▲自动合并小文件 ▲仅仅自动调整shuffle partition数 ▲主要基于规则优化 ▲优化靠资源堆砌 ▲主要人工分析: 1.倾斜分析靠人工和加盐处理;2.Broadcast Join靠经验判断;3.读写和混洗并发靠人为分析指定 | ▲手工调参 ▲被动优化 ▲只能基于规则优化 ▲靠资源堆砌 ▲集群资源有限 ▲有些场景加了资源效果不佳(倾斜) | ▲自适应查询执行:1.shuffle partition自动调整;2.动态查询重用;3.本地shuffle读;4.join倾斜自动优化;5.连续读shuffle优化 ▲动态分区优化 ▲其他规则优化 ▲最小化表缓存同步成本 ▲将聚合代码拆分为小函数 ▲在INSERT和ALTER TABLE Add PARTITION中添加批处理 ▲允许聚合器注册为UDAF |
背景2 | 基础依赖升级 | ▲目前线上HIVE为2.3.4 ▲SPARK2.x依赖HIVE为1.2.1 ▲spark2.x依赖Hadoop2.6 ▲Hadoop线上为3.2 | ▲HIVE和SPARK函数和语法兼容性问题 ▲线上场景多,差异突显就更多 ▲Spark2.x升级Hadoop3.x费时费力 ▲Spark2.x和Hadoop有第三方依赖冲突 | ▲SPARK3.0依赖2.3.7 HIVE ▲升级后线上SPARK依赖HIVE和HIVE基准都为2.3 ▲Hadoop 3 support ▲Java 11 support ▲GA Scala 2.12 and remove 2.11 |
背景3 | 数据湖 | ▲Spark2.x通过API操作delta lake ▲实时增量入仓(替换离线抽取装载) | ▲缺失SQL on delta lake ▲易用性差 ▲Merge性能低 | ▲数据湖SQL支持 ▲SPARK3.0性能加强给delta lake带来性能提升 |
背景4 | 数据源增强 | ▲Orc默认存储格式 ▲SPARK2.X支持Orc/Parquet/sequence/text/csv/jdbc/json | ▲列裁剪不确定表达式失效 ▲嵌套子字段下推失效 ▲CSV下推失效 | ▲parquet/ ORC: 1.析取谓词的下推;2.通用化嵌套列修剪;3.嵌套字段的parquet谓词下推(仅限parquet);4.支持ORC的合并模式(仅限ORC)5.;ORC的嵌套模式修剪(仅限ORC);6.减少ORC的谓词转换复杂度(仅限ORC) ▲Kafka: 1.增加对Kafka报头的支持;2.在Kafka源代码中引入新的选项:3.时间戳偏移(开始/结束);4.支持Kafka批处理源和流处理源v1中的minPartitions选项;5.升级Kafka到2.4.1 |
背景5 | pyspark增强 | ▲PySpark2.x UDF多接口实现 ▲PySpark2.x错误提示词不达意 | ▲pyspark应用越来越广但分析维护难 | ▲重新设计的带有类型提示的pandas udf ▲允许Pandas UDF接受pd.DataFrames的迭代器 ▲支持StructType作为标量pandas UDF的参数和返回类型 ▲通过Pandas udf支持dataframe Cogroup ▲添加mapInPandas来允许数据帧的迭代器 ▲使PySpark SQL异常更加python化 |
背景6 | spark on cloud | ▲Stream on k8s ▲ML on k8s | ▲Spark on k8s功能不完善 ▲driver和executor的日志收集困难 ▲缺乏Spark SQL on k8s ▲缺乏STS on k8s | ▲支持用户指定的driver和executor pod模板 ▲允许没有外部shuffle服务的动态分配 ▲使用k8s进行响应更灵敏的动态分配 ▲与Hadoop兼容的文件系统支持客户端依赖关系 ▲支持Kubernetes的子路径安装 |
背景7 | 功能增强 | ▲只支持broadcast join Hints ▲只支持API REPARTITIONS Hints by column ▲Catalyst只支持SELECT/INSERT | ▲SQL支持Hints不全 ▲Catalyst不支持DML ▲已有些函数性能差 | ▲引入完整Hints join语法 ▲SQL支持REPARTITION BY hint ▲Thrift Server中元数据处理 ▲在Catalyst支持删除/更新/合并操作符 ▲新的35个内建函数 ▲对现有内置函数的改进 |
背景8 | 监控/调试增强 | ▲SHS日志大且杂乱 ▲SHS加载日志性能差 ▲metric信息不全 ▲UI信息不多 | ▲metric信息不够 ▲分析问题困难 ▲测试手段不多 | ▲新的结构化流UI ▲SHS:允许滚动流应用程序的事件日志 ▲SQL exchange操作符添加shuffle度量 ▲提高历史服务器的并发性能 ▲解释格式化命令 ▲改进SQL解析器的错误消息 ▲向度量系统添加执行器度量和内存使用测量 |
升级Vip SPARK3.0
主要升级节点(如下图)
合并HIVE VIP1.2.1 patch到2.3.7
# | Patch描述 |
1 | 修复Spark3.0能删非默认hdfs namespace表的目录 |
2 | Spark3.0加载vip数据平台函数 |
3 | 屏蔽Hive audit log日志 |
4 | 解决函数冲突问题 |
合并SPARK VIP2.x patch到3.0.1
# | Patch描述 |
1 | 小文件合并 |
2 | shuffle文件准实时清理,防止nodemanager磁盘压力大 |
3 | Thrift server 删临时目录 bugfix |
4 | Thrift server支持代理用户 |
5 | 支持Hive权限检验 |
6 | Thrift server 超时杀任务 |
7 | 支持bucket 超集 |
8 | 支持Spark bucket(datasource)读写同一目录 |
9 | 支持tf/es/kudu/alluxio/delta到2.1 |
灰度上线spark3.0和bugfix
线上作业灰度计划
*注解p0/p1/p2/p3 为作业重要度级别,以此递减(下图为灰度作业流程)
bugfix和兼容线上2.3.2
灰度线上作业发现BUG不少,但BUG分优先级,优先级高先处理。
灰度线上作业和不断bugfix是一个迭代过程,不断完善和修复因为SPARK3.0上线导致线上问题。
# | bug/兼容性问题 | 解决方案 |
1 | 分号在 -- 注释后面bug,导致SQL执行失败 | 在spark sql语句 每个分号加上换行符 |
2 | 数据存储时,不对数据类型和列类型进行校验 | spark.sql.storeAssignmentPolicy默认值改为LEGACY |
3 | stage 和 推测task 映射关系不存在报错bug | 先判断映射关系存在,再从map去掉结束task和stage映射关系 |
4 | java.text.SimpleDataFormat用于以区域设置敏感的方式格式化和解析日期/时间戳,默认值是EXCEPTION,当我们得到不同的结果时抛出RuntimeException | set spark.sql.legacy.timeParserPolicy默认值改为LEGACY |
5 | 执行SQL文件,中间某段SQL报错,不会马上退出整个执行 | SQL顺序执行, 中间有结果状态不为0,直接将状态返回给driver直接退出 |
6 | stage retry多个active taskSetManager在运行 | stage retry,设置历史taskSetManager为Zombie状态 |
7 | 静态插入 UI,显示动态分区个数不为0 ,只影响ui显示,不影响执行结果 | |
8 | Spark ui 作业结束,但是 history sql 还有显示running job,不影响执行结果 |
挑战和应对
内部 patch 如何兼容
挑战:Spark2.x 自研功能比如: 小文件合并;支持权限检验;STS 超时杀任务;Shuffle 文件动态清理等都是是基于2.x打的patch,但是3.0相比2.x整个核心变化比较大, merge到3.0难度增大,测试和灰度工作量增加。
应对:有些patch基于3.0是重新开发一次。
SQL 语法兼容
挑战:历史包袱, 线上用户SQL掌握熟练程度不一样,不规范的SQL大有存在,2.x对语法没有那么强制,但3.0默认相对2.x严格一点,这样对灰度线上作业阻力巨大,对保障SLA有冲击。比如: 表不存在删表;数据插入数据列类型和元数据字段是否检验;敏感日期格式化;SQL文件设置非SQL参数报错等。
应对:大部分是兼容2.x版本,少数修改作业兼容3.0.
新版本BUG不少
挑战:灰度线上作业出现十几个bug,比如: 注释后面分号无法识别;stage和推测task对应关系找不到;执行SQL文件,中间某段SQL报错,不会立刻退出整个执行,导致数据质量问题;metric 在application结束之后没有complete等。
应对:及时快速Bugfix。
线上作业灰度工作量大
挑战:线上作业类型(批处理、流处理、机器学习、adhoc、pyspark和sparkjar)多,作业量大(线上有1.2w个作业),重要业务,核心全链路必须保障SLA 等情况下, 灰度作业压力不小。而且出现新的bug, 压力就倍增。
应对:必须停止灰度,快速BugFix再灰度。
总结
平滑升级,SPARK组件每次升级都需要做到尽量对用户毫无感知(后台修改spark_version切换),升级,灰度,回滚,监控都是平台自动统一处理;
升级充分准备,对新版本的憧憬和敬畏都不可或缺,升级之前要对社区新版本的feature和BUG要有足够的认知,甚至对升级版本和最新版本之间的issue也要有足够的了解,因为后面版本issue很大一部分是对前面的版本进行优化和bugfix,这样才能做到心中有数,比如3.1已经修复不少3.0bug;
升级策略,社区重大版本一般与线上版本核心架构差异很大,比如3.0加入自适应导致spark sql模块和shuffle service服务发生重大变化,直接在2.3.2基础上 merge 3.0难度较大,风险也高,建议3.x基础之上merge 2.3.2的patch进行升级;
真理来源于实践,从2.1.1升级到2.3.2为本次2.3.2升级3.0积累不少的经验和套路,为本次升级少走弯路和保障SLA;
吸取社区,本次升级完3.0不是终点,Spark社区非常活跃,3.1已经出来了,中间有很多非常好的issue, 都需要merge vip SPARK 3.0, SPARK优化永无止境;
回馈社区,近期打算把VIP SPARK main Patch 比如: 小文件合并;支持权限检验;STS 超时杀任务;Shuffle 文件动态清理 等提交社区。
本文原文:唯品会 Apache Spark 3.0 升级之路
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【唯品会 Apache Spark 3.0 升级之路】(https://www.iteblog.com/archives/9939.html)