最近一段时间在使用Spark Streaming,里面遇到很多问题,只知道参照官方文档写,不理解其中的原理,于是抽了一点时间研究了一下Spark Streaming作业提交的全过程,包括从外部数据源接收数据,分块,拆分Job,提交作业全过程。这里我只介绍从Kafka中接收数据为例进行讲解。我这里是基于对Spark 1.3.0的代码进行分析的,由于Spark代码在经常变动,不同版本的Spark代码可能不一样,所以阅读下面的代码请参照Spark 1.3.0的源码。
我们通过KafkaUtils.createStream
函数可以创建KafkaReceiver
类(这是默认的Kafka Receiver,如果spark.streaming.receiver.writeAheadLog.enable
配置选项设置为true,则会使用ReliableKafkaReceiver
,其中会使用WAL机制来保证数据的可靠性,也就是保证数据不丢失。)
在KafkaReceiver
类中首先会在onStart方法中初始化一些环境,比如创建Consumer(这个就是用来从Kafka的Topic中读取消息的消费者)。在初始化完相关环境之后会在线程池中启动MessageHandler
来从Kafka中接收数据:
/** * User: 过往记忆 * Date: 15-04-29 * Time: 上午03:16 * bolg: * 本文地址:/archives/1334 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ // Handles Kafka messages private class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { def run() { logInfo("Starting MessageHandler.") try { val streamIterator = stream.iterator() while (streamIterator.hasNext()) { val msgAndMetadata = streamIterator.next() store((msgAndMetadata.key, msgAndMetadata.message)) } } catch { case e: Throwable => logError("Error handling message; exiting", e) } } }
该线程负责从Kafka中读取数据,并将读取到的数据存储到BlockGenerator
中(通过调用store方法实现),msgAndMetadata.key
其实就是Topic的Key值;而msgAndMetadata.message
就是我们要的消息。
store函数是Receiver
类提供的,所有继承自该类的子类(KafkaReceiver、ActorReceiver、ReliableKafkaReceiver等)都拥有该方法。其内部的实现是调用了blockGenerator
的addData方法,最终是将数据存储在currentBuffer
中,而currentBuffer
其实就是一个ArrayBuffer[Any]
。
在blockGenerator
内部存在两个线程:(1)、定期地生成新的batch,然后再将之前生成的batch封装成block。这里的定期其实就是spark.streaming.blockInterval
参数配置的。(2)、将生成的block发送到Block Manager中。
第一个线程定期地调用updateCurrentBuffer
函数将存储在currentBuffer
中的数据封装成Block,然后放在blocksForPushing
中,blocksForPushing
是ArrayBlockingQueue[Block]
类型的队列,其大小默认是10,我们可以通过spark.streaming.blockQueueSize
参数配置(当然,在很多情况下这个值不需要我们去配置)。当blocksForPushing
没有多余的空间,那么该线程就会阻塞,直到有剩余的空间可用于存储新生成的Block。如果你的数据量真的很大,大到blocksForPushing
无法及时存储那些block,这时候你就得考虑加大spark.streaming.blockQueueSize
的大小了。updateCurrentBuffer
函数的实现如下:
/** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
第二个线程不断地调用keepPushingBlocks
函数从blocksForPushing
阻塞队列中获取生成的Block,然后调用pushBlock方法将Block存储到BlockManager中。当存储到BlockManager
中后,会返回一个blockStoreResult
结果,这就是成功存储到BlockManager
的StreamBlockId
。然后下一步就是将blockStoreResult
封装成ReceivedBlockInfo
,这也就是最新的未处理过的数据,然后通过Akka告诉ReceiverTracker
有新的块加入,ReceiverTracker
会调用addBlock方法将ReceivedBlockInfo
存储到streamIdToUnallocatedBlockQueues
队列中。关键代码如下:
/** * User: 过往记忆 * Date: 15-04-29 * Time: 上午03:16 * bolg: * 本文地址:/archives/1334 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ /** Keep pushing blocks to the BlockManager. */ private def keepPushingBlocks() { logInfo("Started block pushing thread") try { while(!stopped) { Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } // Push out the blocks that are still left logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { logDebug("Getting block ") val block = blocksForPushing.take() pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } } private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) } /** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size case _ => -1 } val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout) Await.result(future, askTimeout) logDebug(s"Reported block $blockId") }
以上就是从Kafka读取数据,并把接收到的数据存储到streamIdToUnallocatedBlockQueues
里面的全过程。关于Spark Streaming如何进一步处理streamIdToUnallocatedBlockQueues
中的数据,并划分作业的流程下篇文章我将会继续讲解。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming作业提交源码分析接收数据篇】(https://www.iteblog.com/archives/1334.html)
求助一个问题,我使用的是flume直接向streaming推数据。 我发现我做监听时,集群中会随机找一台机子做监听,我的理解中,监听其实就是driver做的事情。
请问我应该如何控制只监听某一台机器呢?
基于推的貌似不行,因为driver是根据一定的算法在集群上启动的,所以事先不知道。
不过你可以使用拉的方式,这种是可以的。