精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

為啥Spark 的Broadcast要用單例模式

大數據 Spark
很多用Spark Streaming 的朋友應該使用過broadcast,大多數情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?

很多用Spark Streaming 的朋友應該使用過broadcast,大多數情況下廣播變量都是以單例模式聲明的有沒有粉絲想過為什么?浪尖在這里幫大家分析一下,有以下幾個原因:

  1. 廣播變量大多數情況下是不會變更的,使用單例模式可以減少spark streaming每次job生成執行,重復生成廣播變量帶來的開銷。
  2. 單例模式也要做同步。這個對于很多新手來說可以不用考慮同步問題,原因很簡單因為新手不會調整spark 程序task的調度模式,而默認采用FIFO的調度模式,基本不會產生并發問題。1).假如你配置了Fair調度模式,同時修改了Spark Streaming運行的并行執行的job數,默認為1,那么就要加上同步代碼了。2).還有一個原因,在多輸出流的情況下共享broadcast,同時配置了Fair調度模式,也會產生并發問題。
  3. 注意。有些時候比如廣播配置文件,規則等需要變更broadcast,在使用fair的時候可以在foreachrdd里面使用局部變量作為廣播,避免相互干擾。

先看例子,后面逐步揭曉內部機制。

1.例子

下面是一個雙重檢查式的broadcast變量的聲明方式。

  1. object WordBlacklist { 
  2.  
  3.   @volatile private var instance: Broadcast[Seq[String]] = null 
  4.  
  5.   def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 
  6.     if (instance == null) { 
  7.       synchronized { 
  8.         if (instance == null) { 
  9.           val wordBlacklist = Seq("a""b""c"
  10.           instance = sc.broadcast(wordBlacklist) 
  11.         } 
  12.       } 
  13.     } 
  14.     instance 
  15.   } 

廣播變量的使用方法如下:

  1. val lines = ssc.socketTextStream(ip, port) 
  2.     val words = lines.flatMap(_.split(" ")) 
  3.     val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 
  4.     wordCounts.foreachRDD { (rdd: RDD[(String, Int)], timeTime) => 
  5.       // Get or register the blacklist Broadcast 
  6.       val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 
  7.       // Get or register the droppedWordsCounter Accumulator 
  8.       val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 
  9.       // Use blacklist to drop words and use droppedWordsCounter to count them 
  10.       val counts = rdd.filter { case (word, count) => 
  11.         if (blacklist.value.contains(word)) { 
  12.           droppedWordsCounter.add(count
  13.           false 
  14.         } else { 
  15.           true 
  16.         } 
  17.       }.collect().mkString("["", ""]"
  18.       val output = s"Counts at time $time $counts" 
  19.       println(output
  20.       println(s"Dropped ${droppedWordsCounter.value} word(s) totally"
  21.       println(s"Appending to ${outputFile.getAbsolutePath}"
  22.       Files.append(output + "\n", outputFile, Charset.defaultCharset()) 
  23.     } 

2.概念補充

為啥Spark 的Broadcast要用單例模式

首先,一個基本概念就是Spark應用程序從開始提交到task執行分了很多層。

  1. 應用調度器。主要是資源管理器,比如standalone,yarn等負責Spark整個應用的調度和集群資源的管理。
  2. job調度器。spark 的算子分為主要兩大類,transform和action,其中每一個action都會產生一個job。這個job需要在executor提供的資源池里調度執行,當然并不少直接調度執行job。
  3. stage劃分及調度。job具體會劃分為若干stage,這個就有一個基本的概念就是寬依賴和窄依賴,寬依賴就會劃分stage。stage也需要調度執行,從后往前劃分,從前往后調度執行。
  4. task切割及調度。stage往下繼續細化就是會根據不太的并行度劃分出task集合,這個就是在executor上調度執行的基本單元,目前的調度默認是一個task一個cpu。
  5. Spark Streaming 的job生成是周期性的。當前job的執行時間超過生成周期就會產生job 累加。累加一定數目的job后有可能會導致應用程序失敗。這個主要原因是由于FIFO的調度模式和Spark Streaming的默認單線程的job執行機制

3.Spark Streaming job生成

這個源碼主要入口是StreamingContext#JobScheduler#JobGenerator對象,內部有個RecurringTimer,主要負責按照批處理時間周期產生GenrateJobs事件,當然在存在windows的情況下,該周期有可能不會生成job,要取決于滑動間隔,有興趣自己去揭秘,浪尖星球里分享的視頻教程里講到了。具體代碼塊如下

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
  2.    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator"

我們直接看其實現代碼塊:

  1. eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
  2.       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
  3.  
  4.       override protected def onError(e: Throwable): Unit = { 
  5.         jobScheduler.reportError("Error in job generator", e) 
  6.       } 
  7.     } 
  8.     eventLoop.start() 

event處理函數是processEvent方法

  1. /** Processes all events */ 
  2.   private def processEvent(event: JobGeneratorEvent) { 
  3.     logDebug("Got event " + event) 
  4.     event match { 
  5.       case GenerateJobs(time) => generateJobs(time
  6.       case ClearMetadata(time) => clearMetadata(time
  7.       case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.         doCheckpoint(time, clearCheckpointDataLater) 
  9.       case ClearCheckpointData(time) => clearCheckpointData(time
  10.     } 
  11.   } 

在接受到GenerateJob事件的時候,會執行generateJobs代碼,就是在該代碼內部產生和調度job的。

  1. /** Generate jobs and perform checkpointing for the given `time`.  */ 
  2.   private def generateJobs(timeTime) { 
  3.     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
  4.     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
  5.     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true"
  6.     Try { 
  7.       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
  8.       graph.generateJobs(time) // generate jobs using allocated block 
  9.     } match { 
  10.       case Success(jobs) => 
  11.         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time
  12.         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  13.       case Failure(e) => 
  14.         jobScheduler.reportError("Error generating jobs for time " + time, e) 
  15.         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
  16.     } 
  17.     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  18.   } 

可以看到代碼里首先會執行job生成代碼

  1. graph.generateJobs(time
  2.  
  3. 具體代碼塊兒 
  4.  
  5. def generateJobs(timeTime): Seq[Job] = { 
  6.     logDebug("Generating jobs for time " + time
  7.     val jobs = this.synchronized { 
  8.       outputStreams.flatMap { outputStream => 
  9.         val jobOption = outputStream.generateJob(time
  10.         jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
  11.         jobOption 
  12.       } 
  13.     } 
  14.     logDebug("Generated " + jobs.length + " jobs for time " + time
  15.     jobs 
  16.   } 

每個輸出流都會生成一個job,輸出流就類似于foreachrdd,print這些。其實內部都是ForEachDStream。所以生成的是一個job集合。

然后就會將job集合提交到線程池里去執行,這些都是在driver端完成的哦。

  1. jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  2.  
  3. 具體h函數內容 
  4. def submitJobSet(jobSet: JobSet) { 
  5.     if (jobSet.jobs.isEmpty) { 
  6.       logInfo("No jobs added for time " + jobSet.time
  7.     } else { 
  8.       listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
  9.       jobSets.put(jobSet.time, jobSet) 
  10.       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
  11.       logInfo("Added jobs for time " + jobSet.time
  12.     } 
  13.   } 

其實就是遍歷生成的job集合,然后提交到線程池jobExecutor內部執行。這個也是在driver端的哦。

jobExecutor就是一個固定線程數的線程池,默認是1個線程。

  1. private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  2.   private val jobExecutor = 
  3.     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor"

需要的話可以配置spark.streaming.concurrentJobs來同時提交執行多個job。

那么這種情況下,job就可以并行執行了嗎?

顯然不是的!

還要修改一下調度模式為Fair,詳細的配置可以參考:

http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application

簡單的均分的話只需要

  1. conf.set("spark.scheduler.mode""FAIR"

然后,同時運行的job就會均分所有executor提供的資源。

這就是整個job生成的整個過程了哦。

因為Spark Streaming的任務存在Fair模式下并發的情況,所以需要在使用單例模式生成broadcast的時候要注意聲明同步。

責任編輯:未麗燕 來源: Spark學習技巧
相關推薦

2021-09-07 10:44:35

異步單例模式

2021-02-01 10:01:58

設計模式 Java單例模式

2021-03-02 08:50:31

設計單例模式

2022-02-06 22:30:36

前端設計模式

2022-09-29 08:39:37

架構

2016-03-28 10:23:11

Android設計單例

2013-11-26 16:20:26

Android設計模式

2011-03-16 10:13:31

java單例模式

2021-02-07 23:58:10

單例模式對象

2022-06-07 08:55:04

Golang單例模式語言

2015-09-06 11:07:52

C++設計模式單例模式

2021-08-11 17:22:11

設計模式單例

2024-02-04 12:04:17

2024-03-06 13:19:19

工廠模式Python函數

2023-11-21 21:39:38

單例模式音頻管理器

2011-06-28 15:18:45

Qt 單例模式

2016-10-09 09:37:49

javascript單例模式

2013-03-26 10:35:47

Objective-C單例實現

2022-08-10 11:02:56

Python單例模式

2021-05-29 10:22:49

單例模式版本
點贊
收藏

51CTO技術棧公眾號

国产精品.xx视频.xxtv| 一区二区三区黄色片| 中文字幕久久精品一区二区 | 亚洲一区二区三区免费在线观看| 亚洲第一精品自拍| 欧美黑人又粗又大又爽免费| 天堂中文8资源在线8| 国产精品亚洲一区二区三区妖精| 午夜精品久久久久久久白皮肤| 风间由美一二三区av片| 欧美在线se| 亚洲国产欧美另类丝袜| 日韩精品另类天天更新| av免费观看在线| 国产色综合网| 欧美精品免费在线观看| 国产肥白大熟妇bbbb视频| 国产区一区二| 91福利在线播放| 日本一级黄视频| 国产玉足榨精视频在线观看| 国产91综合网| 国产免费一区视频观看免费| 久久狠狠高潮亚洲精品| 国产精品福利在线观看播放| 日韩av在线最新| 国产黑丝在线视频| 日本在线中文字幕一区二区三区| 亚洲在线免费播放| 一区二区不卡在线视频 午夜欧美不卡'| 超碰人人人人人人| 精品一区二区影视| 国产精品igao视频| www.国产色| 国产尤物精品| 久久中国妇女中文字幕| 高清国产在线观看| 免费成人网www| 亚洲国产日韩精品在线| 日本人dh亚洲人ⅹxx| 欧美97人人模人人爽人人喊视频| 欧美性xxxx极品hd满灌| av在线播放天堂| 丝袜中文在线| 国产精品乱码人人做人人爱 | 欧洲激情视频| 亚洲精品国产欧美| 国产一级免费片| 91夜夜蜜桃臀一区二区三区| 91麻豆精品久久久久蜜臀| 欧美成人福利在线观看| 天天综合网站| 91精品福利视频| 欧美xxxxx在线视频| 人在线成免费视频| 欧美日韩美女视频| 一区二区传媒有限公司| 狠狠操一区二区三区| 亚洲图片自拍偷拍| 日韩黄色短视频| av女在线播放| 欧美日韩性视频在线| 国产免费观看高清视频| 日本免费一区二区六区| 福利视频第一区| 国产真实乱子伦| 芒果视频成人app| 色婷婷精品久久二区二区蜜臀av| 成年网站在线免费观看| 欧美特黄aaaaaaaa大片| 91福利社在线观看| 午夜免费福利在线| 亚洲青青一区| 欧美v日韩v国产v| aaaa黄色片| 亚洲都市激情| 中文字幕av一区二区三区谷原希美 | 国产精品极品美女在线观看| 欧洲一区二区三区免费视频| 香蕉视频禁止18| 视频欧美精品| 欧美tk—视频vk| 黄色录像a级片| 精品国产91| 久久精品亚洲热| 久久亚洲成人av| 亚洲一区二区网站| 国产精品视频久久| 亚洲AV无码精品自拍| 99精品视频在线免费观看| 日本一区二区不卡高清更新| 日韩黄色影院| 一区二区欧美在线观看| 欧美 丝袜 自拍 制服 另类| jizzyou欧美16| 日韩欧美激情四射| 91网站免费入口| 外国成人免费视频| 午夜精品蜜臀一区二区三区免费| 亚洲黄网在线观看| 国产美女视频91| 久久久福利视频| 午夜激情视频在线观看| 亚洲成人久久影院| 日本a√在线观看| 99精品国产一区二区三区2021 | 97人妻天天摸天天爽天天| 欧美三级情趣内衣| 精品自拍视频在线观看| 无码人妻一区二区三区免费| 国产精品2024| 日韩欧美激情一区二区| 后进极品白嫩翘臀在线播放| 欧美在线一二三| 在线观看亚洲免费视频| 久久人人99| 91超碰中文字幕久久精品| 国产精品久久久久久免费免熟| av不卡在线观看| 天天做天天爱天天高潮| 欧美激情喷水| 亚洲精品一区在线观看| 五月婷婷婷婷婷| 一本一本久久| av一区二区在线看| 91激情在线| 日韩欧美在线免费| 精品久久久久久无码人妻| 四虎国产精品免费观看| 日韩免费在线免费观看| 日韩在线一区二区三区四区| 亚洲免费看黄网站| 成人不卡免费视频| 欧美自拍偷拍| 浅井舞香一区二区| 无套内谢的新婚少妇国语播放| 日韩码欧中文字| 手机在线看福利| 欧美猛男同性videos| 亚州成人av在线| 亚洲高清在线观看视频| 亚洲人成网站精品片在线观看| 北条麻妃在线观看| 欧美a一欧美| 亚洲91精品在线| 可以免费看毛片的网站| 亚洲一区二区三区中文字幕| 青娱乐国产精品视频| 亚洲经典一区| 成人免费福利视频| 日本在线视频网| 欧美日韩亚洲综合一区二区三区| 色欲AV无码精品一区二区久久| 老司机一区二区三区| 精品91免费| 在线免费三级电影网站| 精品99999| 日韩黄色一级大片| 2017欧美狠狠色| 99热成人精品热久久66| 国产探花在线精品一区二区| 国产精品91久久久| 成人高清免费在线播放| 精品视频一区二区三区免费| 亚洲精品国产精品国自| 久久精品二区亚洲w码| 黄瓜视频免费观看在线观看www| 日韩成人综合网站| 欧美成人在线免费视频| 精品久久国产视频| 亚洲国产美女搞黄色| chinese麻豆新拍video| 久久性色av| 亚洲欧美日产图| 日韩一区二区三区精品| 高清欧美性猛交| 免费在线毛片| 欧美精品丝袜久久久中文字幕| 欧美激情图片小说| 不卡一区二区在线| 国产福利一区视频| 99久久精品网| 国产精品日韩欧美一区二区| 在线免费av资源| 久久久国产视频91| 日本激情一区二区三区| 在线看国产日韩| 疯狂试爱三2浴室激情视频| 成人av在线资源网站| 日韩精品一区二区三区色欲av| 欧美gay男男猛男无套| 国产精品.com| av高清一区| 欧美国产日本高清在线| 欧美香蕉爽爽人人爽| 欧美巨大另类极品videosbest | 2024短剧网剧在线观看| 亚洲国产高清高潮精品美女| 性色av一区二区三区四区| 亚洲激情自拍视频| 亚洲码无人客一区二区三区| 国产激情视频一区二区三区欧美| 国产aaa一级片| 中文字幕免费一区二区| 欧美精品二区三区四区免费看视频| 久久爱.com| 91成人在线播放| 国产黄色在线免费观看| 亚洲精品永久免费精品| 国产成人a人亚洲精品无码| 一本久道中文字幕精品亚洲嫩 | 精品动漫一区二区三区| 成人无码精品1区2区3区免费看| 成人精品免费看| 一本一道久久a久久综合蜜桃| 亚洲每日在线| 无码人妻精品一区二区蜜桃百度| 你懂的一区二区三区| 国产精华一区二区三区| 91视频成人| 国产精品户外野外| 人狥杂交一区欧美二区| 欧美黑人狂野猛交老妇| 免费在线看黄网站| 国产亚洲精品久久久久久777| 丰满肉肉bbwwbbww| 69堂精品视频| 一区二区视频网站| 色吊一区二区三区| 国产三级av片| 精品国产31久久久久久| 国产一级久久久| 亚洲伦理在线精品| 欧美激情精品久久久久久免费| 国产欧美日韩精品a在线观看| 人妻丰满熟妇aⅴ无码| 成人天堂资源www在线| 亚洲精品久久久久久| 国产乱子伦视频一区二区三区| a在线观看免费视频| 视频一区二区中文字幕| 哪个网站能看毛片| 国产精品亚洲综合久久| 国产深夜男女无套内射| 在线成人国产| 缅甸午夜性猛交xxxx| 精品成人在线| 国产精品久久中文字幕| 亚洲大胆在线| 国自产拍偷拍精品啪啪一区二区| 激情文学一区| 国产毛片视频网站| av成人天堂| 欧美污视频网站| 日韩国产精品久久久久久亚洲| 99视频精品免费| 男男视频亚洲欧美| 成年人三级黄色片| 久久9热精品视频| 色网站在线视频| 国产高清无密码一区二区三区| 日韩欧美色视频| 成人av在线一区二区三区| avtt香蕉久久| 国产日韩精品一区| 成年人视频软件| 亚洲视频在线观看三级| 久久久精品一区二区涩爱| 亚洲成人精品一区二区| 国产微拍精品一区| 在线亚洲一区二区| 国产视频手机在线| 精品av久久707| 国产视频精品久久| 久久精品国产欧美激情| 国产盗摄精品一区二区酒店| 7777kkkk成人观看| 成人视屏在线观看| 91免费福利视频| 久久97精品| 色女人综合av| 午夜精品999| 欧美激情视频免费看| 蜜桃av一区| 国产精品久久久久久9999| 成人的网站免费观看| xxx在线播放| 亚洲精品国产成人久久av盗摄| 欧美黑人一区二区| 在线不卡a资源高清| 日韩一级片免费看| 最新国产精品亚洲| www视频在线观看| 国产精品热视频| 国产suv精品一区二区四区视频| 欧美人xxxxx| 亚洲91中文字幕无线码三区| 女人天堂av手机在线| 蜜桃视频一区二区三区在线观看| 午夜影院福利社| 国产精品久久久久久久午夜片| 日韩 欧美 亚洲| 精品视频一区 二区 三区| 午夜在线视频免费| 久久综合伊人77777| 免费成人直播| 成人在线资源网址| 成人免费看片39| 亚洲第一论坛sis| 午夜电影网一区| 国产美女www| 欧美成人一区二区三区片免费| 日韩av资源| 久久99久久亚洲国产| 亚洲天堂1区| 激情久久av| 午夜日韩福利| 思思久久精品视频| 久久久噜噜噜久久中文字幕色伊伊 | 午夜精品一区二| 亚洲国产精品字幕| 最新日本在线观看| 国产欧美一区二区三区久久人妖| 久久a级毛片毛片免费观看| 欧美日韩视频免费在线观看| 视频一区欧美日韩| av在线网站观看| 亚洲国产精品久久艾草纯爱| 国产一区二区在线不卡| 国产一区二区三区在线免费观看| 国产h片在线观看| 99re国产在线播放| 91精品啪在线观看国产18| 婷婷丁香激情网| 国产欧美日韩精品在线| 国产成人在线免费视频 | 精华区一区二区三区| 91国内精品久久| 久久aimee| 国产在线精品91| 97精品视频在线观看自产线路二| 精品一区免费观看| 精品国产电影一区二区| 1769免费视频在线观看| 99re在线播放| 欧美女人交a| 乱码一区二区三区| 亚洲成人综合在线| 免费观看黄一级视频| 欧美黑人国产人伦爽爽爽| 最新精品在线| 男女私大尺度视频| 不卡欧美aaaaa| 日韩精品乱码久久久久久| 亚洲国产美女久久久久| av福利在线导航| 精品在线视频一区二区三区| 国模 一区 二区 三区| 无码人妻一区二区三区免费n鬼沢| 亚洲精品乱码久久久久久久久 | 亚洲欧洲日产国码av系列天堂| 密臀av在线播放| 欧美国产视频在线观看| 欧美一级二区| 国产午夜精品久久久久久久久| 欧美精品久久一区二区三区| av免费在线免费| 国产一区二区三区四区五区加勒比| 日韩视频精品在线观看| 少妇特黄一区二区三区| 91高清视频免费看| 黄色av电影在线观看| 国产精品12| 性色一区二区| 五月天免费网站| 日韩精品一区二区在线| 免费h在线看| 日韩少妇中文字幕| 国产在线不卡一区| 日本三级黄色大片| 亚洲天堂男人的天堂| 国产va免费精品观看精品| 日韩免费在线观看av| 久久亚洲一区二区三区四区| 中文字幕久久久久| 欧美劲爆第一页| 亚洲福利天堂| 少妇愉情理伦片bd| 狠狠躁夜夜躁人人爽天天天天97 | 欧美人与性动交| 小说区图片区色综合区| 亚洲国产日韩欧美在线观看| 一区二区激情小说| 欧美3p视频在线观看| 成人信息集中地欧美| 亚洲制服少妇| 成人免费精品动漫网站| 亚洲精品网址在线观看| 91麻豆精品| 欧美激情成人网|