精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

MapReduce源碼解析--環形緩沖區

開發 開發工具
這篇文章把Map階段的環形緩沖區單獨拿出來進行分析,對環形緩沖區的數據結構和數據進入環形緩沖區然后溢寫到磁盤的流程進行分析。

這篇文章把Map階段的環形緩沖區單獨拿出來進行分析,對環形緩沖區的數據結構和數據進入環形緩沖區然后溢寫到磁盤的流程進行分析。

環形緩沖區數據結構

Map過程中環形緩沖區是指數據被map處理之后會先放入內存,內存中的這片區域就是環形緩沖區。

環形緩沖區是在MapTask.MapOutputBuffer中定義的,相關的屬性如下:

  1. // k/v accounting 
  2. // 存放meta數據的IntBuffer,都是int entry,占4byte 
  3. private IntBuffer kvmeta; // metadata overlay on backing store 
  4. int kvstart; // marks origin of spill metadata 
  5. int kvend; // marks end of spill metadata 
  6. int kvindex; // marks end of fully serialized records 
  7. // 分割meta和key value內容的標識 
  8. // meta數據和key value內容都存放在同一個環形緩沖區,所以需要分隔開 
  9. int equator; // marks origin of meta/serialization 
  10. int bufstart; // marks beginning of spill 
  11. int bufend; // marks beginning of collectable 
  12. int bufmark; // marks end of record 
  13. int bufindex; // marks end of collected 
  14. int bufvoid; // marks the point where we should stop 
  15. // reading at the end of the buffer 
  16. // 存放key value的byte數組,單位是byte,注意與kvmeta區分 
  17. byte[] kvbuffer; // main output buffer 
  18. private final byte[] b0 = new byte[0]; 
  19.   
  20. // key value在kvbuffer中的地址存放在偏移kvindex的距離 
  21. private static final int VALSTART = 0; // val offset in acct 
  22. private static final int KEYSTART = 1; // key offset in acct 
  23. // partition信息存在kvmeta中偏移kvindex的距離 
  24. private static final int PARTITION = 2; // partition offset in acct 
  25. private static final int VALLEN = 3; // length of value 
  26. // 一對key value的meta數據在kvmeta中占用的個數 
  27. private static final int NMETA = 4; // num meta ints 
  28. // 一對key value的meta數據在kvmeta中占用的byte數 
  29. private static final int METASIZE = NMETA * 4; // size in bytes 

環形緩沖區其實是一個數組,數組中存放著key、value的序列化數據和key、value的元數據信息,key/value的元數據存儲的格式是int類型,每個key/value對應一個元數據,元數據由4個int組成,第一個int存放value的起始位置,第二個存放key的起始位置,第三個存放partition,最后一個存放value的長度。

 

key/value序列化的數據和元數據在環形緩沖區中的存儲是由equator分隔的,key/value按照索引遞增的方向存儲,meta則按照索引遞減的方向存儲,將其數組抽象為一個環形結構之后,以equator為界,key/value順時針存儲,meta逆時針存儲。

初始化

 

環形緩沖區的結構在MapOutputBuffer.init中創建。

  1. public void init(MapOutputCollector.Context context 
  2. ) throws IOException, ClassNotFoundException { 
  3. ... 
  4. //MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent 
  5. // map 端buffer所占的百分比 
  6. //sanity checks 
  7. final float spillper = 
  8. job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); 
  9. //IO_SORT_MB = "mapreduce.task.io.sort.mb" 
  10. // map 端buffer大小 
  11. // mapreduce.task.io.sort.mb * mapreduce.map.sort.spill.percent 最好是16的整數倍 
  12. final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); 
  13. // 所有的spill index 在內存所占的大小的閾值 
  14. indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, 
  15. INDEX_CACHE_MEMORY_LIMIT_DEFAULT); 
  16. ... 
  17. // 排序的實現類,可以自己實現。 這里用的是改寫的快排 
  18. sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", 
  19. QuickSort.class, IndexedSorter.class), job); 
  20. // buffers and accounting 
  21. // 上面IO_SORT_MB的單位是MB,左移20位將單位轉化為byte 
  22. int maxMemUsage = sortmb << 20
  23. // METASIZE是元數據的長度,元數據有4個int單元,分別為 
  24. // VALSTART、KEYSTART、PARTITION、VALLEN,而int為4個byte, 
  25. // 所以METASIZE長度為16。下面是計算buffer中最多有多少byte來存元數據 
  26. maxMemUsage -maxMemUsage % METASIZE; 
  27. // 元數據數組 以byte為單位 
  28. kvbuffer = new byte[maxMemUsage]; 
  29. bufvoid = kvbuffer.length; 
  30. // 將kvbuffer轉化為int型的kvmeta 以int為單位,也就是4byte 
  31. kvmeta = ByteBuffer.wrap(kvbuffer) 
  32. .order(ByteOrder.nativeOrder()) 
  33. .asIntBuffer(); 
  34. // 設置buf和kvmeta的分界線 
  35. setEquator(0); 
  36. bufstart = bufend = bufindex = equator
  37. kvstart = kvend = kvindex; 
  38. // kvmeta中存放元數據實體的最大個數 
  39. maxRec = kvmeta.capacity() / NMETA; 
  40. // buffer spill時的閾值(不單單是sortmb*spillper) 
  41. // 更加精確的是kvbuffer.length*spiller 
  42. softLimit = (int)(kvbuffer.length * spillper); 
  43. // 此變量較為重要,作為spill的動態衡量標準 
  44. bufferRemaining = softLimit
  45. ... 
  46. // k/v serialization 
  47. comparator = job.getOutputKeyComparator(); 
  48. keyClass = (Class<K>)job.getMapOutputKeyClass(); 
  49. valClass = (Class<V>)job.getMapOutputValueClass(); 
  50. serializationFactory = new SerializationFactory(job); 
  51. keySerializer = serializationFactory.getSerializer(keyClass); 
  52. // 將bb作為key序列化寫入的output 
  53. keySerializer.open(bb); 
  54. valSerializer = serializationFactory.getSerializer(valClass); 
  55. // 將bb作為value序列化寫入的output 
  56. valSerializer.open(bb); 
  57. ... 
  58. // combiner 
  59. ... 
  60. spillInProgress = false
  61. // 最后一次merge時,在有combiner的情況下,超過此閾值才執行combiner 
  62. minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); 
  63. spillThread.setDaemon(true); 
  64. spillThread.setName("SpillThread"); 
  65. spillLock.lock(); 
  66. try { 
  67. spillThread.start(); 
  68. while (!spillThreadRunning) { 
  69. spillDone.await(); 
  70. } catch (InterruptedException e) { 
  71. throw new IOException("Spill thread failed to initialize", e); 
  72. } finally { 
  73. spillLock.unlock(); 
  74. if (sortSpillException != null) { 
  75. throw new IOException("Spill thread failed to initialize", 
  76. sortSpillException); 

init是對環形緩沖區進行初始化構造,由mapreduce.task.io.sort.mb決定map中環形緩沖區的大小sortmb,默認是100M。

此緩沖區也用于存放meta,一個meta占用METASIZE(16byte),則其中用于存放數據的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好設置sortmb轉換為byte之后是16的整數倍),然后用maxMemUsage初始化kvbuffer字節數組和kvmeta整形數組,最后設置數組的一些標識信息。利用setEquator(0)設置kvbuffer和kvmeta的分界線,初始化的時候以0為分界線,kvindex為aligned - METASIZE + kvbuffer.length,其位置在環形數組中相當于按照逆時針方向減去METASIZE,由kvindex設置kvstart = kvend = kvindex,由equator設置bufstart = bufend = bufindex = equator,還得設置bufvoid = kvbuffer.length,bufvoid用于標識用于存放數據的最大位置。

為了提高效率,當buffer占用達到閾值之后,會進行spill,這個閾值是由bufferRemaining進行檢查的,bufferRemaining由softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;進行初始化賦值,這里需要注意的是softLimit并不是sortmb*spillper,而是kvbuffer.length * spillper,當sortmb << 20是16的整數倍時,才可以認為softLimit是sortmb*spillper。

 

下面是setEquator的代碼

  1. // setEquator(0)的代碼如下 
  2. private void setEquator(int pos) { 
  3. equator = pos
  4. // set index prior to first entry, aligned at meta boundary 
  5. // 第一個 entry的末尾位置,即元數據和kv數據的分界線 單位是byte 
  6. final int aligned = pos - (pos % METASIZE); 
  7. // Cast one of the operands to long to avoid integer overflow 
  8. // 元數據中存放數據的起始位置 
  9. kvindex = (int) 
  10. (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; 
  11. LOG.info("(EQUATOR) " + pos + " kvi " + kvindex + 
  12. "(" + (kvindex * 4) + ")"); 

buffer初始化之后的抽象數據結構如下圖所示:

buffer初始化之后的抽象數據結構

環形緩沖區數據結構圖

寫入buffer

 

Map通過NewOutputCollector.write方法調用collector.collect向buffer中寫入數據,數據寫入之前已在NewOutputCollector.write中對要寫入的數據進行逐條分區,下面看下collect

  1. // MapOutputBuffer.collect 
  2. public synchronized void collect(K key, V value, final int partition 
  3. ) throws IOException { 
  4. ... 
  5. // 新數據collect時,先將剩余的空間減去元數據的長度,之后進行判斷 
  6. bufferRemaining -METASIZE
  7. if (bufferRemaining <= 0) { 
  8. // start spill if the thread is not running and the soft limit has been 
  9. // reached 
  10. spillLock.lock(); 
  11. try { 
  12. do { 
  13. // 首次spill時,spillInProgress是false 
  14. if (!spillInProgress) { 
  15. // 得到kvindex的byte位置 
  16. final int kvbidx = 4 * kvindex; 
  17. // 得到kvend的byte位置 
  18. final int kvbend = 4 * kvend; 
  19. // serialized, unspilled bytes always lie between kvindex and 
  20. // bufindex, crossing the equator. Note that any void space 
  21. // created by a reset must be included in "used" bytes 
  22. final int bUsed = distanceTo(kvbidx, bufindex); 
  23. final boolean bufsoftlimit = bUsed >= softLimit; 
  24. if ((kvbend + METASIZE) % kvbuffer.length != 
  25. equator - (equator % METASIZE)) { 
  26. // spill finished, reclaim space 
  27. resetSpill(); 
  28. bufferRemaining = Math.min( 
  29. distanceTo(bufindex, kvbidx) - 2 * METASIZE, 
  30. softLimit - bUsed) - METASIZE; 
  31. continue; 
  32. } else if (bufsoftlimit && kvindex != kvend) { 
  33. // spill records, if any collected; check latter, as it may 
  34. // be possible for metadata alignment to hit spill pcnt 
  35. startSpill(); 
  36. final int avgRec = (int) 
  37. (mapOutputByteCounter.getCounter() / 
  38. mapOutputRecordCounter.getCounter()); 
  39. // leave at least half the split buffer for serialization data 
  40. // ensure that kvindex >= bufindex 
  41. final int distkvi = distanceTo(bufindex, kvbidx); 
  42. final int newPos = (bufindex + 
  43. Math.max(2 * METASIZE - 1, 
  44. Math.min(distkvi / 2, 
  45. distkvi / (METASIZE + avgRec) * METASIZE))) 
  46. % kvbuffer.length; 
  47. setEquator(newPos); 
  48. bufmark = bufindex = newPos; 
  49. final int serBound = 4 * kvend; 
  50. // bytes remaining before the lock must be held and limits 
  51. // checked is the minimum of three arcs: the metadata space, the 
  52. // serialization space, and the soft limit 
  53. bufferRemaining = Math.min( 
  54. // metadata max 
  55. distanceTo(bufend, newPos), 
  56. Math.min( 
  57. // serialization max 
  58. distanceTo(newPos, serBound), 
  59. // soft limit 
  60. softLimit)) - 2 * METASIZE; 
  61. } while (false); 
  62. } finally { 
  63. spillLock.unlock(); 
  64. // 將key value 及元數據信息寫入緩沖區 
  65. try { 
  66. // serialize key bytes into buffer 
  67. int keystart = bufindex
  68. // 將key序列化寫入kvbuffer中,并移動bufindex 
  69. keySerializer.serialize(key); 
  70. // key所占空間被bufvoid分隔,則移動key, 
  71. // 將其值放在連續的空間中便于sort時key的對比 
  72. if (bufindex < keystart) { 
  73. // wrapped the key; must make contiguous 
  74. bb.shiftBufferedKey(); 
  75. keystart = 0
  76. // serialize value bytes into buffer 
  77. final int valstart = bufindex
  78. valSerializer.serialize(value); 
  79. // It's possible for records to have zero length, i.e. the serializer 
  80. // will perform no writes. To ensure that the boundary conditions are 
  81. // checked and that the kvindex invariant is maintained, perform a 
  82. // zero-length write into the buffer. The logic monitoring this could be 
  83. // moved into collect, but this is cleaner and inexpensive. For now, it 
  84. // is acceptable. 
  85. bb.write(b0, 0, 0); 
  86.   
  87. // the record must be marked after the preceding write, as the metadata 
  88. // for this record are not yet written 
  89. int valend = bb.markRecord(); 
  90.   
  91. mapOutputRecordCounter.increment(1); 
  92. mapOutputByteCounter.increment( 
  93. distanceTo(keystart, valend, bufvoid)); 
  94.   
  95. // write accounting info 
  96. kvmeta.put(kvindex + PARTITION, partition); 
  97. kvmeta.put(kvindex + KEYSTART, keystart); 
  98. kvmeta.put(kvindex + VALSTART, valstart); 
  99. kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); 
  100. // advance kvindex 
  101. kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); 
  102. } catch (MapBufferTooSmallException e) { 
  103. LOG.info("Record too large for in-memory buffer: " + e.getMessage()); 
  104. spillSingleRecord(key, value, partition); 
  105. mapOutputRecordCounter.increment(1); 
  106. return; 

每次寫入數據時,執行bufferRemaining -= METASIZE之后,檢查bufferRemaining,

 

如果大于0,直接將key/value序列化對和對應的meta寫入buffer中,key/value是序列化之后寫入的,key/value經過一些列的方法調用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len),最后由MapOutputBuffer.Buffer.write(b, off, len)將數據寫入kvbuffer中,write方法如下:

  1. public void write(byte b[], int off, int len) 
  2. throws IOException { 
  3. // must always verify the invariant that at least METASIZE bytes are 
  4. // available beyond kvindex, even when len == 0 
  5. bufferRemaining -len
  6. if (bufferRemaining <= 0) { 
  7. // writing these bytes could exhaust available buffer space or fill 
  8. // the buffer to soft limit. check if spill or blocking are necessary 
  9. boolean blockwrite = false
  10. spillLock.lock(); 
  11. try { 
  12. do { 
  13. checkSpillException(); 
  14.   
  15. final int kvbidx = 4 * kvindex; 
  16. final int kvbend = 4 * kvend; 
  17. // ser distance to key index 
  18. final int distkvi = distanceTo(bufindex, kvbidx); 
  19. // ser distance to spill end index 
  20. final int distkve = distanceTo(bufindex, kvbend); 
  21.   
  22. // if kvindex is closer than kvend, then a spill is neither in 
  23. // progress nor complete and reset since the lock was held. The 
  24. // write should block only if there is insufficient space to 
  25. // complete the current write, write the metadata for this record, 
  26. // and write the metadata for the next record. If kvend is closer, 
  27. // then the write should block if there is too little space for 
  28. // either the metadata or the current write. Note that collect 
  29. // ensures its metadata requirement with a zero-length write 
  30. blockwrite = distkvi <= distkve 
  31. ? distkvi <= len + 2 * METASIZE 
  32. : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE; 
  33.   
  34. if (!spillInProgress) { 
  35. if (blockwrite) { 
  36. if ((kvbend + METASIZE) % kvbuffer.length != 
  37. equator - (equator % METASIZE)) { 
  38. // spill finished, reclaim space 
  39. // need to use meta exclusively; zero-len rec & 100% spill 
  40. // pcnt would fail 
  41. resetSpill(); // resetSpill doesn't move bufindex, kvindex 
  42. bufferRemaining = Math.min( 
  43. distkvi - 2 * METASIZE, 
  44. softLimit - distanceTo(kvbidx, bufindex)) - len; 
  45. continue; 
  46. // we have records we can spill; only spill if blocked 
  47. if (kvindex != kvend) { 
  48. startSpill(); 
  49. // Blocked on this write, waiting for the spill just 
  50. // initiated to finish. Instead of repositioning the marker 
  51. // and copying the partial record, we set the record start 
  52. // to be the new equator 
  53. setEquator(bufmark); 
  54. } else { 
  55. // We have no buffered records, and this record is too large 
  56. // to write into kvbuffer. We must spill it directly from 
  57. // collect 
  58. final int size = distanceTo(bufstart, bufindex) + len; 
  59. setEquator(0); 
  60. bufstart = bufend = bufindex = equator
  61. kvstart = kvend = kvindex; 
  62. bufvoid = kvbuffer.length; 
  63. throw new MapBufferTooSmallException(size + " bytes"); 
  64.   
  65. if (blockwrite) { 
  66. // wait for spill 
  67. try { 
  68. while (spillInProgress) { 
  69. reporter.progress(); 
  70. spillDone.await(); 
  71. } catch (InterruptedException e) { 
  72. throw new IOException( 
  73. "Buffer interrupted while waiting for the writer", e); 
  74. } while (blockwrite); 
  75. } finally { 
  76. spillLock.unlock(); 
  77. // here, we know that we have sufficient space to write 
  78. if (bufindex + len > bufvoid) { 
  79. final int gaplen = bufvoid - bufindex; 
  80. System.arraycopy(b, off, kvbuffer, bufindex, gaplen); 
  81. len -gaplen
  82. off += gaplen; 
  83. bufindex = 0
  84. System.arraycopy(b, off, kvbuffer, bufindex, len); 
  85. bufindex += len; 

write方法將key/value寫入kvbuffer中,如果bufindex+len超過了bufvoid,則將寫入的內容分開存儲,將一部分寫入bufindex和bufvoid之間,然后重置bufindex,將剩余的部分寫入,這里不區分key和value,寫入key之后會在collect中判斷bufindex < keystart,當bufindex小時,則key被分開存儲,執行bb.shiftBufferedKey(),value則直接寫入,不用判斷是否被分開存儲,key不能分開存儲是因為要對key進行排序。

這里需要注意的是要寫入的數據太長,并且kvinde==kvend,則拋出MapBufferTooSmallException異常,在collect中捕獲,將此數據直接spill到磁盤spillSingleRecord,也就是當單條記錄過長時,不寫buffer,直接寫入磁盤。

 

下面看下bb.shiftBufferedKey()代碼

  1. // BlockingBuffer.shiftBufferedKey 
  2. protected void shiftBufferedKey() throws IOException { 
  3. // spillLock unnecessary; both kvend and kvindex are current 
  4. int headbytelen = bufvoid - bufmark; 
  5. bufvoid = bufmark
  6. final int kvbidx = 4 * kvindex; 
  7. final int kvbend = 4 * kvend; 
  8. final int avail = 
  9. Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend)); 
  10. if (bufindex + headbytelen < avail) { 
  11. System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex); 
  12. System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen); 
  13. bufindex += headbytelen; 
  14. bufferRemaining -kvbuffer.length - bufvoid; 
  15. } else { 
  16. byte[] keytmp = new byte[bufindex]; 
  17. System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex); 
  18. bufindex = 0
  19. out.write(kvbuffer, bufmark, headbytelen); 
  20. out.write(keytmp); 

shiftBufferedKey時,判斷首部是否有足夠的空間存放key,有沒有足夠的空間,則先將首部的部分key寫入keytmp中,然后分兩次寫入,再次調用Buffer.write,如果有足夠的空間,分兩次copy,先將首部的部分key復制到headbytelen的位置,然后將末尾的部分key復制到首部,移動bufindex,重置bufferRemaining的值。

key/value寫入之后,繼續寫入元數據信息并重置kvindex的值。

spill

 

一次寫入buffer結束,當寫入數據比較多,bufferRemaining小于等于0時,準備進行spill,首次spill,spillInProgress為false,此時查看bUsed = distanceTo(kvbidx, bufindex),此時bUsed >= softLimit 并且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE),則進行spill,調用startSpill

  1. private void startSpill() { 
  2. // 元數據的邊界賦值 
  3. kvend = (kvindex + NMETA) % kvmeta.capacity(); 
  4. // key/value的邊界賦值 
  5. bufend = bufmark
  6. // 設置spill運行標識 
  7. spillInProgress = true
  8. ... 
  9. // 利用重入鎖,對spill線程進行喚醒 
  10. spillReady.signal(); 

startSpill喚醒spill線程之后,進程spill操作,但此時map向buffer的寫入操作并沒有阻塞,需要重新邊界equator和bufferRemaining的值,先來看下equator和bufferRemaining值的設定:

  1. // 根據已經寫入的kv得出每個record的平均長度 
  2. final int avgRec = (int) (mapOutputByteCounter.getCounter() / 
  3. mapOutputRecordCounter.getCounter()); 
  4. // leave at least half the split buffer for serialization data 
  5. // ensure that kvindex >= bufindex 
  6. // 得到空余空間的大小 
  7. final int distkvi = distanceTo(bufindex, kvbidx); 
  8. // 得出新equator的位置 
  9. final int newPos = (bufindex + 
  10. Math.max(2 * METASIZE - 1, 
  11. Math.min(distkvi / 2, 
  12. distkvi / (METASIZE + avgRec) * METASIZE))) 
  13. % kvbuffer.length; 
  14. setEquator(newPos); 
  15. bufmark = bufindex = newPos; 
  16. final int serBound = 4 * kvend; 
  17. // bytes remaining before the lock must be held and limits 
  18. // checked is the minimum of three arcs: the metadata space, the 
  19. // serialization space, and the soft limit 
  20. bufferRemaining = Math.min( 
  21. // metadata max 
  22. distanceTo(bufend, newPos), 
  23. Math.min( 
  24. // serialization max 
  25. distanceTo(newPos, serBound), 
  26. // soft limit 
  27. softLimit)) - 2 * METASIZE; 

因為equator是kvbuffer和kvmeta的分界線,為了更多的空間存儲kv,則最多拿出distkvi的一半來存儲meta,并且利用avgRec估算distkvi能存放多少個record和meta對,根據record和meta對的個數估算meta所占空間的大小,從distkvi/2和meta所占空間的大小中取最小值,又因為distkvi中最少得存放一個meta,所占空間為METASIZE,在選取kvindex時需要求aligned,aligned最多為METASIZE-1,總和上述因素,最終選取equator為(bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE)))。equator選取之后,設置bufmark = bufindex = newPos和kvindex,但此時并不設置bufstart、bufend和kvstart、kvend,因為這幾個值要用來表示spill數據的邊界。

spill之后,可用的空間減少了,則控制spill的bufferRemaining也應該重新設置,bufferRemaining取三個值的最小值減去2*METASIZE,三個值分別是meta可用占用的空間distanceTo(bufend, newPos),kv可用空間distanceTo(newPos, serBound)和softLimit。這里為什么要減去2*METASIZE,一個是spill之前kvend到kvindex的距離,另一個是當時的kvindex空間????此時,已有一個record要寫入buffer,需要從bufferRemaining中減去當前record的元數據占用的空間,即減去METASIZE,另一個METASIZE是在計算equator時,沒有包括kvindex到kvend(spill之前)的這段METASIZE,所以要減去這個METASIZE。

 

接下來解析下SpillThread線程,查看其run方法:

  1. public void run() { 
  2. spillLock.lock(); 
  3. spillThreadRunning = true
  4. try { 
  5. while (true) { 
  6. spillDone.signal(); 
  7. // 判斷是否在spill,false則掛起SpillThread線程,等待喚醒 
  8. while (!spillInProgress) { 
  9. spillReady.await(); 
  10. try { 
  11. spillLock.unlock(); 
  12. // 喚醒之后,進行排序和溢寫到磁盤 
  13. sortAndSpill(); 
  14. } catch (Throwable t) { 
  15. sortSpillException = t; 
  16. } finally { 
  17. spillLock.lock(); 
  18. if (bufend < bufstart) { 
  19. bufvoid = kvbuffer.length; 
  20. kvstart = kvend
  21. bufstart = bufend
  22. spillInProgress = false
  23. } catch (InterruptedException e) { 
  24. Thread.currentThread().interrupt(); 
  25. } finally { 
  26. spillLock.unlock(); 
  27. spillThreadRunning = false

run中主要是sortAndSpill,

  1. private void sortAndSpill() throws IOException, ClassNotFoundException, 
  2. InterruptedException { 
  3. //approximate the length of the output file to be the length of the 
  4. //buffer + header lengths for the partitions 
  5. final long size = distanceTo(bufstart, bufend, bufvoid) + 
  6. partitions * APPROX_HEADER_LENGTH; 
  7. FSDataOutputStream out = null
  8. try { 
  9. // create spill file 
  10. // 用來存儲index文件 
  11. final SpillRecord spillRec = new SpillRecord(partitions); 
  12. // 創建寫入磁盤的spill文件 
  13. final Path filename = 
  14. mapOutputFile.getSpillFileForWrite(numSpills, size); 
  15. // 打開文件流 
  16. out = rfs.create(filename); 
  17. // kvend/4 是截止到當前位置能存放多少個元數據實體 
  18. final int mstart = kvend / NMETA; 
  19. // kvstart 處能存放多少個元數據實體 
  20. // 元數據則在mstart和mend之間,(mstart - mend)則是元數據的個數 
  21. final int mend = 1 + // kvend is a valid record 
  22. (kvstart >= kvend 
  23. ? kvstart 
  24. : kvmeta.capacity() + kvstart) / NMETA; 
  25. // 排序 只對元數據進行排序,只調整元數據在kvmeta中的順序 
  26. // 排序規則是MapOutputBuffer.compare, 
  27. // 先對partition進行排序其次對key值排序 
  28. sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); 
  29. int spindex = mstart
  30. // 創建rec,用于存放該分區在數據文件中的信息 
  31. final IndexRecord rec = new IndexRecord(); 
  32. final InMemValBytes value = new InMemValBytes(); 
  33. for (int i = 0; i < partitions; ++i) { 
  34. // 臨時文件是IFile格式的 
  35. IFile.Writer<K, V> writer = null
  36. try { 
  37. long segmentStart = out.getPos(); 
  38. FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); 
  39. writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, 
  40. spilledRecordsCounter); 
  41. // 往磁盤寫數據時先判斷是否有combiner 
  42. if (combinerRunner == null) { 
  43. // spill directly 
  44. DataInputBuffer key = new DataInputBuffer(); 
  45. // 寫入相同partition的數據 
  46. while (spindex < mend && 
  47. kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { 
  48. final int kvoff = offsetFor(spindex % maxRec); 
  49. int keystart = kvmeta.get(kvoff + KEYSTART); 
  50. int valstart = kvmeta.get(kvoff + VALSTART); 
  51. key.reset(kvbuffer, keystart, valstart - keystart); 
  52. getVBytesForOffset(kvoff, value); 
  53. writer.append(key, value); 
  54. ++spindex; 
  55. } else { 
  56. int spstart = spindex
  57. while (spindex < mend && 
  58. kvmeta.get(offsetFor(spindex % maxRec) 
  59. + PARTITION) == i) { 
  60. ++spindex; 
  61. // Note: we would like to avoid the combiner if we've fewer 
  62. // than some threshold of records for a partition 
  63. if (spstart != spindex) { 
  64. combineCollector.setWriter(writer); 
  65. RawKeyValueIterator kvIter = 
  66. new MRResultIterator(spstart, spindex); 
  67. combinerRunner.combine(kvIter, combineCollector); 
  68.   
  69. // close the writer 
  70. writer.close(); 
  71.   
  72. // record offsets 
  73. // 記錄當前partition i的信息寫入索文件rec中 
  74. rec.startOffset = segmentStart
  75. rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); 
  76. rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); 
  77. // spillRec中存放了spill中partition的信息,便于后續堆排序時,取出partition相關的數據進行排序 
  78. spillRec.putIndex(rec, i); 
  79.   
  80. writer = null
  81. } finally { 
  82. if (null != writer) writer.close(); 
  83. // 判斷內存中的index文件是否超出閾值,超出則將index文件寫入磁盤 
  84. // 當超出閾值時只是把當前index和之后的index寫入磁盤 
  85. if (totalIndexCacheMemory >= indexCacheMemoryLimit) { 
  86. // create spill index file 
  87. // 創建index文件 
  88. Path indexFilename = 
  89. mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions 
  90. * MAP_OUTPUT_INDEX_RECORD_LENGTH); 
  91. spillRec.writeToFile(indexFilename, job); 
  92. } else { 
  93. indexCacheList.add(spillRec); 
  94. totalIndexCacheMemory += 
  95. spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; 
  96. LOG.info("Finished spill " + numSpills); 
  97. ++numSpills; 
  98. } finally { 
  99. if (out != null) out.close(); 

ortAndSpill中,有mstart和mend得到一共有多少條record需要spill到磁盤,調用sorter.sort對meta進行排序,先對partition進行排序,然后按key排序,排序的結果只調整meta的順序。

排序之后,判斷是否有combiner,沒有則直接將record寫入磁盤,寫入時是一個partition一個IndexRecord,如果有combiner,則將該partition的record寫入kvIter,然后調用combinerRunner.combine執行combiner。

寫入磁盤之后,將spillx.out對應的spillRec放入內存indexCacheList.add(spillRec),如果所占內存totalIndexCacheMemory超過了indexCacheMemoryLimit,則創建index文件,將此次及以后的spillRec寫入index文件存入磁盤。

最后spill次數遞增。sortAndSpill結束之后,回到run方法中,執行finally中的代碼,對kvstart和bufstart賦值,kvstart = kvend,bufstart = bufend,設置spillInProgress的狀態為false。

 

在spill的同時,map往buffer的寫操作并沒有停止,依然在調用collect,再次回到collect方法中,

  1. // MapOutputBuffer.collect 
  2. public synchronized void collect(K key, V value, final int partition 
  3. ) throws IOException { 
  4. ... 
  5. // 新數據collect時,先將剩余的空間減去元數據的長度,之后進行判斷 
  6. bufferRemaining -METASIZE
  7. if (bufferRemaining <= 0) { 
  8. // start spill if the thread is not running and the soft limit has been 
  9. // reached 
  10. spillLock.lock(); 
  11. try { 
  12. do { 
  13. // 首次spill時,spillInProgress是false 
  14. if (!spillInProgress) { 
  15. // 得到kvindex的byte位置 
  16. final int kvbidx = 4 * kvindex; 
  17. // 得到kvend的byte位置 
  18. final int kvbend = 4 * kvend; 
  19. // serialized, unspilled bytes always lie between kvindex and 
  20. // bufindex, crossing the equator. Note that any void space 
  21. // created by a reset must be included in "used" bytes 
  22. final int bUsed = distanceTo(kvbidx, bufindex); 
  23. final boolean bufsoftlimit = bUsed >= softLimit; 
  24. if ((kvbend + METASIZE) % kvbuffer.length != 
  25. equator - (equator % METASIZE)) { 
  26. // spill finished, reclaim space 
  27. resetSpill(); 
  28. bufferRemaining = Math.min( 
  29. distanceTo(bufindex, kvbidx) - 2 * METASIZE, 
  30. softLimit - bUsed) - METASIZE; 
  31. continue; 
  32. } else if (bufsoftlimit && kvindex != kvend) { 
  33. ... 
  34. } while (false); 
  35. } finally { 
  36. spillLock.unlock(); 
  37. ... 

有新的record需要寫入buffer時,判斷bufferRemaining -= METASIZE,此時的bufferRemaining是在開始spill時被重置過的(此時的bufferRemaining應該比初始的softLimit要小),當bufferRemaining小于等于0時,進入if,此時spillInProgress的狀態為false,進入if (!spillInProgress),startSpill時對kvend和bufend進行了重置,則此時(kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE),調用resetSpill(),將kvstart、kvend和bufstart、bufend設置為上次startSpill時的位置。此時buffer已將一部分內容寫入磁盤,有大量空余的空間,則對bufferRemaining進行重置,此次不spill。

bufferRemaining取值為Math.min(distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE

 

最后一個METASIZE是當前record進入collect之后bufferRemaining減去的那個METASIZE,為什么要減去2*METASIZE,不知道。。。。。

  1. private void resetSpill() { 
  2. final int e = equator
  3. bufstart = bufend = e; 
  4. final int aligned = e - (e % METASIZE); 
  5. // set start/end to point to first meta record 
  6. // Cast one of the operands to long to avoid integer overflow 
  7. kvstart = kvend = (int) 
  8. (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; 
  9. LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" + 
  10. (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); 

當bufferRemaining再次小于等于0時,進行spill,這以后就都是套路了。環形緩沖區分析到此結束。

【本文為51CTO專欄作者“王森豐”的原創稿件,轉載請注明出處】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2023-10-09 23:01:09

MySQL數據庫

2021-06-26 07:50:20

STM32串口開發環形緩沖區

2011-12-14 16:30:42

javanio

2017-01-09 17:03:34

2019-02-27 13:58:29

漏洞緩沖區溢出系統安全

2018-11-01 08:31:05

2011-03-23 12:39:44

2014-07-30 11:21:46

2011-03-23 11:35:00

2018-01-26 14:52:43

2009-11-16 17:08:59

Oracle日志緩沖區

2009-11-16 17:26:17

Oracle優化緩沖區

2009-07-15 15:50:48

Jython線程

2009-11-16 16:59:24

Oracle優化庫高速

2009-09-24 18:16:40

2011-07-20 10:54:14

C++

2010-12-27 10:21:21

2022-05-07 08:27:42

緩沖區溢出堆棧

2010-10-09 14:45:48

2015-03-06 17:09:10

點贊
收藏

51CTO技術棧公眾號

99re视频在线观看| 日韩av在线看| 日韩视频在线免费播放| 中文av免费观看| 成人午夜av| 欧美三级电影一区| 宅男一区二区三区| av天堂一区二区三区| 日韩欧美视频专区| 欧美一级二级在线观看| 成人在线视频一区二区三区| 免费a视频在线观看| 亚洲狼人精品一区二区三区| 日韩精品在线免费播放| 日韩免费高清在线| 久草资源在线| 国产麻豆一精品一av一免费| 韩国v欧美v日本v亚洲| 中文字幕国产专区| 精品69视频一区二区三区| 亚洲色图丝袜美腿| 国产日韩亚洲精品| 中文字幕在线观看免费视频| 欧美日韩精品一区二区视频| 91精品国产高清一区二区三区蜜臀| 中文字幕一区二区三区5566| 动漫av一区二区三区| 免播放器亚洲| 免费99精品国产自在在线| 久久久老熟女一区二区三区91| 日本成人伦理电影| 一区二区三区国产精品| 欧美日韩电影一区二区| 亚洲视频在线观看免费视频| 最近2019年日本中文免费字幕| 久久久久久久久久久久久久一区| av片免费观看| 欧美成人一品| 精品人伦一区二区色婷婷| 免费国产a级片| 免费av在线网站| 99久久国产综合精品色伊| 国产女人精品视频| 国产精品免费av一区二区| 日韩欧美一区免费| 日韩成人在线视频观看| 久久精品一卡二卡| 午夜激情电影在线播放| 亚洲色图在线播放| 日韩久久在线| 天天摸天天碰天天爽天天弄| 精品一区二区国语对白| 69影院欧美专区视频| 日韩免费成人av| 黑人久久a级毛片免费观看| 欧美日韩不卡视频| 99精品视频播放| av电影免费在线看| 亚洲乱码日产精品bd| 亚洲精品二区| 黄视频在线播放| 99视频国产精品| 99国产精品久久久久老师| 欧美日韩在线视频播放| 国产欧美日韩综合一区在线播放| 欧美另类交人妖| 五月天精品在线| 国产真实有声精品录音| 亚洲精品国产欧美| 欧美xxxx×黑人性爽| 日韩中文字幕一区二区高清99| 欧洲一区二区av| 亚欧无线一线二线三线区别| 麻豆av在线免费观看| 最新日韩av在线| 一区二区三区av| jizz日韩| 亚洲国产精品精华液2区45| 欧美日韩国产一二| 无码精品一区二区三区在线| 国产大陆a不卡| 91欧美激情另类亚洲| 国产又粗又大又黄| 国产一区二区中文字幕| 成人羞羞国产免费| 精品久久国产视频| 粉嫩av一区二区三区在线播放| 亚洲999一在线观看www| 中文字幕日日夜夜| 免费一级欧美片在线观看| 国产精品99免视看9| 中文字幕xxxx| 日韩va亚洲va欧美va久久| 国产精品大陆在线观看| 国产精品国产精品国产| 精品综合久久久久久8888| 成人av番号网| wwwxxxx国产| 成人激情免费电影网址| 就去色蜜桃综合| 成人免费在线视频网| 中文字幕一区免费在线观看| 91九色国产ts另类人妖| 国产第一页在线| 亚洲一区电影777| 国产极品尤物在线| 55av亚洲| 91久久久免费一区二区| 亚洲少妇第一页| 久久天天久久| 欧美一区二区三区视频免费 | 毛片视频免费播放| 亚洲影视一区二区三区| 欧美巨大黑人极品精男| 日韩熟女精品一区二区三区| 久久最新视频| 成人高h视频在线| 亚洲成人777777| 99国产精品久久| 日韩欧美在线电影| 性欧美videoshd高清| 懂色av中文一区二区三区天美| 天堂中文视频在线| 午夜视频在线观看精品中文| 亚洲免费中文字幕| 美国黄色片视频| 黄色欧美日韩| 国产精品视频精品视频| 国产成a人亚洲精v品无码| 99re热这里只有精品免费视频| 亚洲精品成人三区| 免费看电影在线| 欧美最新大片在线看| 不许穿内裤随时挨c调教h苏绵| 日韩电影不卡一区| 精品国产自在精品国产浪潮| 日韩 欧美 中文| 国产一二精品视频| 日韩av电影免费播放| 欧美激情成人动漫| 欧美日韩在线播放三区四区| 国产又粗又长又爽| 综合激情一区| 国产精品亚洲自拍| 色在线免费视频| 一区二区三区国产| 色婷婷综合网站| 蜜桃成人av| 久久久久久亚洲精品中文字幕| japanese国产在线观看| 97久久久精品综合88久久| 日韩三级电影| 人在线成免费视频| 欧美mv日韩mv国产网站app| 欧美aaa级片| 性欧美精品高清| 国产精品国产一区二区| 大地资源网在线观看免费官网| 免费av在线电影| 亚洲国产精品精华液网站| 国产无色aaa| 国产精品白丝av嫩草影院| 久久精品人人爽| 中文字幕人妻一区二区在线视频 | 日韩一区三区| 国产伦精品一区二区三区精品视频| 国产在线91| 欧洲av一区二区嗯嗯嗯啊| wwwww黄色| 精品一区二区三区久久| 中文字幕久精品免| 亚洲va欧美va人人爽成人影院| 久久91亚洲精品中文字幕奶水 | 亚洲精品美女在线| www欧美在线| 国产亚洲人成网站| www.精品在线| 女生裸体视频一区二区三区| 动漫精品视频| 五月天av在线| 日韩在线视频免费观看| 国产手机精品视频| 亚洲成人动漫av| www在线观看免费视频| 美洲天堂一区二卡三卡四卡视频| 一级全黄肉体裸体全过程| 天堂久久av| 欧美综合一区第一页| 北岛玲一区二区三区| 欧美精品国产精品| 久久亚洲精品大全| 国产日韩影视精品| 91在线第一页| 先锋影音久久| 综合国产精品久久久| 97久久亚洲| 国产精品久久久久久久久久ktv| 毛片在线播放a| 亚洲精品一区二区三区蜜桃下载| 国产免费一级视频| 亚洲激情五月婷婷| 国产熟妇久久777777| 国产老肥熟一区二区三区| 久草热视频在线观看| 三级电影一区| 国产乱码精品一区二区三区日韩精品| xxxxxx欧美| 欧美激情视频一区| 成人av电影观看| 精品88久久久久88久久久| 中文字幕手机在线视频| 亚洲精品videosex极品| 亚洲黄色小说视频| 成人视屏免费看| 日本不卡一区二区在线观看| 亚洲美女一区| 免费成人进口网站| 免费成人av| 国产精品久久久久久久久久直播 | 国产精品果冻传媒| 免费成人在线视频观看| 欧美久久久久久久久久久久久| 欧美国产美女| 欧美日本亚洲| 欧美xxxx在线| 高清不卡一区二区三区| 四虎精品在线观看| 国产成人鲁鲁免费视频a| 99久久精品免费看国产小宝寻花| 久久精品视频播放| av在线之家电影网站| 亚洲美女又黄又爽在线观看| 亚洲第一天堂在线观看| 91精品综合久久久久久| 正在播放亚洲精品| 色欧美片视频在线观看在线视频| 久草视频在线资源站| 亚洲人成网站精品片在线观看| 免费观看a级片| 久久久久久亚洲综合| 亚洲观看黄色网| aaa亚洲精品| 国产精品久久久久久在线观看| 国产美女久久久久| 午夜视频在线观| 麻豆免费看一区二区三区| 美女黄色片视频| 三级亚洲高清视频| 免费av网址在线| 久久精品国语| 情侣黄网站免费看| 欧美亚洲在线| www.日本xxxx| 美日韩一级片在线观看| 九色porny自拍| 美国三级日本三级久久99| 色综合手机在线| 美女一区二区三区| 欧美成人乱码一二三四区免费| 蜜桃av一区二区三区电影| www.这里只有精品| 激情文学综合插| 欧美性受xxxx黒人xyx性爽| 极品少妇xxxx偷拍精品少妇| 成人综合久久网| 国产一区二区三区观看| 免费人成视频在线播放| 国产69精品久久久久毛片| 中文字幕人妻一区| 26uuu久久天堂性欧美| 在线国产视频一区| 国产精品免费aⅴ片在线观看| 免费黄色激情视频| 一区二区三区四区不卡视频| 久久精品女人毛片国产| 欧美日韩一区二区免费在线观看| 亚洲欧美偷拍一区| 欧洲另类一二三四区| 国产精品乱码久久久| 日韩欧美国产电影| 日韩精品视频在线观看一区二区三区| 亚洲午夜色婷婷在线| 婷婷成人激情| 欧美交受高潮1| 丝袜美腿诱惑一区二区三区| 91精品国产综合久久久久久久久| 日韩三级精品| 久久视频在线观看中文字幕| 成人激情开心网| 国产精品无码免费专区午夜| 国产精品久久久亚洲一区| 亚洲一区二区蜜桃| 国产成人精品亚洲午夜麻豆| av在线网站观看| 日韩美女视频一区二区| 91美女免费看| 91精品在线免费| 午夜av免费在线观看| 最新的欧美黄色| 免费一二一二在线视频| 成人伊人精品色xxxx视频| 欧美顶级毛片在线播放| 一区二区三区欧美在线| 夜夜嗨网站十八久久| 最新国产黄色网址| 91丨九色丨黑人外教| 免费三级在线观看| 色综合久久天天综合网| 国产黄色美女视频| 一区二区福利视频| 理论片午夜视频在线观看| 成人免费观看a| 欧美国产不卡| 在线观看亚洲视频啊啊啊啊| 西西人体一区二区| 亚洲精品成人无码毛片| 国产精品天美传媒沈樵| 激情五月色婷婷| 欧美一区二区在线观看| 成人激情电影在线看| 欧美在线观看日本一区| 中文字幕久久精品一区二区| 亚洲午夜在线观看| 久久精品女人| 尤物网站在线观看| 一区二区激情小说| 一级黄色片在线观看| 亚洲视频在线观看视频| 成人国产电影在线观看| 91超碰在线电影| 99久久精品费精品国产| 538在线视频观看| 26uuu成人网一区二区三区| 久久久精品人妻一区二区三区四| 欧美日韩国产色站一区二区三区| 欧美日韩在线中文字幕| 91av视频在线| 黑色丝袜福利片av久久| 999一区二区三区| 国产成人综合精品三级| 日韩在线观看免| 欧美日韩精品福利| 秋霞成人影院| 成人精品aaaa网站| 99久久99视频只有精品| 激情在线观看视频| 综合精品久久久| 国产99视频在线| 欧美高清视频免费观看| 天堂av一区| 男人插女人视频在线观看| 成人涩涩免费视频| 亚洲国产精品午夜在线观看| 日韩精品专区在线影院重磅| av网址在线看| 99视频日韩| 在线视频观看日韩| 在线免费观看a级片| 色综合视频在线观看| 欧美zzoo| 国产精品久久久久久亚洲调教| 青青草91久久久久久久久| 亚洲不卡视频在线| 综合激情成人伊人| 性一交一乱一乱一视频| 97在线免费视频| 女优一区二区三区| 免费看污黄网站| 日韩美女视频一区二区| 后进极品白嫩翘臀在线视频| 97久久精品人人澡人人爽缅北| 亚洲欧洲av| 免费看污污网站| 椎名由奈av一区二区三区| 精品久久人妻av中文字幕| 久久久久久久国产精品视频| 天堂99x99es久久精品免费| 无码少妇一区二区三区芒果| 中文字幕在线不卡| 亚洲国产欧美另类| 国产成人黄色av| 亚洲国产老妈| 欲求不满的岳中文字幕| 色婷婷综合视频在线观看| √新版天堂资源在线资源| 国产69精品久久久久9999apgf| 亚洲欧美bt| 久久福利免费视频| 亚洲国产欧美一区二区三区久久| 亚洲精品国产精品国产| 在线观看国产一区| 成人天堂资源www在线| 亚洲 小说区 图片区| 欧美高清第一页| 精品久久精品| 午夜性福利视频| 欧美中文字幕亚洲一区二区va在线| av电影高清在线观看| 蜜桃成人在线| 国产精品夜夜爽|