在《Spark源码分析:多种部署方式之间的区别与联系(1)》我们谈到了SparkContext的初始化过程会做好几件事情(这里就不再列出,可以去《Spark源码分析:多种部署方式之间的区别与联系(1)》查看),其中做了一件重要的事情就是创建TaskScheduler。
// Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
在createTaskScheduler方法中,会根据用户传进来的master URL分别初始化不同的SchedulerBackend和ExecutorBackend。而且从代码中我们可以看到master URL多大九种格式。但是在代码中SchedulerBackend的种类可没九种,只有五种;而ExecutorBackend只有三种,我们先来看看这些SchedulerBackend和ExecutorBackend的类继承关系:
每一个Application都对应了一个SchedulerBackend和多个ExecutorBackend。下面我们分别介绍各种运行模式所涉及到的类
1、Local模式
local模式出了伪集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl类。LocalBackend接收来自TaskSchedulerImpl的receiveOffers()调用,并根据运行Application传进来的CPU核生成WorkerOffer,并调用scheduler.resourceOffers(offers)生成Task,最后通过 executor.launchTask来执行这些Task。
2、Standalone
Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是继承自CoarseGrainedSchedulerBackend类,并重写了其中的一些方法。
CoarseGrainedSchedulerBackend是一个粗粒度的资源调度类,在Spark job运行的整个期间,它会保存所有的Executor,在task运行完的时候,并不释放该Executor,也不向Scheduler申请一个新的Executor。Executor的启动方式有很多中,需要根据Application提交的Master URL进行判断。在CoarseGrainedSchedulerBackend中封装了一个DriverActor类,它接受Executor注册(RegisterExecutor)、状态更新(StatusUpdate)、响应Scheduler的ReviveOffers请求、杀死Task等等。
在本模式中将会启动一个或者多个CoarseGrainedExecutorBackend。具体是通过AppClient类向Master请求注册Application。当注册成功之后,Master会向Client进行反馈,并调用schedule启动Driver和CoarseGrainedExecutorBackend,启动的Executor会向DriverActor进行注册。然后CoarseGrainedExecutorBackend通过aunchTask方法启动已经提交的Task。
3、yarn-cluster
yarn-cluster集群模式涉及到的类有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同样是继承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler继承自TaskSchedulerImpl,它只是简单地对TaskSchedulerImpl进行封装,并重写了getRackForHost和postStartHook方法。
Client类通过YarnClient在Hadoop集群上启动一个Container,并在其中运行ApplicationMaster,并通过Yarn提供的接口在集群中启动多个Container用于运行CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor进行注册。
4、yarn-client
yarn-cluster集群模式涉及到的类有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler继承自TaskSchedulerImpl,并对其中的getRackForHost方法进行了重写。
Yarn-client模式下,会在集群外面启动一个ExecutorLauncher来作为driver,并想集群申请Container,来启动CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor进行注册。
5、Mesos
Mesos模式调度方式有两种:粗粒度和细粒度。粗粒度涉及到的类有CoarseMesosSchedulerBackend和TaskSchedulerImpl类;而细粒度涉及到的类有MesosSchedulerBackend和TaskSchedulerImpl类。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都继承了MScheduler(其实是Mesos的Scheduler),便于注册到Mesos资源调度的框架中。选择哪种模式可以通过spark.mesos.coarse参数配置。默认的是MesosSchedulerBackend。
yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。
有关yarn-cluster和yarn-client模式深层次的区别,我将专门写一篇博文,敬请关注。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark源码分析:多种部署方式之间的区别与联系(2)】(https://www.iteblog.com/archives/1185.html)
博主你好,拜读你的好多帖子,受益匪浅,有个spark streaming的部署问题卡住了,我程序都开发完了,但是一直部署不成功,具体问题参见这里: http://bbs.csdn.net/topics/391050126
期待博主抽空能看下,谢谢了
看了你的帖子,首先怀疑的是你程序分配的Core数量不足,但是具体是不是得看你程序以及Kafka分区的数量等。
①关于程序分配的core数量不足的问题,我也一直关注,也踩了不少坑,但如果是本地模式local[2],程序运行是没问题的;但若是standalone模式,你看我的命令,--total-executor-cores 3,所有的executor一共有3个core,但我通过管理工具查看,每个作业会有三个executor,那么就是说每个executor只有一个core,这块也不如何配置???
②程序就是接收kafka来的消息,处理,我觉得和程序关系不大;kafka的分区数为8。
③另外,在帖子里我粘了spark的两个配置文件,麻烦博主还能关注下。
你通过Spark读Kafka是使用哪个API?基于Receiver的还是?和你那两个配置文件应该关系不大,读取Kafka分区是一直会占用Core的,然后你处理数据就可能Core不够。
分配了core,standalone确实能跑了。
但部署模式为yarn-cluster,三个作业通过spark-submit跑起来后,只有一个作业是running状态,其他两个一直是accepted状态?
spark-submit的参数为 --master yarn-cluster --conf spark.ui.port=4042 --executor-memory 768M --num-executors 1 --executor-cores 2,
是在yarn中设置虚拟核数吗?
local模式出了伪集群模式,这句话,有错别字