i主题是 vivo 旗下的一款主题商店 app,用户可以经过下载主题、壁纸、字体等,成功对手机界面格调的一键改换和自定义。
Disruptor 是英国外汇买卖公司 LMAX 开发的一个高性能的内存队列(用于系统外部线程间传递信息,不同于 RocketMQ、Kafka这种散布式信息队列),基于 Disruptor 开发的系统复线程能撑持每秒600万订单。目前,包括 ApacheStorm、Camel、Log4j 2在内的很多出名名目都运行了 Disruptor 以失掉高性能。在 vivo外部它也有不少运行,比如自定义监控中经常使用 Disruptor 队列来暂存经过监控 SDK上报的监控数据,i主题中也经常使用它来统计本地内存目的数据。
接上去从 Disruptor 和 JDK 内置队列的对比、Disruptor 外围概念、Disruptor 经常使用Demo、Disruptor外围源码、Disruptor 高性能原理、Disruptor 在 i主题业务中的运行几个角度来引见 Disruptor。
上方来看下 JDK 中内置的队列和 Disruptor 的对比。队列的底层成功普通分为三种:数组、链表和堆,其中堆普通是为了成功带有优先级特性的队列,暂不思考。另外,像 ConcurrentLinkedQueue 、LinkedTransferQueue属于无界队列,在稳固性要求特意高的系统中,为了防止消费者速渡过快,造成内存溢出,只能选用有界队列。这样 JDK 中剩下可选的线程安保的队列还有ArrayBlockingQueue
由于 LinkedBlockingQueue 是基于链表成功的,由于链表存储的数据在内存里不延续,关于高速缓存并不友好,而且LinkedBlockingQueue 是加锁的,性能较差。ArrayBlockingQueue有雷同的疑问,它也须要加锁,另外,ArrayBlockingQueue 存在伪共享疑问,也会造成性能变差。而当天要引见的 Disruptor是基于数组的有界无锁队列,合乎空间部分性原理,可以很好的应用 CPU 的高速缓存,同时它防止了伪共享,大大优化了性能。
如下图,从数据流转的角度先对 Disruptor 有一个直观的概念。Disruptor 支持单(多)消费者、单(多)消费者形式。消费时支持广播消费(HandlerA会消费处置一切信息,HandlerB 也会消费处置一切信息)、集群消费(HandlerA 和 HandlerB各消费部分信息),HandlerA 和HandlerB 消费成功后会把信息交给 HandlerC 继续处置。
上方联合 Disruptor 官网的架构图引见下 Disruptor 的外围概念:
Event 是详细的数据实体,消费者消费 Event ,存入 RingBuffer,消费者从 RingBuffer 中消费它启动逻辑处置。Event 就是一个普通的 Java 对象,无需成功 Disruptor 内定义的接口。
public class OrderEvent {private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}}
用于创立 Event 对象。
public class OrderEventFactory implements EventFactory<OrderEvent> {public OrderEvent newInstance() {return new OrderEvent();}}
可以看到,生成者关键是持有 RingBuffer 对象启动数据的颁布。这里有几个点须要留意:
public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer>
消费者可以成功 EventHandler 接口,定义自己的处置逻辑。
public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent event,long sequence,boolean endOfBatch) throws Exception {System.out.println("消费者: " + event.getValue());}}
public static void main(String[] args) {OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(1);/*** 1. 实例化disruptor对象1) eventFactory: 信息(event)工厂对象2) ringBufferSize: 容器的长度3) executor:4) ProducerType: 单消费者还是多消费者5) waitStrategy: 期待战略*/Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());// 2. 参与消费者的监听disruptor.handleEventsWith(new OrderEventHandler());// 3. 启动disruptordisruptor.start();// 4. 失掉实践存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
本文剖析时以单(多)消费者、单消费者为例启动剖析。
首先是经过传入的参数创立 RingBuffer,将创立好的 RingBuffer 与传入的 executor 交给 Disruptor 对象持有。
public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,final Executor executor,final ProducerType producerType,final WaitStrategy waitStrategy){this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),executor);}
接上去剖析 RingBuffer 的创立环节,分为单消费者与多消费者。
public static <E> RingBuffer<E> create(ProducerType producerType,EventFactory<E> factory,int bufferSize,WaitStrategy waitStrategy){switch (producerType){case SINGLE:// 单消费者return createSingleProducer(factory, bufferSize, waitStrategy);case MULTI:// 多消费者return createMultiProducer(factory, bufferSize, waitStrategy);default:throw new IllegalStateException(producerType.toString());}}
不论是单消费者还是多消费者,最终都会创立一个 RingBuffer 对象,只是传给 RingBuffer 的 Sequencer 对象不同。可以看到,RingBuffer 外部最终创立了一个Object 数组来存储 Event 数据。这里有几点须要留意:
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize,waitStrategy);return new RingBuffer<E>(factory, sequencer);}
RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){// 省略部分代码...// 额外创立2个填充空间的大小, 首尾填充, 防止数组的有效载荷和其它成员加载到同一缓存行this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];fill(eventFactory);}private void fill(EventFactory<E> eventFactory){for (int i = 0; i < bufferSize; i++){// BUFFER_PAD + i为真正的数组索引entries[BUFFER_PAD + i] = eventFactory.newInstance();}}
参与消费者的外围代码如下所示,外围就是为将一个
而后参与到consumerRepository 中,后续启动 Disruptor 时,会遍历 consumerRepository 中的一切 BatchEventProcessor(成功了Runnable 接口),将 BatchEventProcessor 义务提交到线程池中。
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){// 经过disruptor对象间接调用handleEventsWith方法时传的是空的Sequence数组return createEventProcessors(new Sequence[0], handlers);}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers) {// 搜集参与的消费者的序号final Sequence[] processorSequences = new Sequence[eventHandlers.length];// 本批次消费由于参与在同一个节点之后, 因此共享该屏障final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);// 为每个EventHandler创立一个BatchEventProcessorfor (int i = 0, eventHandlersLength = eventHandlers.length;i < eventHandlersLength; i++) {final EventHandler<? super T> eventHandler = eventHandlers[i];final BatchEventProcessor<T> batchEventProcessor =new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);if (exceptionHandler != null){batchEventProcessor.setExceptionHandler(exceptionHandler);}// 参与到消费者信息仓库中consumerRepository.add(batchEventProcessor, eventHandler, barrier);processorSequences[i] = batchEventProcessor.getSequence();}// 降级网关序列(消费者只有要关注一切的末端消费者节点的序列)updateGatingSequencesForNextInChain(barrierSequences, processorSequences);return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}
创立完 Disruptor 对象之后,可以经过 Disruptor 对象参与 EventHandler,这里有一须要留意:经过 Disruptor对象间接调用 handleEventsWith 方法时传的是空的 Sequence 数组,这是什么意思?可以看到createEventProcessors 方法接纳该空 Sequence 数组的字段名是barrierSequences,翻译成中文就是栅栏序号。怎样了解这个字段?
比如经过如下代码给 Disruptor 参与了两个handler,记为 handlerA 和 handlerB,这种是串行消费,关于一个Event,handlerA 消费完后才干轮到 handlerB 去消费。关于 handlerA来说,它没有前置消费者(生成者消费到哪里,消费者就可以消费到哪里),因此它的 barrierSequences 是一个空数组。而关于handlerB 来说,它的前置消费者是 handlerA,因此它的 barrierSequences 就是A的消费进展,也就是说handlerB 的消费进展是要小于 handlerA 的消费进展的。
disruptor.handleEventsWith(handlerA).handleEventsWith(handlerB);
假设是经过如下形式参与的 handler,则 handlerA 和handlerB 会消费一切 Event 数据,相似 MQ 信息中的广播消费,而 handlerC 的 barrierSequences 数组就是蕴含了 handlerA 的消费进展和 handlerB 的消费进展,这也是为什么barrierSequences 是一个数组,后续 handlerC在消费数据时,会取A和B消费进展的较小值启动判别,比如A消费到进展6,B消费到进展4,那么C只能去消费下标为3的数据,这也是barrierSequences 的含意。
disruptor.handleEventsWith(handlerA, handlerB).handleEventsWith(handlerC);
Disruptor的启动逻辑比拟繁复,就是遍历consumerRepository 中搜集的 EventProcessor(成功了Runnable接口),将它提交到创立 Disruptor 时指定的executor中,EventProcessor 的 run 方法会启动一个while 循环,不时尝试从 RingBuffer 中失掉数据启动消费。
disruptor.start();
public RingBuffer<T> start() {checkOnlyStartedOnce();for (final ConsumerInfo consumerInfo : consumerRepository) {consumerInfo.start(executor);}return ringBuffer;}public void start(final Executor executor) {executor.execute(eventprocessor);}
在剖析 Disruptor 的颁布数据的源码前,先来回忆下颁布数据的全体流程。
public void sendData(ByteBuffer>
next 方法自动放开一个序号。nextValue 示意已调配的序号,nextSequence 示意在此基础上再放开n个序号(此处n为1),cachedValue 示意缓存的消费者的最小消费进展。
假定有一个 size 为8的 RingBuffer,下标为6的数据曾经颁布好(nextValue为6),消费者不时未开启消费(cachedValue 和
cachedGatingSequence 为-1),此时消费者想继续颁布数据,调用 next() 方法放开失掉序号为7的位置(nextSequence为7),计算失掉的 wrapPoint 为7-8=-1,此时 wrapPoint 等于
cachedGatingSequence,可以继续颁布数据,如左图。最后将 nextValue 赋值为7,示意序号7的位置曾经被消费者占用了。
接着消费者继续调用 next() 方法放开序号为0的数据,此时 nextValue为7,nextSequence 为8,wrapPoint 等于0, 由于消费者迟迟未消费
(cachedGatingSequence为-1),此时 wrapPoint 大于了 cachedGatingSequence,因此 next 方法的if判别成立,会调用LockSupport.parkNanos 阻塞期待消费者启动消费。其中 getMinimumSequence方法是失掉多个消费者的最小消费进展。
public long next() {return next(1);}
public long next(int n) {/*** 已调配的序号的缓存(已调配到这里), 初始-1. 可以看该方法的前往值nextSequence,* 接上去消费者就会该往该位置写数据, 它赋值给了nextValue, 所以下一次性调用next方* 法时, nextValue位置就是示意曾经消费好了数据, 接上去要放开nextSequece的数据*/long nextValue = this.nextValue;// 本次放开调配的序号long nextSequence = nextValue + n;// 造成环路的点:环形缓冲区或者追尾的点 = 等于本次放开的序号-环形缓冲区大小// 假设该序号大于最慢消费者的进展, 那么示意追尾了, 须要期待long wrapPoint = nextSequence - bufferSize;// 上次缓存的最小网关序号(消费最慢的消费者的进展)long cachedGatingSequence = this.cachedValue;// wrapPoint > cachedGatingSequence 示意消费者追上消费者发生环路(追尾), 即缓冲区已满,// 此时须要失掉消费者们最新的进展, 以确定能否队列满if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {// 拔出StoreLoad内存屏障/栅栏, 保障可见性。// 由于publish经常使用的是set()/putOrderedLong, 并不保障其余消费者能及时看见颁布的数据// 当我再次放开更多的空间时, 必定保障消费者能消费颁布的数据cursor.setVolatile(nextValue);long minSequence;// minSequence是多个消费者的最小序号, 要等一切消费者消费完了才干继续消费while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,nextValue))) {LockSupport.parkNanos(1L);}// 缓存消费者们最新的消费进展this.cachedValue = minSequence;}// 这里只写了缓存, 并未写volatile变量, 由于只是预调配了空间然而并未被颁布数据,// 不须要让其余消费者感知到。消费者只会感知到真正被颁布的序号this.nextValue = nextSequence;return nextSequence;}
间接经过 Unsafe 工具类失掉指定序号的 Event 对象,此时失掉的是空对象,因此接上去须要对该 Event 对象启动业务赋值,赋值成功后调用 publish 方法启动最终的数据颁布。
OrderEvent event = ringBuffer.get(sequence);
public E get(long sequence) {return elementAt(sequence);}
protected final E elementAt(long sequence) {return (E) UNSAFE.getObject(entries,REF_ARRAY_BASE +((sequence & indexMask) << REF_ELEMENT_SHIFT));}
消费者失掉到可用序号后,首先对该序号处的空 Event 对象启动业务赋值,接着调用 RingBuffer 的 publish 方法颁布数据,RingBuffer 会委托给其持有的sequencer(单消费者和多消费者对应不同的 sequencer)对象启动真正颁布。单消费者的颁布逻辑比拟繁难,降级下 cursor进展(cursor 示意消费者的消费进展,该位置已实践颁布数据,而 next 方法中的 nextSequence示意消费者放开的最大序号,或者还未实践颁布数据),接着唤醒期待的消费者。
waitStrategy 有不同的成功,因此唤醒逻辑也不尽相反,如驳回 BusySpinWaitStrategy战略时,消费者失掉不到数据时自旋期待,而后继续判别能否有新数据可以消费了,因此 BusySpinWaitStrategy 战略的signalAllWhenBlocking 就是一个空成功,啥也不做。
ringBuffer.publish(sequence);
public void publish(long sequence) {sequencer.publish(sequence);}
public void publish(long sequence) {// 更重消费者进展cursor.set(sequence);// 唤醒期待的消费者waitStrategy.signalAllWhenBlocking();}
前面提到,Disruptor 启动时,会将封装 EventHandler 的EventProcessor(此处以 BatchEventProcessor为例)提交到线程池中运转,BatchEventProcessor 的 run 方法会调用 processEvents 方法不时尝试从RingBuffer 中失掉数据启动消费,上方剖析下 processEvents 的逻辑(代码做了精简)。它会开启一个 while 循环,调用sequenceBarrier.waitFor方法失掉最大可用的序号,比如失掉序号一节所提的,消费者继续消费,消费者不时未消费,此时消费者曾经将整个 RingBuffer数据都消费满了,消费者不可再继续消费,消费者此时会阻塞。假定这时刻消费者开局消费,因此 nextSequence 为0,而
availableSequence 为7,此时消费者可以批量消费,将这8条已消费者的数据所有消费完,消费成功后降级下消费进展。降级消费进展后,消费者经过Util.getMinimumSequence 方法就可以感知到最新的消费进展,从而不再阻塞,继续颁布数据了。
private void processEvents() {T event = null;// sequence记载消费者的消费进展, 初始为-1long nextSequence = sequence.get() + 1L;// 死循环,因此不会让出线程,须要独立的线程(每一个EventProcessor都须要独立的线程)while (true) {// 经过屏障失掉到的最大可用序号final long availableSequence = sequenceBarrier.waitFor(nextSequence);// 批量消费while (nextSequence <= availableSequence) {event =>
上方剖析下 SequenceBarrier 的 waitFor 方法。首先它会调用 waitStrategy 的 waitFor 方法失掉最大可用序号,以 BusySpinWaitStrategy 战略为例,它的 waitFor 方法的三个参数的含意区分是:
由于 dependentSequence 分为两种状况,所以 waitFor 的逻辑也可以分为两种状况探讨:
在 waitStrategy 的 waitFor 方法前往,失掉最大可用的序号 availableSequence 后,最后须要再调用下 sequencer 的
getHighestPublishedSequence失掉真正可用的最大序号,这和消费者模型有相关,假设是单消费者,由于数据是延续颁布的,间接前往传入的 availableSequence。而假设是多消费者,由于多消费者是有多个线程在消费数据,颁布的数据是不延续的,因此须要经过
getHighestPublishedSequence 方法失掉已颁布的且延续的最大序号,由于失掉序号启动消费时须要是顺序的,不能腾跃。
public long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException {/*** sequence: 消费者希冀失掉的序号* cursorSequence: 消费者的序号* dependentSequence: 消费者须要依赖的序号*/long availableSequence = waitStrategy.waitFor(sequence,cursorSequence,dependentSequence, this);if (availableSequence < sequence) {return availableSequence;}// 目的sequence曾经颁布了, 这里失掉真正的最大序号(和消费者模型无关)return sequencer.getHighestPublishedSequence(sequence, availableSequence);}
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence,final SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;// 确保该序号曾经被我前面的消费者消费(协调与其余消费者的相关)while ((availableSequence = dependentSequence.get()) < sequence) {barrier.checkAlert();// 自旋期待ThreadHints.onSpinWait();}return availableSequence;}
前文剖析源码时引见到,RingBuffer 外部保养了一个 Object 数组(也就是真正存储数据的容器),在 RingBuffer 初始化时该 Object数组就曾经经常使用EventFactory 初始化了一些空 Event,后续就不须要在运转时来创立了,防止频繁GC。
另外,RingBuffe的数组中的元素是在初始化时一次性性所有创立的,所以这些元素的内存地址大略率是延续的。消费者在消费时,是遵照空间部分性原理的。消费完第一个Event 时,很快就会消费第二个 Event,而在消费第一个 Event 时,CPU 会把内存中的第一个 Event 的前面的 Event 也加载进Cache 中,这样当消费第二个 Event 时,它曾经在 CPU Cache 中了,所以就不须要从内存中加载了,这样也可以大大优化性能。
如下代码所示,定义了一个 Pointer 类,它有2个 long 类型的成员变量x、y,而后在 main 方法中其中2个线程区分对同一个 Pointer对象的x和y自增 100000000 次,最后统计下方法耗时,在我本机电脑上测试屡次,平均约为3600ms。
public class Pointer {volatile long x;volatile long y;@Overridepublic String toString() {return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]").add("x=" + x).add("y=" + y).toString();}}
public static void main(String[] args) throws InterruptedException {Pointer pointer = new Pointer();int num = 100000000;long start = System.currentTimeMillis();Thread t1 = new Thread(() -> {for(int i = 0; i < num; i++){pointer.x++;}});Thread t2 = new Thread(() -> {for(int i = 0; i < num; i++){pointer.y++;}});t1.start();t2.start();t1.join();t2.join();System.out.println(System.currentTimeMillis() - start);System.out.println(pointer);}
接着将 Pointer 类修正如下:在变量x和y之间拔出7个 long 类型的变量,仅此而已,接着继续经过上述的 main方法统计耗时,平均约为500ms。可以看到,修正前的耗时是修正后(防止了伪共享)的7倍多。那么什么是伪共享,为什么防止了伪共享能有这么大的性能优化呢?
public class Pointer {volatile long x;long p1, p2, p3, p4, p5, p6, p7;volatile long y;@Overridepublic String toString() {return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]").add("x=" + x).add("y=" + y).toString();}}
内存的访问速度是远远慢于 CPU 的,为了高效应用 CPU,在 CPU 和内存之间加了缓存,称为 CPU Cache。为了提高性能,须要更多地从 CPU Cache里失掉数据,而不是从内存中失掉数据。CPU Cache 加载内存里的数据,是以缓存行(通常为64字节)为单位加载的。Java 的 long类型是8字节,因此一个缓存行可以寄存8个 long 类型的变量。
然而,这种加载带来了一个坏处,如上述例子所示,假定有一个 long 类型的变量x,另外还有一个 long 类型的变量y紧挨着它,那么当加载x时也会加载y。假设此时 CPU Core1的线程在对x启动修正,另一个 CPU Core2 的线程却在对y启动读取。者修正x时,会把x和y同时加载到 CPU Core1 对应的CPU Cache 中,降级完后x和其它一切蕴含x的缓存行都将失效。而当 CPU Core2的线程读取y时,发现这个缓存行曾经失效了,须要从主内存中从新加载。
这就是伪共享,x和y不相干,然而却由于x的降级造成须要从新从主内存读取,拖慢了程序性能。处置方法之一就是如上述示例中所做,在x和y之间填充7个 long 类型的变量,保障x和y不会被加载到同一个缓存行中去。Java8中也参与了新的注解@Contended(JVM加上启动参数-XX:-RestrictContended 才会失效),也可以防止伪共享。
Disruptor 中经常使用 Sequence 类的 value 字段来示意消费/消费进展,可以看到在该字段前后各填充了7个 long 类型的变量,来防止伪共享。另外,向 RingBuffer 外部的数组、
SingleProducerSequencer 等也经常使用了该技术。
class LhsPadding {protected long p1, p2, p3, p4, p5, p6, p7;}class Value extends LhsPadding {protected volatile long value;}class RhsPadding extends Value {protected long p9, p10, p11, p12, p13, p14, p15;}
消费者消费数据时,须要入队。消费者消费数据时,须要出队。入队时,不能笼罩没有消费的元素。出队时,不能读取没有写入的元素。因此,Disruptor 中须要保养一个入队索引(消费者数据消费到哪里,对应 AbstractSequencer 中的 cursor)和一个出队索引(一切消费者中消费进展最小的序号)。
Disruptor 中最复杂的是入队操作,上方以多消费者(MultiProducerSequencer)的 next(n) 方法(放开n个序号)为例剖析下Disruptor 是如何成功无锁操作的。代码如下所示,判别下能否有足够的序号(空余位置),假设没有,就让出 CPU经常使用权,而后从新判别。假设有,则经常使用 CAS 设置 cursor(入队索引)。
public long next(int n) {do {// cursor相似于入队索引, 指的是上次消费到这里current = cursor.get();// 目的是再消费n个next = current + n;// 前文剖析过, 用于判别消费者能否曾经追上消费进展, 消费者能否放开到n个序号long wrapPoint = next - bufferSize;// 失掉缓存的上一次性的消费进展long cachedGatingSequence = gatingSequenceCache.get();// 第一步:空间无余就继续期待if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {// 从新计算下一切消费者里的最小消费进展long gatingSequence = Util.getMinimumSequence(gatingSequences, current);// 依然没有足够的空间, 让出CPU经常使用权if (wrapPoint > gatingSequence) {LockSupport.parkNanos(1);continue;}// 降级下最新的最小的消费进展gatingSequenceCache.set(gatingSequence);}// 第二步:看见空间足够时尝试CAS竞争空间else if (cursor.compareAndSet(current, next)) {break;}} while (true);return next;}
这个比拟好了解,在前文剖析消费数据的逻辑时引见了,消费者会失掉下最大可用的序号,而后批量消费这些信息。
很多开源名目都经常使用了 Disruptor,比如日志框架 Log4j2 经常使用它来成功异步日志。HBase、Storm 等名目中也经常使用了到了 Disruptor。vivo 的 i主题业务也经常使用了 Disruptor,上方繁难引见下它的2个经常使用场景。
业务监控系统关于企业来说十分关键,可以协助企业及时发现和处置疑问,可以繁难的检测业务目的数据,改良业务决策,从而保障业务的可继续开展。i主题经常使用 Disruptor(多消费者单消费者)来暂存待上报的业务目的数据,而后有定时义务不时提取数据上报到监控平台,如下图所示。
i主题业务中少量经常使用了本地缓存,为了统计本地缓存中key 的个数(去重)以及每种缓存形式 key 的数量,思考经常使用 Disruptor来暂存并消费处置数据。由于业务代码里很多中央触及到本地缓存的访问,也就是说,消费者是多线程的。思考到消费处置比拟繁难,而假设经常使用多线程消费的话又触及到加锁同步,因此消费者驳回复线程形式。
全体流程如下图所示,首先在缓存访问工具类中参与缓存访问统计上报的调用,缓存访问数据进入到 RingBuffer 后,复线程消费者经常使用 HyperLogLog 来去重统计不同key的个数,经常使用正则婚配来统计每种形式key的数量。而后有异步义务定时失掉统计结果,启动展现。
须要留意的是,由于 RingBuffer 队列大小是固定的,假设消费者消费过快而消费者消费不上来,假设经常使用 next方法放开序号,假设残余空间不够会造成消费者阻塞,因此倡导经常使用 tryPublishEvent 方法去颁布数据,它外部是经常使用 tryNext方法放开序号,该方法假设放开不到可用序号会抛出意外,这样消费者感知到了就可以做兼容处置,而不是阻塞期待。
本文首先经过对比 JDK 中内置的线程安保的队列和Disruptor 的特点,引入了高性能无锁内存队列 Disruptor。接着引见了 Disruptor的外围概念和基本经常使用,使读者对 Disruptor 建设后来步的意识。接着从源码和原理角度引见了 Disruptor的外围成功以及高性能原理(空间预调配、防止伪共享、无锁、支持批量消费)。其次,联合i主题业务引见了 Disruptor在通常中的运行。最后,基于上述原理剖析及运行实战,总结了一些 Disruptor 最佳通常战略。
参考文章:
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载联系作者并注明出处:https://clwxseo.com/wangluoyouhua/8814.html