在这篇文章中,我将介绍如何在Spark中使用Akka-http并结合Cassandra实现REST服务,在这个系统中Cassandra用于数据的存储。
我们已经见识到Spark的威力,如果和Cassandra正确地结合可以实现更强大的系统。我们先创建一个build.sbt
文件,内容如下:
name := "cassandra-spark-akka-http-starter-kit" version := "1.0" scalaVersion := "2.11.8" organization := "com.iteblog" val akkaV = "2.4.5" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.0", "org.apache.spark" % "spark-sql_2.11" % "2.0.0", "com.typesafe.akka" %% "akka-http-core" % akkaV, "com.typesafe.akka" %% "akka-http-experimental" % akkaV, "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test", "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV, "org.scalatest" %% "scalatest" % "2.2.6" % "test", "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3", "net.liftweb" % "lift-json_2.11" % "2.6.2" ) assembleArtifact in assemblyPackageScala := false assemblyMergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) } fork in run := true
上面我们把 assembleArtifact in assemblyPackageScala
设置为false,因为Spark已经包含了Scala library,所以我们不需要再包含了。
样本类User定义
User累仅仅包含id、名字以及Email等信息,定义如下:
package com.iteblog.domain case class User(id: String, name: String, email: String)
数据访问层
下面代码片段是数据访问层的实现:
package com.iteblog.factories import com.iteblog.domain.User import com.typesafe.config.ConfigFactory import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import com.datastax.spark.connector._ import scala.util.Try trait DatabaseAccess { import Context._ def create(user: User): Boolean = Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption } object DatabaseAccess extends DatabaseAccess object Context { val config = ConfigFactory.load() val url = config.getString("cassandra.url") val sparkConf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-rest-example").setMaster("local[4]") .set("spark.cassandra.connection.host", url) val spark = SparkSession.builder().config(sparkConf).getOrCreate() val sc = spark.sparkContext val keyspace = config.getString("cassandra.keyspace") val tableName = config.getString("cassandra.tableName") }
服务层
下面是路由文件的实现代码:
package com.iteblog.routes import java.util.UUID import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{ExceptionHandler, Route} import akka.stream.ActorMaterializer import com.iteblog.domain.User import com.iteblog.factories.DatabaseAccess import net.liftweb.json._ import java.util.Date import net.liftweb.json.Extraction._ trait SparkService extends DatabaseAccess { implicit val system:ActorSystem implicit val materializer:ActorMaterializer val logger = Logging(system, getClass) implicit def myExceptionHandler = ExceptionHandler { case e: ArithmeticException => extractUri { uri => complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong")) } } implicit val formats: Formats = new DefaultFormats { outer => override val typeHintFieldName = "type" override val typeHints = ShortTypeHints(List(classOf[String], classOf[Date])) } val sparkRoutes: Route = { get { path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) => complete { val documentId = "user::" + UUID.randomUUID().toString try { val user = User(documentId,name,email) val isPersisted = create(user) if (isPersisted) { HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId") } else { HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId") } } catch { case ex: Throwable => logger.error(ex, ex.getMessage) HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId") } } } } ~ path("retrieve" / "id" / Segment) { (listOfIds: String) => get { complete { try { val idAsRDD: Option[Array[User]] = retrieve(listOfIds) idAsRDD match { case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x))))) case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong") } } catch { case ex: Throwable => logger.error(ex, ex.getMessage) HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds") } } } } } }
服务启动
现在我们需要编写一个用于启动服务的类,其主要目的是启动一个HTTP服务,这样可以供用户调用,如下:
package com.iteblog import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import com.iteblog.routes.SparkService import com.iteblog.factories.Context class StartSparkServer(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer) extends SparkService { def startServer(address: String, port: Int) = { Http().bindAndHandle(sparkRoutes, address, port) } } object StartApplication extends App { StartApp } object StartApp { implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service") implicit val executor = system.dispatcher implicit val materializer = ActorMaterializer() val server = new StartSparkServer() val config = Context.config val serverUrl = config.getString("http.interface") val port = config.getInt("http.port") server.startServer(serverUrl, port) }本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【使用Cassandra和Spark 2.0实现Rest API服务】(https://www.iteblog.com/archives/1839.html)
你好,请问这种spark作为服务端来调用的方式,算是最佳使用方式吗?