我们在《Apache CarbonData快速入门编程指南》文章中介绍了如何快速使用Apache CarbonData,为了简单起见,我们展示了如何在单机模式下使用Apache CarbonData。但是生产环境下一般都是使用集群模式,本文主要介绍如何在集群模式下使用Apache CarbonData。
启动Spark shell
这里以Spark shell模式进行介绍,master为yarn-client,启动Spark shell如下:
[iteblog@www.iteblog.com ~]$ cd ${SPARK_HOME} [iteblog@www.iteblog.com ~]$ carbondata_jar=./lib/$(ls -1 lib |grep "^carbondata_.*\.jar$") [iteblog@www.iteblog.com ~]$ mysql_jar=./lib/$(ls -1 lib |grep "^mysql.*\.jar$") [iteblog@www.iteblog.com ~]$ ./bin/spark-shell --master yarn-client \ --jars ${carbondata_jar},${mysql_jar} \ --num-executors 2 \ --executor-cores 1 \ --executor-memory 5G \ --queue iteblog
上面命令将会以Client模式启动shell。
创建CarbonContext实例
启动完Spark Shell之后,接下来就是来初始化CarbonContext实例了,这个和《Apache CarbonData快速入门编程指南》里面类似:
/** * User: 过往记忆 * Date: 2016年07月07日 * Time: 下午20:49 * bolg: * 本文地址:/archives/1703 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ import org.apache.spark.sql.CarbonContext import org.apache.hadoop.hive.conf.HiveConf val storePath = "/user/iteblog/store/" val cc = new CarbonContext(sc, storePath) cc.setConf("carbon.kettle.home","./carbondata/carbonplugins") cc.setConf("hive.metastore.warehouse.dir", "/user/iteblog/metadata/") cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")
创建表
现在我们已经创建好CarbonContext
实例了,可以使用它创建表:
cc.sql("create table if not exists iteblog (id string, hash string) STORED BY 'org.apache.carbondata.format'")
加载数据
/** * User: 过往记忆 * Date: 2016年07月07日 * Time: 下午20:49 * bolg: * 本文地址:/archives/1703 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ cc.sql(s"load data inpath 'hdfs:///tmp/iteblog.csv' into table iteblog options('DELIMITER'='\t')")
我们发现数据加载出错了,可以到其中一个节点的stderr日志里面看到如下的异常信息:
16/07/07 20:38:18 ERROR graphgenerator.GraphGenerator: [Executor task launch worker-0][partitionID:default_iteblog_ace3f131-836f-4b27-b198-f636fbc4e53b] org.pentaho.di.core.exception.KettleException: Unable to read file './carbondata/carbonplugins/.kettle/kettle.properties' ./carbondata/carbonplugins/.kettle/kettle.properties (No such file or directory) at org.pentaho.di.core.util.EnvUtil.readProperties(EnvUtil.java:65) at org.pentaho.di.core.util.EnvUtil.environmentInit(EnvUtil.java:95) at org.carbondata.processing.graphgenerator.GraphGenerator. validateAndInitialiseKettelEngine(GraphGenerator.java:302) at org.carbondata.processing.graphgenerator.GraphGenerator.generateGraph(GraphGenerator.java:277) at org.carbondata.spark.load.CarbonLoaderUtil.generateGraph(CarbonLoaderUtil.java:130) at org.carbondata.spark.load.CarbonLoaderUtil.executeGraph(CarbonLoaderUtil.java:186) at org.carbondata.spark.rdd.CarbonDataLoadRDD$$anon$1.<init>(CarbonDataLoadRDD.scala:189) at org.carbondata.spark.rdd.CarbonDataLoadRDD.compute(CarbonDataLoadRDD.scala:148) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.io.FileNotFoundException: ./carbondata/carbonplugins/.kettle/kettle.properties (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:146) at java.io.FileInputStream.<init>(FileInputStream.java:101) at org.pentaho.di.core.util.EnvUtil.readProperties(EnvUtil.java:60) ... 15 more
很明显是没有找到./carbondata/carbonplugins/.kettle/kettle.properties
文件,因为我们目前只在启动Spark Shell的那台机器上部署好了Carbondata,而Carbondata的计算依赖于kettle,所以我们需要把kettle相关的依赖加载到所有参与计算的节点。这里有以下两种方法可以解决这个问题。
1、我们很容易想到的就是将./carbondata/carbonplugins/
文件里面的所有内容全部复制到Hadoop集群的各个节点的某一目录下(比如/user/iteblog/carbondata/carbonplugins
),然后修改carbon.kettle.home
如下:
cc.setConf("carbon.kettle.home","/user/iteblog/carbondata/carbonplugins")
其余代码不变,这个问题即可解决。
2、但是如果我们没有Hadoop集群各个节点的登陆权限,也就是说我们无法手动到各个节点部署好carbonplugins,这咋办呢?我们可以在启动Spark Shell的时候加载carbonplugins插件,如下:
[iteblog@www.iteblog.com ~]$ ./bin/spark-shell --master yarn-client \ --jars ${carbondata_jar},${mysql_jar},carbondata.tar.gz \ --num-executors 2 \ --executor-cores 1 \ --executor-memory 5G \ --queue iteblog
carbondata.tar.gz
里面已经打包好了所有的插件信息。然后我们上面的代码不需要改变,这个问题也可以解决。
查数
数据已经加载进iteblog表里面了,现在我们可以查询里面的数据了,如下:
scala> cc.sql("select * from iteblog").show +----------+--------------------+ | id| hash| +----------+--------------------+ |1761060630| 1507780651275746626| |1777010203|-6420079594732250962| |1777080884|-3720484624594970761| |1777080885| 6953598621328551083| |1794379845| 4443483533807209950| |1794419628|-3898139641996026768| |1794522657| 5721419051907524948| |1796358316|-3848539843796297096| |1796361951| 2673643446784761880| |1796363022| 7081835511530066760| |1797689090| 7687516489507000693| |1798032763| 8729543868018607114| |1798032765|-2073004072970288002| |1798933651| 4359618602910211713| |1799173523| 3862252443910602052| |1799555536|-2751011281327328990| |1799569121| 1024477364392595538| |1799608637| 4403346642796222437| |1799745227|-2719506638749624471| |1799859723| 5552725300397463239| +----------+--------------------+ only showing top 20 rows scala> cc.sql("select count(*) from iteblog").show +-------+ | _c0| +-------+ |7230338| +-------+ scala> cc.sql("select count(distinct id) from iteblog").show +-------+ | _c0| +-------+ |6031231| +-------+本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache CarbonData集群模式使用指南】(https://www.iteblog.com/archives/1703.html)
fasfsdf sd十多个