在昨天的文章中介绍了Spark Streaming作业提交的数据接收部分的源码(《Spark Streaming作业提交源码分析接收数据篇》),今天来介绍Spark Streaming中如何处理这些从外部接收到的数据。
在调用StreamingContext
的start函数的时候,会调用JobScheduler
的start函数。而JobScheduler的start函数会启动ReceiverTracker
和jobGenerator
。
在启动jobGenerator的时候,系统会根据这次是从Checkpoint恢复与否分别调用restart和startFirstTime函数。
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventActor != null) return // generator has already been started eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobGeneratorEvent => processEvent(event) } }), "JobGenerator") if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
startFirstTime函数会分别启动DStreamGraph
和JobGenerator
线程
/** * User: 过往记忆 * Date: 15-04-30 * Time: 上午07:16 * bolg: * 本文地址:/archives/1336 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop * * * Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
JobGenerator线程会每隔ssc.graph.batchDuration.milliseconds
的时间生成Jobs,这个时间就是我们初始化StreamingContext
的时候传进来的,生成Jobs是通过Akka调用generateJobs方法:
/** * User: 过往记忆 * Date: 15-04-30 * Time: 上午07:16 * bolg: * 本文地址:/archives/1336 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop * * * Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the sparkEnv in this thread, so that job generation code can access the // environment Example: BlockRDDs are created in this thread, and it needs // to access BlockManager // Update: This is probably redundant after threadlocal stuff in sparkEnv has // been removed. S parkEnv.set(ssc.env) Try { // allocate received blocks to batch jobScheduler.receiverTracker.allocateBlocksToBatch(time) graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val receivedBlockInfos = jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray } jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) }
在generateJobs方法中的jobScheduler.receiverTracker.allocateBlocksToBatch(time)
很重要,其最终调用的是allocateBlocksToBatch
函数,其定义如下:
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks)) timeToAllocatedBlocks(batchTime) = allocatedBlocks lastAllocatedBatchTime = batchTime allocatedBlocks } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed // again, so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } }
注意getReceivedBlockQueue(streamId)
,它的实现就是
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) }
还记得我们介绍从Kafka中读取数据并存储的过程吗?最终那些新生成的Block信息就是存储在streamIdToUnallocatedBlockQueues
里面的,通过这个获取到所有那些没有处理的block并存储在timeToAllocatedBlocks(mutable.HashMap[Time, AllocatedBlocks])
中,然后调用graph.generateJobs(time)
函数生成Jobs。
当Success(jobs) 成立时,系统会通过调用jobScheduler.receiverTracker.getBlocksOfBatch(time)
获取那些新的block,这也就是获取timeToAllocatedBlocks
中的信息,最后调用jobScheduler的submitJobSet函数将JobSet提交到集群进行计算,计算完之后会进行Checkpoint操作。
好了,整个Spark Streaming作业从外部数据源接收数据并存储到内存,最后分割成作业的源码部分就弄完了。如果想关站Spark最新的资讯请关注本博客。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming作业提交源码分析数据处理篇】(https://www.iteblog.com/archives/1336.html)
早上刚写的啊!
不是,是昨天晚上下班写的,今天早上发的。