# Disruptor

无锁的缓存框架: http://lmax-exchange.github.io/disruptor/

1.生产者-消费者模式

1.1 介绍

生产者-消费者模式是一个经典的多线程设计模式, 它为线程间的协作提供了良好的解决方案.

  • 生产者线程: 负责提交用户请求
  • 消费者线程: 负责处理具体生产者提交的任务
  • 生产者和消费者之间通过共享内存缓存区进行通信

注意:生产者-消费者模式中的内存缓存区的主要功能是数据在多线程间的共享,此外,t通过该缓冲区, 可以缓解生产者和消费者间的性能差.

1.2 生产者-消费者模式主要角色

角色 作用
生产者 用户提交用户请求, 提取用户任务, 并装入内存缓冲区
消费者 在内存缓冲区中提取并处理任务
内存缓存区 缓存生产者提交的任务或数据, 供消费者使用
任务 生产者向内存缓冲区提交的数据结构
Main 使用生产者和消费者的客户端

生产者-消费者实现类图

NOTE: BlockingQueue用于消费者和生产者是一个不错的选择.它可以很自然地实现作为生产者和消费者的内存缓冲区.但是BlockingQueue并不是一个高性能的实现,它完全使用锁和阻塞等待来实现线程间的同步.在高并发场合,它的性能并不是特别的优越.相反, ConcurrentLinkQueue(秘诀在于大量使用了无锁的CAS操作)是一个高性能的队列, BlockingQueue只是为了方便数据共享.

2. disruptor

2.1 简介

disruptor

  • LMAX的高性能低延迟异步处理框架
  • 基于无锁循环队列,采用数组实现
  • 能够在无锁的情况下实现并发操作
  • 号称单线程里每秒处理 6百万订单

2.2 disruptor的数据结构

RingBuffer(环形缓冲区)

2.3 Disruptor核心组件

  • Disruptor: 协调类,入口类,生成生产者、消费者以及他们之间的协调关系
  • RingBuffer: 负责对数据(事件)进行存储和更新,大小必须为2的n次方
  • Sequence: 用于跟踪标识某个特定的事件处理者的处理进度
  • Sequencer: Disruptor 的真正核心。有两个实现类 SingleProducerSequencer、MultiProducerSequencer
  • Sequence Barrier: 序号栅栏,管理和协调生产者的游标序号和各个消费者的序号
  • Event: 通过 Disruptor 进行交换的数据类型
  • EventFactory: 生成Event的工厂,通过该工厂方法填充RingBuffer
  • EventProcessor: 一个Runnable, 其run方法是消费者的消费过程
  • EventHandler: 用于处理事件,是 Consumer 的真正实现
  • Wait Strategy: 定义 Consumer 如何进行等待下一个事件的策略

组件结构

2.4 disruptor高效的原因

1.无锁

队列 有界性 数据结构
ArrayBlockingQueue bounded` 加锁 arraylist
LinkedBlockingQueue optionally-bounded` 加锁 linkedlist
ConcurrentLinkedQueue unbounded` 无锁 linkedlist
LinkedTransferQueue unbounded` 无锁 linkedlist
PriorityBlockingQueue unbounded` 加锁 heap
DelayQueue unbounded` 加锁 heap
  • 单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能.
  • 在多线程情况下,CAS的性能是锁的8倍.

disruptor实现

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据

  • 单线程: 没有锁也没有CAS
  • 多线程: 多线程写存在CAS

相对于传统方式的优点

  • 没有竞争=没有锁=非常快
  • 所有访问者都记录自己的序号,允许多个生产者与多个消费者共享相同的数据结构

2.缓存行填充

伪共享

当写指针、读指针存储在同一CPU缓存行时,一个CPU核心对写指针的修改会使另一个核心的该缓存行失效,导致使用读指针被延迟.

缓存行特点

  • 缓存是由缓存行组成的,通常是64字节
  • 一个缓存行中可以存8个long类型的变量
  • 如加载一个long值,会额外加载附近7个值到缓存行
  • 加载的值有一个修改,缓存行整体失效

disruptor实现

缓存行填充: 使用多个long变量填充,从而确保一个序号独占一个缓存行

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

内部数组的构造: 实际产生的数组大小是缓冲区实际大小再加上两倍的BUFFER_PAD.这就相当于在这个数据的头部和尾部两端各增加了BUFFER_PAD个填充,使得整个数组被载入Cache时不会受到其它变量的影响而失效.

this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];

3.内存屏障

内存屏障: volatile

原理: 如果字段是volatile,Java内存模型将在写操作后插入一个写屏障指令,在读操作前插入一个读屏障指令

4.避免频繁GC

5.批量生产消费

Disruptor实现

  • 消费者一次消费尽量多的产品
  • 生产者多次生产一次发布

2.5 Disruptor消费的选择策略

2.5.1 BlockingWaitStrategy

默认的策略.使用BlockingWaitStrategy和使用BlockingQueue是非常类似的,它们都是用锁和条件(Condition)进行数据的监控和线程的唤醒.因为涉及到线程的切换,BlockingWaitStrategy策略是最节省CPU, 但是在高并发性能表现最糟糕的一种等待策略.

public final class BlockingWaitStrategy implements WaitStrategy{
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException{
        long availableSequence;
        if (cursorSequence.get() < sequence){
            lock.lock();
            try{
                while (cursorSequence.get() < sequence){
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }finally{
                lock.unlock();
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence){
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking(){
        lock.lock();
        try{
            processorNotifyCondition.signalAll();
        }finally{
            lock.unlock();
        }
    }

}

2.5.2 SleepingWaitStrategy

这个策略也是对CPU使用率非常保守的.它会在循环中不断等待数据.它会先进行自旋等待.如果不成功, 则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos()进行线程休眠,已确保不占用太多的CPU数据.因此,这个策略对于数据处理可能产生比较高的平均延迟.它比较适合于对延迟要求不是特别高的场合,好处是它对生产者线程的影响最小.典型的应用场景是异步日志.

public final class SleepingWaitStrategy implements WaitStrategy{
    private static final int DEFAULT_RETRIES = 200;
    private static final long DEFAULT_SLEEP = 100;

    private final int retries;
    private final long sleepTimeNs;

    public SleepingWaitStrategy(){ this(DEFAULT_RETRIES, DEFAULT_SLEEP);}
    public SleepingWaitStrategy(int retries){this(retries, DEFAULT_SLEEP);}
    public SleepingWaitStrategy(int retries, long sleepTimeNs){this.retries = retries;this.sleepTimeNs = sleepTimeNs;}

    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException{
        long availableSequence;
        int counter = retries;

        while ((availableSequence = dependentSequence.get()) < sequence){
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
    }

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException{
        barrier.checkAlert();

        if (counter > 100){
            --counter;
        }else if (counter > 0){
            --counter;
            Thread.yield();
        }else{
            LockSupport.parkNanos(sleepTimeNs);
        }

        return counter;
    }
}

2.5.3 YieldingWaitStrategy

这个策略用于低延迟的场合.消费者会不断循环监控缓冲区变化.在循环内部,它会使用Thread.yield()让出CPU给别的线程执行时间.如果需要一个高性能的系统,并且对延迟有较为严格的要求,则可以考虑这种策略.使用这个策略时,相当于消费者线程变声成为了一个内部执行了Thread.yield()的死循环.因此,最好有多于消费者线程数量的逻辑CPU数量(这里的逻辑CPU,指的是"双核四线程"中的四线程),否则,整个应用程序恐怕都会受到影响.

public final class YieldingWaitStrategy implements WaitStrategy {
    private static final int SPIN_TRIES = 100;

    @Override
    public long waitFor(
            final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
            throws AlertException, InterruptedException {
        long availableSequence;
        int counter = SPIN_TRIES;

        while ((availableSequence = dependentSequence.get()) < sequence) {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
    }

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
            throws AlertException {
        barrier.checkAlert();

        if (0 == counter) {
            Thread.yield();
        } else {
            --counter;
        }

        return counter;
    }
}

2.5.4 BusySpinWaitStrategy

这个是最疯狂的等待策略了,它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区的变化.因此,它会吃掉所有的CPU资源.只有在对延迟非常苛刻的场合可以考虑使用它(或者说,你的系统真的非常繁忙).因为在这里你等同开启了一个死循环监控,所有,你的物理CPU数量必需要大于消费者线程数.

public final class BusySpinWaitStrategy implements WaitStrategy {
    @Override
    public long waitFor(
            final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
            throws AlertException, InterruptedException {
        long availableSequence;

        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
    }
}

results matching ""

    No results matching ""