欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark Streaming作业提交源码分析接收数据篇

  最近一段时间在使用Spark Streaming,里面遇到很多问题,只知道参照官方文档写,不理解其中的原理,于是抽了一点时间研究了一下Spark Streaming作业提交的全过程,包括从外部数据源接收数据,分块,拆分Job,提交作业全过程。这里我只介绍从Kafka中接收数据为例进行讲解。我这里是基于对Spark 1.3.0的代码进行分析的,由于Spark代码在经常变动,不同版本的Spark代码可能不一样,所以阅读下面的代码请参照Spark 1.3.0的源码

  我们通过KafkaUtils.createStream函数可以创建KafkaReceiver类(这是默认的Kafka Receiver,如果spark.streaming.receiver.writeAheadLog.enable配置选项设置为true,则会使用ReliableKafkaReceiver,其中会使用WAL机制来保证数据的可靠性,也就是保证数据不丢失。)

  在KafkaReceiver类中首先会在onStart方法中初始化一些环境,比如创建Consumer(这个就是用来从Kafka的Topic中读取消息的消费者)。在初始化完相关环境之后会在线程池中启动MessageHandler来从Kafka中接收数据:

/**
 * User: 过往记忆
 * Date: 15-04-29
 * Time: 上午03:16
 * bolg: 
 * 本文地址:/archives/1334
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

// Handles Kafka messages
private class MessageHandler(stream: KafkaStream[K, V])
    extends Runnable {
    def run() {
      logInfo("Starting MessageHandler.")
      try {
        val streamIterator = stream.iterator()
        while (streamIterator.hasNext()) {
          val msgAndMetadata = streamIterator.next()
          store((msgAndMetadata.key, msgAndMetadata.message))
        }
      } catch {
        case e: Throwable => logError("Error handling message; exiting", e)
      }
    }
}

  该线程负责从Kafka中读取数据,并将读取到的数据存储到BlockGenerator中(通过调用store方法实现),msgAndMetadata.key其实就是Topic的Key值;而msgAndMetadata.message就是我们要的消息。

  store函数是Receiver类提供的,所有继承自该类的子类(KafkaReceiver、ActorReceiver、ReliableKafkaReceiver等)都拥有该方法。其内部的实现是调用了blockGenerator的addData方法,最终是将数据存储在currentBuffer中,而currentBuffer其实就是一个ArrayBuffer[Any]

  在blockGenerator内部存在两个线程:(1)、定期地生成新的batch,然后再将之前生成的batch封装成block。这里的定期其实就是spark.streaming.blockInterval参数配置的。(2)、将生成的block发送到Block Manager中。

  第一个线程定期地调用updateCurrentBuffer函数将存储在currentBuffer中的数据封装成Block,然后放在blocksForPushing中,blocksForPushingArrayBlockingQueue[Block]类型的队列,其大小默认是10,我们可以通过spark.streaming.blockQueueSize参数配置(当然,在很多情况下这个值不需要我们去配置)。当blocksForPushing没有多余的空间,那么该线程就会阻塞,直到有剩余的空间可用于存储新生成的Block。如果你的数据量真的很大,大到blocksForPushing无法及时存储那些block,这时候你就得考虑加大spark.streaming.blockQueueSize的大小了。updateCurrentBuffer函数的实现如下:

/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = synchronized {
    try {
      val newBlockBuffer = currentBuffer
      currentBuffer = new ArrayBuffer[Any]
      if (newBlockBuffer.size > 0) {
        val blockId = StreamBlockId(receiverId, time - blockInterval)
        val newBlock = new Block(blockId, newBlockBuffer)
        listener.onGenerateBlock(blockId)
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
        logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
}

  第二个线程不断地调用keepPushingBlocks函数从blocksForPushing阻塞队列中获取生成的Block,然后调用pushBlock方法将Block存储到BlockManager中。当存储到BlockManager中后,会返回一个blockStoreResult结果,这就是成功存储到BlockManagerStreamBlockId。然后下一步就是将blockStoreResult封装成ReceivedBlockInfo,这也就是最新的未处理过的数据,然后通过Akka告诉ReceiverTracker有新的块加入,ReceiverTracker会调用addBlock方法将ReceivedBlockInfo存储到streamIdToUnallocatedBlockQueues队列中。关键代码如下:

/**
 * User: 过往记忆
 * Date: 15-04-29
 * Time: 上午03:16
 * bolg: 
 * 本文地址:/archives/1334
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {
    logInfo("Started block pushing thread")
    try {
      while(!stopped) {
        Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }
      // Push out the blocks that are still left
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        logDebug("Getting block ")
        val block = blocksForPushing.take()
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
}

private def pushBlock(block: Block) {
    listener.onPushBlock(block.id, block.buffer)
    logInfo("Pushed block " + block.id)
}

/** Store block and report it to driver */
def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val numRecords = receivedBlock match {
      case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
      case _ => -1
    }

    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
    Await.result(future, askTimeout)
    logDebug(s"Reported block $blockId")
}

  以上就是从Kafka读取数据,并把接收到的数据存储到streamIdToUnallocatedBlockQueues里面的全过程。关于Spark Streaming如何进一步处理streamIdToUnallocatedBlockQueues中的数据,并划分作业的流程下篇文章我将会继续讲解。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming作业提交源码分析接收数据篇】(https://www.iteblog.com/archives/1334.html)
喜欢 (9)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(2)个小伙伴在吐槽
  1. 求助一个问题,我使用的是flume直接向streaming推数据。 我发现我做监听时,集群中会随机找一台机子做监听,我的理解中,监听其实就是driver做的事情。
    请问我应该如何控制只监听某一台机器呢?

    李晓亮_Hark2015-05-05 11:03 回复
    • 基于推的貌似不行,因为driver是根据一定的算法在集群上启动的,所以事先不知道。

      不过你可以使用拉的方式,这种是可以的。

      w3970907702015-05-05 12:06 回复