后端面霸之旅-MapReduce探秘
最近在看一些大數(shù)據(jù)的東西,發(fā)現(xiàn)對(duì)其中的shuffle過程很模糊,于是決定學(xué)習(xí)一下,深入之后又發(fā)現(xiàn)對(duì)整個(gè)mapreduce的數(shù)據(jù)完成處理過程也同樣模糊。
所以本文將從以下幾個(gè)角度來展開:
- mapreduce以及hadoop框架的一些認(rèn)識(shí)
- mapreduce的核心思想是什么
- mapreduce數(shù)據(jù)處理過程推演
- mapreduce的shuffle是如何實(shí)現(xiàn)的
Hadoop三劍客
Hadoop是一個(gè)由Apache開發(fā)的大數(shù)據(jù)處理框架,它包括了HDFS(Hadoop分布式文件系統(tǒng))、YARN(Yet Another Resource Negotiator,資源管理器)以及MapReduce計(jì)算框架。
HDFS是Hadoop的存儲(chǔ)組件,YARN則是用于資源管理和調(diào)度的組件,而MapReduce是Hadoop用于分布式計(jì)算的框架。
在Hadoop中,數(shù)據(jù)通常存儲(chǔ)在HDFS中,通過MapReduce框架進(jìn)行分布式計(jì)算,YARN負(fù)責(zé)管理計(jì)算資源,并協(xié)調(diào)MapReduce等計(jì)算框架的運(yùn)行。
MapReduce、Hadoop、HDFS和YARN之間是相互依存、協(xié)同工作的關(guān)系,它們共同構(gòu)成了一個(gè)完整的大數(shù)據(jù)處理系統(tǒng)。

MapReduce核心思想
分而治之
MapReduce的主要思想是將大規(guī)模數(shù)據(jù)處理任務(wù)分解成多個(gè)小任務(wù),并在分布式計(jì)算集群上并行執(zhí)行,從而實(shí)現(xiàn)高效的數(shù)據(jù)處理和分析。
MapReduce數(shù)據(jù)處理任務(wù)分為兩個(gè)主要階段:Map階段和Reduce階段。
在Map階段中,MapReduce將輸入數(shù)據(jù)分割成若干個(gè)小塊,然后在分布式計(jì)算集群上同時(shí)執(zhí)行多個(gè)Map任務(wù),每個(gè)任務(wù)都對(duì)一個(gè)小塊的數(shù)據(jù)進(jìn)行處理,并將處理結(jié)果輸出為一系列鍵值對(duì),Map任務(wù)的輸出結(jié)果會(huì)被臨時(shí)存儲(chǔ)在本地磁盤或內(nèi)存中,以供Reduce任務(wù)使用。
在Reduce階段中,它主要負(fù)責(zé)對(duì)Map任務(wù)的輸出結(jié)果進(jìn)行整合和匯總,生成最終的輸出結(jié)果。Reduce任務(wù)會(huì)將所有Map任務(wù)輸出的鍵值對(duì)按照鍵進(jìn)行排序,并將相同鍵的值合并在一起,Reduce任務(wù)的輸出結(jié)果通常會(huì)寫入到文件系統(tǒng)中

計(jì)算向數(shù)據(jù)靠攏
在傳統(tǒng)的分布式計(jì)算中,數(shù)據(jù)通常需要從存儲(chǔ)介質(zhì)中讀取到計(jì)算節(jié)點(diǎn)進(jìn)行處理,這會(huì)造成大量的數(shù)據(jù)傳輸和網(wǎng)絡(luò)延遲,導(dǎo)致計(jì)算效率較低。
"計(jì)算向數(shù)據(jù)靠攏"是MapReduce的一個(gè)設(shè)計(jì)思想,旨在最大化利用數(shù)據(jù)本地性(data locality)來提高作業(yè)的性能。
在MapReduce中,數(shù)據(jù)通常分布在集群的不同節(jié)點(diǎn)上,而處理數(shù)據(jù)的任務(wù)(Map任務(wù)和Reduce任務(wù))盡量在數(shù)據(jù)所在的節(jié)點(diǎn)上執(zhí)行,減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸和節(jié)點(diǎn)間的通信開銷,提高作業(yè)的性能。
MapReduce過程推演
我們嘗試從頭來推演整個(gè)MapReduce任務(wù)的大致執(zhí)行過程:
- 怎么寫跨語言的MapReduce作業(yè)?
- JobClient做了些什么?
- JobClient和Yarn是如何交互的?
- Hadoop2.x中AppMaster、NodeManager是如何協(xié)作的?
- 輸入文件分割是怎么實(shí)現(xiàn)的?
Hadoop Streaming
如果使用非 Java 編程語言來實(shí)現(xiàn) MapReduce 任務(wù),或者希望更靈活地定制 Map 和 Reduce 函數(shù)的實(shí)現(xiàn)方式,可以考慮使用 Hadoop Streaming。
Hadoop Streaming 是 Hadoop 提供的一個(gè)工具,可以讓用戶通過標(biāo)準(zhǔn)輸入和標(biāo)準(zhǔn)輸出來實(shí)現(xiàn)自定義 Map 和 Reduce 函數(shù)的功能。使用 Hadoop Streaming 可以使用任何語言來實(shí)現(xiàn) Map 和 Reduce 函數(shù),而不僅僅局限于 Java。
當(dāng)使用 Hadoop Streaming 時(shí),客戶端會(huì)將 Map 和 Reduce 函數(shù)打包成可執(zhí)行文件,然后提交給 Hadoop 集群來執(zhí)行。這些可執(zhí)行文件可以用任何編程語言編寫,例如 Python、Perl、Ruby、C++ 等。

JobClient
在提交任務(wù)之前,客戶端需要將任務(wù)的輸入數(shù)據(jù)和輸出路徑等信息設(shè)置好,以便Hadoop集群能夠正確地執(zhí)行任務(wù)并將結(jié)果輸出到指定的路徑。
MapReduce的客戶端JobClient通常會(huì)將任務(wù)打包成JAR包,然后將該JAR包提交給Hadoop集群來執(zhí)行任務(wù)。
JAR包中包含了MapReduce任務(wù)所需的輸入數(shù)據(jù)、輸出路徑、 Map 和 Reduce 的數(shù)量,以及每個(gè)任務(wù)需要的內(nèi)存和 CPU 資源等參數(shù),這樣可以保證任務(wù)在集群中的任何節(jié)點(diǎn)上都能夠正常運(yùn)行。
hadoop 1.x和2.x
在Hadoop 1.x版本中,JobTracker和TaskTracker是Hadoop集群中的兩個(gè)重要組件,其中JobTracker負(fù)責(zé)整個(gè)集群中所有MapReduce任務(wù)的協(xié)調(diào)和管理,而TaskTracker負(fù)責(zé)具體的任務(wù)執(zhí)行。
在Hadoop 2.x版本中引入了YARN框架,將JobTracker的功能拆分成兩部分,一部分是ResourceManager,負(fù)責(zé)集群資源的管理和分配,另一部分是ApplicationMaster,負(fù)責(zé)具體任務(wù)的管理和協(xié)調(diào)。
在Hadoop 2.x版本及以后的版本中,ApplicationMaster扮演的角色類似于JobTracker。
AppMaster
在 Hadoop YARN 中,ApplicationMaster是一個(gè)關(guān)鍵的組件,它負(fù)責(zé)在集群中管理和監(jiān)控一個(gè)特定的應(yīng)用程序。
在 MapReduce 中,每個(gè) MapReduce 作業(yè)都有一個(gè)對(duì)應(yīng)的 ApplicationMaster 實(shí)例,該實(shí)例負(fù)責(zé)協(xié)調(diào)整個(gè)作業(yè)的執(zhí)行過程,包括分配任務(wù)、監(jiān)控任務(wù)的進(jìn)度和狀態(tài)、處理任務(wù)失敗等。
當(dāng)客戶端提交 MapReduce 作業(yè)時(shí),YARN 資源管理器會(huì)為該作業(yè)啟動(dòng)一個(gè) ApplicationMaster 實(shí)例。
ApplicationMaster 將向資源管理器請(qǐng)求分配資源,并與各個(gè) NodeManager 協(xié)商任務(wù)的執(zhí)行。它還負(fù)責(zé)將 MapReduce 作業(yè)的邏輯劃分為多個(gè) Map 和 Reduce 任務(wù),并將任務(wù)分配給相應(yīng)的 NodeManager 執(zhí)行。
在任務(wù)執(zhí)行期間,ApplicationMaster 將持續(xù)監(jiān)控任務(wù)的進(jìn)度和狀態(tài),并在任務(wù)出現(xiàn)錯(cuò)誤或失敗時(shí)進(jìn)行相應(yīng)的處理。
NodeManager
AppMaster和NodeManager不同,它們是YARN框架中的兩個(gè)不同的組件,分別扮演著不同的角色。
AppMaster是一個(gè)應(yīng)用程序級(jí)別的組件,它運(yùn)行在分布式集群中的一個(gè)節(jié)點(diǎn)上,負(fù)責(zé)協(xié)調(diào)和管理應(yīng)用程序的生命周期。它向ResourceManager請(qǐng)求資源,然后將這些資源分配給它的任務(wù)(如Map和Reduce任務(wù))。在任務(wù)運(yùn)行期間,AppMaster監(jiān)視任務(wù)的進(jìn)度并與ResourceManager通信,以確保應(yīng)用程序在分布式集群上有效地運(yùn)行。
NodeManager是一個(gè)節(jié)點(diǎn)級(jí)別的組件,它運(yùn)行在每個(gè)節(jié)點(diǎn)上,負(fù)責(zé)管理該節(jié)點(diǎn)上的容器和資源。在應(yīng)用程序啟動(dòng)時(shí),AppMaster向ResourceManager請(qǐng)求節(jié)點(diǎn)資源,并指示NodeManager在該節(jié)點(diǎn)上啟動(dòng)容器來執(zhí)行應(yīng)用程序的任務(wù)。
NodeManager負(fù)責(zé)啟動(dòng)容器并為它們分配資源,同時(shí)監(jiān)視它們的進(jìn)度,并向ResourceManager報(bào)告資源使用情況。

HDFS輸入文件邏輯分割
客戶端程序中的Job對(duì)象會(huì)設(shè)置輸入文件的路徑和InputFormat類,在提交作業(yè)之前,Job會(huì)調(diào)用InputFormat的getSplits()方法來獲取輸入文件的切片信息。
InputFormat 是針對(duì)輸入文件進(jìn)行邏輯分割,將一個(gè)或多個(gè)輸入文件劃分為一組輸入切片InputSplit,以便于 MapReduce 作業(yè)的并行處理。
對(duì)于大多數(shù)常見的數(shù)據(jù)類型,Hadoop 提供了一些內(nèi)置的 InputFormat 實(shí)現(xiàn),如下:
- TextInputFormat:按行讀取文本文件,并將每行作為一個(gè)輸入記錄。
- KeyValueTextInputFormat:按行讀取鍵值對(duì)形式的文本文件,并將每個(gè)鍵值對(duì)作為一個(gè)輸入記錄。
- SequenceFileInputFormat:讀取 Hadoop 序列文件(SequenceFile),并將其中的每個(gè) key-value 對(duì)作為一個(gè)輸入記錄。
- CombineFileInputFormat:支持讀取多個(gè)小文件或者多個(gè)小數(shù)據(jù)塊,并將它們合并成一個(gè)或多個(gè)輸入切片,以便于提高作業(yè)的并行度和執(zhí)行效率。
輸入文件的切片由InputFormat的getSplits()方法生成,這個(gè)方法會(huì)計(jì)算輸入文件的切片,并返回一個(gè)切片數(shù)組。每個(gè)切片都包含了一個(gè)起始偏移量和一個(gè)長度,這些信息告訴了Map任務(wù)它需要處理的輸入數(shù)據(jù)的范圍,ResourceManager會(huì)根據(jù)Split對(duì)象數(shù)組來計(jì)算作業(yè)需要的資源,并分配相應(yīng)的資源來運(yùn)行作業(yè)。
當(dāng)Map任務(wù)啟動(dòng)時(shí),它會(huì)使用InputFormat提供的InputSplit信息來創(chuàng)建一個(gè)RecordReader實(shí)例,用于讀取該Map任務(wù)需要處理的輸入數(shù)據(jù)。
RecordReader從HDFS或其他存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù),將數(shù)據(jù)劃分成適當(dāng)大小的記錄,然后將它們轉(zhuǎn)換為鍵值對(duì)(key-value pairs),再將它們傳遞給Mapper進(jìn)行處理。

shuffle簡介
先看一個(gè)完整的圖,接下來會(huì)進(jìn)行展開:

什么是shuffle
在 MapReduce 中,Map 和 Reduce 任務(wù)都是并行執(zhí)行的,Map 任務(wù)負(fù)責(zé)對(duì)輸入數(shù)據(jù)進(jìn)行處理,將其轉(zhuǎn)換為鍵值對(duì)的形式,而 Reduce 任務(wù)負(fù)責(zé)對(duì) Map 任務(wù)的輸出結(jié)果進(jìn)行聚合和計(jì)算。
在 MapReduce 中,Shuffle 過程的主要作用是將 Map 任務(wù)的輸出結(jié)果傳遞給 Reduce 任務(wù),并為 Reduce 任務(wù)提供輸入數(shù)據(jù),它是 MapReduce 中非常重要的一個(gè)步驟,可以提高 MapReduce 作業(yè)效率。
Shuffle 過程的作用包括以下幾點(diǎn):
- 合并相同 Key 的 Value:Map 任務(wù)輸出的鍵值對(duì)可能會(huì)包含相同的 Key,Shuffle 過程會(huì)將相同 Key 的 Value 合并在一起,減少 Reduce 任務(wù)需要處理的數(shù)據(jù)量。
- 按照 Key 進(jìn)行排序:Shuffle 過程會(huì)將 Map 任務(wù)的輸出結(jié)果按照 Key 進(jìn)行排序,這樣 Reduce 任務(wù)可以順序地處理鍵值對(duì)序列,避免在處理數(shù)據(jù)時(shí)需要進(jìn)行額外的排序操作。
- 劃分?jǐn)?shù)據(jù)并傳輸:Shuffle 過程會(huì)將 Map 任務(wù)的輸出結(jié)果按照 Key 劃分成多個(gè)分區(qū),并將每個(gè)分區(qū)的數(shù)據(jù)傳輸?shù)綄?duì)應(yīng)的 Reduce 任務(wù)中。這樣,Reduce 任務(wù)可以從不同的 Map 任務(wù)中獲取到數(shù)據(jù),從而實(shí)現(xiàn)更好的并行化處理。

Map端的Shuffle
- Map任務(wù)在執(zhí)行過程中會(huì)將輸出的鍵值對(duì)寫入本地磁盤上的環(huán)形緩沖區(qū)中,按照Partitioner函數(shù)的規(guī)則將鍵值對(duì)映射到不同的區(qū)域Partition中。
- 當(dāng)環(huán)形緩沖區(qū)滿或者M(jìn)ap任務(wù)執(zhí)行完畢時(shí),Map任務(wù)將環(huán)形緩沖區(qū)中的數(shù)據(jù)進(jìn)行排序,并按照Partition和排序結(jié)果寫入到本地磁盤上的Spill文件中。
- 當(dāng)所有Map任務(wù)都執(zhí)行完畢后,MapReduce會(huì)按照Partition對(duì)所有Spill文件進(jìn)行歸并排序,形成一個(gè)有序的大文件。
環(huán)形緩沖區(qū)的作用
環(huán)形緩沖區(qū)是一種特殊的緩沖區(qū),它將數(shù)據(jù)存儲(chǔ)在一個(gè)固定大小的、循環(huán)的緩沖區(qū)中,當(dāng)緩沖區(qū)滿時(shí),新的數(shù)據(jù)將覆蓋最老的數(shù)據(jù),于鏈表和數(shù)組等數(shù)據(jù)結(jié)構(gòu),環(huán)形緩沖區(qū)具有以下優(yōu)勢(shì):
- 高效的讀寫性能:由于環(huán)形緩沖區(qū)具有固定的大小,可以預(yù)先分配一定的空間,因此讀寫操作非常高效。而且由于緩沖區(qū)是循環(huán)的,可以通過簡單的指針操作來實(shí)現(xiàn)數(shù)據(jù)的讀寫操作,而不需要復(fù)雜的數(shù)據(jù)移動(dòng)操作。
- 空間利用率高:由于環(huán)形緩沖區(qū)可以循環(huán)利用空間,因此空間利用率非常高。相比于鏈表和數(shù)組等數(shù)據(jù)結(jié)構(gòu),它不需要額外的空間來維護(hù)節(jié)點(diǎn)或索引等信息,可以更好地利用內(nèi)存空間。
- 易于實(shí)現(xiàn)和管理:由于環(huán)形緩沖區(qū)的結(jié)構(gòu)非常簡單,因此易于實(shí)現(xiàn)和管理。可以通過一些簡單的技術(shù)來解決緩沖區(qū)溢出等問題,從而保證 Shuffle 過程的正確性。
分區(qū)&排序&溢寫
在 Map 階段中,Map 任務(wù)會(huì)將輸出數(shù)據(jù)寫入環(huán)形緩沖區(qū),而這些數(shù)據(jù)會(huì)被分為多個(gè)分區(qū),并且每個(gè)分區(qū)內(nèi)的數(shù)據(jù)是按照鍵(Key)進(jìn)行排序的。
分區(qū)的劃分是由 Partitioner 類完成的,默認(rèn)情況下,Partitioner 類會(huì)根據(jù)鍵值對(duì)的鍵(Key)來計(jì)算分區(qū)編號(hào)(Partition ID),從而將數(shù)據(jù)分配到對(duì)應(yīng)的分區(qū)中。
不同的鍵(Key)可能會(huì)被分到同一個(gè)分區(qū)中,而相同的鍵(Key)則一定會(huì)被分到同一個(gè)分區(qū)中,這是為了保證相同鍵(Key)的數(shù)據(jù)能夠被發(fā)送到同一個(gè) Reduce 任務(wù)中進(jìn)行處理。
Map 任務(wù)向環(huán)形緩沖區(qū)中寫入數(shù)據(jù)時(shí),先將數(shù)據(jù)插入到分區(qū)內(nèi),然后對(duì)該分區(qū)內(nèi)的所有數(shù)據(jù)進(jìn)行快速排序。
當(dāng)環(huán)形緩沖區(qū)中的數(shù)據(jù)量達(dá)到一定閾值(MapReduce 1.x 中默認(rèn)為環(huán)形緩沖區(qū)大小的 0.8 倍),或者某個(gè)分區(qū)內(nèi)存放的數(shù)據(jù)大小達(dá)到一定閾值(MapReduce 1.x 中默認(rèn)為 100MB),就會(huì)觸發(fā)溢寫操作,將數(shù)據(jù)按照分區(qū)寫到磁盤上的臨時(shí)文件中。

Reduce端的Shuffle
- map的shuffle階段會(huì)生成包含不同分區(qū)的大文件
- reduce任務(wù)可能會(huì)多個(gè)不同的分區(qū),但是同一個(gè)分區(qū)的數(shù)據(jù)一定在同一個(gè)reduce中
- reduce任務(wù)收集來自多個(gè)map輸出的同一個(gè)分區(qū)的數(shù)據(jù),在內(nèi)部再針對(duì)同一分區(qū)的多個(gè)文件做歸并成一個(gè)大文件

小結(jié)
很多時(shí)候明確問題比知道答案更重要,多想為什么、多在腦海里去推演過程、最終才能自洽吸收外界知識(shí),化為自己的經(jīng)驗(yàn)。?























