文章目录
CarbonData是由华为开发、开源并支持Apache Hadoop的列式存储文件格式,支持索引、压缩以及解编码等,其目的是为了实现同一份数据达到多种需求,而且能够实现更快的交互查询。目前该项目正处于Apache孵化过程中。详情参见《CarbonData:华为开发并支持Hadoop的列式文件格式》,本文是单机模式下使用CarbonData的,如果你需要集群模式下使用请参见《Apache CarbonData集群模式使用指南》
编译CarbonData
编译CarbonData之前,我们必须确保好实现安装好了thrift(如何安装看这里:CentOS上编译安装Apache Thrift),否则会出现以下的异常:
[ERROR] thrift failed output: [ERROR] thrift failed error: /bin/sh: thrift: command not found [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] carbondata ......................................... SUCCESS [01:59 min] [INFO] carbon-common ...................................... SUCCESS [ 41.227 s] [INFO] carbon-format ...................................... FAILURE [ 8.875 s] [INFO] carbon-core ........................................ SKIPPED [INFO] carbon-processing .................................. SKIPPED [INFO] carbon-hadoop ...................................... SKIPPED [INFO] carbon-spark ....................................... SKIPPED [INFO] carbon-assembly .................................... SKIPPED [INFO] carbon-examples .................................... SKIPPED [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 02:50 min [INFO] Finished at: 2016-06-30T17:30:43+08:00 [INFO] Final Memory: 25M/224M [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11: compile (generate-thrift-java) on project carbon-format: thrift did not exit cleanly. Review output for more information. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging.
默认情况下CarbonData是使用Spark 1.5.2,Hadoop 2.2.0的,最新版的CarbonData支持Spark 1.6.1,这里以Spark 1.6.1版本为例进行介绍。我们依次进行如下操作:
$ git clone https://github.com/apache/incubator-carbondata.git carbondata $ cd carbondata $ mvn -Pspark-1.6.1 clean install -DskipTests $ cp assembly/target/scala-2.10/carbondata_*.jar ${SPARK_HOME}/lib $ mkdir ${SPARK_HOME}/carbondata $ cp -r processing/carbonplugins ${SPARK_HOME}/carbondata
到这里我们已经编译好CarbonData了,而且把编译好的相关lib包添加到${SPARK_HOME}/lib
中。然后我们就可以在Spark中(不可以直接在Hive中使用吗?)使用CarbonData了。
启动Spark shell
$ cd ${SPARK_HOME} $ carbondata_jar=./lib/$(ls -1 lib |grep "^carbondata_.*\.jar$") $ mysql_jar=./lib/$(ls -1 lib |grep "^mysql.*\.jar$") $ ./bin/spark-shell --master local --jars ${carbondata_jar},${mysql_jar}
创建CarbonContext实例
/** * User: 过往记忆 * Date: 2016年7月01日 * Time: 下午21:16 * bolg: * 本文地址:/archives/1698 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ import org.apache.spark.sql.CarbonContext import java.io.File import org.apache.hadoop.hive.conf.HiveConf val storePath = "/user/iteblog/store/" val cc = new CarbonContext(sc, storePath) cc.setConf("carbon.kettle.home","./carbondata/carbonplugins") cc.setConf("hive.metastore.warehouse.dir", "/user/iteblog/metadata/") cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")
CarbonContext的创建接收两个参数,SparkContext
以及storePath
。storePath
参数用于指定创建好的表数据存放的目录,此目录可以是本地或者HDFS上的目录。
创建表
现在我们已经创建好CarbonContext实例了,可以使用它创建表:
cc.sql("create table if not exists iteblog (id string, hash string) STORED BY 'org.apache.carbondata.format'")
运行完上面语句之后,我们可以看到在Hive的default库里面有了一个表,我们来看看他的建表语句:
hive> show create table iteblog; OK CREATE EXTERNAL TABLE `iteblog`( `col` array<string> COMMENT 'from deserializer') ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe' WITH SERDEPROPERTIES ( 'tableName'='default.iteblog', 'tablePath'='hdfs:///user/iteblog/store/default/iteblog') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat' LOCATION 'hdfs://itebloghadoop/user/iteblog/hive/warehouse/iteblog' TBLPROPERTIES ( 'COLUMN_STATS_ACCURATE'='false', 'numFiles'='0', 'numRows'='-1', 'rawDataSize'='-1', 'spark.sql.sources.provider'='org.apache.spark.sql.CarbonSource', 'totalSize'='0', 'transient_lastDdlTime'='1467361064') Time taken: 0.05 seconds, Fetched: 21 row(s)
不科学啊,怎么只有col的字段呢?我们明明是创建了两个字段id和hash,这两个字段哪里去了?别急,上面不是说了创建CarbonContext实例的时候需要指定storePath参数吗?这里面就存放了刚刚创建好表的元数据信息:
drwxr-xr-x - iteblog iteblog 0 2016-07-01 21:00 /user/iteblog/store/default drwxr-xr-x - iteblog iteblog 0 2016-07-01 21:00 /user/iteblog/store/default/iteblog drwxr-xr-x - iteblog iteblog 0 2016-07-01 21:00 /user/iteblog/store/default/iteblog/Metadata -rw-r--r-- 3 iteblog iteblog 597 2016-07-01 21:00 /user/iteblog/store/default/iteblog/Metadata/schema
表的元信息是存放在/user/iteblog/store/default/iteblog/Metadata/schema
文件里面的。
加载数据
好了,表创建好了让我们来load点数据进去吧,我准备好了类似于以下的数据(名称为iteblog):
1802202095 -9223347229018688133 1805433788 -9223224306642795473 1807808238 -9223191974382569971 1803505412 -9222950928798855459 1803603535 -9222783416682807621 1808506900 -9222758602401798041 1805531330 -9222636742915245241 1807853373 -9222324670859328253
第一列对应id字段;第二列对应hash字段;他们之间是使用tab分割的。来把数据load到iteblog里面去:
cc.sql(s"load data inpath '/tmp/iteblog' into table iteblog") org.carbondata.processing.etl.DataLoadingException: please check your input path and make sure that files end with '.csv' and content is not empty.
遗憾的是,居然说文件名不是.csv后缀的。所以CarbonData需要你加载进去的文件名后缀为.csv(这个为什么?)。我们把文件名修改成iteblog.csv,然后再load:
cc.sql(s"load data inpath '/tmp/iteblog.csv' into table iteblog") org.carbondata.processing.etl.DataLoadingException: CSV File provided is not proper. Column names in schema and csv header are not same. CSVFile Name : iteblog.csv at org.carbondata.processing.csvload.DataGraphExecuter.validateCSV(DataGraphExecuter.java:147) at org.carbondata.processing.csvload.DataGraphExecuter.validateCSVFiles(DataGraphExecuter.java:552) at org.carbondata.processing.csvload.DataGraphExecuter.executeGraph(DataGraphExecuter.java:166) at org.carbondata.spark.load.CarbonLoaderUtil.executeGraph(CarbonLoaderUtil.java:189) at org.carbondata.spark.rdd.CarbonDataLoadRDD$$anon$1.<init>(CarbonDataLoadRDD.scala:189) at org.carbondata.spark.rdd.CarbonDataLoadRDD.compute(CarbonDataLoadRDD.scala:148) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
居然又出错了!!!意思大概就是你文件里面没有指定csv头信息。这里有两种方法来解决这个问题。
1、直接在数据文件里面加上
id hash 1802202095 -9223347229018688133 1805433788 -9223224306642795473 1807808238 -9223191974382569971 1803505412 -9222950928798855459 1803603535 -9222783416682807621 1808506900 -9222758602401798041 1805531330 -9222636742915245241 1807853373 -9222324670859328253
2、我们在load语句里面加上header信息
/** * User: 过往记忆 * Date: 2016年7月01日 * Time: 下午21:16 * bolg: * 本文地址:/archives/1698 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ cc.sql(s"load data local inpath '/tmp/iteblog.csv' into table iteblog options('FILEHEADER'='id,hash')")
我们再运行,发现还是出错!这是因为CarbonData加载文件的时候字段之间的分隔符默认是英文逗号(,),而我们这里字段字段之间是制表符分割的,当然不行了,我们可以在load的时候加上DELIMITER属性,如下:
cc.sql(s"load data local inpath '/tmp/iteblog.csv' into table iteblog options('DELIMITER'='\t', 'FILEHEADER'='id,hash')")
这次终于没问题了!运行完上面的加载命令之后,我们可以看到/user/iteblog/store目录下产生了如下的数据文件:
drwxr-xr-x /user/iteblog/store/default/iteblog drwxr-xr-x /user/iteblog/store/default/iteblog/Fact drwxr-xr-x /user/iteblog/store/default/iteblog/Fact/Part0 drwxr-xr-x /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0 -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/0-1467362068000.carbonindex -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/1-1467362068000.carbonindex -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/2-1467362068000.carbonindex -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/3-1467362068000.carbonindex -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/4-1467362068000.carbonindex -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-0-1467362068000.carbondata -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-1-1467362068000.carbondata -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-2-1467362068000.carbondata -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-3-1467362068000.carbondata -rw-r--r-- /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-4-1467362068000.carbondata
其中*.carbonindex
文件是CarbonData文件的索引;而*.carbondata
才是真正的数据,当然里面的数据肯定已经编码好了。
load data local inpath 'xxx' into table;
语法,但是其含义和load data inpath 'xxx' into table;
一致,其目的是为了和Hive语法相兼容。查数
数据已经加载进iteblog表里面了,现在我们可以查询里面的数据了,如下:
scala> cc.sql("select * from iteblog").show +----------+--------------------+ | id| hash| +----------+--------------------+ |1761060630| 1507780651275746626| |1777010203|-6420079594732250962| |1777080884|-3720484624594970761| |1777080885| 6953598621328551083| |1794379845| 4443483533807209950| |1794419628|-3898139641996026768| |1794522657| 5721419051907524948| |1796358316|-3848539843796297096| |1796361951| 2673643446784761880| |1796363022| 7081835511530066760| |1797689090| 7687516489507000693| |1798032763| 8729543868018607114| |1798032765|-2073004072970288002| |1798933651| 4359618602910211713| |1799173523| 3862252443910602052| |1799555536|-2751011281327328990| |1799569121| 1024477364392595538| |1799608637| 4403346642796222437| |1799745227|-2719506638749624471| |1799859723| 5552725300397463239| +----------+--------------------+ only showing top 20 rows scala> cc.sql("select count(*) from iteblog").show +-------+ | _c0| +-------+ |7230338| +-------+ scala> cc.sql("select count(distinct id) from iteblog").show +-------+ | _c0| +-------+ |6031231| +-------+
更多高级的功能请参见Carbondata官方文档:https://github.com/apache/incubator-carbondata/blob/master/docs/Quick-Start.md
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache CarbonData快速入门编程指南】(https://www.iteblog.com/archives/1698.html)
博主 我按照你上面的步骤操作最后的dataFrame里是空的 也不报错 本地路径下也有文件 里面也有内容 请问是什么原因?
以下是我操作的代码
val path="/home/wg/store"
val cc = new CarbonContext(sc,path)
cc.setConf("carbon.kettle.home","./carbondata/carbonplugins")
cc.setConf("hive.metastore.warehouse.dir", "/home/hive/metadata/")
cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")
cc.sql("create table if not exists iteblog1 (id string, hash string) STORED BY 'org.apache.carbondata.format'")
val df=cc.sql(s"load data inpath '/home/wg/data.csv' into table iteblog1 options('DELIMITER'=',', 'FILEHEADER'='id,hash')")
hive下的表路径
'tableName'='default.iteblog1',
'tablePath'='/home/wg/store/default/iteblog1
你执行load data inpath这句的时候在/home/wg/store/defalut/iteblog1目录下看到有数据吗?
有的 生成了一个 0-1475113051000.carbonindex和一个part-0-0-1475113051000.carbondata文件 大小几KB