Disruptor—3.核心源码实现分析二
大纲
1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
2.Disruptor的消费者源码分析
Disruptor的消费者主要由BatchEventProcessor类和WorkProcessor类来实现,并通过Disruptor的handleEventsWith()方法或者handleEventsWithWorkerPool()方法和start()方法来启动。
执行Disruptor的handleEventsWith()方法绑定消费者时,会创建BatchEventProcessor对象,并将其添加到Disruptor的consumerRepository属性。
执行Disruptor的handleEventsWithWorkerPool()方法绑定消费者时,则会创建WorkProcessor对象,并将该对象添加到Disruptor的consumerRepository属性。
执行Disruptor的start()方法启动Disruptor实例时,便会通过线程池执行BatchEventProcessor里的run()方法,或者通过线程池执行WorkProcessor里的run()方法。
执行BatchEventProcessor的run()方法时,会通过修改BatchEventProcessor的sequence来实现消费RingBuffer的数据。
执行WorkProcessor的run()方法时,会通过修改WorkProcessor的sequence来实现消费RingBuffer的数据。
public class Main {public static void main(String[] args) {//参数准备OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//参数一:eventFactory,消息(Event)工厂对象//参数二:ringBufferSize,容器的长度//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler//参数四:ProducerType,单生产者还是多生产者//参数五:waitStrategy,等待策略//1.实例化Disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2.添加Event处理器,用于处理事件//也就是构建Disruptor与消费者的一个关联关系//方式一:使用handleEventsWith()方法disruptor.handleEventsWith(new OrderEventHandler());//方式二:使用handleEventsWithWorkerPool()方法//disruptor.handleEventsWithWorkerPool(workHandlers);//3.启动disruptordisruptor.start();//4.获取实际存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);//向容器中投递数据producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}
public class Disruptor<T> {private final RingBuffer<T> ringBuffer;private final Executor executor;private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();private final AtomicBoolean started = new AtomicBoolean(false);private ExceptionHandler<? super T> exceptionHandler;...//绑定消费者,设置EventHandler,创建EventProcessor//Set up event handlers to handle events from the ring buffer. //These handlers will process events as soon as they become available, in parallel.//This method can be used as the start of a chain. //For example if the handler A must process events before handler B: dw.handleEventsWith(A).then(B); //@param handlers the event handlers that will process events.//@return a EventHandlerGroup that can be used to chain dependencies.@SuppressWarnings("varargs")public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {return createEventProcessors(new Sequence[0], handlers);}//创建BatchEventProcessor,添加到consumerRepository中EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {checkNotStarted();final Sequence[] processorSequences = new Sequence[eventHandlers.length];final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {final EventHandler<? super T> eventHandler = eventHandlers[i];//创建BatchEventProcessor对象final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);if (exceptionHandler != null) {batchEventProcessor.setExceptionHandler(exceptionHandler);}//添加BatchEventProcessor对象到consumerRepository中consumerRepository.add(batchEventProcessor, eventHandler, barrier);//一个消费者线程对应一个batchEventProcessor//每个batchEventProcessor都会持有一个Sequence对象来表示当前消费者线程的消费进度processorSequences[i] = batchEventProcessor.getSequence();}//将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)updateGatingSequencesForNextInChain(barrierSequences, processorSequences);return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {if (processorSequences.length > 0) {ringBuffer.addGatingSequences(processorSequences);for (final Sequence barrierSequence : barrierSequences) {ringBuffer.removeGatingSequence(barrierSequence);}consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}}private void checkNotStarted() {//线程的开关会使用CAS实现if (started.get()) {throw new IllegalStateException("All event handlers must be added before calling starts.");}}...//Starts the event processors and returns the fully configured ring buffer.//The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.//This method must only be called once after all event processors have been added.//@return the configured ring buffer.public RingBuffer<T> start() {checkOnlyStartedOnce();for (final ConsumerInfo consumerInfo : consumerRepository) {//在执行Disruptor.handleEventsWith()方法,调用Disruptor.createEventProcessors()方法时,//会将新创建的BatchEventProcessor对象封装成EventProcessorInfo对象(即ConsumerInfo对象),//然后通过add()方法添加到consumerRepository中//所以下面会调用EventProcessorInfo.start()方法consumerInfo.start(executor);}return ringBuffer;}private void checkOnlyStartedOnce() {//线程的开关使用CAS实现if (!started.compareAndSet(false, true)) {throw new IllegalStateException("Disruptor.start() must only be called once.");}}...
}//Provides a repository mechanism to associate EventHandlers with EventProcessors
class ConsumerRepository<T> implements Iterable<ConsumerInfo> {private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();//添加BatchEventProcessor对象到consumerRepository中public void add(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) {//将传入的BatchEventProcessor对象封装成EventProcessorInfo对象,即ConsumerInfo对象final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);eventProcessorInfoByEventHandler.put(handler, consumerInfo);eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);consumerInfos.add(consumerInfo);}...
}class EventProcessorInfo<T> implements ConsumerInfo {private final EventProcessor eventprocessor;private final EventHandler<? super T> handler;private final SequenceBarrier barrier;private boolean endOfChain = true;EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) {this.eventprocessor = eventprocessor;this.handler = handler;this.barrier = barrier;}...@Overridepublic void start(final Executor executor) {//通过传入的线程池,执行BatchEventProcessor对象的run()方法//传入的线程池,其实就是初始化Disruptor时指定的线程池executor.execute(eventprocessor);}...
}//Convenience class for handling the batching semantics of consuming entries from
//a RingBuffer and delegating the available events to an EventHandler.
//If the EventHandler also implements LifecycleAware it will be notified just after
//the thread is started and just before the thread is shutdown.
//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
public final class BatchEventProcessor<T> implements EventProcessor {private final AtomicBoolean running = new AtomicBoolean(false);private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();private final DataProvider<T> dataProvider;private final SequenceBarrier sequenceBarrier;private final EventHandler<? super T> eventHandler;private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);private final TimeoutHandler timeoutHandler;//Construct a EventProcessor that will automatically track the progress by //updating its sequence when the EventHandler#onEvent(Object, long, boolean) method returns.//@param dataProvider to which events are published.//@param sequenceBarrier on which it is waiting.//@param eventHandler is the delegate to which events are dispatched.public BatchEventProcessor(final DataProvider<T> dataProvider, final SequenceBarrier sequenceBarrier, final EventHandler<? super T> eventHandler) {//传入的dataProvider其实就是Disruptor的ringBufferthis.dataProvider = dataProvider;this.sequenceBarrier = sequenceBarrier;this.eventHandler = eventHandler;if (eventHandler instanceof SequenceReportingEventHandler) {((SequenceReportingEventHandler<?>)eventHandler).setSequenceCallback(sequence);}timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;}...//It is ok to have another thread rerun this method after a halt().//通过对sequence进行修改来实现消费RingBuffer里的数据@Overridepublic void run() {if (running.compareAndSet(IDLE, RUNNING)) {sequenceBarrier.clearAlert();notifyStart();try {if (running.get() == RUNNING) {processEvents();}} finally {notifyShutdown();running.set(IDLE);}} else {//This is a little bit of guess work. //The running state could of changed to HALTED by this point. //However, Java does not have compareAndExchange which is the only way to get it exactly correct.if (running.get() == RUNNING) {throw new IllegalStateException("Thread is already running");} else {earlyExit();}}}private void processEvents() {T event = null;long nextSequence = sequence.get() + 1L;while (true) {try {//通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息final long availableSequence = sequenceBarrier.waitFor(nextSequence);if (batchStartAware != null) {batchStartAware.onBatchStart(availableSequence - nextSequence + 1);}while (nextSequence <= availableSequence) {//从RingBuffer中获取要消费的数据event = dataProvider.get(nextSequence);//执行消费者实现的onEvent()方法来消费数据eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}//设置消费者当前的消费进度sequence.set(availableSequence);} catch (final TimeoutException e) {notifyTimeout(sequence.get());} catch (final AlertException ex) {if (running.get() != RUNNING) {break;}} catch (final Throwable ex) {handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}}private void earlyExit() {notifyStart();notifyShutdown();}private void notifyTimeout(final long availableSequence) {try {if (timeoutHandler != null) {timeoutHandler.onTimeout(availableSequence);}} catch (Throwable e) {handleEventException(e, availableSequence, null);}}//Notifies the EventHandler when this processor is starting upprivate void notifyStart() {if (eventHandler instanceof LifecycleAware) {try {((LifecycleAware) eventHandler).onStart();} catch (final Throwable ex) {handleOnStartException(ex);}}}//Notifies the EventHandler immediately prior to this processor shutting downprivate void notifyShutdown() {if (eventHandler instanceof LifecycleAware) {try {((LifecycleAware) eventHandler).onShutdown();} catch (final Throwable ex) {handleOnShutdownException(ex);}}}...
}
public class Disruptor<T> {private final RingBuffer<T> ringBuffer;private final Executor executor;private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();private final AtomicBoolean started = new AtomicBoolean(false);private ExceptionHandler<? super T> exceptionHandler;...//设置WorkHandler,创建WorkProcessor//Set up a WorkerPool to distribute an event to one of a pool of work handler threads.//Each event will only be processed by one of the work handlers.//The Disruptor will automatically start this processors when #start() is called.//@param workHandlers the work handlers that will process events.//@return a {@link EventHandlerGroup} that can be used to chain dependencies.@SafeVarargs@SuppressWarnings("varargs")public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) {return createWorkerPool(new Sequence[0], workHandlers);}//创建WorkerPool,添加到consumerRepository中EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);//创建WorkerPool对象,以及根据workHandlers创建WorkProcessorfinal WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);//添加WorkerPool对象到consumerRepository中 consumerRepository.add(workerPool, sequenceBarrier);final Sequence[] workerSequences = workerPool.getWorkerSequences();//将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)updateGatingSequencesForNextInChain(barrierSequences, workerSequences);return new EventHandlerGroup<>(this, consumerRepository, workerSequences);}private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {if (processorSequences.length > 0) {ringBuffer.addGatingSequences(processorSequences);for (final Sequence barrierSequence : barrierSequences) {ringBuffer.removeGatingSequence(barrierSequence);}consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}}...//Starts the event processors and returns the fully configured ring buffer.//The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.//This method must only be called once after all event processors have been added.//@return the configured ring buffer.public RingBuffer<T> start() {checkOnlyStartedOnce();for (final ConsumerInfo consumerInfo : consumerRepository) {//在执行Disruptor.handleEventsWithWorkerPool()方法,调用Disruptor.createWorkerPool()方法时,//会将新创建的WorkerPool对象封装成WorkerPoolInfo对象(即ConsumerInfo对象),//然后通过add()方法添加到consumerRepository中//所以下面会调用WorkerPoolInfo.start()方法consumerInfo.start(executor);}return ringBuffer;}private void checkOnlyStartedOnce() {//线程的开关使用CAS实现if (!started.compareAndSet(false, true)) {throw new IllegalStateException("Disruptor.start() must only be called once.");}}...
}//Provides a repository mechanism to associate EventHandlers with EventProcessors
class ConsumerRepository<T> implements Iterable<ConsumerInfo> {private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();//添加WorkerPool对象到consumerRepository中public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) {final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier);consumerInfos.add(workerPoolInfo);for (Sequence sequence : workerPool.getWorkerSequences()) {eventProcessorInfoBySequence.put(sequence, workerPoolInfo);}}...
}class WorkerPoolInfo<T> implements ConsumerInfo {private final WorkerPool<T> workerPool;private final SequenceBarrier sequenceBarrier;private boolean endOfChain = true;WorkerPoolInfo(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) {this.workerPool = workerPool;this.sequenceBarrier = sequenceBarrier;}@Overridepublic void start(Executor executor) {workerPool.start(executor);}...
}public final class WorkerPool<T> {private final AtomicBoolean started = new AtomicBoolean(false);private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);private final RingBuffer<T> ringBuffer;//WorkProcessors are created to wrap each of the provided WorkHandlersprivate final WorkProcessor<?>[] workProcessors;//Create a worker pool to enable an array of WorkHandlers to consume published sequences.//This option requires a pre-configured RingBuffer which must have RingBuffer#addGatingSequences(Sequence...) called before the work pool is started.//@param ringBuffer of events to be consumed.//@param sequenceBarrier on which the workers will depend.//@param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.//@param workHandlers to distribute the work load across.@SafeVarargspublic WorkerPool(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) {this.ringBuffer = ringBuffer;final int numWorkers = workHandlers.length;//根据workHandlers创建WorkProcessorworkProcessors = new WorkProcessor[numWorkers];for (int i = 0; i < numWorkers; i++) {workProcessors[i] = new WorkProcessor<>(ringBuffer, sequenceBarrier, workHandlers[i], exceptionHandler, workSequence);}}//Start the worker pool processing events in sequence.//@param executor providing threads for running the workers.//@return the {@link RingBuffer} used for the work queue.//@throws IllegalStateException if the pool has already been started and not halted yetpublic RingBuffer<T> start(final Executor executor) {if (!started.compareAndSet(false, true)) {throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");}final long cursor = ringBuffer.getCursor();workSequence.set(cursor);for (WorkProcessor<?> processor : workProcessors) {processor.getSequence().set(cursor);//通过传入的线程池,执行WorkProcessor对象的run()方法executor.execute(processor);}return ringBuffer;}...
}public final class WorkProcessor<T> implements EventProcessor {private final AtomicBoolean running = new AtomicBoolean(false);private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);private final RingBuffer<T> ringBuffer;private final SequenceBarrier sequenceBarrier;private final WorkHandler<? super T> workHandler;private final ExceptionHandler<? super T> exceptionHandler;private final Sequence workSequence;private final EventReleaser eventReleaser = new EventReleaser() {@Overridepublic void release() {sequence.set(Long.MAX_VALUE);}};private final TimeoutHandler timeoutHandler;//Construct a {@link WorkProcessor}.//@param ringBuffer to which events are published.//@param sequenceBarrier on which it is waiting.//@param workHandler is the delegate to which events are dispatched.//@param exceptionHandler to be called back when an error occurs//@param workSequence from which to claim the next event to be worked on. It should always be initialised as Sequencer#INITIAL_CURSOR_VALUEpublic WorkProcessor(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final WorkHandler<? super T> workHandler, final ExceptionHandler<? super T> exceptionHandler, final Sequence workSequence) {this.ringBuffer = ringBuffer;this.sequenceBarrier = sequenceBarrier;this.workHandler = workHandler;this.exceptionHandler = exceptionHandler;this.workSequence = workSequence;if (this.workHandler instanceof EventReleaseAware) {((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser);}timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null;}//通过对sequence进行修改来实现消费RingBuffer里的数据@Overridepublic void run() {if (!running.compareAndSet(false, true)) {throw new IllegalStateException("Thread is already running");}sequenceBarrier.clearAlert();notifyStart();boolean processedSequence = true;long cachedAvailableSequence = Long.MIN_VALUE;long nextSequence = sequence.get();T event = null;while (true) {try {if (processedSequence) {processedSequence = false;do {nextSequence = workSequence.get() + 1L;//设置消费者当前的消费进度sequence.set(nextSequence - 1L);} while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));}if (cachedAvailableSequence >= nextSequence) {//从RingBuffer中获取要消费的数据event = ringBuffer.get(nextSequence);//执行消费者实现的onEvent()方法来消费数据workHandler.onEvent(event);processedSequence = true;} else {//通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);}} catch (final TimeoutException e) {notifyTimeout(sequence.get());} catch (final AlertException ex) {if (!running.get()) {break;}} catch (final Throwable ex) {//handle, mark as processed, unless the exception handler threw an exceptionexceptionHandler.handleEventException(ex, nextSequence, event);processedSequence = true;}}notifyShutdown();running.set(false);}...
}
public class Disruptor<T> {private final RingBuffer<T> ringBuffer;private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {if (processorSequences.length > 0) {ringBuffer.addGatingSequences(processorSequences);for (final Sequence barrierSequence : barrierSequences) {ringBuffer.removeGatingSequence(barrierSequence);}consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}}...
}abstract class RingBufferPad {protected long p1, p2, p3, p4, p5, p6, p7;
}abstract class RingBufferFields<E> extends RingBufferPad {...private static final Unsafe UNSAFE = Util.getUnsafe();private final long indexMask;//环形数组存储事件消息private final Object[] entries;protected final int bufferSize;//RingBuffer的sequencer属性代表了当前线程对应的生产者protected final Sequencer sequencer;RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;//初始化数组this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//内存预加载fill(eventFactory);}private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {entries[BUFFER_PAD + i] = eventFactory.newInstance();}}protected final E elementAt(long sequence) {return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}...
}public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {...//Add the specified gating sequences to this instance of the Disruptor. //They will safely and atomically added to the list of gating sequences.//@param gatingSequences The sequences to add.public void addGatingSequences(Sequence... gatingSequences) {sequencer.addGatingSequences(gatingSequences);}...
}public interface Sequencer extends Cursored, Sequenced {...//Add the specified gating sequences to this instance of the Disruptor. //They will safely and atomically added to the list of gating sequences.//@param gatingSequences The sequences to add.void addGatingSequences(Sequence... gatingSequences);...
}public abstract class AbstractSequencer implements Sequencer {private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");...@Overridepublic final void addGatingSequences(Sequence... gatingSequences) {SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);}...
}class SequenceGroups {static <T> void addSequences(final T holder, final AtomicReferenceFieldUpdater<T, Sequence[]> updater, final Cursored cursor, final Sequence... sequencesToAdd) {long cursorSequence;Sequence[] updatedSequences;Sequence[] currentSequences;do {currentSequences = updater.get(holder);updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);cursorSequence = cursor.getCursor();int index = currentSequences.length;for (Sequence sequence : sequencesToAdd) {sequence.set(cursorSequence);updatedSequences[index++] = sequence;}} while (!updater.compareAndSet(holder, currentSequences, updatedSequences));cursorSequence = cursor.getCursor();for (Sequence sequence : sequencesToAdd) {sequence.set(cursorSequence);}}...
}
3.Disruptor的WaitStrategy等待策略分析
在生产者发布消息时,会调用WaitStrategy的signalAllWhenBlocking()方法唤醒阻塞的消费者。在消费者消费消息时,会调用WaitStrategy的waitFor()方法阻塞消费过快的消费者。
当然,不同的策略不一定就是阻塞消费者,比如BlockingWaitStrategy会通过ReentrantLock来阻塞消费者,而YieldingWaitStrategy则通过yield切换线程来实现让消费者无锁等待,即通过Thread的yield()方法切换线程让另一个线程继续执行自旋判断操作。
所以YieldingWaitStrategy等待策略的效率是最高的 + 最耗费CPU资源,当然效率次高、比较耗费CPU资源的是BusySpinWaitStrategy等待策略。
Disruptor提供了如下几种等待策略:
一.完全阻塞的等待策略BlockingWaitStrategy二.切换线程自旋的等待策略YieldingWaitStrategy三.繁忙自旋的等待策略BusySpinWaitStrategy四.轻微阻塞的等待策略LiteBlockingWaitStrategy
也就是唤醒阻塞线程时,通过GAS避免并发获取锁的等待策略五.最小睡眠 + 切换线程的等待策略SleepingWaitStrategy
总结:
为了达到最高效率,有大量CPU资源,可切换线程让多个线程自旋判断
为了保证高效的同时兼顾CPU资源,可以让单个线程自旋判断
为了保证比较高效更加兼顾CPU资源,可以切换线程自旋 + 最少睡眠
为了完全兼顾CPU资源不考虑效率问题,可以采用重入锁实现阻塞唤醒
为了完全兼顾CPU资源但考虑一点效率,可以采用重入锁 + GAS唤醒
//完全阻塞的等待策略
//Blocking strategy that uses a lock and condition variable for EventProcessors waiting on a barrier.
//This strategy can be used when throughput and low-latency are not as important as CPU resource.
public final class BlockingWaitStrategy implements WaitStrategy {private final Lock lock = new ReentrantLock();private final Condition processorNotifyCondition = lock.newCondition();@Overridepublic long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;if ((availableSequence = cursorSequence.get()) < sequence) {lock.lock();try {while ((availableSequence = cursorSequence.get()) < sequence) {barrier.checkAlert();processorNotifyCondition.await();}} finally {lock.unlock();}}while ((availableSequence = dependentSequence.get()) < sequence) {barrier.checkAlert();}return availableSequence;}@Overridepublic void signalAllWhenBlocking() {lock.lock();try {processorNotifyCondition.signalAll();} finally {lock.unlock();}}
}//切换线程自旋的等待策略
//Yielding strategy that uses a Thread.yield() for EventProcessors waiting on a barrier after an initially spinning.
//This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
public final class YieldingWaitStrategy implements WaitStrategy {private static final int SPIN_TRIES = 100;@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;int counter = SPIN_TRIES;while ((availableSequence = dependentSequence.get()) < sequence) {counter = applyWaitMethod(barrier, counter);}return availableSequence;}@Overridepublic void signalAllWhenBlocking() {}private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException {barrier.checkAlert();if (0 == counter) {//切换线程,让另一个线程继续执行自旋操作Thread.yield();} else {--counter;}return counter;}
}//繁忙自旋的等待策略
//Busy Spin strategy that uses a busy spin loop for EventProcessors waiting on a barrier.
//This strategy will use CPU resource to avoid syscalls which can introduce latency jitter.
//It is best used when threads can be bound to specific CPU cores.
public final class BusySpinWaitStrategy implements WaitStrategy {@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;while ((availableSequence = dependentSequence.get()) < sequence) {barrier.checkAlert();}return availableSequence;}@Overridepublic void signalAllWhenBlocking() {}
}//轻微阻塞的等待策略(唤醒阻塞线程时避免了并发获取锁)
//Variation of the BlockingWaitStrategy that attempts to elide conditional wake-ups when the lock is uncontended.
//Shows performance improvements on microbenchmarks.
//However this wait strategy should be considered experimental as I have not full proved the correctness of the lock elision code.
public final class LiteBlockingWaitStrategy implements WaitStrategy {private final Lock lock = new ReentrantLock();private final Condition processorNotifyCondition = lock.newCondition();private final AtomicBoolean signalNeeded = new AtomicBoolean(false);@Overridepublic long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;if ((availableSequence = cursorSequence.get()) < sequence) {lock.lock();try {do {signalNeeded.getAndSet(true);if ((availableSequence = cursorSequence.get()) >= sequence) {break;}barrier.checkAlert();processorNotifyCondition.await();} while ((availableSequence = cursorSequence.get()) < sequence);} finally {lock.unlock();}}while ((availableSequence = dependentSequence.get()) < sequence) {barrier.checkAlert();}return availableSequence;}@Overridepublic void signalAllWhenBlocking() {if (signalNeeded.getAndSet(false)) {lock.lock();try {processorNotifyCondition.signalAll();} finally {lock.unlock();}}}
}//最小睡眠 + 切换线程的等待策略SleepingWaitStrategy
//Sleeping strategy that initially spins, then uses a Thread.yield(),
//and eventually sleep LockSupport.parkNanos(1) for the minimum number of nanos the OS
//and JVM will allow while the EventProcessors are waiting on a barrier.
//This strategy is a good compromise between performance and CPU resource.
//Latency spikes can occur after quiet periods.
public final class SleepingWaitStrategy implements WaitStrategy {private static final int DEFAULT_RETRIES = 200;private final int retries;public SleepingWaitStrategy() {this(DEFAULT_RETRIES);}public SleepingWaitStrategy(int retries) {this.retries = retries;}@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;int counter = retries;while ((availableSequence = dependentSequence.get()) < sequence) {counter = applyWaitMethod(barrier, counter);}return availableSequence;}@Overridepublic void signalAllWhenBlocking() {}private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException {barrier.checkAlert();if (counter > 100) {--counter;} else if (counter > 0) {--counter;Thread.yield();} else {LockSupport.parkNanos(1L);}return counter;}
}
4.Disruptor的高性能原因
一.使用了环形结构 + 数组 + 内存预加载
二.使用了单线程写的方式并配合内存屏障
三.消除伪共享(填充缓存行)
四.序号栅栏和序号配合使用来消除锁
五.提供了多种不同性能的等待策略
5.Disruptor高性能之数据结构(内存预加载机制)
(1)RingBuffer使用环形数组来存储元素
(2)采用了内存预加载机制
(1)RingBuffer使用环形数组来存储元素
环形数组可以避免数组扩容和缩容带来的性能损耗。
(2)RingBuffer采用了内存预加载机制
初始化RingBuffer时,会将entries数组里的每一个元素都先new出来。比如RingBuffer的大小设置为8,那么初始化RingBuffer时,就会先将entries数组的8个元素分别指向新new出来的空的Event对象。往RingBuffer填充元素时,只是将对应的Event对象进行赋值。所以RingBuffer中的Event对象是一直存活着的,也就是说它能最小程度减少系统GC频率,从而提升性能。
public class Main {public static void main(String[] args) {//参数准备OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//参数一:eventFactory,消息(Event)工厂对象//参数二:ringBufferSize,容器的长度//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler//参数四:ProducerType,单生产者还是多生产者//参数五:waitStrategy,等待策略//1.实例化Disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2.添加Event处理器,用于处理事件//也就是构建Disruptor与消费者的一个关联关系disruptor.handleEventsWith(new OrderEventHandler());//3.启动disruptordisruptor.start();//4.获取实际存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);//向容器中投递数据producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}public class Disruptor<T> {private final RingBuffer<T> ringBuffer;private final Executor executor;...//Create a new Disruptor.//@param eventFactory the factory to create events in the ring buffer.//@param ringBufferSize the size of the ring buffer, must be power of 2.//@param executor an Executor to execute event processors.//@param producerType the claim strategy to use for the ring buffer.//@param waitStrategy the wait strategy to use for the ring buffer.public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);}//Private constructor helperprivate Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {this.ringBuffer = ringBuffer;this.executor = executor;}...
}//Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.
//@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {//值为-1public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;protected long p1, p2, p3, p4, p5, p6, p7;...//Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {switch (producerType) {case SINGLE:return createSingleProducer(factory, bufferSize, waitStrategy);case MULTI:return createMultiProducer(factory, bufferSize, waitStrategy);default:throw new IllegalStateException(producerType.toString());}}//Create a new single producer RingBuffer with the specified wait strategy.public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);return new RingBuffer<E>(factory, sequencer);}//Construct a RingBuffer with the full option set.//@param eventFactory to newInstance entries for filling the RingBuffer//@param sequencer sequencer to handle the ordering of events moving through the RingBuffer.RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {super(eventFactory, sequencer);}...
}abstract class RingBufferFields<E> extends RingBufferPad {private final long indexMask;//环形数组存储事件消息private final Object[] entries;protected final int bufferSize;//RingBuffer的sequencer属性代表了当前线程对应的生产者protected final Sequencer sequencer;...RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;//初始化数组this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//内存预加载fill(eventFactory);}private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {//设置一个空的数据对象entries[BUFFER_PAD + i] = eventFactory.newInstance();}}...
}abstract class RingBufferPad {protected long p1, p2, p3, p4, p5, p6, p7;
}
6.Disruptor高性能之内核(使用单线程写)
Disruptor的RingBuffer之所以可以做到完全无锁是因为单线程写。离开单线程写,没有任何技术可以做到完全无锁。Redis和Netty等高性能技术框架也是利用单线程写来实现的。
具体就是:单生产者时,固然只有一个生产者线程在写。多生产者时,每个生产者线程都只会写各自获取到的Sequence序号对应的环形数组的元素,从而使得多个生产者线程相互之间不会产生写冲突。
7.Disruptor高性能之系统内存优化(内存屏障)
要正确实现无锁,还需要另外一个关键技术——内存屏障。对应到Java语言,就是valotile变量与Happens Before语义。
内存屏障:Linux的smp_wmb()/smp_rmb()。
8.Disruptor高性能之系统缓存优化(消除伪共享)
CPU缓存是以缓存行(Cache Line)为单位进行存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节,最常见的缓存行大小是64个字节。
当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会对这个缓存行形成竞争,从而无意中影响彼此性能,这就是伪共享。
消除伪共享:利用了空间换时间的思想。
由于代表着一个序号的Sequence其核心字段value是一个long型变量(占8个字节),所以有可能会出现多个Sequence对象的value变量共享同一个缓存行。因此,需要对Sequence对象的value变量消除伪共享。具体做法就是:对Sequence对象的value变量前后增加7个long型变量。
注意:伪共享与Sequence的静态变量无关,因为静态变量本身就是多个线程共享的,而不是多个线程隔离独立的。
class LhsPadding {protected long p1, p2, p3, p4, p5, p6, p7;
}class Value extends LhsPadding {protected volatile long value;
}class RhsPadding extends Value {protected long p9, p10, p11, p12, p13, p14, p15;
}public class Sequence extends RhsPadding {static final long INITIAL_VALUE = -1L;private static final Unsafe UNSAFE;private static final long VALUE_OFFSET;static {UNSAFE = Util.getUnsafe();try {VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));} catch (final Exception e) {throw new RuntimeException(e);}}//Create a sequence initialised to -1.public Sequence() {this(INITIAL_VALUE);}//Create a sequence with a specified initial value.public Sequence(final long initialValue) {UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);}//Perform a volatile read of this sequence's value.public long get() {return value;}//Perform an ordered write of this sequence. //The intent is a Store/Store barrier between this write and any previous store.public void set(final long value) {UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);}...
}
9.Disruptor高性能之序号获取优化(自旋 + CAS)
生产者投递Event时会使用"long sequence = ringBuffer.next()"获取序号,而序号栅栏SequenceBarrier和会序号Sequence搭配起来一起使用,用来协调和管理消费者和生产者的工作节奏,避免锁的使用。
各个消费者和生产者都持有自己的序号,这些序号需满足如下条件以避免生产者速度过快,将还没来得及消费的消息覆盖。
一.消费者序号数值必须小于生产者序号数值
二.消费者序号数值必须小于其前置消费者的序号数值
三.生产者序号数值不能大于消费者中最小的序号数值
高性能的序号获取优化:为避免生产者每次执行next()获取序号时,都要查询消费者的最小序号,Disruptor采取了自旋 + LockSupport挂起线程 + 缓存最小序号 + CAS来优化。既避免了锁,也尽量在不耗费CPU的情况下提升了性能。
单生产者的情况下,只有一个线程添加元素,此时没必要使用锁。多生产者的情况下,会有多个线程并发获取Sequence序号添加元素,此时会通过自旋 + CAS避免锁。
public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号long sequence = ringBuffer.next();try {//2.根据这个序号, 找到具体的"OrderEvent"元素//注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"OrderEvent event = ringBuffer.get(sequence);//3.进行实际的赋值处理event.setValue(data.getLong(0));} finally {//4.提交发布操作ringBuffer.publish(sequence);}}
}//Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.
//@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {//值为-1public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;protected long p1, p2, p3, p4, p5, p6, p7;...//Increment and return the next sequence for the ring buffer.//Calls of this method should ensure that they always publish the sequence afterward.//E.g.// long sequence = ringBuffer.next();// try {// Event e = ringBuffer.get(sequence);// ...// } finally {// ringBuffer.publish(sequence);// }//@return The next sequence to publish to.@Overridepublic long next() {return sequencer.next();}//Publish the specified sequence.//This action marks this particular message as being available to be read.//@param sequence the sequence to publish.@Overridepublic void publish(long sequence) {sequencer.publish(sequence);}//Get the event for a given sequence in the RingBuffer.//This call has 2 uses. //Firstly use this call when publishing to a ring buffer.//After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).//Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.//@param sequence for the event//@return the event for the given sequence@Overridepublic E get(long sequence) {//调用父类RingBufferFields的elementAt()方法return elementAt(sequence);}...
}abstract class RingBufferPad {protected long p1, p2, p3, p4, p5, p6, p7;
}abstract class RingBufferFields<E> extends RingBufferPad {...private static final Unsafe UNSAFE = Util.getUnsafe();private final long indexMask;//环形数组存储事件消息private final Object[] entries;protected final int bufferSize;//RingBuffer的sequencer属性代表了当前线程对应的生产者protected final Sequencer sequencer;RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;//初始化数组this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//内存预加载fill(eventFactory);}private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {entries[BUFFER_PAD + i] = eventFactory.newInstance();}}protected final E elementAt(long sequence) {return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}...
}
public abstract class AbstractSequencer implements Sequencer {private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");//环形数组的大小protected final int bufferSize;//等待策略protected final WaitStrategy waitStrategy;//当前生产者的进度protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)//这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,//由RingBuffer的addGatingSequences()方法进行添加protected volatile Sequence[] gatingSequences = new Sequence[0];...//Create with the specified buffer size and wait strategy.//@param bufferSize The total number of entries, must be a positive power of 2.//@param waitStrategypublic AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.bufferSize = bufferSize;this.waitStrategy = waitStrategy;}...
}abstract class SingleProducerSequencerPad extends AbstractSequencer {protected long p1, p2, p3, p4, p5, p6, p7;public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}
}abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}//表示生产者的当前序号,值为-1protected long nextValue = Sequence.INITIAL_VALUE;//表示消费者的最小序号,值为-1protected long cachedValue = Sequence.INITIAL_VALUE;
}public final class SingleProducerSequencer extends SingleProducerSequencerFields {protected long p1, p2, p3, p4, p5, p6, p7;//Construct a Sequencer with the selected wait strategy and buffer size.//@param bufferSize the size of the buffer that this will sequence over.//@param waitStrategy for those waiting on sequences.public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}...@Overridepublic long next() {return next(1);}@Overridepublic long next(int n) {//Sequence的初始化值为-1if (n < 1) {throw new IllegalArgumentException("n must be > 0");}//nextValue指的是当前Sequence//this.nextValue为SingleProducerSequencerFields的变量//第一次调用next()方法时,nextValue = -1//第二次调用next()方法时,nextValue = 0//第三次调用next()方法时,nextValue = 1//第四次调用next()方法时,nextValue = 2//第五次调用next()方法时,nextValue = 3long nextValue = this.nextValue;//第一次调用next()方法时,nextSequence = -1 + 1 = 0//第二次调用next()方法时,nextSequence = 0 + 1 = 1//第三次调用next()方法时,nextSequence = 1 + 1 = 2//第四次调用next()方法时,nextSequence = 2 + 1 = 3//第五次调用next()方法时,nextSequence = 3 + 1 = 4long nextSequence = nextValue + n;//wrapPoint会用来判断生产者序号是否绕过RingBuffer的环//如果wrapPoint是负数,则表示还没绕过RingBuffer的环//如果wrapPoint是非负数,则表示已经绕过RingBuffer的环//假设bufferSize = 3,那么://第一次调用next()方法时,wrapPoint = 0 - 3 = -3,还没绕过RingBuffer的环//第二次调用next()方法时,wrapPoint = 1 - 3 = -2,还没绕过RingBuffer的环//第三次调用next()方法时,wrapPoint = 2 - 3 = -1,还没绕过RingBuffer的环//第四次调用next()方法时,wrapPoint = 3 - 3 = 0,已经绕过RingBuffer的环//第五次调用next()方法时,wrapPoint = 4 - 3 = 1,已经绕过RingBuffer的环long wrapPoint = nextSequence - bufferSize;//cachedGatingSequence是用来将消费者的最小消费序号缓存起来//这样就不用每次执行next()方法都要去获取消费者的最小消费序号//第一次调用next()方法时,cachedGatingSequence = -1//第二次调用next()方法时,cachedGatingSequence = -1//第三次调用next()方法时,cachedGatingSequence = -1//第四次调用next()方法时,cachedGatingSequence = -1//第五次调用next()方法时,cachedGatingSequence = 1long cachedGatingSequence = this.cachedValue;//第四次调用next()方法时,wrapPoint大于cachedGatingSequence,执行条件中的逻辑if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {//最小的消费者序号long minSequence;//自旋操作://Util.getMinimumSequence(gatingSequences, nextValue)的含义就是找到消费者中最小的序号值//如果wrapPoint > 消费者中最小的序号,那么生产者线程就需要进行阻塞//即如果生产者序号 > 消费者中最小的序号,那么就挂起并进行自旋操作//第四次调用next()方法时,nextValue = 2,wrapPoint = 0,gatingSequences里面的消费者序号如果还没消费(即-1),则要挂起while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {//TODO: Use waitStrategy to spin? LockSupport.parkNanos(1L); }//cachedValue接收了消费者的最小序号//第四次调用next()方法时,假设消费者的最小序号minSequence为1,则cachedValue = 1this.cachedValue = minSequence;}//第一次调用完next()方法时,nextValue会变为0//第二次调用完next()方法时,nextValue会变为1//第三次调用完next()方法时,nextValue会变为2//第四次调用完next()方法时,nextValue会变为3//第五次调用完next()方法时,nextValue会变为4this.nextValue = nextSequence;//第一次调用next()方法时,返回的nextSequence = 0//第二次调用next()方法时,返回的nextSequence = 1//第三次调用next()方法时,返回的nextSequence = 2//第四次调用next()方法时,返回的nextSequence = 3//第五次调用next()方法时,返回的nextSequence = 4return nextSequence;}@Overridepublic void publish(long sequence) {//设置当前生产者的sequencecursor.set(sequence);//通过等待策略通知阻塞的消费者waitStrategy.signalAllWhenBlocking();}...
}public final class Util {...//Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s.//@param sequences to compare.//@param minimum an initial default minimum. If the array is empty this value will be returned.//@return the smaller of minimum sequence value found in sequences and minimum; minimum if sequences is emptypublic static long getMinimumSequence(final Sequence[] sequences, long minimum) {for (int i = 0, n = sequences.length; i < n; i++) {long value = sequences[i].get();minimum = Math.min(minimum, value);}return minimum;}...
}public final class MultiProducerSequencer extends AbstractSequencer {...@Overridepublic long next() {return next(1);}@Overridepublic long next(int n) {if (n < 1) {throw new IllegalArgumentException("n must be > 0");}long current;long next;do {//获取当前生产者的序号current = cursor.get();next = current + n;//wrapPoint会用来判断生产者序号是否绕过RingBuffer的环//如果wrapPoint是负数,则表示还没绕过RingBuffer的环//如果wrapPoint是非负数,则表示已经绕过RingBuffer的环long wrapPoint = next - bufferSize;//cachedGatingSequence是用来将消费者的最小消费序号缓存起来//这样就不用每次执行next()方法都要去获取消费者的最小消费序号long cachedGatingSequence = gatingSequenceCache.get();if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {//gatingSequence表示的是消费者的最小序号long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence) {//TODO, should we spin based on the wait strategy?LockSupport.parkNanos(1); continue;}gatingSequenceCache.set(gatingSequence);} else if (cursor.compareAndSet(current, next)) {break;}} while (true);return next;}...
}