博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming源码解读之数据清理 内幕
阅读量:7052 次
发布时间:2019-06-28

本文共 2742 字,大约阅读时间需要 9 分钟。

hot3.png

前文已经讲解很清晰,Spark Streaming是通过定时器按照DStream 的DAG 回溯出整个RDD的DAG。

细心的读者一定有一个疑问,随着时间的推移,生产越来越多的RDD,SparkStreaming是如何保证RDD的生命周期的呢?

我们直接快进到JobGenerator中,

163024_p4Hj_120395.png

交由JobHandler执行,JobHandler是一个Runnable接口

163204_jwDL_120395.png

咱们已经阅读过JobStarted事件,继续往下看。

// JobScheduler.scala line 202 spark 1.6.0    def run() {      try {        val formattedTime = UIUtils.formatBatchTime(          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"        ssc.sc.setJobDescription(          s"""Streaming job from $batchLinkText""")        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)        // We need to assign `eventLoop` to a temp variable. Otherwise, because        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then        // it's possible that when `post` is called, `eventLoop` happens to null.        var _eventLoop = eventLoop        if (_eventLoop != null) {          _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // 之前已经解析过。          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            job.run()          }          _eventLoop = eventLoop          if (_eventLoop != null) {            _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) // 本讲的重点在此。          }        } else {          // JobScheduler has been stopped.        }      } finally {        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)      }    }

163502_86rE_120395.png165223_paBo_120395.png

164742_kQ1W_120395.png164807_KWAR_120395.png

165634_NBuP_120395.png

清理RDD

165820_mbSp_120395.png

170510_fIUf_120395.png

172242_MCeN_120395.png

172325_uazn_120395.png

清理Checkpoint

171230_cwEr_120395.png

171332_voB5_120395.png

// Checkpoint.scala line 196    def run() {      // ... 代码...           jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)          return        } catch {          case ioe: IOException =>            logWarning("Error in attempt " + attempts + " of writing checkpoint to "              + checkpointFile, ioe)            reset()        }      }      logWarning("Could not write checkpoint for time " + checkpointTime + " to file "        + checkpointFile + "'")    }

171611_tdhO_120395.png

171748_43kr_120395.png

171845_CGLA_120395.png

清理ReceiverTracker的block和batch

172644_1WrL_120395.png

173011_YSJm_120395.png

174540_ERlU_120395.png

174629_ZTNI_120395.png

174737_38aY_120395.png

清理InputInfo

175008_NgoJ_120395.png

 

至此,所有的清理工作已经完成。

 

总结下:

JVM中对不使用的对象有GC,Spark Streaming中也是如此。

清理对象如下:

  1. Job
  2. RDD
  3. InputInfo
  4. Block 和 batch
  5. checkpoint的数据
  6. WAL的数据

 

最后配上日志截图:

112445_fQC3_120395.png

 

转载于:https://my.oschina.net/corleone/blog/685060

你可能感兴趣的文章
Linear Regression and Gradient Descent
查看>>
python 面向对象
查看>>
通过mycat实现mysql的读写分离
查看>>
RMAN数据库恢复之对数据库进行完全介质恢复
查看>>
linux reboot命令
查看>>
Python之路--Python基础4--内置函数
查看>>
霍布斯:人对人像狼一样
查看>>
typedef和#define的区别
查看>>
if else和switch的效率
查看>>
Linux下C结构体初始化
查看>>
CodeForces 1082B
查看>>
ORACLE在线重定义--将普通表转化为分区表
查看>>
模拟jquery底层链式编程
查看>>
如何实现Qlikview的增量数据加载
查看>>
Notepad++ 代码格式化
查看>>
裁切图片,
查看>>
[转]跟我一起写 Makefile
查看>>
作业-继承5
查看>>
修改 android 的 framework 层操作小记.转载
查看>>
图片鼠标滑过图片半透明(jquery特效)
查看>>