无锁

锁处理策略 乐观锁(Optimistic Lock) 其实是一种假设成功(Assuming Success)的设计思想,当一个线程/进程在尝试操作共享数据时,在写入的同时增加条件判断,该条件用于检查其编辑时拿到的原始数据和当前数据是否相同,如果相同,则执行写入操作,否则,将失败,外围的程序必须处理这种可能的失败,并根据应用的不同选择自动的重试或提示用户失败.所谓"乐观",是指这样的设计是比不发生"碰撞"为假设的,在真实的场景中(尤其是现代计算机的计算速度已经很快的情况下),"碰撞"发生的概率确实不高,那么,在绝大多数场景下,没有额外的锁定动作(也即没有额外的性能损耗)的情况下,也可以安全的更新数据.

CAS属于乐观锁

悲观锁(Pessimistic Lock) 一个线程/进程在操作共享数据时,完全不允许线程/进程的访问(当然,有可能只是不允许写),相当于一种"临界区",在临界区之外,系统可以并行处理,而到达"临界区"后,则必须串行的进行处理,在MySQL的InnoDB中,最常见的悲观锁就是显示的排它锁(exclusive lock):

mysql> select * from TABLEA where ID = :id for update

而所谓的"悲观",是指这样的设计是以发生"碰撞"为假设的,即使在甚至的执行场景中很少发生"碰撞",在出入"临界区"时,也必须有显示的加锁和解锁的过程

1. 并发策略CAS

CAS(Compare And Swap): 比较交换,一旦检测到线程冲突产生,就重试当前操作直到没有冲突为止.

CAS操作包含3个参数CAS(V,E,N).

  • V表示要更新的变量
  • E表示预期值
  • N表示新增

仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其它线程做了更新,则当前线程什么都不做.最后,CAS返回当前的V的真实值. CAS总是抱着乐观的态度进行,只有一个会胜出,并更新成功,其余均会失败.失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作. CAS使用的是底层的CPU指令CMPXCHG,从而保证对内存区域的原子操作juc包下的组建大多都用到了CAS,原生的Java代码没有CAS的实现,不过com.msic.Unsafe类通过本地方法实现了对CAS的使用

CAS缺点 1.ABA问题. 因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了.ABA问题的解决思路就是使用版本号.在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A.

从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题.这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值.

2.循环时间长开销大. 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销.如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零.第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率.

3.只能保证一个共享变量的原子操作. 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作.比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij.从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作.

2.无锁的线程安全整数: AutomicInteger

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    // value对象在AtomicInteger对象中的偏移量 
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    // 表示AtomicInteger的当前实际取值
    private volatile int value;

    // 获取当前值
    public final int get() {
        return value;
    }

    // 设置当前值
    public final void set(int newValue) {
        value = newValue;
    }

    // 设置新值,并返回旧值
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this,valueOffset,newValue);
    }

    // 如果当前值为expect,则设置为update
    public final boolean compareAndSet(int expect,int update) {
        return unsafe.compareAndSwapInt(this,valueOffset,expect,update);
    }

    // 当前值+1,返回旧值
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this,valueOffset,1);
    }

    // 当前值-1,返回旧值
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this,valueOffset,-1);
    }

    // 当前值+delta,返回旧值
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this,valueOffset,delta);
    }

    // 当前值+1,返回新值
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this,valueOffset,1) + 1;
    }

    // 当前值-1,返回新值
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this,valueOffset,-1) - 1;
    }

    // 当前值+delta,返回新值
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this,valueOffset,delta) + delta;
    }

}

Java中和AtomicInteger类似的: AtomicLong,AtomicBoolean,AtomicReference

2.3 Java中的指针: Unsafe类

Java最初被设计为一种安全的受控环境.尽管如此,HotSpot还是包含了一个后门sun.misc.Unsafe,提供了一些可以直接操控内存和线程的底层操作. Unsafe被JDK广泛应用于java.nio和并发包等实现中,这个不安全的类提供了一个观察HotSpot JVM内部结构并且可以对其进行修改,但是不建议在生产环境中使用.

Unsafe类是如此地不安全,以至于JDK开发者增加了很多特殊限制来访问它:

  • 私有的构造器
  • 工厂方法getUnsafe()的调用器只能被Bootloader加载,否则抛出SecurityException 异常

不过,我们可以通过反射来获取Unsafe的一个实例:

public static Unsafe getUnsafe() {
   try {
           Field f = Unsafe.class.getDeclaredField("theUnsafe");
           f.setAccessible(true);
           return (Unsafe)f.get(null);
   } catch (Exception e) {
       /* ... */
   }
}

如果你直接调用Unsafe.getUnsafe(),你可能会得到SecurityException异常.只能从受信任的代码中使用这个方法

public static Unsafe getUnsafe() {
    Class cc = sun.reflect.Reflection.getCallerClass(2);
    if (cc.getClassLoader() != null)
        throw new SecurityException("Unsafe");
    return theUnsafe;
}

注意: 根据Java类加载器的工作原理,应用程序的类由App Loader加载. 而系统核心类,如 rt.jar中的类由Bootstrap类加载器加载. Bootstrap加载器没有Java对象的对象,因此试图获得这个类加载器会返回null. 所以,当一个类的类加载器为null时,说明它是由Bootstrap加载的,而这个类也基友可能是rt.jar中的类.

2.4 无锁的对象引用: AtomicReference

AtomicInteger: 对整数的封装 AtomicReference: 对普通对象的引用,它可以保证你在修改对象引用时的线程安全性.

2.5 带有时间戳的对象引用: AtomicStampedReference

AtomicReference无法解决对象在修改过程中,丢失状态的问题.对象值本身与状态被画上了等号,无法解决北京被反复修改导致线程无法正确判断对象状态的问题.

AtomicStampedReference内部不仅维护了对象值,还维护了一个时间戳(状态值).当它对应的数值被修改时,除了更新数据本身外,还必需要更新时间戳.只有对象值以及时间戳都必须满足期望值,写入才会成功.只要时间戳发生变化,就能防止不恰当的写入.

比AtomicReference内部新增的有关时间戳的信息


    /**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * @param expectedReference the expected value of the reference
     * @param newReference the new value for the reference
     * @param expectedStamp the expected value of the stamp
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current,Pair.of(newReference,newStamp)));
    }

    // 获得当前对象的引用
    public V getReference() {
        return pair.reference;
    }

    // 获得当前时间戳
    public int getStamp() {
        return pair.stamp;
    }

    // 设置当前对象引用和时间戳
    public void set(V newReference,int newStamp) {
        Pair<V> current = pair;
        if (newReference != current.reference || newStamp != current.stamp)
            this.pair = Pair.of(newReference,newStamp);
    }

4.6 数组也能无锁: AtomicIntegerArray

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray 本质是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性.

4.7 普通变量的原子操作: AtomicIntegerFieldUpdater

在不改动(或者极少改动)原有代码的基础上,让普通变量也享受CAS操作带来的线程安全性.

4.8 无锁的Vector

...

4.9 SynchronousQueue

SynchronousQueue的定义如下:

public class SynchronousQueue<E> extends AbstractQueue<E>
                                                 implements BlockingQueue<E>,Serializable

它模拟的功能类似于生活中一手交钱一手交货这种情形,像那种货到付款或者先付款后发货模型不适合使用SynchronousQueue. 首先要知道SynchronousQueue没有容纳元素的能力,即它的isEmpty()方法总是返回true,但是给人的感觉却像是只能容纳一个元素.其实,队列头元素是第一个排队要插入数据的线程,而不是要交换的数据.数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中.可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开.

synchronousQueue

  • 顾名思义 同步队列,每一个线程的插入动作必须依赖另外一个线程的移除操作.
  • 本身支持公平策略与非公平策略,所以底层存在队列(实现公平策略),栈(实现非公平策略).
  • 暴露出来是没有容量的,也不存放元素.但是会利用上一条说的队列或者栈来暂存元素,进行匹配.
  • 提供的iterator()返回空; peek()返回null; isEmpty()返回true; remove()false; remainCapacity()返回0.这些方法对于synchronousQueue来说返回的都是固定的.
  • put() take() 如果执行失败会自旋等待,直到空旋次数耗尽.被park; offer() poll() 方法在执行的时候,如果刚好匹配到,则执行,否则直接返回,不会进入自旋. poll可以带参数,表明等待多长时间.
  • put() take() offer() poll() 这四个方法底层都是调用了SynchronousQueue中 内部抽象类Transferertransfer方法实现,这个类有两个子类,分别为TransferQueue和TransferStack,也就是上面说的队列与栈,公平策略与非公平策略.
  • transfer(Object e,boolean timed,long nanos) 第一个参数如果为null 说明这是一个消费者,也就是take poll ;如果非null 则是一个生产者,希望将元素入队列 . 第二个参数为是否等待,如果传false,则会进行自旋等待 . 如果传true,则将立刻结束
static final class TransferQueue<E> extends Transferer<E> {
     E transfer(E e, boolean timed, long nanos) {
        /* Basic algorithm is to loop trying to take either of
         * two actions:
         *
         * 1. If queue apparently empty or holding same-mode nodes,
         *    try to add node to queue of waiters, wait to be
         *    fulfilled (or cancelled) and return matching item.
         *
         * 2. If queue apparently contains waiting items, and this
         *    call is of complementary mode, try to fulfill by CAS'ing
         *    item field of waiting node and dequeuing it, and then
         *    returning matching item.
         *
         * In each case, along the way, check for and try to help
         * advance head and tail on behalf of other stalled/slow
         * threads.
         *
         * The loop starts off with a null check guarding against
         * seeing uninitialized head or tail values. This never
         * happens in current SynchronousQueue, but could if
         * callers held non-volatile/final ref to the
         * transferer. The check is here anyway because it places
         * null checks at top of loop, which is usually faster
         * than having them implicitly interspersed.
         */

        QNode s = null; // constructed/reused as needed
        boolean isData = (e != null);

        for (;;) {
            QNode t = tail;
            QNode h = head;
            if (t == null || h == null)         // saw uninitialized value
                continue;                       // spin

            if (h == t || t.isData == isData) { // empty or same-mode
                QNode tn = t.next;
                if (t != tail)                  // inconsistent read
                    continue;
                if (tn != null) {               // lagging tail
                    advanceTail(t, tn);
                    continue;
                }
                if (timed && nanos <= 0)        // can't wait
                    return null;
                if (s == null)
                    s = new QNode(e, isData);
                if (!t.casNext(null, s))        // failed to link in
                    continue;

                advanceTail(t, s);              // swing tail and wait
                Object x = awaitFulfill(s, e, timed, nanos);
                if (x == s) {                   // wait was cancelled
                    clean(t, s);
                    return null;
                }

                if (!s.isOffList()) {           // not already unlinked
                    advanceHead(t, s);          // unlink if head
                    if (x != null)              // and forget fields
                        s.item = s;
                    s.waiter = null;
                }
                return (x != null) ? (E)x : e;

            } else {                            // complementary-mode
                QNode m = h.next;               // node to fulfill
                if (t != tail || m == null || h != head)
                    continue;                   // inconsistent read

                Object x = m.item;
                if (isData == (x != null) ||    // m already fulfilled
                    x == m ||                   // m cancelled
                    !m.casItem(x, e)) {         // lost CAS
                    advanceHead(h, m);          // dequeue and retry
                    continue;
                }

                advanceHead(h, m);              // successfully fulfilled
                LockSupport.unpark(m.waiter);
                return (x != null) ? (E)x : e;
            }
        }
    }
}

static final class TransferStack<E> extends Transferer<E> {
    /**
     * Puts or takes an item.
     */
    @SuppressWarnings("unchecked")
    E transfer(E e, boolean timed, long nanos) {
        /*
         * Basic algorithm is to loop trying one of three actions:
         *
         * 1. If apparently empty or already containing nodes of same
         *    mode, try to push node on stack and wait for a match,
         *    returning it, or null if cancelled.
         *    如果等到队列为空,或者队列中节点的类型和本次操作是一致的, 那么将当期操作压入队列等待.
         *    比如:  等待队列中是读线程等待,本次操作也是读,因此这两个读都需要等待,进入等待队列的线程可能会被挂起, 它们会等待一个"匹配"操作
         *
         * 2. If apparently containing node of complementary mode,
         *    try to push a fulfilling node on to stack, match
         *    with corresponding waiting node, pop both from
         *    stack, and return matched item. The matching or
         *    unlinking might not actually be necessary because of
         *    other threads performing action 3:
         *    如果等待队列中的元素和本次操作是互补的(等待出走是读,本次操作是写), 那就就插入一个"完成状态的节点",并且让他"匹配"到一个等待节点上.
         *      接着弹出这两个节点,并且使得对应的两个线程继续执行
         *
         * 3. If top of stack already holds another fulfilling node,
         *    help it out by doing its match and/or pop
         *    operations, and then continue. The code for helping
         *    is essentially the same as for fulfilling, except
         *    that it doesn't return the item.
         *       如果线程发下等待队列的节点是"完成"节点, 那么帮助这个节点完成任务. 其流程和步骤2是一致的.
         */

        SNode s = null; // constructed/reused as needed
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            SNode h = head;
            if (h == null || h.mode == mode) {  // empty or same-mode     (步骤1)队列为空,或者相同模式
                if (timed && nanos <= 0) {      // can't wait            不进行等待
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);     // pop cancelled node     处理取消行为
                    else
                        return null;
                } else if (casHead(h, s = snode(s, e, h, mode))) {
                    SNode m = awaitFulfill(s, timed, nanos);
                    if (m == s) {               // wait was cancelled    等待被取消
                        clean(s);
                        return null;
                    }
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);     // help s's fulfiller    帮助s的fulfiller
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            } else if (!isFulfilling(h.mode)) { // try to fulfill            (步骤2)是否处于fulfill状态
                if (h.isCancelled())            // already cancelled        如果以前取消了
                    casHead(h, h.next);         // pop and retry            弹出并重试
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    for (;;) { // loop until matched or waiters disappear    一致循环直到匹配(match)或者没有等待着
                        SNode m = s.next;       // m is s's match            m是s的匹配者(match)
                        if (m == null) {        // all waiters are gone        已经没有等待者了
                            casHead(s, null);   // pop fulfill node            弹出fulfill节点
                            s = null;           // use new node next time    下一次使用新的节点
                            break;              // restart main loop        重新开始主循环
                        }
                        SNode mn = m.next;
                        if (m.tryMatch(s)) {
                            casHead(s, mn);     // pop both s and m            弹出s和m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else                  // lost match                match失败
                            s.casNext(m, mn);   // help unlink                帮助删除节点
                    }
                }
            } else {                            // help a fulfiller        (步骤3) 帮助一个fulfiller
                SNode m = h.next;               // m is h's match        m是h的match
                if (m == null)                  // waiter is gone        没有等待者
                    casHead(h, null);           // pop fulfilling node    弹出fulfill节点
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match            尝试match
                        casHead(h, mn);         // pop both h and m        弹出h和m
                    else                        // lost match            match失败
                        h.casNext(m, mn);       // help unlink            帮助删除节点
                }
            }
        }
    }
}

SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系. 更重要的是,它们彼此之间还会互相帮助, 在一个线程内部, 可能会帮助其他线程完成它们的工作.这种模式可以更大程度上减少饥饿的可能,提高系统整体的并行度.

results matching ""

    No results matching ""