Spark 的 shell 作为一个强大的交互式数据分析工具,提供了一个简单的方式来学习 API。它可以使用 Scala(在 Java 虚拟机上运行现有的 Java 库的一个很好方式) 或 Python。我们很可能会在Spark Shell模式下运行下面的测试代码:
scala> import org.apache.hadoop.mapreduce.{Job, MRJobConfig, TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.{Job, MRJobConfig, TaskAttemptID, TaskType} scala> val job = Job.getInstance(sc.hadoopConfiguration) java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283) at org.apache.hadoop.mapreduce.Job.toString(Job.java:452) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337) at .<init>(<console>:10) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
在初始化job实例的时候出现了java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING异常。这是因为在Spark shell模式下,每运行一行代码其都会输出这个对象,所以在初始化job的时候会调用其toString
方法来打印出这个对象;但是在toString方法的实现里面会对其状态进行检查,确保job实例是JobState.RUNNING
状态,但是这个时候job的状态是JobState.DEFINE
,所以会导致异常。本文将提供两种方法来解决这个问题。
将Job封装到类里面
一种办法就是将Job对象封装到类里面,这样就不会调用Job的toString
方法,这样就可以避免出现异常,实现如下:
scala> import org.apache.spark.SparkContext import org.apache.spark.SparkContext scala> class JobWrapper(sc:SparkContext){ val job = Job.getInstance(sc.hadoopConfiguration); } defined class JobWrapper scala> val paths = new Path("/iteblog/data.csv") paths: org.apache.hadoop.fs.Path = /iteblog/data.csv scala> val jobWrapper = new JobWrapper(sc) jobWrapper: JobWrapper = $iwC$$iwC$JobWrapper@58e4210d scala> FileInputFormat.setInputPaths(jobWrapper.job, paths) scala> val splits = format.getSplits(jobWrapper.job) splits: java.util.List[org.apache.hadoop.mapreduce.InputSplit] = [hdfs://iteblogcluster/iteblog/data.csv:0+3145728, hdfs://iteblogcluster/iteblog/data.csv:3145728+3145728, hdfs://iteblogcluster/iteblog/data.csv:6291456+3428026]
可以看出运行上面代码片段不会出现异常了。
初始化Job对象的时候使用lazy
其实我们可以在初始化Job对象的时候使用lazy,这样会只有job对象被真正使用的时候才会初始化,如下:
scala> lazy val job = Job.getInstance(sc.hadoopConfiguration) job: org.apache.hadoop.mapreduce.Job = <lazy> scala> FileInputFormat.setInputPaths(job, paths) scala> val splits = format.getSplits(jobWrapper.job) splits: java.util.List[org.apache.hadoop.mapreduce.InputSplit] = [hdfs://iteblogcluster/iteblog/data.csv:0+3145728, hdfs://iteblogcluster/iteblog/data.csv:3145728+3145728, hdfs://iteblogcluster/iteblog/data.csv:6291456+3428026]
这种方法比上面的要简单多了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【解决Spark shell模式下初始化Job出现的异常】(https://www.iteblog.com/archives/2142.html)