在《Spark读取Hbase中的数据》文章中我介绍了如何在Spark中读取Hbase中的数据,并提供了Java和Scala两个版本的实现,本文将接着上文介绍如何通过Spark将计算好的数据存储到Hbase中。
Spark中内置提供了两个方法可以将数据写入到Hbase:(1)、saveAsHadoopDataset;(2)、saveAsNewAPIHadoopDataset,它们的官方介绍分别如下:
saveAsHadoopDataset
: Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system. The JobConf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.
saveAsNewAPIHadoopDataset
: Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop Configuration object for that storage system. The Conf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.
可以看出这两个API分别是针对mapred
和mapreduce
实现的,本文将提供这两个版本的实现实例代码。在编写代码之前我们先在pom.xml文件中引入一下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.98.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.98.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>0.98.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.98.2-hadoop2</version> </dependency>
saveAsHadoopDataset
package com.iteblog.bigdata.hbase import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} ///////////////////////////////////////////////////////////////////// User: 过往记忆 Date: 2016-11-29 Time: 22:59 bolg: https://www.iteblog.com 本文地址:https://www.iteblog.com/archives/1892 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共帐号:iteblog_hadoop ///////////////////////////////////////////////////////////////////// object SparkToHBase { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkToHBase <input file>") System.exit(1) } val conf = new SparkConf().setAppName("SparkToHBase") val sc = new SparkContext(conf) val input = sc.textFile(args(0)) //创建HBase配置 val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181") //创建JobConf,设置输出格式和表名 val jobConf = new JobConf(hConf, this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog") val data = input.map { item => val Array(key, value) = item.split("\t") val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value)) (new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsHadoopDataset(jobConf) sc.stop() } }
我们输入的数据格式是:
0015A49A8F2A60DACEE0160545C58F94 1234 0152C9666B5F3426DDDB5FB74BDBCE4F 4366 0160D90AC268AEB595208E8448C7F8B8 6577 0225A39EB29BDB582CA58BE86791ACBC 1234 02462ACDF7232C49890B07D63B50C5E1 4366 030730EBE05740C992840525E35BC8AD 7577 038A459BC05F3B655F5655C810E76352 7577 0417D3FD71458C4BAD1E5AFDE7259930 7577 042CD42B657C46D0D4E5CC69AFDD7E54 7577 051069378849ACF97BFAD09D3A9C7702 7577 05E833C9C763A98323E0328DA0A31039 7577 060E206514A24D944305D370F615F8E9 7577 087E8488796C29E1C8239565666CE2D7 7577 09A425F1DD240A7150ECEFAA0BFF25FA 7577 0B27E3CB5F3F32EB3715DB8E2D333BED 7577 0B27E82A4CEE73BBB98438DFB0DB2FFE 7577 0BAEEB7A12DCEF20EE26D7A030164DFF 7577 0C5BFC45F64907A61ECB1C892F98525C 7577 0C74F2FFD1BB3598BC8DB10C37DBA6B4 7577 0C9CEE40DDD961C7D2BBE0491FDF92A8 7577 0CC578371622F932287EB81065F81F5F 7577 0D6B03EFDAE7165A0F7CC79EABEAC0D3 7577 0DF7B014187A9AB2F1049781592CC053 7577 0E67D8ABDB3749D58207A7B45FEA7F12 7577 0E866677E79A7843E0EDCF2BE0141911 7577 0EAF4A69BA3BF05E8EA75CC1287304A3 7577 0EE2969AE674DF5F8944B5EA2E97DBEC 7577 0FAA253D53BC6D831CF6E742147C3BED 7577 0FB92AC3DE664BFF40D334DA8EE97B85 7577
第一列将作为HBase的Rowkey存储,第二列就是info的值。
saveAsNewAPIHadoopDataset
package com.iteblog.bigdata.hbase import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkContext, SparkConf} ///////////////////////////////////////////////////////////////////// User: 过往记忆 Date: 2016-11-29 Time: 22:59 bolg: https://www.iteblog.com 本文地址:https://www.iteblog.com/archives/1892 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共帐号:iteblog_hadoop ///////////////////////////////////////////////////////////////////// object SparkToHBaseNew { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkToHBaseNew <input file>") System.exit(1) } val conf = new SparkConf().setAppName("SparkToHBaseNew") val sc = new SparkContext(conf) val input = sc.textFile(args(0)) val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181") val jobConf = new JobConf(hConf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog") //设置job的输出格式 val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val data = input.map { item => val Array(key, value) = item.split("\t") val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value)) (new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }
这个方法和第一种几乎一样,大家可以根据自己的情况选择使用其中一个。不过上面将Spark中的数据写入到Hbase还是有点啰嗦,后面我将单独再介绍如何将RDD中的数据直接写入到hbase中,类似于saveToHbase
,欢迎大家关注。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【使用Spark读取HBase中的数据】(https://www.iteblog.com/archives/1892.html)
https://github.com/TopSpoofer/hbrdd 这个封装了