欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache Spark 2.4 正式发布,重要功能详细介绍

美国时间 2018年11月08日 正式发布了。一如既往,为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.4 带来了许多新功能,如下:

  • 添加一种支持屏障模式(barrier mode)的调度器,以便与基于MPI的程序更好地集成,例如, 分布式深度学习框架;
  • 引入了许多内置的高阶函数,以便更容易处理复杂的数据类型(比如数组和 map);
  • 开始支持 Scala 2.12;
  • 允许我们对 notebooks 中的 DataFrame 进行热切求值(eager evaluation),以便于调试和排除故障;
  • 引入新的内置 Avro 数据源。
Apache Spark 2.4 正式发布,重要功能详细介绍
如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

除了这些新功能外,该版本还重点关注可用性,稳定性和优化,解决了超过1000 个tickets。 Spark 贡献者的其他显着特征包括:

在这篇文章中,我们简要总结了一些更高级别的功能和改进。 有关 Spark 所有组件和 JIRA 已解决的主要功能的完整列表,请阅读 Apache Spark 2.4.0 release notes

Barrier Execution Mode

Barrier Execution Mode 是 Project Hydrogen 的一部分,这是 Apache Spark 的一项计划,旨在将最先进的大数据和 AI 技术结合在一起。它可以将来自 AI 框架的分布式训练作业正确地嵌入到 Spark 作业中。我们通常会像 All-Reduce 这样来探讨复杂通信模式(complex communication patterns),因此所有的任务都需要同时运行。这不符合 Spark 当前使用的 MapReduce 模式 。使用这种新的执行模式,Spark 同时启动所有训练任务(例如,MPI 任务),并在任务失败时重新启动所有任务。Spark 还为屏障(barrier tasks)任务引入了一种新的容错机制。当任何障碍任务在中间失败时,Spark 将中止所有任务并重新启动当前 stage。

内置高阶函数

Spark 2.4 之前,为了直接操作复杂类型(例如数组类型),有两种典型的解决方案:

  • 将嵌套结构展开为多行,并应用某些函数,然后再次创建结构;
  • 编写用户自定义函数(UDF)。

新的内置函数可以直接操作复杂类型,高阶函数可以使用匿名 lambda 函数直接操作复杂值,类似于UDF,但具有更好的性能。比如以下两个高阶函数:

SELECT array_distinct(array(1, 2, 3, null, 3));
["1","2","3",null]

SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));
["1","3"]

关于内置函数和高阶函数的进一步说明可以参见《Apache Spark 2.4 中解决复杂数据类型的内置函数和高阶函数介绍》《Apache Spark 2.4 新增内置函数和高阶函数使用介绍》

内置 Avro 数据源

Apache Avro 是一种流行的数据序列化格式。它广泛用于 Apache Spark 和 Apache Hadoop 生态系统,尤其适用于基于 Kafka 的数据管道。从 Apache Spark 2.4 版本开始,Spark 为读取和写入 Avro 数据提供内置支持。新的内置 spark-avro 模块最初来自 Databricks 的开源项目Avro Data Source for Apache Spark。除此之外,它还提供以下功能:

  • 新函数 from_avro()to_avro() 用于在 DataFrame 中读取和写入 Avro 数据,而不仅仅是文件。
  • 支持 Avro 逻辑类型(logical types),包括 Decimal,Timestamp 和 Date类型。Spark SQL 和 Avro 的数据类型之间的转换可以参见下面:
    Spark SQL typeAvro typeAvro logical type
    ByteTypeint
    ShortTypeint
    BinaryTypebytes
    DateTypeintdate
    TimestampTypelongtimestamp-micros
    DecimalTypefixeddecimal
  • 2倍读取吞吐量提高和10%写入吞吐量提升。

支持 Scala 2.12

从 Spark 2.4 开始,Spark 支持 Scala 2.12,并分别与 Scala 2.11 和 2.12 进行交叉构建,这两个版本都可以在 Maven 存储库和下载页面中使用。现在,用户可以使用 Scala 2.12 来编写 Spark 应用程序。

Scala 2.12 为 Java 8 带来了更好的互操作性,Java 8 提供了改进的 lambda 函数序列化。 它还包括用户期望的新功能和错误修复。

Pandas UDF 提升

Pandas UDF 是从 Spark 2.3 开始引入的。在此版本中,社区收集了用户的反馈,并不断改进 Pandas UDF。

除了错误修复之外,Spark 2.4 中还有2个新功能:

  • SPARK-22239 使用 Pandas UDF 来用户自定义窗口函数。
  • SPARK-22274 使用 Pandas UDF 来用户自定义聚合函数。

我们相信这些新功能将进一步改善 Pandas UDF 的使用,我们将在下一版本中不断改进Pandas UDF。

Image Data Source

社区从图像/视频/音频处理行业看到了更多案例。为这些提供 Spark 内置数据源简化了用户将数据导入 ML 训练的工作。在 Spark 2.3 版本中,图像数据源是通过ImageSchema.readImages 实现的。Spark 2.4 发行版中的 SPARK-22666 引入了一个新的 Spark 数据源,它可以作为 DataFrame 从目录中递归加载图像文件。现在加载图像非常简单:

df = spark.read.format("image").load("...")

Kubernetes 整合增强

Spark 2.4 包含许多 Kubernetes 集成的增强功能。主要包括这三点:

  • 首先,此版本支持在 Kubernetes 上运行容器化 PySpark 和 SparkR 应用程序。Spark 为 Dockerfiles 提供了 Python 和 R 绑定,供用户构建基本映像或自定义它以构建自定义映像。
  • 其次,提供了客户端模式。用户可以在 Kubernetes 集群中运行的 pod 里面运行交互式工具(例如,shell或 notebooks)。
  • 最后,支持挂载以下类型的 Kubernetes volumes :emptyDir,hostPath 和 persistentVolumeClaim。

灵活的 Streaming Sink

许多外部存储系统已经有批量连接器(batch connectors),但并非所有外部存储系统都有流式接收器(streaming sinks)。在此版本中,即使存储系统不支持将流式传输作为接收器(streaming as a sink)。streamingDF.writeStream.foreachBatch(...) 允许我们在每个微批次(microbatch)的输出中使用 batch data writers。例如,过往记忆告诉你可以使用 foreachBatch 中现有的 Apache Cassandra 连接器直接将流式查询的输出写入到 Cassandra。具体如下:

/**
 * User: 过往记忆
 * Date: 2018-11-10
 * Time: 10:24
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/2448
 * 过往记忆博客,专注于Hadoop、Spark、HBase 等大数据技术。
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

streamingDF.writeStream
.foreachBatch { (iteblogBatchDF: DataFrame, batchId: Long) =>
    iteblogBatchDF.write       // Use Cassandra batch data source to write streaming out
      .cassandraFormat(tableName, keyspace)
      .option("cluster", "iteblog_hadoop")
      .mode("append")
      .save()
  }

同样,你也可以使用它将每个微批输出(micro-batch output)应用于 streaming DataFrames 中,许多 DataFrame/Dataset 操作在 streaming DataFrames 是不支持的,具体使用如下:

streamingDF.writeStream.foreachBatch { (iteblogBatchDF: DataFrame, batchId: Long) =>
  iteblogBatchDF.cache()
  iteblogBatchDF.write.format(...).save(...)  // location 1
  iteblogBatchDF.write.format(...).save(...)  // location 2
  iteblogBatchDF.uncache()
}
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark 2.4 正式发布,重要功能详细介绍】(https://www.iteblog.com/archives/2448.html)
喜欢 (6)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!