spark.cleaner.ttl
参数的原意是清除超过这个时间的所有RDD数据,以便腾出空间给后来的RDD使用。周期性清除保证在这个时间之前的元数据会被遗忘,对于那些运行了几小时或者几天的Spark作业(特别是Spark Streaming)设置这个是很有用的。注意:任何内存中的RDD只要过了这个时间就会被清除掉。官方文档是这么介绍的:
Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
但是有经验的用户会知道,超过了spark.cleaner.ttl
时间的RDD不一定是需要被删除的,这些RDD可能正在或者将要被使用,而且基于时间间隔去删除RDD这种设计就不太合理,所以基于这些原因,在Spark 1.4中参数spark.cleaner.ttl
被标记为Deprecated
,如下:
/** * User: 过往记忆 * Date: 2015-05-19 * Time: 下午23:50 * bolg: * 本文地址:/archives/1362 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ DeprecatedConfig("spark.cleaner.ttl", "1.4", "TTL-based metadata cleaning is no longer necessary in recent Spark versions " + "and can lead to confusing errors if metadata is deleted for entities that are still" + " in use. Except in extremely special circumstances, you should remove this setting" + " and rely on Spark's reference-tracking-based cleanup instead." + " See SPARK-7689 for more details.")
同时社区重新设计了删除RDD的逻辑,使得Spark可以自动地清除已经持久化RDD相关的metadata和数据,以及shuffles和broadcast 相关变量数据,并引入了ContextCleaner
类,这个类在SparkContext
中被实例化:
/** * User: 过往记忆 * Date: 2015-05-19 * Time: 下午23:50 * bolg: * 本文地址:/archives/1362 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } } cleaner.foreach(_.start())
在ContextCleaner
中会调用RDD.unpersist()
来清除已经持久化的RDD数据:
/** Perform RDD cleanup. */ def doCleanupRDD(rddId: Int, blocking: Boolean) { try { logDebug("Cleaning RDD " + rddId) sc.unpersistRDD(rddId, blocking) listeners.foreach(_.rddCleaned(rddId)) logInfo("Cleaned RDD " + rddId) } catch { case t: Throwable => logError("Error cleaning RDD " + rddId, t) } }
清除Shuffle
和Broadcast
相关的数据会分别调用doCleanupShuffle
和doCleanupBroadcast
函数。根据需要清除数据的类型分别调用:
task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) }
相信加上这些逻辑之后,Spark清除RDD会更加智能,期待吧。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【spark.cleaner.ttl将在Spark 1.4中取消】(https://www.iteblog.com/archives/1362.html)