精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Disruptor廣播模式與執行順序鏈源碼分析

開發 前端
消費者線程起來后,然后進入死循環,持續不斷從生產者處批量獲取可用的序號,如果獲取到可用序號后,那么遍歷所有可用序號,然后調用eventHandler的onEvent方法消費數據,onEvent方法寫的是消費者的業務邏輯。

1.前言

本篇文章開始Disruptor的源碼分析,理解起來相對比較困難,特別是Disruptor的sequenceBarrier的理解,sequenceBarrier包括生產者與消費者之間的gatingSequence以及消費者與消費者之間的dependentSequence。此外,Disruptor源碼中的sequence變量也比較多,需要捋清楚各種sequence的含義。最后,建議小伙伴們動手調試理解,效果會更好。

2.Disruptor六邊形DEMO

分析源碼前,先來看看Disruptor六邊形執行器鏈的DEMO。

public class LongEventMain
{
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) throws Exception
{
// 1,構建disruptor
final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
new LongEventFactory(),
BUFFER_SIZE,
Executors.newFixedThreadPool(5), // 【注意點】線程池需要保證足夠的線程:有多少個消費者就要有多少個線程,否則有些消費者將不會執行,生產者可能也會一直阻塞下去
ProducerType.SINGLE,
new YieldingWaitStrategy()
);

EventHandler eventHandler1 = new LongEventHandler1();
EventHandler eventHandler2 = new LongEventHandler2();
EventHandler eventHandler3 = new LongEventHandler3();
EventHandler eventHandler4 = new LongEventHandler4();
EventHandler eventHandler5 = new LongEventHandler5();

// 方式1 構建串行執行順序:
/*disruptor
.handleEventsWith(eventHandler1)
.handleEventsWith(eventHandler2)
.handleEventsWith(eventHandler3)
.handleEventsWith(eventHandler4)
.handleEventsWith(eventHandler5);*/

// 方式2 構建并行執行順序
/*disruptor
.handleEventsWith(eventHandler1, eventHandler2, eventHandler3, eventHandler4, eventHandler5);*/

// 方式3 構建菱形執行順序
/*disruptor.handleEventsWith(eventHandler1, eventHandler2)
.handleEventsWith(eventHandler3);*/

// 2,構建eventHandler執行鏈
// 方式4 構建六邊形執行順序
disruptor.handleEventsWith(eventHandler1, eventHandler3);
disruptor.after(eventHandler1).handleEventsWith(eventHandler2);
disruptor.after(eventHandler3).handleEventsWith(eventHandler4);
disruptor.after(eventHandler2, eventHandler4).handleEventsWith(eventHandler5);

// 3, 啟動disruptor即啟動線程池線程執行BatchEventProcessor任務
disruptor.start();

// 4,生產者往ringBuffer生產數據并喚醒所有的消費者消費數據
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
bb.putLong(0, 666);
ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);
}

static class LongEventTranslatorOneArg implements EventTranslatorOneArg<LongEvent, ByteBuffer> {
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
}

static class LongEvent
{
private long value;

public void set(long value)
{
this.value = value;
}

public long get() {
return this.value;
}
}

static class LongEventFactory implements EventFactory<LongEvent>
{
@Override
public LongEvent newInstance()
{
return new LongEvent();
}
}

static class LongEventHandler1 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler1-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler2 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler2-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler3 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler3-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler4 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler4-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler5 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler5-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}
}

3.初始化Disruptor實例

先來看下前面DEMO中的初始化Disruptor實例代碼:

// 1,構建disruptor
final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
new LongEventFactory(),
BUFFER_SIZE,
Executors.newFixedThreadPool(5), // 線程池需要保證足夠的線程
ProducerType.SINGLE,
new YieldingWaitStrategy()
);

這句代碼最終是給Disruptor的ringBuffer和executor屬性賦值:

// Disruptor.java
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
// 創建RingBuffer實例
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
executor);
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}

那么RingBuffer實例又是如何創建的呢?我們來看下RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy)這句源碼:

// RingBuffer.java
public static <E> RingBuffer<E> create(
final ProducerType producerType,
final EventFactory<E> factory,
final int bufferSize,
final WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}

首先會根據producerType來創建不同的Producer,以創建SingleProducerSequencer實例為例進去源碼看下:

// RingBuffer.java
public static <E> RingBuffer<E> createSingleProducer(
final EventFactory<E> factory,
final int bufferSize,
final WaitStrategy waitStrategy)
{
// 1,創建SingleProducerSequencer實例
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
// 2,創建RingBuffer實例
return new RingBuffer<>(factory, sequencer);
}

3.1 創建SingleProducerSequencer實例

首先創建了SingleProducerSequencer實例,給SingleProducerSequencer實例的bufferSize和waitStrategy賦初值;

// AbstractSequencer.java
// SingleProducerSequencer父類
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}

此外,創建SingleProducerSequencer實例時還初始化了一個成員變量cursor:

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

即給cursor賦值了一個Sequence實例對象,Sequence是標識RingBuffer環形數組的下標,同時生產者和消費者也會同時維護各自的Sequence。最重要的是,**Sequence通過填充CPU緩存行避免了偽共享帶來的性能損耗**,來看下其填充緩存行源碼:

// Sequence.java
class LhsPadding
{
// 左填充
protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
// Sequence值
protected volatile long value;
}

class RhsPadding extends Value
{
// 右填充
protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding
{
// ...
}

3.2 創建RingBuffer實例

然后核心是創建RingBuffer實例,看看最終創建RingBuffer實例源碼:

// RingBuffer.java
RingBufferFields( // RingBufferFields為RingBuffer父類
final EventFactory<E> eventFactory,
final Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();

if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}

this.indexMask = bufferSize - 1;
// 【重要特性】內存預加載,內存池機制
this.entries = (E[]) new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}

實例作為構造參數傳入給了RingBuffer實例的sequencer屬性賦初值,然后最重要的是在創建RingBuffer實例時,會為RingBuffer的環形數組提前填充Event對象,即內存池機制:

private void fill(final EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}

內存池機制好處:

  • 提前創建好復用的對象,減少程序運行時因為創建對象而浪費性能,其實也是一種空間換時間的思想;
  • 因為環形數組對象可復用,從而避免GC來提高性能。

4.構建執行順序鏈

// 2,構建eventHandler執行鏈:構建六邊形執行順序
disruptor.handleEventsWith(eventHandler1, eventHandler3);
disruptor.after(eventHandler1).handleEventsWith(eventHandler2);
disruptor.after(eventHandler3).handleEventsWith(eventHandler4);
disruptor.after(eventHandler2, eventHandler4).handleEventsWith(eventHandler5);

再來看看Disruptor構建執行順序鏈相關源碼:

先來看看disruptor.handleEventsWith(eventHandler1, eventHandler3);源碼:

// Disruptor.java
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 根據eventHandlers長度來創建多少個消費者Sequence實例,注意這個processorSequences是傳遞到EventHandlerGroup用于構建執行順序鏈用的,
// 比如有執行順序鏈:A->B,那么A的sequenct即processorSequences會作為B節點的barrierSequences即dependencySequence
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
// 新建了一個ProcessingSequenceBarrier實例返回
// ProcessingSequenceBarrier實例作用:序號屏障,通過追蹤生產者的cursorSequence和每個消費者( EventProcessor)
// 的sequence的方式來協調生產者和消費者之間的數據交換進度
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);// 如果構建執行順序鏈比如A->B,那么barrierSequences是A消費者的sequence;如果是A,C->B,那么barrierSequences是A和C消費者的sequence

for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 有多少個eventHandlers就創建多少個BatchEventProcessor實例(消費者),
// 但需要注意的是同一批次的每個BatchEventProcessor實例共用同一個SequenceBarrier實例
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 將batchEventProcessor, eventHandler, barrier封裝成EventProcessorInfo實例并加入到ConsumerRepository相關集合
// ConsumerRepository作用:提供存儲機制關聯EventHandlers和EventProcessors
consumerRepository.add(batchEventProcessor, eventHandler, barrier); // // 如果構建執行順序鏈比如A->B,那么B消費者也一樣會加入consumerRepository的相關集合
// 獲取到每個消費的消費sequece并賦值給processorSequences數組
// 即processorSequences[i]引用了BatchEventProcessor的sequence實例,
// 但processorSequences[i]又是構建生產者gatingSequence和消費者執行器鏈dependentSequence的來源
processorSequences[i] = batchEventProcessor.getSequence();
}
// 總是拿執行器鏈最后一個消費者的sequence作為生產者的gateingSequence
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
// 最終返回封裝了Disruptor、ConsumerRepository和消費者sequence數組processorSequences的EventHandlerGroup對象實例返回
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

構建Disruptor執行順序鏈的核心邏輯就在這段源碼中,我們縷一縷核心邏輯:

  • 有多少個eventHandlers就創建多少個BatchEventProcessor實例(消費者),BatchEventProcessor消費者其實就是一個實現Runnable接口的線程實例;
  • 每個BatchEventProcessor實例(消費者)擁有前一個消費者的sequence作為其sequenceBarrier即dependentSequence;
  • 當前消費者的sequence通過EventHandlerGroup這個載體來傳遞給下一個消費者作為其sequenceBarrier即dependentSequence。

再來看看diruptor.after(eventHandler1)源碼:

// Disruptor.java
public final EventHandlerGroup<T> after(final EventHandler<T>... handlers)
{
// 獲取指定的EventHandler的消費者sequence并賦值給sequences數組,
// 然后重新新建一個EventHandlerGroup實例返回(封裝了前面的指定的消費者sequence被賦值
// 給了EventHandlerGroup的成員變量數組sequences,用于后面指定執行順序用)
final Sequence[] sequences = new Sequence[handlers.length];
for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
{
sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
}

return new EventHandlerGroup<>(this, consumerRepository, sequences);
}

這段源碼做的事情也是將當前消費者sequence封裝進EventHandlerGroup,從而可以通過這個載體來傳遞給下一個消費者作為其sequenceBarrier即dependentSequence。

最終構建的最終sequence依賴關系如下圖,看到這個圖不禁讓我想起AQS的線程等待鏈即CLH鎖的變相實現,附上文章鏈接,有興趣的讀者可以比對理解。

5.啟動Disruptor實例

// 3, 啟動disruptor即啟動線程池線程執行BatchEventProcessor任務
disruptor.start();

我們再來看看 disruptor.start()這句源碼:

// Disruptor.java
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
// 遍歷每一個BatchEventProcessor消費者(線程)實例,并把該消費者線程實例跑起來
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}

return ringBuffer;
}

其實這里做的事情無非就是遍歷每個消費者線程實例,然后啟動每個消費者線程實例BatchEventProcessor,其中BatchEventProcessor被封裝進ConsumerInfo實例。還沒生產數據就啟動消費線程的話,此時消費者會根據阻塞策略WaitStrategy進行阻塞。

6.生產消費數據

6.1 生產者生產數據

// 4,生產者往ringBuffer生產數據并喚醒所有的消費者消費數據
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
bb.putLong(0, 666);
ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);

生產者生產數據的源碼在ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);中。

// RingBuffer.java
public <A> void publishEvent(final EventTranslatorOneArg<E, A> translator, final A arg0)
{
// 【1】獲取下一個RingBuffer中需填充數據的event對象的序號,對應生產者
final long sequence = sequencer.next();
// 【2】轉換數據格式并生產數據并喚醒消費者
translateAndPublish(translator, sequence, arg0);
}

6.1.1 生產者獲取RingBuffer的sequence

先來看下單生產者獲取sequence的源碼:

// SingleProducerSequencer.java
public long next(final int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// 總是拿到生產者已生產的當前序號
long nextValue = this.nextValue;
// 獲取要生產的下n個序號
long nextSequence = nextValue + n;
// 生產者總是先有bufferSize個坑可以填,所以nextSequence - bufferSize
long wrapPoint = nextSequence - bufferSize;
// 拿到上一次的GatingSequence,因為是緩存,這里不是最新的
long cachedGatingSequence = this.cachedValue;
// 如果生產者生產超過了消費者消費速度,那么這里自旋等待,這里的生產者生產的下標wrapPoint是已經繞了RingBuffer一圈的了哈
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence

long minSequence;
// 自旋等待,其中gatingSequences是前面構建執行順序鏈時的最后一個消費者的sequence
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}

this.cachedValue = minSequence;
}
// 將獲取的nextSequence賦值給生產者當前值nextValue
this.nextValue = nextSequence;

return nextSequence;
}

這段源碼相對較難,我們縷一縷:

  • 生產者把第一圈RingBuffer的坑填完后,此時生產者進入RingBuffer第2圈,如果消費者消費速度過慢,此時生產者很可能會追上消費者,如果追上消費者那么就讓生產者自旋等待;
  • 第1點的如果消費者消費速度過慢,對于構建了一個過濾器鏈的消費者中,那么指的是哪個消費者呢?指的就是執行器鏈最后執行的那個消費者gatingSequences就是執行器鏈最后執行的那個消費者的sequence;這個gatingSequences其實就是防止生產者追趕消費者的sequenceBarrier;

  • 生產者總是先把第一圈RingBuffer填滿后,才會考慮追趕消費者的問題,因此才有wrapPoint > cachedGatingSequence的評判條件。

前面是單生產者獲取sequence的源碼,對于多生產者MultiProducerSequencer的源碼邏輯也是類似,只不過將生產者當前值cursor和cachedGatingSequence用了CAS操作而已,防止多線程問題。

6.1.2 生產者生產數據并喚醒消費者

再來看看 translateAndPublish(translator, sequence, arg0)源碼:

// RingBuffer.java
private <A> void translateAndPublish(final EventTranslatorOneArg<E, A> translator, final long sequence, final A arg0)
{
try
{
// 【1】將相應數據arg0轉換為相應的Eevent數據,其中get(sequence)會從RingBuffer數組對象池中取出一個對象,而非新建
translator.translateTo(get(sequence), sequence, arg0);
}
finally
{
// 【2】發布該序號說明已經生產完畢供消費者使用
sequencer.publish(sequence);
}
}



// SingleProducerSequencer.java
public void publish(final long sequence)
{
// 【1】給生產者cursor游標賦值新的sequence,說明該sequenc對應的對象數據已經填充(生產)完畢
cursor.set(sequence);// 這個cursor即生產者生產時移動的游標,是AbstractSequencer的成員變量
// 【2】根據阻塞策略將所有消費者喚醒
// 注意:這個waitStrategy實例是所有消費者和生產者共同引用的
waitStrategy.signalAllWhenBlocking();
}

生產者生產數據并喚醒消費者的注釋已經寫得很清楚了,這里需要注意的點:

  • cursor才是生產者生產數據的當前下標,消費者消費速度有無追趕上生產者就是拿消費者的消費sequence跟生產者的cursor比較的,因此生產者生產數據完成后需要給cursor賦值;
  • waitStrategy策略對象時跟消費者共用的,這樣才能線程間實現阻塞喚醒邏輯。

6.2 消費者消費數據

前面第4節啟動Disruptor實例中講到,其實就是開啟各個消費者實例BatchEventProcessor線程,我們看看其run方法中的核心邏輯即processEvents源碼:

// BatchEventProcessor.java
private void processEvents()
{
T event = null;
// nextSequence:消費者要消費的下一個序號
long nextSequence = sequence.get() + 1L; // 【重要】每一個消費者都是從0開始消費,各個消費者維護各自的sequence
// 消費者線程一直在while循環中不斷獲取生產者數據
while (true)
{
try
{
// 拿到當前生產者的生產序號
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 如果消費者要消費的下一個序號小于生產者的當前生產序號,那么消費者則進行消費
// 這里有一個亮點:就是消費者會一直循環消費直至到達當前生產者生產的序號
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 消費完后設置當前消費者的消費進度,這點很重要
// 【1】如果當前消費者是執行鏈的最后一個消費者,那么其sequence則是生產者的gatingSequence,因為生產者就是拿要生產的下一個sequence跟gatingSequence做比較的哈
// 【2】如果當前消費者不是執行器鏈的最后一個消費者,那么其sequence作為后面消費者的dependentSequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}

消費者線程起來后,然后進入死循環,持續不斷從生產者處批量獲取可用的序號,如果獲取到可用序號后,那么遍歷所有可用序號,然后調用eventHandler的onEvent方法消費數據,onEvent方法寫的是消費者的業務邏輯。消費完后再設置當前消費者的消費進度,這點很重要,用于構建sequenceBarrier包括gatingSequence和dependentSequence。

下面再來看看消費者是怎么獲取可用的序號的,繼續看sequenceBarrier.waitFor(nextSequence)源碼:

// ProcessingSequenceBarrier.java

public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// availableSequence:獲取生產者生產后可用的序號
// sequence:消費者要消費的下一個序號
// cursorSequence:生產者生產數據時的當前序號
// dependentSequence:第一個消費者即前面不依賴任何消費者的消費者,dependentSequence就是生產者游標;
// 有依賴其他消費者的消費者,dependentSequence就是依賴的消費者的sequence
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

if (availableSequence < sequence)
{
return availableSequence;
}
// 這個主要是針對多生產者的情形
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

可以看到ProcessingSequenceBarrier封裝了WaitStrategy等待策略實例,此時消費者獲取下一批可用序號的邏輯又封裝在了WaitStrategy的waitFor方法中,以BlockingWaitStrategy為例來其實現邏輯:

// BlockingWaitStrategy.java

public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
// cursorSequence:生產者的序號
// 第一重條件判斷:如果消費者消費速度大于生產者生產速度(即消費者要消費的下一個數據已經大于生產者生產的數據時),那么消費者等待一下
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 第一重條件判斷:自旋等待
// 即當前消費者線程要消費的下一個sequence大于其前面執行鏈路(若有依賴關系)的任何一個消費者最小sequence(dependentSequence.get()),那么這個消費者要自旋等待,
// 直到前面執行鏈路(若有依賴關系)的任何一個消費者最小sequence(dependentSequence.get())已經大于等于當前消費者的sequence時,說明前面執行鏈路的消費者已經消費完了
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}

return availableSequence;
}

可以看到,消費者獲取下一批可用消費序號時,此時要經過兩重判斷:

  • 第一重判斷:消費者消費的序號不能超過當前生產者消費當前生產的序號,否則消費者就阻塞等待;當然,這里因為是BlockingWaitStrategy等待策略的實現,如果是其他策略,比如BusySpinWaitStrategy和YieldingWaitStrategy的話,這里消費者是不會阻塞等待的,而是自旋,因此這也是其無鎖化的實現了,但就是很耗CPU而已;
  • 第二重判斷:消費者消費的序號不能超過其前面依賴的消費消費的序號,否則其自旋等待。因為這里是消費者等消費者,按理說前面消費者應該會很快處理完,所以不用阻塞等待;但是消費者等待生產者的話,如果生產者沒生產數據的話,消費者還是自旋等待的話會比較浪費CPU,所以對于BlockingWaitStrategy策略,是阻塞等待了。

7.WaitStrategy等待策略

最后,再來看下WaitStrategy有哪些實現類:

可以看到消費者的WaitStrategy等待策略有8種實現類,可以分為有鎖和無鎖兩大類,然后每一種都有其適用的場合,沒有最好的WaitStrategy等待策略,只有適合自己應用場景的等待策略。因為其源碼不是很難,這里逐一分析。

責任編輯:武曉燕 來源: 源碼筆記
相關推薦

2021-09-08 10:47:33

Flink執行流程

2021-11-26 17:17:43

Android廣播運行原理源碼分析

2022-05-10 08:47:00

JMeter作用域執行順序

2010-04-16 09:27:18

Ocacle執行計劃

2022-08-27 08:02:09

SQL函數語法

2021-04-15 09:17:01

SpringBootRocketMQ

2021-09-13 15:40:37

區塊鏈教育技術

2010-09-01 09:03:56

CSS優先權

2010-08-04 13:33:52

路由器配置

2009-06-16 10:51:14

Java源碼

2016-10-21 13:03:18

androidhandlerlooper

2019-12-10 09:54:20

高德APP架構全鏈路

2016-11-25 13:26:50

Flume架構源碼

2016-11-29 09:38:06

Flume架構核心組件

2021-12-24 07:50:45

責任鏈模式設計

2021-05-07 13:42:58

區塊鏈互聯網技術

2016-11-25 13:14:50

Flume架構源碼

2023-09-04 08:00:53

提交事務消息

2009-07-03 16:33:13

Tapestry函數執

2022-01-21 08:50:15

Promise任務隊列前端
點贊
收藏

51CTO技術棧公眾號

色综合久久六月婷婷中文字幕| 国产成人av电影| 一区二区三区精品99久久| 欧美婷婷精品激情| av免费在线免费观看| 99在线热播精品免费| 国产精品色午夜在线观看| 黄色一级片在线免费观看| 亚洲老女人视频免费| 欧美精品一级二级三级| 久久精品国产sm调教网站演员| 久草福利在线视频| 国产精品综合视频| 人九九综合九九宗合| 日本在线一级片| 猛男gaygay欧美视频| 91麻豆精品国产91久久久久久久久 | 妺妺窝人体色www看人体| 亚欧洲精品视频| 国产一区二区三区蝌蚪| 日本一欧美一欧美一亚洲视频| 日本在线一级片| 精品国产一区二区三区小蝌蚪 | 日韩欧美一区二区三区久久| 性做爰过程免费播放| 女人天堂在线| 成人美女视频在线看| 国产日韩欧美91| 在线视频一区二区三区四区| 激情综合电影网| 日韩有码在线视频| 一区二区伦理片| 欧美巨大xxxx| 日韩亚洲电影在线| 亚洲欧美国产中文| 国产精品伦理| 亚洲伊人色欲综合网| 欧美一级黄色录像片| 五月婷婷在线观看| 中文字幕第一区二区| 欧美精品一区二区三区在线四季 | 亚洲精品小视频| 性色av浪潮av| 日韩免费大片| 欧美性欧美巨大黑白大战| 国产aaa一级片| 国产极品在线观看| 亚洲综合色丁香婷婷六月图片| 一区二区三区av| 亚洲欧美视频一区二区| 国产女主播视频一区二区| 免费看污久久久| 免费福利在线视频| 久久综合九色综合97婷婷女人| 成人欧美一区二区| 成人h动漫精品一区二区无码| 精品无人区卡一卡二卡三乱码免费卡| 国产美女久久精品香蕉69| 无码人妻精品一区二区| 久久久精品网| 国产精品美女av| 在线观看中文字幕码| 久久精品99国产精品日本| 国产日韩欧美中文在线播放| 中文天堂在线视频| 九色porny丨国产精品| 91精品综合视频| 国产黄a三级三级三级| 国产大片一区二区| 国产乱码精品一区二区三区不卡| 天堂中文网在线| 91视频免费看| 免费亚洲精品视频| 成a人片在线观看www视频| 国产精品网站在线播放| 久久久无码中文字幕久...| 超碰在线97国产| 欧美性生交xxxxxdddd| 午夜免费一区二区| 日韩精品亚洲专区在线观看| 精品国产区一区| a级在线观看视频| 美女毛片一区二区三区四区| 久久精品国产亚洲| 久久久久99精品成人片毛片| 国产精品女主播一区二区三区| 日韩美女在线观看| 国产精品乱码一区二区| 成人激情免费电影网址| 日本一区二区三区四区在线观看| 浪潮av一区| 五月婷婷色综合| 国产精品入口免费软件| 天堂va在线高清一区| 亚洲精品天天看| 女人18毛片毛片毛片毛片区二 | 91精品国产91久久久| 蜜臀尤物一区二区三区直播| 国产在线播放一区二区三区| 久久国产精品 国产精品| 日本高清中文字幕在线| 亚洲高清免费一级二级三级| 日韩视频免费在线播放| 九九99久久精品在免费线bt| 亚洲精品电影网在线观看| 国产馆在线观看| 一区二区国产精品| 成人夜晚看av| 国产原创av在线| 亚洲观看高清完整版在线观看| 日本在线观看免费视频| 老牛影视av一区二区在线观看| 丝袜美腿精品国产二区 | 91国在线观看| 国产精品欧美性爱| 不卡在线一区| 97在线免费视频| 国产视频一区二区三| 国产网站一区二区三区| 免费看欧美黑人毛片| 欧美在线se| 亚洲欧洲第一视频| 日本一区二区免费在线观看| 韩国v欧美v日本v亚洲v| 亚洲成人午夜在线| 免费成人直播| 亚洲精品国产精品乱码不99按摩| 可以直接看的黄色网址| 久久精品二区亚洲w码| 欧美一二三区| 在线天堂新版最新版在线8| 欧美成人猛片aaaaaaa| 神马午夜精品91| 捆绑调教美女网站视频一区| 欧美日韩喷水| 欧美gv在线| 亚洲成人xxx| 久久久久久久国产精品毛片| 国产中文字幕精品| 一区二区三区久久网| 99久久婷婷国产综合精品首页| 亚洲天堂影视av| 麻豆成人免费视频| 久久久久九九视频| 男女av免费观看| 婷婷综合电影| 欧美孕妇与黑人孕交| 五月天婷婷在线观看| 亚洲风情在线资源站| 极品白嫩少妇无套内谢| 欧美91视频| 99在线观看| av老司机免费在线| 欧美精品一区二区三区高清aⅴ| 免费看一级一片| 成人免费福利片| 国产精品一区二区免费在线观看| 成人动态视频| 国产91|九色| 精华区一区二区三区| 在线精品亚洲一区二区不卡| 一区二区黄色片| 日本最新不卡在线| 在线观看免费91| 精品麻豆剧传媒av国产九九九| 九九久久国产精品| 欧美天堂在线视频| 好吊成人免视频| 成年人看的免费视频| 国产乱码精品1区2区3区| 日本a级片在线观看| 精品成人自拍视频| 国产成人+综合亚洲+天堂| www在线播放| 91精品国产综合久久久蜜臀图片| 免费在线看黄网址| 337p粉嫩大胆噜噜噜噜噜91av| 男女曰b免费视频| 国产精品99久久精品| av一区二区三区在线观看| 国产中文在线播放| 在线成人激情视频| www.色婷婷.com| 色一情一乱一乱一91av| 午夜精品久久久久99蜜桃最新版| 国产精品1024久久| www国产黄色| 亚洲va在线| 精品蜜桃传媒| 91精品国产一区二区在线观看| 国模精品视频一区二区| 风间由美一区| 精品国内片67194| 夜夜躁日日躁狠狠久久av| 亚洲日本一区二区三区| 手机在线看片日韩| 精品一区二区三区免费观看| 日本十八禁视频无遮挡| 欧美激情国产在线| 精品视频在线观看| 91国产精品| 国产91免费观看| 日本资源在线| 在线视频一区二区| 日韩一区二区三区不卡| 欧美剧在线免费观看网站| 国产精品视频123| 亚洲码国产岛国毛片在线| 无码国产69精品久久久久同性| 国产精品一区二区在线看| 欧美黄色一级片视频| 欧美区亚洲区| 亚洲午夜精品久久| 亚瑟一区二区三区四区| 亚洲自拍在线观看| 性欧美videohd高精| 欧美精品18videosex性欧美| 免费在线观看黄色| 亚洲色图欧美制服丝袜另类第一页| av资源免费看| 欧美美女直播网站| 中文字幕在线观看视频免费| 亚洲国产另类av| 欧美激情图片小说| 国产精品美女视频| 亚洲精品国产精品国自产网站| 99国产欧美久久久精品| www.欧美com| 国产一区不卡视频| 香港日本韩国三级网站| 久久精品毛片| 欧美精品99久久| 亚洲激情成人| 91丨porny丨探花| 激情五月***国产精品| 久操手机在线视频| 亚洲成人最新网站| 中文字幕不卡每日更新1区2区| 精品视频网站| 天天好比中文综合网| 国精一区二区| 欧美视频小说| 九九热精品视频在线观看| 免费国产一区| 精品视频国产| 亚洲图片都市激情| 日韩大片在线| 中文字幕一区二区三区在线乱码| 第一会所亚洲原创| 日本在线播放一区| 欧美精品一区二区久久| 色999五月色| 成人羞羞网站| 综合网五月天| 欧美成人一品| 一本久道高清无码视频| 亚洲二区精品| 日韩少妇内射免费播放18禁裸乳| 在线亚洲伦理| 国产第一页视频| 奇米亚洲午夜久久精品| 亚洲免费黄色录像| 国产激情精品久久久第一区二区 | 精品人妻av一区二区三区| 欧美一区二区三区免费观看视频 | xvideos亚洲人网站| 麻豆视频在线观看免费| 精品自在线视频| 国产精品蜜芽在线观看| 欧美一区第一页| 亚洲综合av一区二区三区| 成人在线中文字幕| 天堂av一区| 欧美三日本三级少妇三99| 欧美顶级大胆免费视频| 国产免费一区二区三区四在线播放| 黄色成人精品网站| 日本黄色三级大片| 久久精品av麻豆的观看方式| 久久久久中文字幕亚洲精品| 91热门视频在线观看| 后入内射无码人妻一区| 一区二区成人在线| 91丝袜一区二区三区| 欧美日韩精品一区二区三区四区| 亚洲av无码国产精品永久一区 | 一区二区不卡免费视频| 国产欧美va欧美不卡在线| 唐朝av高清盛宴| 日韩欧美成人网| 国产手机视频在线| 日韩极品精品视频免费观看| av网在线观看| 欧美激情一区二区三区久久久 | 亚洲美女黄色| 一区二区三区 欧美| 成熟亚洲日本毛茸茸凸凹| 欧美丰满美乳xxⅹ高潮www| 亚洲乱码国产乱码精品精的特点 | 欧美精品vⅰdeose4hd| 色呦呦视频在线| 久久精品国产欧美激情| 国模套图日韩精品一区二区| 亚洲影院高清在线| 狠狠色丁香婷婷综合影院| 成人免费a级片| 久久av老司机精品网站导航| 超碰caoprom| 亚洲柠檬福利资源导航| 亚洲视屏在线观看| 亚洲国产成人91精品| 米奇777四色精品人人爽| 5278欧美一区二区三区| 91精品尤物| 亚洲一区二三| 久久九九99| 风间由美一二三区av片| 亚洲欧洲综合另类在线| 性高潮视频在线观看| 日韩精品久久久久久久玫瑰园| 另类视频在线| 亚洲xxx自由成熟| 四季av一区二区三区免费观看| 国产免费成人在线| 99久久精品久久久久久清纯| 久久国产在线视频| 67194成人在线观看| 在线观看完整版免费| 26uuu久久噜噜噜噜| 狼人精品一区二区三区在线| 97中文字幕在线| 高清在线成人网| 免费一级全黄少妇性色生活片| 91麻豆精品国产无毒不卡在线观看| 在线播放麻豆| 国产精品欧美一区二区| 精品国产美女| 免费观看成人网| 久久精品人人做人人爽人人| 国产成人无码精品亚洲| 精品91自产拍在线观看一区| 污污网站在线看| 成人精品水蜜桃| 欧美日韩岛国| 中文字幕在线观看91| 亚洲宅男天堂在线观看无病毒| 亚洲黄色在线免费观看| 欧美精品福利在线| 懂色av一区二区| 久久视频这里有精品| 99久久99久久免费精品蜜臀| 精品美女久久久久| 亚洲另类欧美自拍| 日韩毛片免费观看| 日本一区二区三区www| 日韩 欧美一区二区三区| 奇米网一区二区| 欧美一区二区在线播放| av免费在线观| 国产一区二区精品免费| 一区二区动漫| 精品无码国产污污污免费网站| 91久久精品国产91性色tv| 北岛玲日韩精品一区二区三区| 国产精品视频一区二区三区四| 日韩成人三级| 亚洲区 欧美区| 精品久久久久久久久久ntr影视| 日本高清中文字幕二区在线| 国产精品在线看| 午夜精品电影| 好吊一区二区三区视频| 在线观看网站黄不卡| 黄色成人在线| 国精产品一区二区| 老司机免费视频久久| 国产在线免费看| 欧美tk—视频vk| 亚洲校园激情春色| 亚洲午夜精品久久久中文影院av| 国产精品一级在线| www成人在线| 最新的欧美黄色| 国产精品18hdxxxⅹ在线| 激情五月亚洲色图| 一区二区三区成人| 久草视频视频在线播放| 亚洲free性xxxx护士白浆| 亚洲精选一区| 久久久久99精品成人| 亚洲成人a级网| 偷拍自拍亚洲| 男人用嘴添女人下身免费视频| 国产精品乱子久久久久| 高h震动喷水双性1v1| 国产一区二区视频在线观看| 亚洲国产99| 国产精品99久久久久久成人| 亚洲免费福利视频| 色悠久久久久综合先锋影音下载|