本博客的《Spark与Mysql(JdbcRDD)整合开发》和《Spark RDD写入RMDB(Mysql)方法二》文章中介绍了如何通过Spark读写Mysql中的数据。
在生产环境下,很多公司都会使用PostgreSQL数据库,这篇文章将介绍如何通过Spark获取PostgreSQL中的数据。我将使用Spark 1.3中的DataFrame(也就是之前的SchemaRDD),我们可以通过SQLContext加载数据库中的数据,并转成DataFrame,我们可以使用SQLContext的load方法:
def load(source: String, options: Map[String, String]): DataFrame = { read.options(options).format(source).load() }
其中,options可以传入的参数包括:url、dbtable、driver、partitionColumn、lowerBound、upperBound与numPartitions。
不过在Spark 1.4版本中,这个方法已经被标记为
deprecated
,我们得调用read.format(source).options(options).load()
来替代。那么我们的代码可以这么写:
/** * User: 过往记忆 * Date: 2015-05-23 * Time: 下午23:23 * bolg: * 本文地址:/archives/1369 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ object IteblogApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("IteblogpostgreSQL") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val url = "jdbc:postgresql://www.iteblog.com:1234/test?user=iteblog&password=123456" val testDataFrame= sqlContext.load("jdbc", Map( "url" -> url, "driver" -> "org.postgresql.Driver", "dbtable" -> "SELECT * FROM iteblog" )) testDataFrame.foreach(println) } }
在上面的使用中,我们是直接将SQL语句直接传入到dbtable
中,但是很多情况下这还不符合我们的需求,不过,我们还可以通过调用registerTempTable()
方法来注册临时表,并调用sql()方法执行查询:
val testDataFrame= sqlContext.load("jdbc", Map( "url" -> url, "driver" -> "org.postgresql.Driver", "dbtable" -> "iteblog" )) testDataFrame.registerTempTable("iteblog") sqlContext.sql("select * from iteblog").foreach(println)
最后,如果你使用的是SBT来管理项目,那么你需要在你的build.sbt
文件中添加相关的依赖,如下:
libraryDependencies ++= { "org.apache.spark" % "spark-core_2.10" % "1.3.1", "org.apache.spark" % "spark-sql_2.10" % "1.3.1", "org.postgresql" % "postgresql" % "9.4-1201-jdbc41" }
如果你使用的是Maven,请在你的pom.xml
文件里面加入以下依赖:
<dependencys> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>9.4-1201-jdbc41</version> </dependency> </dependencys>本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark SQL整合PostgreSQL】(https://www.iteblog.com/archives/1369.html)