在使用Spark操作Hbase的时候,其返回的数据类型是RDD[ImmutableBytesWritable,Result]
,我们可能会对这个结果进行其他的操作,比如join
等,但是因为org.apache.hadoop.hbase.io.ImmutableBytesWritable
和 org.apache.hadoop.hbase.client.Result
并没有实现 java.io.Serializable
接口,程序在运行的过程中可能发生以下的异常:
Serialization stack: - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0})) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10); not retrying 17/03/16 16:07:48 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable Serialization stack: - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0})) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable Serialization stack: - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0})) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.take(RDD.scala:1302) at com.iteblog.HBase2Hive$.main(HBase2Hive.scala:41) at com.iteblog.HBase2Hive.main(HBase2Hive.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
解决这个问题主要有两种方法。
指定如何序列化ImmutableBytesWritable类
我们可以手动设置如何序列化ImmutableBytesWritable
类,实现如下:
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
这样就会使用 org.apache.spark.serializer.KryoSerializer
来序列化 ImmutableBytesWritable
,是的 ImmutableBytesWritable
类可以在网络上传送。
将ImmutableBytesWritable转换成其他可序列化的对象
这种方法就是从ImmutableBytesWritable
对象中抽取我们需要的数据,然后将它存储在其他可序列化的对象中,如下:
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) hBaseRDD.map{item => val immutableBytesWritable = item._1 Bytes.toString(immutableBytesWritable.get()) }
上面实例从 ImmutableBytesWritable
对象中抽取我们要的数据并转换成String
,而 String
是实现 java.io.Serializable
接口的类,所以也可以解决上面的问题。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable)】(https://www.iteblog.com/archives/2081.html)
我在hbase中创建索引表上遇到些问题。可以将 String类型 转换为 ImmutableBytesWritable 对象吗。