欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache CarbonData集群模式使用指南

  我们在《Apache CarbonData快速入门编程指南》文章中介绍了如何快速使用Apache CarbonData,为了简单起见,我们展示了如何在单机模式下使用Apache CarbonData。但是生产环境下一般都是使用集群模式,本文主要介绍如何在集群模式下使用Apache CarbonData

启动Spark shell

这里以Spark shell模式进行介绍,master为yarn-client,启动Spark shell如下:

[iteblog@www.iteblog.com ~]$  cd ${SPARK_HOME}
[iteblog@www.iteblog.com ~]$  carbondata_jar=./lib/$(ls -1 lib |grep "^carbondata_.*\.jar$")
[iteblog@www.iteblog.com ~]$  mysql_jar=./lib/$(ls -1 lib |grep "^mysql.*\.jar$")
[iteblog@www.iteblog.com ~]$ ./bin/spark-shell --master yarn-client                   \
                                               --jars ${carbondata_jar},${mysql_jar}  \
                                               --num-executors 2                      \
                                               --executor-cores 1                     \
                                               --executor-memory 5G                   \
                                               --queue iteblog

上面命令将会以Client模式启动shell。

创建CarbonContext实例

启动完Spark Shell之后,接下来就是来初始化CarbonContext实例了,这个和《Apache CarbonData快速入门编程指南》里面类似:

/**
 * User: 过往记忆
 * Date: 2016年07月07日
 * Time: 下午20:49
 * bolg: 
 * 本文地址:/archives/1703
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
 
import org.apache.spark.sql.CarbonContext
import org.apache.hadoop.hive.conf.HiveConf
val storePath = "/user/iteblog/store/"
val cc = new CarbonContext(sc, storePath)
cc.setConf("carbon.kettle.home","./carbondata/carbonplugins")
cc.setConf("hive.metastore.warehouse.dir", "/user/iteblog/metadata/")
cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")

创建表

现在我们已经创建好CarbonContext实例了,可以使用它创建表:

cc.sql("create table if not exists iteblog (id string, hash string) STORED BY 'org.apache.carbondata.format'")

加载数据

/**
 * User: 过往记忆
 * Date: 2016年07月07日
 * Time: 下午20:49
 * bolg: 
 * 本文地址:/archives/1703
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

cc.sql(s"load data inpath 'hdfs:///tmp/iteblog.csv' into table iteblog options('DELIMITER'='\t')")

我们发现数据加载出错了,可以到其中一个节点的stderr日志里面看到如下的异常信息:

16/07/07 20:38:18 ERROR graphgenerator.GraphGenerator: [Executor task launch worker-0][partitionID:default_iteblog_ace3f131-836f-4b27-b198-f636fbc4e53b] 
org.pentaho.di.core.exception.KettleException: 
Unable to read file './carbondata/carbonplugins/.kettle/kettle.properties'
./carbondata/carbonplugins/.kettle/kettle.properties (No such file or directory)

	at org.pentaho.di.core.util.EnvUtil.readProperties(EnvUtil.java:65)
	at org.pentaho.di.core.util.EnvUtil.environmentInit(EnvUtil.java:95)
	at org.carbondata.processing.graphgenerator.GraphGenerator.
validateAndInitialiseKettelEngine(GraphGenerator.java:302)
	at org.carbondata.processing.graphgenerator.GraphGenerator.generateGraph(GraphGenerator.java:277)
	at org.carbondata.spark.load.CarbonLoaderUtil.generateGraph(CarbonLoaderUtil.java:130)
	at org.carbondata.spark.load.CarbonLoaderUtil.executeGraph(CarbonLoaderUtil.java:186)
	at org.carbondata.spark.rdd.CarbonDataLoadRDD$$anon$1.<init>(CarbonDataLoadRDD.scala:189)
	at org.carbondata.spark.rdd.CarbonDataLoadRDD.compute(CarbonDataLoadRDD.scala:148)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.FileNotFoundException: ./carbondata/carbonplugins/.kettle/kettle.properties (No such file or directory)
	at java.io.FileInputStream.open(Native Method)
	at java.io.FileInputStream.<init>(FileInputStream.java:146)
	at java.io.FileInputStream.<init>(FileInputStream.java:101)
	at org.pentaho.di.core.util.EnvUtil.readProperties(EnvUtil.java:60)
	... 15 more

  很明显是没有找到./carbondata/carbonplugins/.kettle/kettle.properties文件,因为我们目前只在启动Spark Shell的那台机器上部署好了Carbondata,而Carbondata的计算依赖于kettle,所以我们需要把kettle相关的依赖加载到所有参与计算的节点。这里有以下两种方法可以解决这个问题。

  1、我们很容易想到的就是将./carbondata/carbonplugins/文件里面的所有内容全部复制到Hadoop集群的各个节点的某一目录下(比如/user/iteblog/carbondata/carbonplugins),然后修改carbon.kettle.home如下:

cc.setConf("carbon.kettle.home","/user/iteblog/carbondata/carbonplugins")

其余代码不变,这个问题即可解决。

  2、但是如果我们没有Hadoop集群各个节点的登陆权限,也就是说我们无法手动到各个节点部署好carbonplugins,这咋办呢?我们可以在启动Spark Shell的时候加载carbonplugins插件,如下:

[iteblog@www.iteblog.com ~]$ ./bin/spark-shell --master yarn-client                                     \
                                               --jars ${carbondata_jar},${mysql_jar},carbondata.tar.gz  \
                                               --num-executors 2                                        \
                                               --executor-cores 1                                       \
                                               --executor-memory 5G                                     \
                                               --queue iteblog

carbondata.tar.gz里面已经打包好了所有的插件信息。然后我们上面的代码不需要改变,这个问题也可以解决。

查数

数据已经加载进iteblog表里面了,现在我们可以查询里面的数据了,如下:

scala> cc.sql("select * from iteblog").show
+----------+--------------------+
|        id|                hash|
+----------+--------------------+
|1761060630| 1507780651275746626|
|1777010203|-6420079594732250962|
|1777080884|-3720484624594970761|
|1777080885| 6953598621328551083|
|1794379845| 4443483533807209950|
|1794419628|-3898139641996026768|
|1794522657| 5721419051907524948|
|1796358316|-3848539843796297096|
|1796361951| 2673643446784761880|
|1796363022| 7081835511530066760|
|1797689090| 7687516489507000693|
|1798032763| 8729543868018607114|
|1798032765|-2073004072970288002|
|1798933651| 4359618602910211713|
|1799173523| 3862252443910602052|
|1799555536|-2751011281327328990|
|1799569121| 1024477364392595538|
|1799608637| 4403346642796222437|
|1799745227|-2719506638749624471|
|1799859723| 5552725300397463239|
+----------+--------------------+
only showing top 20 rows
 
scala> cc.sql("select count(*) from iteblog").show
+-------+
|    _c0|
+-------+
|7230338|
+-------+
 
scala> cc.sql("select count(distinct id) from iteblog").show
+-------+
|    _c0|
+-------+
|6031231|
+-------+
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache CarbonData集群模式使用指南】(https://www.iteblog.com/archives/1703.html)
喜欢 (3)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(1)个小伙伴在吐槽
  1. fasfsdf sd十多个

    w3970907702016-07-08 17:25 回复