精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spark Shuffle 核心技術(shù)深度解析

大數(shù)據(jù)
本文將深入剖析 Sort-Based Shuffle 的核心原理、Shuffle Manager 的可插拔設(shè)計,以及 Map-side 聚合、Partition Reuse、堆外 Shuffle 等關(guān)鍵優(yōu)化技術(shù),并結(jié)合源碼揭示其實現(xiàn)細(xì)節(jié)。

一、Shuffle 概述:Spark 分布式計算的“心臟”

在 Spark 分布式計算中,Shuffle 是連接 Map 階段和 Reduce 階段的關(guān)鍵橋梁,也是影響作業(yè)性能的核心環(huán)節(jié)。當(dāng)需要對數(shù)據(jù)進(jìn)行重新分區(qū)(如 groupByKey、reduceByKey、join 等操作)時,Spark 必須將不同分區(qū)(Partition)的數(shù)據(jù)進(jìn)行重新分發(fā),這一過程就是 Shuffle。簡單來說,Shuffle 的核心任務(wù)是:將 Map 端的數(shù)據(jù)按規(guī)則分區(qū)、排序后寫入磁盤,再由 Reduce 端拉取并處理。

Shuffle 的性能直接影響整個作業(yè)的執(zhí)行效率。早期 Spark 版本采用 Hash Shuffle,存在嚴(yán)重的“小文件問題”;后續(xù)引入的 Sort-Based Shuffle 通過優(yōu)化文件管理和排序機(jī)制,成為默認(rèn)的 Shuffle 實現(xiàn)。本文將深入剖析 Sort-Based Shuffle 的核心原理、Shuffle Manager 的可插拔設(shè)計,以及 Map-side 聚合、Partition Reuse、堆外 Shuffle 等關(guān)鍵優(yōu)化技術(shù),并結(jié)合源碼揭示其實現(xiàn)細(xì)節(jié)。

二、Shuffle 演進(jìn):從 Hash Shuffle 到 Sort-Based Shuffle

1. Hash Shuffle:早期實現(xiàn)的“痛點”

在 Spark 1.6 之前,Hash Shuffle 是默認(rèn)實現(xiàn)。其核心邏輯是:每個 Map Task 為每個 Reduce Task 創(chuàng)建一個單獨的文件。假設(shè)作業(yè)有 M 個 Map Task 和 R 個 Reduce Task,則會產(chǎn)生 M × R 個文件。例如,1000 個 Map Task 和 1000 個 Reduce Task 會生成 100 萬個文件,這會帶來兩個嚴(yán)重問題:

  • 文件系統(tǒng)壓力:大量小文件會導(dǎo)致文件系統(tǒng)元數(shù)據(jù)管理開銷劇增(如 HDFS 的 NameNode 內(nèi)存壓力),同時隨機(jī)讀寫小文件的效率極低。
  • 內(nèi)存開銷:每個 Map Task 需要同時打開 R 個文件句柄(File Handler),當(dāng) R 較大時,容易導(dǎo)致內(nèi)存溢出或句柄耗盡。

Hash Shuffle 流程示例:

// 偽代碼:Hash Shuffle Map 端寫入
for (record: (K, V) in mapTask.records) {
    int reducePartition = partitioner.getPartition(record._1);
    // 每個reducePartition對應(yīng)一個文件
    FileOutputStream fos = getFileOutputStream(reducePartition);
    fos.write(serialize(record));
}

2. Sort-Based Shuffle:默認(rèn)實現(xiàn)的“優(yōu)化之道”

為解決 Hash Shuffle 的問題,Spark 1.6 后默認(rèn)采用 Sort-Based Shuffle。其核心改進(jìn)是:每個 Map Task 只生成一個數(shù)據(jù)文件和一個索引文件。數(shù)據(jù)文件按 Partition ID 排序存儲,索引文件記錄每個 Partition 的起始位置和長度。Reduce Task 通過索引文件快速定位并拉取屬于自己的數(shù)據(jù)。

Sort-Based Shuffle 的優(yōu)勢:

  • 文件數(shù)量大幅減少:M 個 Map Task 僅生成 2M 個文件(1 數(shù)據(jù)文件 + 1 索引文件),避免小文件問題。
  • 排序優(yōu)化:在 Map 端按 Partition ID 排序(可自定義 Secondary Key 排序),減少 Reduce 端合并開銷。
  • 內(nèi)存管理高效:基于堆外內(nèi)存和排序緩沖區(qū),減少 GC 壓力。

三、Shuffle Manager:可插拔的“調(diào)度中心”

Spark 通過 ShuffleManager 接口實現(xiàn) Shuffle 機(jī)制的可插拔設(shè)計,用戶可通過 spark.shuffle.manager 參數(shù)指定實現(xiàn)類(默認(rèn) sort)。ShuffleManager 的核心職責(zé)包括:

  • 注冊 Shuffle 依賴(registerShuffle);
  • 獲取 Map 端 Writer(getWriter);
  • 獲取 Reduce 端 Reader(getReader)。

1. ShuffleManager 接口定義

// org.apache.spark.shuffle.ShuffleManager
trait ShuffleManager{
  // 注冊Shuffle依賴,返回Shuffle句柄
def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  // 獲取Map端的Writer,用于寫入數(shù)據(jù)
def getWriter[K, V](
      shuffleHandle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V]

  // 獲取Reduce端的Reader,用于拉取數(shù)據(jù)
def getReader[K, C](
      shuffleHandle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]

  // 釋放資源
def stop(): Unit
}

2. 核心實現(xiàn)類:SortShuffleManager 與 HashShuffleManager

(1) SortShuffleManager(默認(rèn)實現(xiàn))

SortShuffleManager 是當(dāng)前主流實現(xiàn),支持三種 Writer 模式:

① UnsafeShuffleWriter:當(dāng)滿足以下條件時啟用(性能最優(yōu)):

  • Shuffle 依賴的序列化器支持 KSerializer 且 key 不需要排序;
  • Shuffle 依賴的聚合器(aggregator)為空;
  • Reduce 分區(qū)數(shù)量不超過 spark.shuffle.sort.maxSpaceUsage(默認(rèn) Long.MaxValue)。

特點:直接操作堆外內(nèi)存,基于 ShuffleExternalSorter 排序,避免 Java 對象開銷。

② SortShuffleWriter:通用模式,當(dāng)不滿足 UnsafeShuffleWriter 條件時啟用:

  • 支持自定義排序(keyOrdering)和 Map-side 聚合(aggregator);
  • 基于 PartitionedPairBuffer(堆內(nèi))或 ShuffleExternalSorter(堆外)排序。

③ BypassMergeSortShuffleWriter:當(dāng)滿足以下條件時啟用(減少排序開銷):

  • Shuffle 依賴的 mapSideCombine 為 false(無 Map-side 聚合);
  • Reduce 分區(qū)數(shù)量小于 spark.shuffle.sort.bypassMergeThreshold(默認(rèn) 200)。

特點:類似 Hash Shuffle,但最后會合并所有 Partition 文件為一個數(shù)據(jù)文件,避免小文件問題。

SortShuffleManager.getWriter 邏輯源碼:

// org.apache.spark.shuffle.sort.SortShuffleManager
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V] = {
  shuffleBlockResolver match {
    case resolver: IndexShuffleBlockResolver =>
      val shuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
      // 判斷是否啟用Bypass模式
      if (shuffleHandle.dependency.mapSideCombine) {
        new SortShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
      } else if (shuffleHandle.dependency.partitioner.numPartitions <= bypassMergeThreshold) {
        new BypassMergeSortShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
      } else {
        // 判斷是否啟用Unsafe模式
        val serializer = shuffleHandle.dependency.serializer
        val ser = serializer.newInstance()
        if (ser.supportsRelocationOfSerializedObjects && 
            !shuffleHandle.dependency.keyOrdering.isDefined) {
          new UnsafeShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
        } else {
          new SortShuffleWriter(shuffleHandle, mapId, context, shuffleBlockResolver)
        }
      }
  }
}

(2) HashShuffleManager(已廢棄)

HashShuffleManager 是早期實現(xiàn),核心邏輯是每個 Map Task 為每個 Reduce Task 創(chuàng)建單獨文件。由于小文件問題,已在 Spark 3.0 后被移除,但其設(shè)計思想對理解 Shuffle 演進(jìn)仍有意義。

四、Sort-Based Shuffle 核心流程:從 Map 端寫入到 Reduce 端拉取

1. Map 端寫入流程

Sort-Based Shuffle 的 Map 端核心流程分為 數(shù)據(jù)緩沖、排序 spill、合并文件 三個階段,以 SortShuffleWriter 為例:

(1) 數(shù)據(jù)緩沖:PartitionedPairBuffer

Map Task 首先將數(shù)據(jù)寫入內(nèi)存緩沖區(qū) PartitionedPairBuffer,其結(jié)構(gòu)為 數(shù)組 + 鏈表:

  • 數(shù)組存儲 (partitionId, record) 的指針;
  • 按 partitionId 分區(qū),同一分區(qū)內(nèi)記錄按插入順序存儲(后續(xù)可排序)。
// org.apache.spark.util.collection.PartitionedPairBuffer
class PartitionedPairBuffer[K, V](initialCapacity: Int) extends SizeTracker {
  private var buffer = new Array[AnyRef](2 * initialCapacity) // 存儲(partitionId, key, value)
  private var curSize = 0

  def insert(partitionId: Int, key: K, value: V): Unit = {
    if (curSize == buffer.length) {
      growArray() // 擴(kuò)容
    }
    buffer(curSize) = partitionId.asInstanceOf[AnyRef]
    buffer(curSize + 1) = (key, value).asInstanceOf[AnyRef]
    curSize += 2
  }
}

(2) 排序與 Spill:當(dāng)緩沖區(qū)達(dá)到閾值

當(dāng)緩沖區(qū)大小超過 spark.shuffle.spill.numElementsForceSpillThreshold(默認(rèn) Integer.MAX_VALUE)或內(nèi)存不足時,觸發(fā) spill 操作:

  • 排序:按 partitionId 升序排序(若定義了 keyOrdering,則同一分區(qū)內(nèi)按 key 排序);
  • 寫入磁盤:將排序后的數(shù)據(jù)寫入臨時文件,記錄每個 Partition 的偏移量;
  • 釋放內(nèi)存:清空緩沖區(qū),繼續(xù)接收新數(shù)據(jù)。

排序 spill 源碼(ShuffleExternalSorter):

// org.apache.spark.shuffle.ShuffleExternalSorter
def spill(): Unit = {
  // 獲取排序后的迭代器(先按partitionId,再按key)
  val sortedIterator = shuffleMemoryManager.allocateMemoryForSort()
  // 寫入臨時文件
  val file = spillFileCreator.createTempFile()
  val writer = new DiskBlockWriter(file)
  while (sortedIterator.hasNext) {
    val (partitionId, key, value) = sortedIterator.next()
    writer.write(partitionId, key, value)
  }
  writer.close()
  spillFiles += file // 記錄spill文件
}

(3) 合并文件:生成數(shù)據(jù)文件與索引文件

Map Task 結(jié)束前,會將內(nèi)存緩沖區(qū)和所有 spill 文件合并為 一個數(shù)據(jù)文件 和 一個索引文件:

  • 數(shù)據(jù)文件:存儲所有 Partition 的數(shù)據(jù),按 partitionId 順序排列;
  • 索引文件:存儲每個 Partition 在數(shù)據(jù)文件中的 起始位置 和 長度(固定 8 字節(jié)/Partition)。

索引文件結(jié)構(gòu)示例:

Partition 0: offset=0, length=1024
Partition 1: offset=1024, length=2048
Partition 2: offset=3072, length=512
...

合并文件源碼(IndexShuffleBlockResolver):

// org.apache.spark.shuffle.IndexShuffleBlockResolver
def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Int,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  // 索引文件路徑:shuffleId-mapId.index
  val indexFile = getIndexFile(shuffleId, mapId)
  // 數(shù)據(jù)文件路徑:shuffleId-mapId.data
  val dataFile = getDataFile(shuffleId, mapId)
  
  // 寫入索引文件(每個Partition 8字節(jié):offset + length)
  val out = new DataOutputStream(new FileOutputStream(indexFile))
  try {
    var offset = 0L
    for (length <- lengths) {
      out.writeLong(offset)
      out.writeLong(length)
      offset += length
    }
  } finally {
    out.close()
  }
  
  // 重命名臨時數(shù)據(jù)文件為正式文件
  dataTmp.renameTo(dataFile)
}

2. Reduce 端拉取流程

Reduce Task 通過 ShuffleReader 拉取 Map 端的數(shù)據(jù),核心流程包括 獲取數(shù)據(jù)位置、拉取數(shù)據(jù)、合并與聚合。

(1) 獲取數(shù)據(jù)位置:MapOutputTracker

Reduce Task 首先通過 MapOutputTracker 獲取每個 Map Task 中對應(yīng) Partition 的數(shù)據(jù)位置(包括 Executor 地址、數(shù)據(jù)文件路徑、索引文件偏移量)。

// org.apache.spark.MapOutputTracker
def getMapSizesByExecutorId(
    shuffleId: Int,
    startPartition: Int,
    endPartition: Int): Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
  // 從Driver獲取MapOutput信息(或本地緩存)
  val statuses = mapStatuses.get(shuffleId).getOrElse(throw ...)
  statuses.map { status =>
    val blockManagerId = status.location
    // 獲取指定Partition的偏移量和長度
    val sizes = status.getSizeForBlockRange(startPartition, endPartition)
    (blockManagerId, sizes)
  }
}

(2) 拉取數(shù)據(jù):BlockStoreShuffleReader

BlockStoreShuffleReader 通過 BlockManager 從遠(yuǎn)程 Executor 拉取數(shù)據(jù)塊,支持 本地讀取(優(yōu)先)和 遠(yuǎn)程傳輸(通過 Netty)。

// org.apache.spark.shuffle.BlockStoreShuffleReader
override def read(): Iterator[Product2[K, C]] = {
  // 獲取數(shù)據(jù)位置
  val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(...)
  // 創(chuàng)建ShuffleBlockFetcherIterator,用于拉取數(shù)據(jù)
  val shuffleBlockFetcherIterator = new ShuffleBlockFetcherIterator(
    context,
    blockManager.blockStoreClient,
    blockManager,
    blocksByAddress,
    serializer,
    // 傳輸配置(如最大并發(fā)拉取數(shù))
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxMbInFlight", "48") * 1024 * 1024)
  
  // 聚合或迭代返回數(shù)據(jù)
  val aggregatedIter = if (dep.aggregator.isDefined) {
    // 如果定義了aggregator,進(jìn)行Reduce端聚合
    new AggregatorIterator(shuffleBlockFetcherIterator, dep.aggregator.get)
  } else {
    shuffleBlockFetcherIterator.map(pair => (pair._1, pair._2))
  }
  
  aggregatedIter
}

(3) 合并與聚合:Reduce 端優(yōu)化

Reduce 端拉取數(shù)據(jù)后,可能需要合并來自多個 Map Task 的數(shù)據(jù),并進(jìn)行聚合(如 reduceByKey)。Spark 通過 AggregatorIterator 實現(xiàn)流式聚合,避免全量數(shù)據(jù)加載到內(nèi)存。

五、Shuffle 核心優(yōu)化技術(shù):從原理到源碼

1. Map-side 聚合:減少數(shù)據(jù)傳輸量的“利器”

(1) 原理:在 Map 端預(yù)聚合

Map-side 聚合是指在 Map Task 將數(shù)據(jù)寫入 Shuffle 前先進(jìn)行局部聚合(如 reduceByKey 的 reduce 操作),減少需要寫入磁盤和傳輸?shù)臄?shù)據(jù)量。例如,統(tǒng)計單詞頻次時,Map 端可先對本地單詞計數(shù),Reduce 端只需合并各 Map Task 的局部結(jié)果。

適用場景:聚合函數(shù)(如 reduce、aggregate)滿足 結(jié)合律 和 交換律(如 sum、max)。

(2) 實現(xiàn)源碼:ShuffleDependency 與 Aggregator

Map-side 聚合的核心是 ShuffleDependency 中的 aggregator 字段,定義了聚合邏輯:

// org.apache.spark.ShuffleDependency
class ShuffleDependency[K, V, C](
    @transient val rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None, // 聚合器
    val mapSideCombine: Boolean = false // 是否啟用Map-side聚合
) extends Dependency[Product2[K, V]] {
  // ...
}

Aggregator 定義了三個核心函數(shù):

  • createCombiner: 將第一個 value 轉(zhuǎn)換為聚合器類型 C(如 word -> 1);
  • mergeValue: 將新 value 合并到聚合器(如 1 + count -> newCount);
  • mergeCombiners: 合并兩個聚合器(如 count1 + count2 -> totalCount)。

Map-side 聚合執(zhí)行流程(SortShuffleWriter):

// org.apache.spark.shuffle.sort.SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 如果定義了aggregator且mapSideCombine=true,啟用Map-side聚合
  val maybeAggregator: Option[Aggregator[K, V, C]] = 
    if (dep.mapSideCombine) dep.aggregator else None

  // 創(chuàng)建排序緩沖區(qū)
  val sorter = if (dep.aggregator.isDefined && dep.mapSideCombine) {
    // 使用聚合器排序
    new PartitionedAppendOnlyMap[K, C](dep.aggregator.get, dep.keyOrdering)
  } else {
    // 普通排序緩沖區(qū)
    new PartitionedPairBuffer[K, V](initialCapacity)
  }

  // 遍歷記錄,插入緩沖區(qū)(聚合或直接存儲)
  for (record <- records) {
    maybeAggregator match {
      case Some(aggregator) =>
        // 調(diào)用aggregator的mergeValue進(jìn)行聚合
        sorter.insert(record._1, aggregator.mergeValue(record._2))
      case None =>
        sorter.insert(record._1, record._2)
    }
  }

  // 排序、spill、合并文件(前文流程)
  // ...
}

(3) 性能收益

假設(shè)原始數(shù)據(jù)為 ("a", 1), ("b", 1), ("a", 1),無 Map-side 聚合時需傳輸 3 條記錄;啟用后,Map 端聚合為 ("a", 2), ("b", 1),僅傳輸 2 條記錄,數(shù)據(jù)量減少 33%。對于數(shù)據(jù)傾斜場景(如某個 key 出現(xiàn)百萬次),Map-side 聚合可大幅降低 Shuffle 數(shù)據(jù)量。

2. Partition Reuse:避免重復(fù)創(chuàng)建 Partition 文件

(1) 原理:復(fù)用數(shù)據(jù)文件與索引文件

Sort-Based Shuffle 中,每個 Map Task 僅生成一個數(shù)據(jù)文件和一個索引文件,所有 Partition 的數(shù)據(jù)存儲在同一文件中,通過索引文件定位。這與 Hash Shuffle 中“每個 Partition 一個文件”的設(shè)計形成對比,徹底避免了小文件問題。

Partition Reuse 的核心:

  • 數(shù)據(jù)文件復(fù)用:不同 Partition 的數(shù)據(jù)按順序?qū)懭胪晃募瑹o額外文件創(chuàng)建開銷;
  • 索引文件高效定位:索引文件固定 8 字節(jié)/Partition,Reduce Task 通過 partitionId 快速計算偏移量(offset = partitionId * 8),讀取起始位置和長度。

(2) 實現(xiàn)源碼:IndexShuffleBlockResolver

IndexShuffleBlockResolver 負(fù)責(zé)管理 Shuffle 文件的創(chuàng)建和讀取,核心方法包括:

  • getDataFile: 獲取數(shù)據(jù)文件路徑(shuffleId-mapId.data);
  • getIndexFile: 獲取索引文件路徑(shuffleId-mapId.index);
  • getBlockData: 根據(jù) partitionId 讀取數(shù)據(jù)文件的對應(yīng)片段。
// org.apache.spark.shuffle.IndexShuffleBlockResolver
def getBlockData(
    shuffleId: Int,
    mapId: Int,
    reduceId: Int): ManagedBuffer = {
  // 索引文件路徑
  val indexFile = getIndexFile(shuffleId, mapId)
  // 數(shù)據(jù)文件路徑
  val dataFile = getDataFile(shuffleId, mapId)
  
  // 讀取索引文件,獲取reduceId對應(yīng)Partition的偏移量和長度
  val in = new DataInputStream(new FileInputStream(indexFile))
  try {
    // 跳轉(zhuǎn)到reduceId對應(yīng)的索引位置(每個Partition 8字節(jié))
    in.skipBytes(reduceId * 8)
    val offset = in.readLong()
    val length = in.readLong()
    // 返回數(shù)據(jù)文件的對應(yīng)片段(FileSegmentManagedBuffer)
    new FileSegmentManagedBuffer(dataFile, offset, length)
  } finally {
    in.close()
  }
}

(3) 性能收益

  • 文件數(shù)量減少:M 個 Map Task 和 R 個 Reduce Task 下,文件數(shù)量從 M×R(Hash Shuffle)降至 2M(Sort-Based Shuffle)。例如 1000 Map Task 和 1000 Reduce Task,文件數(shù)量從 100 萬降至 2000,減少 99.8%。
  • IO 效率提升:順序讀寫大文件比隨機(jī)讀寫小文件效率高 1~2 個數(shù)量級(HDFS 等文件系統(tǒng)對順序讀寫優(yōu)化更好)。

3. 堆外 Shuffle:減少 GC 壓力的“內(nèi)存優(yōu)化”

(1) 原理:使用堆外內(nèi)存存儲 Shuffle 數(shù)據(jù)

JVM 堆內(nèi)內(nèi)存(Heap Memory)由 GC 管理,頻繁創(chuàng)建/銷毀 Shuffle 數(shù)據(jù)對象(如 (K, V) 記錄)會導(dǎo)致 GC 頻繁觸發(fā),影響作業(yè)穩(wěn)定性。堆外 Shuffle(Off-Heap Shuffle)通過 直接操作系統(tǒng)內(nèi)存(不受 GC 管理)存儲 Shuffle 數(shù)據(jù),減少 GC 壓力。

堆外內(nèi)存管理:

  • Spark 通過 TaskMemoryManager 分配堆外內(nèi)存,基于 sun.misc.Unsafe 直接操作內(nèi)存;
  • 堆外內(nèi)存大小由 spark.memory.offHeap.size 配置(默認(rèn) 0,不啟用),需設(shè)置 spark.memory.offHeap.enabled=true。

(2) 實現(xiàn)源碼:ShuffleExternalSorter 與 MemoryBlock

ShuffleExternalSorter 是堆外 Shuffle 的核心排序器,使用 MemoryBlock(堆外內(nèi)存塊)存儲數(shù)據(jù):

// org.apache.spark.shuffle.ShuffleExternalSorter
class ShuffleExternalSorter(
    memoryManager: TaskMemoryManager,
    serializerManager: SerializerManager,
    // 堆外內(nèi)存分配器
    initialSize: Long = 1024 * 1024) extends Spillable{

  // 當(dāng)前使用的堆外內(nèi)存塊
  private var currentPage: MemoryBlock = _
  // 當(dāng)前頁的寫入位置
  private var pageCursor: Long = _

  // 分配堆外內(nèi)存頁
  privatedef allocatePage(): Unit = {
    currentPage = memoryManager.allocatePage(PAGE_SIZE)
    pageCursor = 0
  }

  // 插入記錄(寫入堆外內(nèi)存)
def insertRecord(partitionId: Int, key: Long, value: Long): Unit = {
    // 序列化key和value(假設(shè)為Long類型)
    val recordSize = 8 + 8 + 4 // partitionId(4) + key(8) + value(8)
    if (pageCursor + recordSize > currentPage.size) {
      spill() // 當(dāng)前頁空間不足,觸發(fā)spill
      allocatePage() // 分配新頁
    }
    // 寫入堆外內(nèi)存(Unsafe操作)
    val baseObject = currentPage.getBaseObject
    Platform.putLong(baseObject, pageCursor, partitionId)
    Platform.putLong(baseObject, pageCursor + 4, key)
    Platform.putLong(baseObject, pageCursor + 12, value)
    pageCursor += recordSize
  }
}

(3) 關(guān)鍵參數(shù):spark.shuffle.spill.numElementsForceSpillThreshold

該參數(shù)控制 堆外內(nèi)存中元素數(shù)量達(dá)到閾值時強(qiáng)制 spill,避免內(nèi)存中數(shù)據(jù)過多導(dǎo)致 OOM。默認(rèn)值為 Integer.MAX_VALUE(不觸發(fā)),可根據(jù)作業(yè)特點調(diào)整(如數(shù)據(jù)傾斜場景可適當(dāng)降低)。

強(qiáng)制 spill 觸發(fā)邏輯(Spillable 接口):

// org.apache.spark.memory.Spillable
def maybeSpill(collection: collection.Iterable[_], currentMemory: Long): Unit = {
  if (currentMemory > myMemoryThreshold || 
      collection.size > numElementsForceSpillThreshold) {
    spill() // 執(zhí)行spill
    _memoryUsed = 0 // 重置內(nèi)存使用
  }
}

(4) 性能收益

  • 減少 GC 暫停:堆外內(nèi)存不受 GC 管理,避免 Full GC 導(dǎo)致的作業(yè)卡頓(尤其對于大內(nèi)存 Executor,如 64GB+);
  • 內(nèi)存利用率提升:堆外內(nèi)存可避免 JVM 對象頭開銷(12 字節(jié)/對象),存儲相同數(shù)據(jù)占用的內(nèi)存更少;
  • 穩(wěn)定性增強(qiáng):通過 numElementsForceSpillThreshold 控制 spill 閾值,避免因內(nèi)存突增導(dǎo)致的 OOM。

六、總結(jié):Spark Shuffle 的設(shè)計哲學(xué)與未來方向

1. 核心設(shè)計哲學(xué)

Spark Shuffle 的演進(jìn)體現(xiàn)了 “性能優(yōu)化”與“工程實踐”的平衡:

  • 從 Hash 到 Sort:通過文件合并解決小文件問題,兼顧排序需求;
  • 可插拔架構(gòu):ShuffleManager 接口支持靈活擴(kuò)展,適應(yīng)不同場景(如 Push-based Shuffle);
  • 內(nèi)存管理優(yōu)化:堆外內(nèi)存、spill 機(jī)制等設(shè)計,平衡內(nèi)存使用與計算效率;
  • 端到端優(yōu)化:Map-side 聚合、Partition Reuse 等技術(shù),從數(shù)據(jù)生成、傳輸?shù)教幚砣溌穬?yōu)化。

2. 未來方向

  • Push-based Shuffle(Spark 3.0 引入):由 Map Task 主動推送數(shù)據(jù)到 Reduce 端的 Executor,減少 Reduce 端拉取延遲,尤其適用于大規(guī)模集群;
  • GPU 加速 Shuffle:利用 GPU 的高帶寬內(nèi)存和并行計算能力,加速排序、聚合等操作;
  • 動態(tài) Shuffle 調(diào)優(yōu):基于作業(yè)歷史數(shù)據(jù)自動調(diào)整 Shuffle 參數(shù)(如 bypassMergeThreshold、numElementsForceSpillThreshold),減少人工調(diào)優(yōu)成本。

3. 最佳實踐建議

  • 優(yōu)先啟用 Sort-Based Shuffle:默認(rèn)已啟用,無需額外配置;
  • 合理配置 Map-side 聚合:對滿足結(jié)合律的聚合操作(如 reduceByKey),設(shè)置 mapSideCombine=true;
  • 啟用堆外內(nèi)存:對于大內(nèi)存 Executor(如 >32GB),設(shè)置 spark.memory.offHeap.enabled=true 和 spark.memory.offHeap.size(如 10g);
  • 調(diào)整 spill 閾值:數(shù)據(jù)傾斜場景下,適當(dāng)降低 spark.shuffle.spill.numElementsForceSpillThreshold(如 1000000),避免內(nèi)存溢出。
責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2018-03-21 11:05:26

Spark大數(shù)據(jù)應(yīng)用程序

2009-02-26 10:11:00

寬帶路由器網(wǎng)絡(luò)共享

2021-08-11 06:57:16

ShuffleSpark核心

2022-03-15 08:25:32

SparkShuffle框架

2022-05-07 14:31:46

物聯(lián)網(wǎng)

2023-12-05 07:26:29

指標(biāo)中臺大數(shù)據(jù)

2010-08-19 09:20:24

寬帶路由器

2017-05-14 14:41:20

5G波束基站

2022-05-09 08:21:29

Spring微服務(wù)Sentinel

2009-06-15 17:54:50

Java核心技術(shù)

2009-06-26 16:01:39

EJB組織開發(fā)EJB容器EJB

2016-11-15 14:33:05

Flink大數(shù)據(jù)

2023-06-14 08:49:22

PodKubernetes

2017-03-08 10:06:11

Java技術(shù)點注解

2019-01-11 08:27:06

2025-06-13 08:01:34

2018-05-16 11:05:49

ApacheFlink數(shù)據(jù)流

2019-05-15 08:40:34

工業(yè)物聯(lián)網(wǎng)MQTT物聯(lián)網(wǎng)

2022-10-11 08:37:43

Servlet配置版本

2019-03-05 14:57:21

大數(shù)據(jù)Hadoop框架
點贊
收藏

51CTO技術(shù)棧公眾號

偷拍自拍在线看| av手机天堂网| 欧美wwwsss9999| 日本乱码高清不卡字幕| www亚洲国产| 天堂av资源网| 久久电影国产免费久久电影| 97久久超碰福利国产精品…| 成人性视频免费看| 黄色美女久久久| 欧美三级日韩在线| 黄色片网址在线观看| 亚洲s色大片| 成人免费视频一区二区| 国产精品日韩一区| 国产精品成人免费一区二区视频| 亚洲www免费| 亚洲欧美日本在线| 日韩欧美亚洲v片| 无码精品在线观看| 国产成人在线网站| 国产精品永久免费| 五月婷婷亚洲综合| 午夜精品毛片| 亚洲午夜激情免费视频| 精品人妻一区二区乱码| 久久xxx视频| 日韩欧美主播在线| 黄色成人在线免费观看| 在线播放麻豆| 国产亚洲欧美日韩在线一区| 国产精品区一区二区三在线播放| 九九精品在线观看视频 | 色综合天天综合网国产成人网| www.亚洲高清| 97se综合| 大桥未久av一区二区三区| 亚洲欧美日韩不卡| av在线播放免费| 久久久久久久久伊人| 国产尤物99| 男人天堂网在线视频| 国产精品资源在线看| 成人在线小视频| 日本视频www色| 久久午夜电影| 国产精品高清网站| 69亚洲精品久久久蜜桃小说| 嫩草成人www欧美| 欧美一区二区三区四区在线| 日韩av电影网| 国产日韩欧美三区| 69久久夜色精品国产69| 一区二区三区视频免费看| 在线免费高清一区二区三区| 色中色综合影院手机版在线观看 | 国产精品久久无码一三区| 久久精品网址| 国产精品久久久久久久久久东京 | 国产精品欧美日韩久久| 亚洲视频 欧美视频| 久久婷婷av| 国产精品视频yy9099| 亚洲天堂狠狠干| 寂寞少妇一区二区三区| 成人激情春色网| 国产成人精品毛片| 成人免费毛片嘿嘿连载视频| 韩日午夜在线资源一区二区| 青青草视频在线观看| 91蜜桃传媒精品久久久一区二区| 国产欧美日韩中文字幕在线| 91女人18毛片水多国产| 久久精品av麻豆的观看方式| 91在线免费网站| 人妻视频一区二区三区| 91首页免费视频| 视频一区亚洲 | 中文字幕日韩免费视频| 久久精品在线观看视频| 91成人免费| 91高清在线免费观看| 一级黄色av片| 国产一区日韩二区欧美三区| 国产伦精品一区二区三区四区视频 | 青青青手机在线视频观看| 欧美极品另类videosde| 性生活免费观看视频| 国产精品25p| 欧洲一区二区av| 亚洲网中文字幕| 日韩在线观看一区二区三区| 亚洲美女自拍视频| 亚洲人与黑人屁股眼交| 日韩天堂av| 国产精品一二三在线| 欧美熟女一区二区| 中文字幕av一区 二区| www.国产在线播放| 69堂精品视频在线播放| 亚洲电影免费观看高清完整版在线| 一区二区三区国产好的精华液| 日韩免费va| 欧美成人三级在线| aaaaa级少妇高潮大片免费看| 欧美韩一区二区| 中文字幕亚洲国产| 天天操天天爽天天干| 精品亚洲porn| 精品视频高清无人区区二区三区| 婷婷五月综合激情| 国产精品高清亚洲| 777精品久无码人妻蜜桃| 亚洲成人高清| 亚洲视频在线观看网站| 中文字幕第28页| 激情都市一区二区| 亚洲 国产 欧美一区| 黄色18在线观看| 日韩午夜电影在线观看| 农村老熟妇乱子伦视频| 免费国产自线拍一欧美视频| www日韩av| 成年人网站在线| 欧美在线视频你懂得| 国产制服丝袜在线| 红桃视频国产精品| 7777精品伊久久久大香线蕉语言 | 国产91高潮流白浆在线麻豆| 欧美在线一区二区三区四区| 九色porny自拍视频在线观看| 色天天综合色天天久久| 中国黄色片视频| 亚洲欧美一区在线| 91牛牛免费视频| 日本精品在线| 欧美天天综合网| 精品无人区无码乱码毛片国产 | 91亚洲精品久久久蜜桃网站| 国产一区二区三区在线免费| 精品国模一区二区三区欧美 | 在线亚洲免费视频| 亚洲精品鲁一鲁一区二区三区| 99re6热只有精品免费观看| 伊人亚洲福利一区二区三区| 一区二区三区在线观看av| 91在线精品一区二区| 欧美精品久久久久久久自慰| 免费精品一区二区三区在线观看| 日韩一区二区三区电影| 三级在线观看免费大全| 久久成人麻豆午夜电影| 在线视频福利一区| 亚洲ww精品| 久久精品国产一区二区三区| 国产精品久久久久久久久久久久久久久久久久 | 在线成人免费av| 欧美在线资源| www.久久艹| av人人综合网| 亚洲精品国产精品久久清纯直播 | 欧美oldwomenvideos| 国产精品久久久久久久久久久久| wwwav在线播放| 亚洲激情综合网| 91高清国产视频| 欧美精品国产| 国产自产在线视频一区| 三上悠亚一区二区| 最近2019年手机中文字幕 | 欧美r级在线| 91精品国产麻豆| 国产奶水涨喷在线播放| 久久品道一品道久久精品| 国产精品乱码久久久久| 欧美电影一二区| 99精品99久久久久久宅男| 18video性欧美19sex高清| 国产视频精品xxxx| 亚洲天堂网在线视频| 亚洲免费资源在线播放| 日本一区二区在线观看视频| 亚洲免费一区二区| 亚洲精品在线视频观看| 亚洲不卡在线| 日韩美女视频免费在线观看| 男人的天堂在线视频免费观看 | 日韩欧美国产骚| 岛国片在线免费观看| 国产成人在线色| 欧美视频第三页| 国产大片一区| 国产乱码精品一区二区三区不卡| 黄色网页在线免费看| 精品久久久久香蕉网| 日本中文字幕在线观看视频| 亚洲另类春色国产| 少妇久久久久久久久久| 国产一区二区三区视频在线播放| 精品国产一区二区三区四区精华 | 日韩区欧美区| 欧美亚洲视频在线观看| 成人日批视频| 亚洲天堂日韩电影| 丰满人妻一区二区三区免费| 欧美午夜精品一区| 国产精品第108页| 国产精品灌醉下药二区| 精品人妻互换一区二区三区| 韩国成人福利片在线播放| 日韩精品一区二区三区色欲av| 亚洲图区在线| 国产精品美女黄网| 成人噜噜噜噜| 国产日韩在线播放| 国产欧洲在线| 久久99精品视频一区97| 天堂а√在线资源在线| 亚洲欧美在线磁力| 天天综合永久入口| 精品国产一区二区亚洲人成毛片 | 青青草国产精品97视觉盛宴| 国产av国片精品| 欧美成人国产| 公共露出暴露狂另类av| 国产免费av一区二区三区| 国产精品麻豆免费版| 国产美女亚洲精品7777| 国产自产女人91一区在线观看| 巨大荫蒂视频欧美大片| 亚洲欧美在线免费观看| 少妇又色又爽又黄的视频| 日韩欧美国产1| 国产毛片在线视频| 3atv在线一区二区三区| 中文字幕日韩第一页| 色域天天综合网| 亚洲天堂一区在线观看| 精品成人国产在线观看男人呻吟| 亚洲国产av一区| 91在线精品一区二区| 中文字幕免费高清视频| 成人午夜av影视| 少妇极品熟妇人妻无码| 国产不卡高清在线观看视频| av在线网站免费观看| 国产成人亚洲综合a∨婷婷| 1314成人网| 成人午夜av电影| 亚洲国产精品无码久久久久高潮| 奇米影视7777精品一区二区| 欧美日韩在线免费播放| 日韩高清电影一区| 少妇高清精品毛片在线视频| 久久久久久穴| 精品久久久久久久无码 | 久精品免费视频| 四虎av在线| 国模私拍一区二区三区| 国产偷倩在线播放| 国产69精品久久久久久| 亚洲成人人体| 国产精品亚发布| 国产一区二区三区视频在线| 成人高清在线观看| 林ゆな中文字幕一区二区| 欧美一二三四五区| 97精品在线| 久操手机在线视频| 亚洲最黄网站| 嫩草影院国产精品| 国产一区二区美女| 国产精品久久久久久亚洲av| 久久久夜色精品亚洲| 国产精品综合激情| 一区二区在线观看不卡| 欧美a∨亚洲欧美亚洲| 欧美日韩综合色| wwwav在线播放| 亚洲欧洲黄色网| mm1313亚洲国产精品美女| 欧美亚洲第一页| 国产精品免费精品自在线观看| 国产精品久久久久影院日本| 国产一区二区视频在线看| 久久99精品久久久久久青青日本 | 青少年xxxxx性开放hg| 欧美精品综合| 亚洲色精品三区二区一区| 精品一区二区三区在线观看国产| 黄色一级大片在线观看| 国产精品中文有码| 成年人网站免费看| 中文字幕在线观看不卡视频| 国产真实乱偷精品视频| 欧美亚一区二区| 日本黄色不卡视频| 日韩在线观看免费高清| mm视频在线视频| 国产综合视频在线观看| 色爱综合av| 无码毛片aaa在线| 老妇喷水一区二区三区| 又黄又爽又色的视频| 国产婷婷一区二区| 久一区二区三区| 在线播放国产精品二区一二区四区| 高潮无码精品色欲av午夜福利| 欧美午夜精品久久久久久久| 一区二区三区黄| 亚洲欧美中文日韩在线v日本| 日本一级在线观看| 久久亚洲精品视频| 成人日韩在线观看| 久久99欧美| 欧美日韩视频| 五月天婷婷影视| 国产女人18毛片水真多成人如厕 | 国产精品九九久久久久久久| 这里视频有精品| 一区二区三区四区视频在线 | 2021国产精品久久精品| 欧美黄色aaa| 欧美中文字幕不卡| 视频一区二区在线播放| 欧美激情第一页xxx| 国产在线一区不卡| 一区二区不卡在线视频 午夜欧美不卡'| 日韩一区亚洲二区| 久久久久久久少妇| 99re热这里只有精品免费视频 | www.五月婷| 久久久精品久久久| 成人污版视频| a级网站在线观看| 久久福利视频一区二区| 国产精品18在线| 欧美三级一区二区| 酒色婷婷桃色成人免费av网| 国产91精品久| 亚洲都市激情| 任你操这里只有精品| 久久亚洲捆绑美女| 国产乱国产乱老熟| 亚洲美女黄色片| julia一区二区三区中文字幕| 亚洲影院高清在线| 999成人网| 午夜不卡福利视频| 中文字幕第一页久久| jizz国产在线| 精品国产欧美成人夜夜嗨| 亚洲国产一区二区久久| 一区二区三区四区久久| 国产精品一区二区在线看| 一区视频免费观看| 欧美成人一区二区三区在线观看| 激情小视频在线| 日本亚洲欧美成人| 精品视频免费| 小早川怜子一区二区三区| 一区二区久久久久久| 日韩在线观看视频网站| 热草久综合在线| 日韩精品dvd| 亚洲成人av免费观看| 亚洲狠狠爱一区二区三区| 亚洲 另类 春色 国产| 国产成人午夜视频网址| 久久国产精品亚洲人一区二区三区 | 青娱乐精品视频| 欧美手机在线观看| 制服丝袜亚洲色图| 国产丝袜精品丝袜| 品久久久久久久久久96高清| 美女视频黄频大全不卡视频在线播放| 香蕉视频污视频| 色噜噜狠狠成人中文综合| 在线日本视频| 99在线观看视频| 久久亚洲欧美| 强制高潮抽搐sm调教高h| 日韩精品一区二区三区三区免费| 日本在线观看| 国产精品乱子乱xxxx| 日本亚洲三级在线| 欧美日韩三级在线观看| 日韩精品视频免费| 9.1麻豆精品| 国产二区视频在线播放| 国产欧美一区二区三区鸳鸯浴| 天天综合天天干| 久久这里有精品视频| 婷婷综合一区| 樱花草www在线| 欧美午夜精品在线| 99福利在线| 日韩免费av一区二区三区| 国产精品一品二品| 国产精品xxxxxx|