精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark Streaming 數據清理機制

大數據 Spark
大家剛開始用Spark Streaming時,心里肯定嘀咕,對于一個7*24小時運行的數據,cache住的RDD,broadcast 系統會幫忙自己清理掉么?還a是說必須自己做清理?如果系統幫忙清理的話,機制是啥?

前言

為啥要了解機制呢?這就好比JVM的垃圾回收,雖然JVM的垃圾回收已經巨牛了,但是依然會遇到很多和它相關的case導致系統運行不正常。

這個內容我記得自己剛接觸Spark Streaming的時候,老板也問過我,運行期間會保留多少個RDD? 當時沒回答出來。后面在群里也有人問到了,所以就整理了下。文中如有謬誤之處,還望指出。

DStream 和 RDD

我們知道Spark Streaming 計算還是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上關系。然而Spark Streaming 并沒有直接讓用戶使用RDD而是自己抽象了一套DStream的概念。 DStream 和 RDD 是包含的關系,你可以理解為Java里的裝飾模式,也就是DStream 是對RDD的增強,但是行為表現和RDD是基本上差不多的。都具備幾個條件:

具有類似的tranformation動作,比如map,reduceByKey等,也有一些自己獨有的,比如Window,mapWithStated等

都具有Action動作,比如foreachRDD,count等

從編程模型上看是一致的。

所以很可能你寫的那堆Spark Streaming代碼看起來好像和Spark 一致的,然而并不能直接復用,因為一個是DStream的變換,一個是RDD的變化。

Spark Streaming中 DStream 介紹

DStream 下面包含幾個類:

  • 數據源類,比如InputDStream,具體如DirectKafkaInputStream等
  • 轉換類,典型比如MappedDStream,ShuffledDStream
  • 輸出類,典型比如ForEachDStream

從上面來看,數據從開始(輸入)到結束(輸出)都是DStream體系來完成的,也就意味著用戶正常情況是無法直接去產生和操作RDD的,這也就是說,DStream有機會和義務去負責RDD的生命周期。

這就回答了前言中的問題了。Spark Streaming具備自動清理功能。

RDD 在Spark Stream中產生的流程

在Spark Streaming中RDD的生命流程大體如下:

  • 在InputDStream會將接受到的數據轉化成RDD,比如DirectKafkaInputStream 產生的就是 KafkaRDD
  • 接著通過MappedDStream等進行數據轉換,這個時候是直接調用RDD對應的map方法進行轉換的
  • 在進行輸出類操作時,才暴露出RDD,可以讓用戶執行相應的存儲,其他計算等操作。

我們這里就以下面的代碼來進行更詳細的解釋:

  1. val source  =   KafkaUtils.createDirectInputStream(....) 
  2. source.map(....).foreachRDD{rdd=> 
  3.     rdd.saveTextFile(....) 

foreachRDD 產生ForEachDStream,因為foreachRDD是個Action,所以會觸發任務的執行,會被調用generateJob方法。

  1. override def generateJob(time: Time): Option[Job] = { 
  2.    parent.getOrCompute(time) match { 
  3.      case Some(rdd) => 
  4.        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { 
  5.          foreachFunc(rdd, time) 
  6.        } 
  7.        Some(new Job(time, jobFunc)) 
  8.      case None => None 
  9.    } 
  10.  } 

對應的parent是MappedDStream,也就是說調用MappedDStream.getOrCompute.該方法在DStream中,首先會在MappedDStream對象中的generatedRDDs 變量中查找是否已經有RDD,如果沒有則觸發計算,并且將產生的RDD放到generatedRDDs

  1. @transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () 
  2.  
  3. private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { 
  4.     // If RDD was already generated, then retrieve it from HashMap, 
  5.     // or else compute the RDD 
  6.     generatedRDDs.get(time).orElse { 
  7. .... 
  8. generatedRDDs.put(time, newRDD) 
  9. .... 

計算RDD是調用的compute方法,MappedDStream 的compute方法很簡單,直接調用的父類也就是DirectKafkaInputStream的getOrCompute方法:

  1. override def compute(validTime: Time): Option[RDD[U]] = { 
  2.     parent.getOrCompute(validTime).map(_.map[U](mapFunc)) 
  3.   } 

在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,這是個數據源,所以他的compute方法會直接new出一個RDD.

從上面可以得出幾個結論:

  • 數據源以及轉換類DStream都會維護一個generatedRDDs,可以按batchTime 進行獲取
  • 內部本質還是進行的RDD的轉換
  • 如果我們調用了cache會發生什么

這里又會有兩種情況,一種是調用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的。

  1. DStream的cache 動作只是將DStream的變量storageLevel 設置為MEMORY_ONLY_SER,然后在產生(或者獲取)RDD的時候,調用RDD的persit方法進行設置。所以DStream.cache 產生的效果等價于RDD.cache(也就是你自己調用foreachRDD 將RDD 都設置一遍)
  2. 進入正題,我們是怎么釋放Cache住的RDD的

其實無所謂Cache不Cache住,RDD最終都是要釋放的,否則運行久了,光RDD對象也能承包了你的內存。我們知道,在Spark Streaming中,周期性產生事件驅動Spark Streaming 的類其實是:

  1. org.apache.spark.streaming.scheduler.JobGenerator 

他內部有個永動機(定時器),定時發布一個產生任務的事件:

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator"

然后通過processEvent進行事件處理:

  1. /** Processes all events */ 
  2.  private def processEvent(event: JobGeneratorEvent) { 
  3.    logDebug("Got event " + event) 
  4.    event match { 
  5.      case GenerateJobs(time) => generateJobs(time) 
  6.      case ClearMetadata(time) => clearMetadata(time) 
  7.      case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.        doCheckpoint(time, clearCheckpointDataLater) 
  9.      case ClearCheckpointData(time) => clearCheckpointData(time) 
  10.    } 
  11.  } 

目前我們只關注ClearMetadata 事件。對應的方法為:

  1. private def clearMetadata(time: Time) { 
  2.     ssc.graph.clearMetadata(time) 
  3.  
  4.     // If checkpointing is enabled, then checkpoint, 
  5.     // else mark batch to be fully processed 
  6.     if (shouldCheckpoint) { 
  7.       eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) 
  8.     } else { 
  9.       // If checkpointing is not enabled, then delete metadata information about 
  10.       // received blocks (block data not saved in any case). Otherwise, wait for 
  11.       // checkpointing of this batch to complete. 
  12.       val maxRememberDuration = graph.getMaxInputStreamRememberDuration() 
  13.       jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration) 
  14.       jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration) 
  15.       markBatchFullyProcessed(time) 
  16.     } 
  17.   } 

首先是清理輸出DStream(比如ForeachDStream),接著是清理輸入類(基于Receiver模式)的數據。

ForeachDStream 其實調用的也是DStream的方法。該方法大體如下:

  1. private[streaming] def clearMetadata(time: Time) { 
  2.     val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist"true
  3.     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) 
  4.     logDebug("Clearing references to old RDDs: [" + 
  5.       oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]"
  6.     generatedRDDs --= oldRDDs.keys 
  7.     if (unpersistData) { 
  8.       logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", ")) 
  9.       oldRDDs.values.foreach { rdd => 
  10.         rdd.unpersist(false
  11.         // Explicitly remove blocks of BlockRDD 
  12.         rdd match { 
  13.           case b: BlockRDD[_] => 
  14.             logInfo("Removing blocks of RDD " + b + " of time " + time) 
  15.             b.removeBlocks() 
  16.           case _ => 
  17.         } 
  18.       } 
  19.     } 
  20.     logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + 
  21.       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) 
  22.     dependencies.foreach(_.clearMetadata(time)) 
  23.   } 

大體執行動作如下描述:

  1. 根據記憶周期得到應該剔除的RDD
  2. 根據是否要清理cache數據,進行unpersit 操作,并且顯示的移除block
  3. 根據依賴調用其他的DStream進行動作清理

這里我們還可以看到,通過參數spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數據進行清理。

這里你會有兩個疑問:

  1. dependencies 是什么?
  2. rememberDuration 是怎么來的?

dependencies 你可以簡單理解為父DStream,通過dependencies 我們可以獲得已完整DStream鏈。

rememberDuration 的設置略微復雜些,大體是 slideDuration,如果設置了checkpointDuration 則是2*checkpointDuration 或者通過DStreamGraph.rememberDuration(如果設置了的話,譬如通過StreamingContext.remember方法,不過通過該方法設置的值要大于計算得到的值會生效)

另外值得一提的就是后面的DStream 會調整前面的DStream的rememberDuration,譬如如果你用了window* 相關的操作,則在此之前的DStream 的rememberDuration 都需要加上windowDuration。

然后根據Spark Streaming的定時性,每個周期只要完成了,都會觸發清理動作,這個就是清理動作發生的時機。代碼如下:

  1. def onBatchCompletion(time: Time) {      
  2.     eventLoop.post(ClearMetadata(time)) 

總結下

Spark Streaming 會在每個Batch任務結束時進行一次清理動作。每個DStream 都會被掃描,不同的DStream根據情況不同,保留的RDD數量也是不一致的,但都是根據rememberDuration變量決定,而該變量會被下游的DStream所影響,所以不同的DStream的rememberDuration取值是不一樣的。

 

 

責任編輯:Ophira 來源: 簡書
相關推薦

2025-07-16 09:16:36

2017-08-14 10:30:13

SparkSpark Strea擴容

2017-06-06 08:31:10

Spark Strea計算模型監控

2016-12-19 14:35:32

Spark Strea原理剖析數據

2025-09-16 08:49:13

2017-10-13 10:36:33

SparkSpark-Strea關系

2018-04-09 12:25:11

2016-01-28 10:11:30

Spark StreaSpark大數據平臺

2017-10-11 11:10:02

Spark Strea大數據流式處理

2022-05-30 08:21:17

Kafka數據傳遞

2018-10-14 15:52:46

MySQL數據清理數據庫

2019-10-17 09:25:56

Spark StreaPVUV

2023-10-24 20:32:40

大數據

2017-09-26 09:35:22

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數據

2021-07-09 10:27:12

SparkStreaming系統

2017-06-27 15:08:05

大數據Apache SparKafka Strea

2025-04-02 08:17:42

2016-03-03 15:11:42

Spark Strea工作流調度器
點贊
收藏

51CTO技術棧公眾號

国产美女亚洲精品7777| 天堂аⅴ在线地址8| 亚洲激情av| 亚洲人成电影在线| 一级黄色在线播放| f2c人成在线观看免费视频| 欧美激情一区二区三区| 俄罗斯精品一区二区三区| 欧美亚洲另类小说| 国产一区二区中文| 日韩在线视频二区| 精品一区二区视频在线观看| 久久69成人| 精品二区三区线观看| 特级黄色录像片| 久香视频在线观看| 成人午夜激情视频| 成人黄色av网站| 日本免费在线观看视频| 亚洲一级黄色| 欧美成人三级视频网站| 谁有免费的黄色网址| 国产精伦一区二区三区| 91麻豆精品国产自产在线 | 国产专区一区| 丝袜美腿亚洲一区二区| 国产美女免费无遮挡| 91久久偷偷做嫩草影院电| 欧美日韩一区视频| 亚洲熟妇av一区二区三区 | 秋霞av国产精品一区| 久久精品女人毛片国产| 亚洲天天影视网| 日韩在线观看av| 永久免费av无码网站性色av| 最好看的中文字幕| 中老年在线免费视频| 一区二区三区在线观看欧美 | 欧亚乱熟女一区二区在线| 开心久久婷婷综合中文字幕| 色婷婷久久99综合精品jk白丝| 美脚丝袜脚交一区二区| 在线视频国产区| 亚洲四区在线观看| 一区二区在线观看网站| 在线a免费看| 中文字幕乱码久久午夜不卡| 欧美系列一区| 国产youjizz在线| 国产日韩av一区二区| 日本精品一区| eeuss影院www在线播放| 国产精品婷婷午夜在线观看| 日韩精品国内| 69av亚洲| 亚洲精品免费电影| 九九热只有这里有精品| japanese色国产在线看视频| 香蕉av福利精品导航| 亚洲不卡中文字幕无码| 亚洲午夜天堂| 欧美午夜视频网站| 一区二区三区欧美精品| 久久伊人久久| 亚洲第一av在线| 99re久久精品国产| 蜜桃a∨噜噜一区二区三区| 亚洲人在线观看| 日本黄区免费视频观看| 91不卡在线观看| 欧美激情精品久久久久久黑人| 久久久久亚洲av片无码下载蜜桃| 亚洲美女少妇无套啪啪呻吟| 456亚洲影院| 国产男人搡女人免费视频| 蜜桃视频在线观看一区| 1卡2卡3卡精品视频| 日韩在线视频免费| 国产欧美一区在线| 99中文字幕在线观看| segui88久久综合| 欧洲一区二区av| 亚洲精品中文字幕乱码无线| 超碰一区二区三区| 亚洲午夜av电影| 成人免费视频网站入口::| 精品91在线| 国产精品99蜜臀久久不卡二区| 国产又爽又黄免费软件| 不卡一区中文字幕| 亚洲激情一区二区| av资源在线播放| 欧美日韩一级黄| 中文字幕乱码在线| 99精品美女| 欧美一级成年大片在线观看| 在线免费观看高清视频| 成人免费毛片嘿嘿连载视频| 婷婷久久青草热一区二区| 日本孕妇大胆孕交无码| 欧美在线999| 欧美日韩人妻精品一区在线| 四季av在线一区二区三区| 久久久最新网址| 夜夜躁很很躁日日躁麻豆| 99久免费精品视频在线观看| 三年中国中文在线观看免费播放| 天堂网在线最新版www中文网| 欧美日韩的一区二区| 亚洲天堂网一区二区| 午夜日韩激情| 国产精自产拍久久久久久蜜| 日韩在线视频第一页| 亚洲欧美区自拍先锋| 凹凸日日摸日日碰夜夜爽1| 亚洲国产中文在线| 日韩在线观看免费av| 秋霞精品一区二区三区| 丰满亚洲少妇av| 国产又粗又硬又长| 日本一区二区电影| 亚洲精品一区中文字幕乱码| 久久久久久av无码免费网站| 久久精品国产精品青草| 深田咏美在线x99av| 亚洲女色av| 亚洲精品99久久久久中文字幕| 精品无码久久久久成人漫画| 毛片av一区二区| 欧美亚洲精品日韩| 亚洲淫成人影院| 日韩av在线免费| 日韩美女黄色片| 成人精品视频一区二区三区 | 不卡av影片| 亚洲第一精品夜夜躁人人躁| 国产无码精品在线观看| 国产99一区视频免费| 亚洲国产精品女人| 色综合久久久| www.国产精品一二区| 在线观看国产黄| 国产精品久久三| 亚洲一级免费在线观看| 成人在线亚洲| 成人精品视频久久久久| 黄网站视频在线观看| 4438x亚洲最大成人网| 日韩成人短视频| 国产一区二区三区免费看| www.国产二区| 黄色成人美女网站| 69影院欧美专区视频| 头脑特工队2在线播放| 欧美视频在线视频| 国产毛片久久久久久久| 日韩国产欧美三级| 一区二区三区在线视频看| 宅男噜噜噜66国产精品免费| 麻豆国产精品va在线观看不卡 | 国产乱国产乱300精品| 看全色黄大色大片| jizz18欧美18| 2019中文字幕在线免费观看| 色视频在线观看免费| 在线观看欧美精品| 国产免费久久久久| 成人一区二区三区视频在线观看| 免费看国产曰批40分钟| 欧美一区二区麻豆红桃视频| 91啪国产在线| sm在线播放| 一区二区国产精品视频| 国产成人免费看一级大黄| 亚洲成va人在线观看| 六月婷婷七月丁香| 国内精品写真在线观看| 九九爱精品视频| 欧美伦理影院| 亚洲最大福利网站| 国产不卡网站| 久久久99久久精品女同性| 国产综合在线播放| 日本韩国欧美在线| 久久久久久久久久久久国产| ww亚洲ww在线观看国产| 中文字幕第一页在线视频| 激情久久五月| 中国成人在线视频| 久久久久观看| 亚洲一区亚洲二区| 黑人精品一区| 欧美精品国产精品日韩精品| 成人免费在线视频网| 精品久久久久一区二区国产| 久久人人爽人人爽人人片av免费| 亚洲三级在线免费观看| 熟女俱乐部一区二区| 国产乱色国产精品免费视频| 亚洲中文字幕无码专区| 欧美成人有码| 日产精品一线二线三线芒果| 99久久香蕉| 成人国产精品日本在线| 国产另类xxxxhd高清| 欧美富婆性猛交| 黄在线免费观看| 亚洲一区av在线播放| 欧美熟女一区二区| 欧美老女人第四色| 男人的天堂av网站| 亚洲va韩国va欧美va| 亚洲熟女www一区二区三区| 国产三级一区二区| 野花社区视频在线观看| 国产不卡免费视频| 一二三级黄色片| 青娱乐精品视频在线| 欧美视频第一区| 亚洲精选国产| 久久亚洲国产成人精品无码区| 亚洲成人国产| 中文字幕欧美人与畜| 精品国产一区二区三区四区 | 伊人久久久久久久久| 麻豆成人免费电影| www.日本xxxx| 老司机精品福利视频| 精品人妻一区二区三区四区在线| 亚洲欧美一级二级三级| 男女啪啪免费观看| 亚洲澳门在线| 四虎影院一区二区| 亚洲欧美日韩高清在线| av不卡在线免费观看| 久久精品av| 亚洲一区二区三区在线观看视频| 激情五月综合网| 欧美在线3区| 国产一区2区| 欧美资源一区| 精品国产乱码久久久久久1区2匹| 欧美日韩一区在线播放| 精品国产不卡| 日本在线免费观看一区| 成人国产精品一级毛片视频| 午夜精品一区二区在线观看| 日韩不卡一区| 激情视频小说图片| 在线精品观看| 国产精品无码一区二区在线| 国产精品入口| 久久久久免费精品| 免播放器亚洲一区| 中文字幕在线观看日| 韩国一区二区在线观看| 在线观看欧美一区二区| 成人免费视频caoporn| 中文字幕一区二区三区人妻不卡| 久久综合九色综合久久久精品综合| 精品少妇人妻一区二区黑料社区| 久久精品视频在线免费观看| 九一在线免费观看| 亚洲少妇最新在线视频| 国产精品1234区| 色综合咪咪久久| 国产精品无码久久久久成人app| 7777精品伊人久久久大香线蕉完整版 | 国产无遮挡一区二区三区毛片日本| 亚洲黄色小说视频| 亚洲天堂免费在线观看视频| 日本少妇在线观看| 91成人国产精品| 国产超碰人人模人人爽人人添| 亚洲第一天堂无码专区| 国产视频精选在线| 免费91在线视频| 麻豆理论在线观看| 国产日韩精品一区二区| 91九色鹿精品国产综合久久香蕉| 欧美日韩一区二区视频在线观看| 999精品色在线播放| 国产精品成人久久电影| 日本在线观看不卡视频| 91精产国品一二三| 国产亚洲精品中文字幕| 欧美人妻精品一区二区三区| 色综合久久久久久久久久久| 精品久久久中文字幕人妻| 亚洲欧美日韩精品| 日韩少妇视频| 国产精品久久一| 精品福利网址导航| 男人的天堂成人| 久久国产日韩| 精人妻一区二区三区| 亚洲国产成人私人影院tom| 久久久久久久久久91| 欧美日韩成人高清| 日本大臀精品| 久久久久久91香蕉国产| 一区二区三区| 色综合影院在线观看| 在线视频亚洲| 国产精久久久久| 专区另类欧美日韩| 怡红院av久久久久久久| 亚洲成人网久久久| 麻豆传媒在线观看| 国产精品极品美女在线观看免费| 久久亚洲黄色| 日本人妻伦在线中文字幕| 老色鬼精品视频在线观看播放| www.久久国产| 五月婷婷激情综合网| 亚洲国产精品久久久久久久| 色老头一区二区三区在线观看| 最新日韩精品| 国新精品乱码一区二区三区18| 91av精品| 加勒比av中文字幕| 国产精品免费丝袜| 日本熟妇一区二区三区| 精品亚洲精品福利线在观看| h片在线观看视频免费| 99视频免费观看蜜桃视频| 亚洲精品97| www.桃色.com| 最近中文字幕一区二区三区| 一区二区日韩在线观看| 中文字幕日韩在线观看| 欧美三级精品| 日韩激情视频| 日韩av在线播放中文字幕| www.中文字幕av| 日本乱码高清不卡字幕| 成人亚洲综合天堂| 国产精品高潮呻吟久久av无限| 欧美欧美黄在线二区| 久久久免费视频网站| 91视频在线观看免费| 欧美videossex极品| 日韩精品在线观看视频| 欧美aa视频| 日本视频一区在线观看| 蜜臀久久99精品久久久画质超高清| 毛片aaaaaa| 在线播放/欧美激情| а√资源新版在线天堂| av一区二区在线看| 亚洲香蕉网站| 在线免费观看日韩av| 欧美综合天天夜夜久久| 日韩子在线观看| 91久热免费在线视频| 欧美精品导航| 西西大胆午夜视频| 91成人网在线| 麻豆视频在线| 国产精品国色综合久久| 午夜在线一区| av男人的天堂av| 欧美日本韩国一区二区三区视频| 国内精品久久久久国产| 国产精品久久久久免费| 久久久国产亚洲精品| 国产三级黄色片| 欧美一级日韩不卡播放免费| 丁香花在线高清完整版视频| 精品国产乱码久久久久久郑州公司 | 亚洲三级观看| 日韩毛片无码永久免费看| 欧美日韩成人综合| 大桥未久在线视频| 日韩视频专区| 国产精品911| 伊人中文字幕在线观看| 久久综合电影一区| 欧美天堂影院| 福利片一区二区三区| 亚洲国产欧美在线| yw视频在线观看| 福利视频久久| 男人的天堂亚洲一区| 久久在线视频精品| 在线不卡国产精品| 亚洲高清999| 男女视频一区二区三区| 亚洲精品成人精品456| 国内在线免费高清视频| 99re6在线| 免费观看在线综合| 亚洲国产成人精品激情在线| 色偷偷av亚洲男人的天堂| 精品国产一区二区三区成人影院| 日本激情视频在线| 亚洲成av人片在线观看| 黄网页免费在线观看| 欧美黑人3p| 成人精品小蝌蚪|