我们都知道,目前 Apache Beam 仅仅提供了 Java 和 Python 两种语言的 API,尚不支持 Scala 相关的 API。基于此全球最大的流音乐服务商 Spotify 开发了 Scio ,其为 Apache Beam 和 Google Cloud Dataflow 提供了Scala API,使得我们可以直接使用 Scala 来编写 Beam 应用程序。Scio 开发受 Apache Spark 和 Scalding 的启发,目前最新版本是 Scio 0.3.0,0.3.0版本之前依赖于 Google Cloud Dataflow SDK,0.3.0及未来版本会直接依赖于 Apache Beam。Scio 目前使用 Apache License, Version 2.0 许可证发布,源代码在 https://github.com/spotify/scio。
主要功能
- Scala API 与 Spark 和 Scalding 的核心 API 非常类似
- 统一 batch 和 streaming 编程模型
- 与 Google Cloud 产品集成,包括:云存储,BigQuery,Pub/Sub,Datastore,Bigtable
- 支持 HDFS、JDBC、TensorFlow TFRecords、Cassandra 以及 Elasticsearch I/O
- 使用 Scio REPL 支持交互模式
- 可以与Algebird和Breeze整合
- 分布式缓存
- Pipeline orchestration with Scala Futures
使用
前面说了 Scio 开发受 Apache Spark 和 Scalding 的启发,所以如果我们使用 Scio API 来编写一个 WordCount 程序看起来和使用 Spark 来编写很类似。首先我们需要引入相关依赖:
libraryDependencies ++= Seq( "com.spotify" % "scio-core_2.11" % "0.3.4", "com.spotify" % "scio-test_2.11" % "0.3.4" % "test" )
或
libraryDependencies ++= Seq( "com.spotify" %% "scio-core" % "0.3.4", "com.spotify" %% "scio-test" % "0.3.4" % "test" )
或
<dependency> <groupId>com.spotify</groupId> <artifactId>scio-core_2.11</artifactId> <version>0.3.4</version> </dependency> <dependency> <groupId>com.spotify</groupId> <artifactId>scio-test_2.11</artifactId> <version>0.3.4</version> <scope>test</scope> </dependency>
然后我们的 Scio API 版的 WordCount 可以这样来编写:
package com.iteblog import com.spotify.scio._ import com.spotify.scio.accumulators._ import com.spotify.scio.examples.common.ExampleData object WordCount { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) val input = args.getOrElse("input", ExampleData.KING_LEAR) val output = args("output") // initialize accumulators val max = sc.maxAccumulator[Int]("maxLineLength") val min = sc.minAccumulator[Int]("minLineLength") val sumNonEmpty = sc.sumAccumulator[Long]("nonEmptyLines") val sumEmpty = sc.sumAccumulator[Long]("emptyLines") sc.textFile(input) .map(_.trim) .accumulateBy(max, min)(_.length) .accumulateCountFilter(sumNonEmpty, sumEmpty)(_.nonEmpty) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue .map(t => t._1 + ": " + t._2) .saveAsTextFile(output) val result = sc.close().waitUntilFinish() // scalastyle:off regex // retrieve accumulator values println("Max: " + result.accumulatorTotalValue(max)) println("Min: " + result.accumulatorTotalValue(min)) println("Sum non-empty: " + result.accumulatorTotalValue(sumNonEmpty)) println("Sum empty: " + result.accumulatorTotalValue(sumEmpty)) // scalastyle:on regex } }
编写玩之后,我们可以
iteblog@www.iteblog.com scio $ sbt [info] ... > project scio-examples [info] ... > runMain com.iteblog.WordCount --input=<FILE PATTERN> --output=<DIRECTORY> --project=[PROJECT] --runner=DataflowRunner --zone=[ZONE]
注意:和我们之前见到的不一样,--input
参数匹配的文件必须写到文件那层,也就是需要使用 gs//bucket/path/part-*.txt
而不是gs://bucket/path
;
如果 --runner
没指定,默认的是 DirectRunner
。
更多关于 Scio 的使用,请参见官方文档 https://github.com/spotify/scio/wiki。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Scio:Apache Beam和Google Cloud Dataflow的Scala API】(https://www.iteblog.com/archives/2196.html)