精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark Shuffle過程分析:Map階段處理流程

大數(shù)據(jù) Spark
默認配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數(shù)據(jù)的計算。我們可以在Driver和Executor對應的SparkEnv對象創(chuàng)建過程中看到對應的配置。

默認配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數(shù)據(jù)的計算。我們可以在Driver和Executor對應的SparkEnv對象創(chuàng)建過程中看到對應的配置,如下代碼所示:

 

  1. // Let the user specify short names for shuffle managers 
  2.     val shortShuffleMgrNames = Map( 
  3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 
  4.       "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 
  5.     val shuffleMgrName = conf.get("spark.shuffle.manager""sort"
  6.     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 
  7.     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 

如果需要修改ShuffleManager實現(xiàn),則只需要修改配置項spark.shuffle.manager即可,默認支持sort和 tungsten-sort,可以指定自己實現(xiàn)的ShuffleManager類。

因為Shuffle過程中需要將Map結(jié)果數(shù)據(jù)輸出到文件,所以需要通過注冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,通過它來控制Map階段記錄數(shù)據(jù)輸出的行為。其中,ShuffleHandle包含了如下基本信息:

  • shuffleId:標識Shuffle過程的唯一ID
  • numMaps:RDD對應的Partitioner指定的Partition的個數(shù),也就是ShuffleMapTask輸出的Partition個數(shù)
  • dependency:RDD對應的依賴ShuffleDependency

下面我們看下,在SortShuffleManager中是如何注冊Shuffle的,代碼如下所示:

 

  1. override def registerShuffle[K, V, C]( 
  2.       shuffleId: Int
  3.       numMaps: Int
  4.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 
  5.     if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 
  6.       new BypassMergeSortShuffleHandle[K, V]( 
  7.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  8.     } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 
  9.       new SerializedShuffleHandle[K, V]( 
  10.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  11.     } else { 
  12.       new BaseShuffleHandle(shuffleId, numMaps, dependency) 
  13.     } 
  14.   } 

上面代碼中,對應如下3種ShuffleHandle可以選擇,說明如下:

  • BypassMergeSortShuffleHandle

如果dependency不需要進行Map Side Combine,并且RDD對應的ShuffleDependency中的Partitioner設置的Partition的數(shù)量(這個不要和parent RDD的Partition個數(shù)混淆,Partitioner指定了map處理結(jié)果的Partition個數(shù),每個Partition數(shù)據(jù)會在Shuffle過程中全部被拉取而拷貝到下游的某個Executor端)小于等于配置參數(shù)spark.shuffle.sort.bypassMergeThreshold的值,則會注冊BypassMergeSortShuffleHandle。默認情況下,spark.shuffle.sort.bypassMergeThreshold的取值是200,這種情況下會直接將對RDD的 map處理結(jié)果的各個Partition數(shù)據(jù)寫入文件,并***做一個合并處理。

  • SerializedShuffleHandle

如果ShuffleDependency中的Serializer,允許對將要輸出數(shù)據(jù)對象進行排序后,再執(zhí)行序列化寫入到文件,則會選擇創(chuàng)建一個SerializedShuffleHandle。

  • BaseShuffleHandle

除了上面兩種ShuffleHandle以后,其他情況都會創(chuàng)建一個BaseShuffleHandle對象,它會以反序列化的格式處理Shuffle輸出數(shù)據(jù)。

Map階段處理流程分析

Map階段RDD的計算,對應ShuffleMapTask這個實現(xiàn)類,它最終會在每個Executor上啟動運行,每個ShuffleMapTask處理RDD的一個Partition的數(shù)據(jù)。這個過程的核心處理邏輯,代碼如下所示:

 

  1. val manager = SparkEnv.get.shuffleManager 
  2.       writer = manager.getWriter[AnyAny](dep.shuffleHandle, partitionId, context) 
  3.       writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[AnyAny]]]) 

上面代碼中,在調(diào)用rdd的iterator()方法時,會根據(jù)RDD實現(xiàn)類的compute方法指定的處理邏輯對數(shù)據(jù)進行處理,當然,如果該Partition對應的數(shù)據(jù)已經(jīng)處理過并存儲在MemoryStore或DiskStore,直接通過BlockManager獲取到對應的Block數(shù)據(jù),而無需每次需要時重新計算。然后,write()方法會將已經(jīng)處理過的Partition數(shù)據(jù)輸出到磁盤文件。

在Spark Shuffle過程中,每個ShuffleMapTask會通過配置的ShuffleManager實現(xiàn)類對應的ShuffleManager對象(實際上是在SparkEnv中創(chuàng)建),根據(jù)已經(jīng)注冊的ShuffleHandle,獲取到對應的ShuffleWriter對象,然后通過ShuffleWriter對象將Partition數(shù)據(jù)寫入內(nèi)存或文件。所以,接下來我們可能關心每一種ShuffleHandle對應的ShuffleWriter的行為,可以看到SortShuffleManager中獲取到ShuffleWriter的實現(xiàn)代碼,如下所示:

 

  1. /** Get a writer for a given partition. Called on executors by map tasks. */ 
  2.   override def getWriter[K, V]( 
  3.       handle: ShuffleHandle, 
  4.       mapId: Int
  5.       context: TaskContext): ShuffleWriter[K, V] = { 
  6.     numMapsForShuffle.putIfAbsent( 
  7.       handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 
  8.     val env = SparkEnv.get 
  9.     handle match { 
  10.       case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 
  11.         new UnsafeShuffleWriter( 
  12.           env.blockManager, 
  13.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  14.           context.taskMemoryManager(), 
  15.           unsafeShuffleHandle, 
  16.           mapId, 
  17.           context, 
  18.           env.conf) 
  19.       case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 
  20.         new BypassMergeSortShuffleWriter( 
  21.           env.blockManager, 
  22.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  23.           bypassMergeSortHandle, 
  24.           mapId, 
  25.           context, 
  26.           env.conf) 
  27.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 
  28.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 
  29.     } 
  30.   } 

我們以最簡單的SortShuffleWriter為例進行分析,在SortShuffleManager可以通過getWriter()方法創(chuàng)建一個SortShuffleWriter對象,然后在ShuffleMapTask中調(diào)用SortShuffleWriter對象的write()方法處理Map輸出的記錄數(shù)據(jù),write()方法的處理代碼,如下所示:

 

  1. /** Write a bunch of records to this task's output */ 
  2.   override def write(records: Iterator[Product2[K, V]]): Unit = { 
  3.     sorter = if (dep.mapSideCombine) { 
  4.       require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!"
  5.       new ExternalSorter[K, V, C]( 
  6.         context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 
  7.     } else { 
  8.       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  9.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  10.       // if the operation being run is sortByKey. 
  11.       new ExternalSorter[K, V, V]( 
  12.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 
  13.     } 
  14.     sorter.insertAll(records) 
  15.  
  16.     // Don't bother including the time to open the merged output file in the shuffle write time
  17.     // because it just opens a single file, so is typically too fast to measure accurately 
  18.     // (see SPARK-3570). 
  19.     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 
  20.     val tmp = Utils.tempFileWith(output
  21.     val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 
  22.     val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 
  23.     shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 
  24.     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 
  25.   } 

從SortShuffleWriter類中的write()方法可以看到,最終調(diào)用了ExeternalSorter的insertAll()方法,實現(xiàn)了Map端RDD某個Partition數(shù)據(jù)處理并輸出到內(nèi)存或磁盤文件,這也是處理Map階段輸出記錄數(shù)據(jù)最核心、最復雜的過程。我們將其分為兩個階段進行分析:***階段是,ExeternalSorter的insertAll()方法處理過程,將記錄數(shù)據(jù)Spill到磁盤文件;第二階段是,執(zhí)行完insertAll()方法之后的處理邏輯,創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件。

內(nèi)存緩沖寫記錄數(shù)據(jù)并Spill到磁盤文件

查看SortShuffleWriter類的write()方法可以看到,在內(nèi)存中緩存記錄數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)有兩種:一種是Buffer,對應的實現(xiàn)類PartitionedPairBuffer,設置mapSideCombine=false時會使用該結(jié)構(gòu);另一種是Map,對應的實現(xiàn)類是PartitionedAppendOnlyMap,設置mapSideCombine=false時會使用該結(jié)構(gòu)。根據(jù)是否指定mapSideCombine選項,分別對應不同的處理流程,我們分別說明如下:

設置mapSideCombine=false時

這種情況在Map階段不進行Combine操作,在內(nèi)存中緩存記錄數(shù)據(jù)會使用PartitionedPairBuffer這種數(shù)據(jù)結(jié)構(gòu)來緩存、排序記錄數(shù)據(jù),它是一個Append-only Buffer,僅支持向Buffer中追加數(shù)據(jù)鍵值對記錄,PartitionedPairBuffer的結(jié)構(gòu)如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

默認情況下,PartitionedPairBuffer初始分配的存儲容量為capacity = initialCapacity = 64,實際上這個容量是針對key的容量,因為要存儲的是鍵值對記錄數(shù)據(jù),所以實際存儲鍵值對的容量為2*initialCapacity = 128。PartitionedPairBuffer是一個能夠動態(tài)擴充容量的Buffer,內(nèi)部使用一個一維數(shù)組來存儲鍵值對,每次擴容結(jié)果為當前Buffer容量的2倍,即2*capacity,***支持存儲2^31-1個鍵值對記錄(1073741823個)。

通過上圖可以看到,PartitionedPairBuffer存儲的鍵值對記錄數(shù)據(jù),鍵是(partition, key)這樣一個Tuple,值是對應的數(shù)據(jù)value,而且curSize是用來跟蹤寫入Buffer中的記錄的,key在Buffer中的索引位置為2*curSize,value的索引位置為2*curSize+1,可見一個鍵值對的key和value的存儲在PartitionedPairBuffer內(nèi)部的數(shù)組中是相鄰的。

使用PartitionedPairBuffer緩存鍵值對記錄數(shù)據(jù),通過跟蹤實際寫入到Buffer內(nèi)的記錄數(shù)據(jù)的字節(jié)數(shù)來判斷,是否需要將Buffer中的數(shù)據(jù)Spill到磁盤文件,如下代碼所示:

 

  1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 
  2.     var shouldSpill = false 
  3.     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 
  4.       // Claim up to double our current memory from the shuffle memory pool 
  5.       val amountToRequest = 2 * currentMemory - myMemoryThreshold 
  6.       val granted = acquireMemory(amountToRequest) 
  7.       myMemoryThreshold += granted 
  8.       // If we were granted too little memory to grow further (either tryToAcquire returned 0, 
  9.       // or we already had more memory than myMemoryThreshold), spill the current collection 
  10.       shouldSpill = currentMemory >= myMemoryThreshold 
  11.     } 
  12.     shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 
  13.     // Actually spill 
  14.     if (shouldSpill) { 
  15.       _spillCount += 1 
  16.       logSpillage(currentMemory) 
  17.       spill(collection) 
  18.       _elementsRead = 0 
  19.       _memoryBytesSpilled += currentMemory 
  20.       releaseMemory() 
  21.     } 
  22.     shouldSpill 
  23.   } 

上面elementsRead表示存儲到PartitionedPairBuffer中的記錄數(shù),currentMemory是對Buffer中的總記錄數(shù)據(jù)大小(字節(jié)數(shù))的估算,myMemoryThreshold通過配置項spark.shuffle.spill.initialMemoryThreshold來進行設置的,默認值為5 * 1024 * 1024 = 5M。當滿足條件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold時,會先嘗試向MemoryManager申請2 * currentMemory – myMemoryThreshold大小的內(nèi)存,如果能夠申請到,則不進行Spill操作,而是繼續(xù)向Buffer中存儲數(shù)據(jù),否則就會調(diào)用spill()方法將Buffer中數(shù)據(jù)輸出到磁盤文件。

向PartitionedPairBuffer中寫入記錄數(shù)據(jù),以及滿足條件Spill記錄數(shù)據(jù)到磁盤文件,具體處理流程,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

為了查看按照怎樣的規(guī)則進行排序,我們看一下,當不進行Map Side Combine時,創(chuàng)建ExternalSorter對象的代碼如下所示:

 

  1. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  2.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  3.       // if the operation being run is sortByKey. 
  4.       new ExternalSorter[K, V, V]( 
  5.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 

上面aggregator = None,ordering = None,在對PartitionedPairBuffer中的記錄數(shù)據(jù)Spill到磁盤之前,要使用默認的排序規(guī)則進行排序,排序的規(guī)則是只對PartitionedPairBuffer中的記錄按Partition ID進行升序排序,可以查看WritablePartitionedPairCollection伴生對象類的代碼(其中PartitionedPairBuffer類實現(xiàn)了特質(zhì)WritablePartitionedPairCollection),如下所示:

 

  1. /** 
  2.    * A comparator for (Int, K) pairs that orders them by only their partition ID. 
  3.    */ 
  4.   def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] { 
  5.     override def compare(a: (Int, K), b: (Int, K)): Int = { 
  6.       a._1 - b._1 
  7.     } 
  8.   } 

上面圖中,引用了SortShuffleWriter.writeBlockFiles這個子序列圖,用來生成Block數(shù)據(jù)文件和索引文件,后面我們會單獨說明。通過對RDD進行計算生成一個記錄迭代器對象,通過該迭代器迭代出的記錄會存儲到PartitionedPairBuffer中,當滿足Spill條件時,先對PartitionedPairBuffer中記錄進行排序,***Spill到磁盤文件,這個過程中PartitionedPairBuffer中的記錄數(shù)據(jù)的變化情況,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

上圖中,對內(nèi)存中PartitionedPairBuffer中的記錄按照Partition ID進行排序,并且屬于同一個Partition的數(shù)據(jù)記錄在PartitionedPairBuffer內(nèi)部的data數(shù)組中是連續(xù)的。排序結(jié)束后,在Spill到磁盤文件時,將對應的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中連續(xù)存儲鍵值對數(shù)據(jù),但同時在另一個內(nèi)存數(shù)組結(jié)構(gòu)中會保存文件中每個Partition擁有的記錄數(shù),這樣就能根據(jù)Partition的記錄數(shù)來順序讀取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中屬于同一個Partition的全部記錄數(shù)據(jù)。

ExternalSorter類內(nèi)部維護了一個SpillFile的ArrayBuffer數(shù)組,最終可能會生成多個SpillFile,SpillFile的定義如下所示:

 

  1. private[this] case class SpilledFile( 
  2.     file: File, 
  3.     blockId: BlockId, 
  4.     serializerBatchSizes: Array[Long], 
  5.     elementsPerPartition: Array[Long]) 

每個SpillFile包含一個blockId,標識Map輸出的該臨時文件;serializerBatchSizes表示每次批量寫入到文件的Object的數(shù)量,默認為10000,由配置項spark.shuffle.spill.batchSize來控制;elementsPerPartition表示每個Partition中的Object的數(shù)量。調(diào)用ExternalSorter的insertAll()方法,最終可能有如下3種情況:

  • Map階段輸出記錄數(shù)較少,沒有生成SpillFile,那么所有數(shù)據(jù)都在Buffer中,直接對Buffer中記錄排序并輸出到文件
  • Map階段輸出記錄數(shù)較多,生成多個SpillFile,同時Buffer中也有部分記錄數(shù)據(jù)
  • Map階段輸出記錄數(shù)較多,只生成多個SpillFile
  • 有關后續(xù)如何對上面3種情況進行處理,可以想見后面對子序列圖SortShuffleWriter.writeBlockFiles的說明。
  • 設置mapSideCombine=true時

這種情況在Map階段會執(zhí)行Combine操作,在Map階段進行Combine操作能夠降低Map階段數(shù)據(jù)記錄的總數(shù),從而降低Shuffle過程中數(shù)據(jù)的跨網(wǎng)絡拷貝傳輸。這時,RDD對應的ShuffleDependency需要設置一個Aggregator用來執(zhí)行Combine操作,可以看下Aggregator類聲明,代碼如下所示:

 

  1. /** 
  2.  * :: DeveloperApi :: 
  3.  * A set of functions used to aggregate data. 
  4.  * 
  5.  * @param createCombiner function to create the initial value of the aggregation. 
  6.  * @param mergeValue function to merge a new value into the aggregation result. 
  7.  * @param mergeCombiners function to merge outputs from multiple mergeValue function
  8.  */ 
  9. @DeveloperApi 
  10. case class Aggregator[K, V, C] ( 
  11.     createCombiner: V => C, 
  12.     mergeValue: (C, V) => C, 
  13.     mergeCombiners: (C, C) => C) { 
  14.   ... ... 

由于在Map階段只用到了構(gòu)造Aggregator的幾個函數(shù)參數(shù)createCombiner、mergeValue、mergeCombiners,我們對這幾個函數(shù)詳細說明如下:

  • createCombiner:進行Aggregation開始時,需要設置初始值。因為在Aggregation過程中使用了類似Map的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來管理鍵值對,每次加入前會先查看Map內(nèi)存結(jié)構(gòu)中是否存在Key對應的Value,***次肯定不存在,所以***將某個Key的Value加入到Map內(nèi)存結(jié)構(gòu)中時,Key在Map內(nèi)存結(jié)構(gòu)中***次有了Value。
  • mergeValue:某個Key已經(jīng)在Map結(jié)構(gòu)中存在Value,后續(xù)某次又遇到相同的Key和一個新的Value,這時需要通過該函數(shù),將舊Value和新Value進行合并,根據(jù)Key檢索能夠得到合并后的新Value。
  • mergeCombiners:一個Map內(nèi)存結(jié)構(gòu)中Key和Value是由mergeValue生成的,那么在向Map中插入數(shù)據(jù),肯定會遇到Map使用容量達到上限,這時需要將記錄數(shù)據(jù)Spill到磁盤文件,那么多個Spill輸出的磁盤文件中可能存在同一個Key,這時需要對多個Spill輸出的磁盤文件中的Key的多個Value進行合并,這時需要使用mergeCombiners函數(shù)進行處理。

該類中定義了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于這些函數(shù)是在Reduce階段使用的,所以在這里先不說明,后續(xù)文章我們會單獨詳細來分析。

我們通過下面的序列圖來描述,需要進行Map Side Combine時的處理流程,如下所示:

Spark Shuffle過程分析:Map階段處理流程

對照上圖,我們看一下,當需要進行Map Side Combine時,對應的ExternalSorter類insertAll()方法中的處理邏輯,代碼如下所示:

 

  1. val shouldCombine = aggregator.isDefined 
  2.  
  3.     if (shouldCombine) { 
  4.       // Combine values in-memory first using our AppendOnlyMap 
  5.       val mergeValue = aggregator.get.mergeValue 
  6.       val createCombiner = aggregator.get.createCombiner 
  7.       var kv: Product2[K, V] = null 
  8.       val update = (hadValue: Boolean, oldValue: C) => { 
  9.         if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) 
  10.       } 
  11.       while (records.hasNext) { 
  12.         addElementsRead() 
  13.         kv = records.next() 
  14.         map.changeValue((getPartition(kv._1), kv._1), update
  15.         maybeSpillCollection(usingMap = true
  16.       } 
  17.     } 

上面代碼中,map是內(nèi)存數(shù)據(jù)結(jié)構(gòu),最重要的是update函數(shù)和map的changeValue方法(這里的map對應的實現(xiàn)類是PartitionedAppendOnlyMap)。update函數(shù)所做的工作,其實就是對createCombiner和mergeValue這兩個函數(shù)的使用,***次遇到一個Key調(diào)用createCombiner函數(shù)處理,非***遇到同一個Key對應新的Value調(diào)用mergeValue函數(shù)進行合并處理。map的changeValue方法主要是將Key和Value在map中存儲或者進行修改(對出現(xiàn)的同一個Key的多個Value進行合并,并將合并后的新Value替換舊Value)。

PartitionedAppendOnlyMap是一個經(jīng)過優(yōu)化的哈希表,它支持向map中追加數(shù)據(jù),以及修改Key對應的Value,但是不支持刪除某個Key及其對應的Value。它能夠支持的存儲容量是0.7 * 2 ^ 29 = 375809638。當達到指定存儲容量或者指定限制,就會將map中記錄數(shù)據(jù)Spill到磁盤文件,這個過程和前面的類似,不再累述。

創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件

無論是使用PartitionedPairBuffer,還是使用PartitionedAppendOnlyMap,當需要容量滿足Spill條件時,都會將該內(nèi)存結(jié)構(gòu)(buffer/map)中記錄數(shù)據(jù)Spill到磁盤文件,所以Spill到磁盤文件的格式是相同的。對于后續(xù)Block數(shù)據(jù)文件和索引文件的生成邏輯也是相同,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

假設,我們生成的Shuffle Block文件對應各個參數(shù)為:shuffleId=2901,mapId=11825,reduceId=0,這里reduceId是一個NOOP_REDUCE_ID,表示與DiskStore進行磁盤I/O交互操作,而DiskStore期望對應一個(map, reduce)對,但是對于排序的Shuffle輸出,通常Reducer拉取數(shù)據(jù)后只生成一個文件(Reduce文件),所以這里默認reduceId為0。經(jīng)過上圖的處理流程,可以生成一個.data文件,也就是Block數(shù)據(jù)文件;一個.index文件,也就是包含了各個Partition在數(shù)據(jù)文件中的偏移位置的索引文件。這個過程生成的文件,示例如下所示:

 

  1. shuffle_2901_11825_0.data  
  2. shuffle_2901_11825_0.index 

這樣,對于每個RDD的多個Partition進行處理后,都會生成對應的數(shù)據(jù)文件和索引文件,后續(xù)在Reduce端就可以讀取這些Block文件,這些記錄數(shù)據(jù)在文件中都是經(jīng)過分區(qū)(Partitioned)的。

責任編輯:未麗燕 來源: 36大數(shù)據(jù)
相關推薦

2017-03-27 10:48:03

Hive map優(yōu)化分析

2021-08-11 06:57:16

ShuffleSpark核心

2019-04-22 15:24:24

HadoopSuffleMap端

2023-02-08 13:08:31

2021-10-20 10:04:47

鴻蒙HarmonyOS應用

2025-09-15 06:25:00

2025-06-13 08:40:00

ShuffleSpark大數(shù)據(jù)

2022-03-15 08:25:32

SparkShuffle框架

2019-07-26 15:01:42

SparkShuffle內(nèi)存

2019-06-06 15:22:07

SparkShuffle內(nèi)存

2012-08-30 09:48:02

Struts2Java

2023-11-20 07:27:00

云原生Spark

2009-07-03 13:41:44

WinCE編譯過程

2024-07-15 09:58:03

OpenRestyNginx日志

2016-12-14 19:20:07

Spark SQL架構(gòu)分布式

2009-07-28 11:32:41

光纖鏈路故障

2011-04-13 14:57:11

ASP.NET請求處理

2017-04-24 09:20:05

Spark分析分區(qū)器

2010-06-13 14:36:20

RARP協(xié)議

2022-08-25 18:48:29

字節(jié)跳動CSS開源
點贊
收藏

51CTO技術棧公眾號

欧美理论电影| 丁香花免费高清完整在线播放| 美女精品一区最新中文字幕一区二区三区| 天天综合网天天综合色| 日韩不卡av| av加勒比在线| 免费精品视频| 精品精品国产国产自在线| 三上悠亚 电影| 日本不良网站在线观看| 国产精品嫩草99a| www.久久久| 欧美国产一级片| 欧美激情亚洲| 一区二区三区视频在线| 国产婷婷在线观看| 欧美成人福利| 精品成人久久av| 公共露出暴露狂另类av| 国产在线网站| caoporn国产精品| 91夜夜未满十八勿入爽爽影院| 偷偷操不一样的久久| 99精品小视频| 亚洲天堂免费视频| 男人网站在线观看| 日韩精品亚洲专区在线观看| 在线中文字幕一区二区| 五月天激情图片| 国产综合在线观看| 成人精品免费视频| 91精品久久久久久久| 五月婷婷色丁香| 国产一区美女| 久久国产精品久久久| 69视频在线观看免费| 精品三级av在线导航| 欧美一区二区国产| 尤物国产在线观看| 黄瓜视频成人app免费| 一区二区三区在线高清| 色综合电影网| 韩国三级在线观看久| aaa亚洲精品| 91视频99| 精品久久久久久亚洲综合网站 | 在线播放日韩精品| 中文在线永久免费观看| 风间由美中文字幕在线看视频国产欧美 | 久久xxx视频| 色哟哟一区二区在线观看| 奇米影视亚洲色图| 24小时免费看片在线观看| 亚洲一区二区视频| 毛片av在线播放| 男插女视频久久久| 亚洲成人激情综合网| 国产欧美日韩小视频| 1区2区在线| 欧美日韩中文字幕| 成人免费观看视频在线观看| 午夜久久中文| 91福利社在线观看| 欧美伦理片在线观看| 综合久久伊人| 欧美成人vps| 水蜜桃av无码| 九一亚洲精品| 日韩一区二区三区在线播放| 美女视频久久久| 欧美.www| 97视频在线观看免费| 激情视频网站在线观看| 麻豆国产精品官网| 91福利视频导航| 成人h动漫精品一区二区无码| 国产经典欧美精品| 精品久久一区二区三区蜜桃| 狠狠v欧美ⅴ日韩v亚洲v大胸 | 日韩三级电影免费观看| 日本高清视频在线播放| 亚洲精品欧美激情| 国产极品在线视频| 激情久久一区二区| 日韩美女视频在线| 偷拍女澡堂一区二区三区| 色综合天天综合网中文字幕| 欧美国产亚洲精品久久久8v| 天堂在线免费观看视频| 六月婷婷色综合| 国产乱人伦精品一区二区| 青青操在线视频| 综合久久国产九一剧情麻豆| 国产情侣第一页| 成人私拍视频| 欧美一区二区三区人| 日本黄色片在线播放| 日韩欧美中字| 97在线免费视频| 亚洲综合视频在线播放| av成人动漫在线观看| 伊人天天久久大香线蕉av色| a级大胆欧美人体大胆666| 欧美综合天天夜夜久久| 欧美极品jizzhd欧美仙踪林| 精品黄色一级片| 欧美黄色成人网| 一个人看的www日本高清视频| 粉嫩av亚洲一区二区图片| 日韩视频专区| 国产精品原创| 欧美一区二区啪啪| 五月婷六月丁香| 亚洲精品孕妇| 91成人免费观看| 日本中文在线观看| 狠狠综合久久av一区二区小说| 午夜激情影院在线观看| 在线视频亚洲专区| 午夜免费在线观看精品视频| 99在线观看精品视频| 国产精品久久久久影院亚瑟| 男人操女人逼免费视频| 9l亚洲国产成人精品一区二三| 中文字幕亚洲国产| 国产一区二区视频网站| 不卡一区二区中文字幕| 日日噜噜夜夜狠狠久久丁香五月| 亚洲综合av一区二区三区| 日韩电影免费观看在线观看| 久久久久久久9999| 国产一区二区在线影院| 色大师av一区二区三区| 在线精品亚洲欧美日韩国产| 亚洲第一男人av| 欧美黑人猛猛猛| 激情综合网最新| 午夜午夜精品一区二区三区文| 在线精品亚洲欧美日韩国产| 日韩av综合网| 91浏览器在线观看| 99综合电影在线视频| 阿v天堂2018| xxxx日韩| 国内精品久久久久久久| 殴美一级特黄aaaaaa| 亚洲国产日韩av| yjizz视频| 一区在线视频| 国产在线精品一区| 蜜桃在线视频| 精品香蕉在线观看视频一| 国产www在线| 久久网站热最新地址| 日韩毛片在线免费看| 视频一区欧美| 国产精品麻豆va在线播放| av在线天堂播放| 欧美日韩国产一级| 老湿机69福利| 国产ts人妖一区二区| 人妻少妇精品久久| 中文字幕精品影院| 91精品国产综合久久香蕉| fc2ppv国产精品久久| 精品国产人成亚洲区| 日本一级黄色录像| 久久婷婷久久一区二区三区| 欧美在线观看视频网站| 日韩精品一区二区久久| 亚洲一区二区日本| 国产美女精品写真福利视频| 亚洲欧美国产精品va在线观看| 久久99国产综合精品免费| 国产欧美日韩精品一区| 在线播放免费视频| 亚洲国产日韩欧美一区二区三区| 久久伊人资源站| 国产香蕉久久| 久久久久国产精品免费网站| 免费理论片在线观看播放老| 欧美另类变人与禽xxxxx| 黄色一级视频在线观看| www激情久久| 日韩av片免费观看| 国产乱码精品| 亚洲国产一区二区精品视频| av日韩精品| 国产精品2018| 伦理在线一区| 亚洲天堂网站在线观看视频| www.久久成人| 91久久精品一区二区| 欧美日韩色视频| 久久一二三国产| 四川一级毛毛片| 水蜜桃久久夜色精品一区的特点 | av男人的天堂在线| 精品国内二区三区| 中文天堂在线资源| 亚洲午夜在线电影| 国产美女网站视频| 91麻豆国产香蕉久久精品| 麻豆网站免费观看| 久久一区二区三区超碰国产精品| av磁力番号网| 精品高清在线| 国产在线资源一区| 91麻豆精品国产综合久久久 | 色婷婷av一区二区三区在线观看 | 亚洲三级黄色在线观看| 精品国产无码一区二区| 欧美日韩一区精品| 午夜影院免费在线观看| 亚洲一级在线观看| 国产jizz18女人高潮| 2欧美一区二区三区在线观看视频 337p粉嫩大胆噜噜噜噜噜91av | 亚洲欧美色图视频| 国产精品一二三四五| 日本www.色| 亚洲一区二区动漫| 日本黄色片一级片| 欧美黄色一区二区| 亚洲欧美日韩另类精品一区二区三区 | 免费看毛片的网站| 国产成人精品亚洲午夜麻豆| 亚洲欧美在线精品| 日韩精品一级中文字幕精品视频免费观看 | 国产一区在线播放| 日韩影片中文字幕| 欧美一级免费看| 99re6在线精品视频免费播放| 久久综合电影一区| 麻豆影院在线| 久久精彩免费视频| 亚乱亚乱亚洲乱妇| 精品国内产的精品视频在线观看| 国产成人天天5g影院在线观看| 日韩精品中文字幕在线观看| 无码国产色欲xxxx视频| 精品国产伦一区二区三区观看体验| 99久久亚洲精品日本无码| 欧美日韩不卡一区二区| 中文字幕日日夜夜| 欧美日韩精品一区二区三区蜜桃| 无码人妻久久一区二区三区| 色婷婷亚洲婷婷| 蜜臀精品一区二区三区| 欧美性猛交xxxx久久久| 国产农村妇女aaaaa视频| 欧美视频国产精品| 国产午夜麻豆影院在线观看| 欧美中文字幕久久| 亚洲视屏在线观看| 欧美日韩色综合| 日韩免费电影网站| 五月激情丁香网| 在线视频一区二区三区| 青青国产在线观看| 欧美午夜影院在线视频| 亚洲天堂男人av| 91黄色激情网站| 一区二区视频免费观看| 欧美日本高清视频在线观看| 国产精品久久久国产盗摄| 日韩一级二级三级| 人妻少妇一区二区三区| 日韩精品日韩在线观看| 国产免费视频在线| www.99久久热国产日韩欧美.com| 国产区在线看| 国语自产精品视频在线看抢先版图片 | 尤物yw午夜国产精品视频明星| 亚洲成a人v欧美综合天堂麻豆| 欧美另类99xxxxx| av资源在线看片| 日韩免费不卡av| 国产高清日韩| 国产在线精品日韩| 日本久久精品| 欧美黄色免费网址| 每日更新成人在线视频| 亚洲精品第三页| av亚洲产国偷v产偷v自拍| 国产探花视频在线播放| 亚洲精品日韩专区silk| 天堂网视频在线| 538在线一区二区精品国产| 神马午夜精品95| 正在播放亚洲1区| 青青草原国产在线| 国产精品成人aaaaa网站| www.av88| 精品午夜一区二区三区在线观看| 在线一区二区不卡| 99精品偷自拍| 农村老熟妇乱子伦视频| 亚洲高清不卡在线| 伊人网综合在线| 国产午夜精品久久久| 2020国产在线视频| 国产精品99久久99久久久二8| 日韩一二三区在线观看| 日本午夜精品电影| 亚洲免费精品| 污污视频网站在线| 久久精品一区二区三区不卡| 久久久久久久久久久久久久久久久| 在线亚洲高清视频| 天堂а√在线8种子蜜桃视频| 久久久精品视频成人| 成人不卡视频| 欧美极品日韩| 亚洲国产一区二区精品专区| 久久婷婷中文字幕| 国产精品第四页| 国产一卡二卡三卡| 日韩久久午夜影院| 92久久精品| 99久久免费国| 亚洲精品久久| 午夜两性免费视频| www成人在线观看| 日韩欧美高清在线观看| 日韩美女视频在线| caoporn97在线视频| 成人福利视频在线观看| 欧美先锋资源| 九九视频精品在线观看| 国产网站一区二区| 欧美啪啪小视频| 日韩国产一区三区| 国产ktv在线视频| 亚洲一区二区三区成人在线视频精品| 欧美日韩国产传媒| 黄色片视频在线播放| 久久综合久久综合九色| 久久久久久久黄色片| 亚洲大胆人体在线| 69av成人| 精品无人区一区二区三区竹菊| 激情综合网址| av av在线| 午夜欧美一区二区三区在线播放| 免费a视频在线观看| 欧美国产激情18| 精品亚洲自拍| 日本精品一区在线观看| 26uuu精品一区二区三区四区在线 26uuu精品一区二区在线观看 | 国产成人精品一区二区色戒| 国产亚洲成av人片在线观看桃| 欧美成人a交片免费看| 青青影院一区二区三区四区| 日韩av在线播放中文字幕| 亚洲色图欧美色| 国产精品亚洲四区在线观看| 99国内精品久久久久久久软件| 欧美精品综合| yy1111111| 色噜噜狠狠色综合中国| gogogo高清在线观看免费完整版| 国产精品视频不卡| 综合精品久久| 亚洲美女在线播放| 91久久线看在观草草青青 | 久久免费看少妇高潮| 人人妻人人爽人人澡人人精品| 日韩最新中文字幕电影免费看| 精品国产亚洲一区二区三区在线| 屁屁影院ccyy国产第一页| 91亚洲大成网污www| 免费一级a毛片| 蜜月aⅴ免费一区二区三区| 大型av综合网站| 日本999视频| 亚洲欧美日韩电影| 五月激情六月婷婷| 国产精品日日做人人爱 | 鲁一鲁一鲁一鲁一澡| 中文字幕不卡的av| 风流老熟女一区二区三区| 国产精品爱久久久久久久| 亚洲精品午夜av福利久久蜜桃| 国产+高潮+白浆+无码| 欧美视频一区二区三区四区| 亚洲小说区图片区都市| 欧美精品欧美精品系列c| 韩国午夜理伦三级不卡影院| 波多野结衣国产| 波霸ol色综合久久| 日韩激情毛片| 成人免费黄色av| 一本到一区二区三区| 在线观看wwwxxxx| 日韩动漫在线观看| 国产不卡视频在线观看| 欧美高清69hd| 午夜免费久久久久| 中文字幕日韩一区二区不卡| 四虎永久免费影院|