一. JDK中常见的锁

1. 重入锁 ReentrantLock

synchronized相比, 重入锁有着显示的操作过程,开发人员必须手动指定何时加锁,适合释放锁.

大多数情况下,锁的申请都是非公平的,如果使用synchronized关键字机型锁控制, 那么产生的锁就是非公平的.

 // ReentrantLock构造函数, fair为true时,表示锁是公平的

public ReentrantLock(boolean fair)

ReentrantLock的几个重要方法

  • lock(): 获得锁, 如果锁已经被占用, 则等待
  • lockInterruptibly(): 获得锁,但优先响应中断
  • tryLock(): 尝试获得锁, 如果成功, 返回true, 失败返回false. 该方法不等待, 立即返回
  • tryLock(long time,TimeUtil unit): 在给定时间内尝试获得锁
  • `unlock(): 释放锁

2. Condition

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

  • await():阻塞当前线程直到被唤醒或被中断,与当前Condition相关联的对象会被自动释放,线程阻塞直到:其他线程执行当前对象的signal()方法并且当前线程被选为被唤醒的对象;其他线程执行当前对象的signalAll()对象;当前线程被中断;当前线程被虚假唤醒(并不懂什么叫虚假唤醒);无论是哪种唤醒方法,该方法返回前必须要重新获取锁
  • awaitUninterruptibly():阻塞当前线程直到被唤醒, 不会再等待过程中响应中断
  • awaitNanos(long nanosTimeout):阻塞当前线程直到被唤醒或被中断或指定的时间超出
  • await(long time,TimeUnit unit):同上
  • await(Date date):同上
  • signal():唤醒一个等待线程
  • signalAll():唤醒所有其他等待线程

3. 信号量 Semaphore

信号量通过构造函数设定一个数量的许可(permit),然后通过acquire获取许可,通过release释放许可,当acquire获取到许可之后当前的线程就可以执行,否则当前线程阻塞。Semaphore通常用于限制可以访问资源的数量,例如数据库连接池,线程池。单个信号量的semaphore可以用来实现互斥锁

Semaphore常用方法

  • public void acquire() throws InterruptedException { } //获取一个许可,若无法获得,则线程会一直等待,直到有线程释放释放许可或者当前线程被中断
  • public void acquireUninterruptibly() // 和acquire方法类似, 但是不会中断响应
  • public void acquire(int permits) throws InterruptedException { } //获取permits个许可
  • public void release() { } //释放一个许可
  • public void release(int permits) { } //释放permits个许可

4. 读写锁 ReadWriteLock

读写锁的访问约束情况

非阻塞 阻塞
阻塞 阻塞
  • 读-读不互斥: 读读之间也不阻塞
  • 读-写互斥: 读阻塞写, 写也会阻塞读
  • 写-写互斥:写写阻塞

5. 倒计时器 CountDownLatch

允许一个或者多个线程等待某个事件的完成,当初始化一个CountDownLatch的时候需要指定同步计数器的个数(等待线程的个数),这个时候主线程的wait方法会一直阻塞,子线程每调用一次countDown方法,同步计数器就会减1,当这个值减少到0时,主线程的wait方法处就会解除阻塞。

6. 循环栅栏 CyclicBarrier

栅栏的主要作用是等待其他线程的完成,他和CountDownLatch的主要区别是: CountDownLatch主要是等待事件的完成,每个子线程只能countDown()一次,当子线程调用一次countDown方法,就表示当前线程的事件完成。(子线程之间不是相互等待,子线程完成之后在主线程的wait处等待) CyclicBarrier主要是等待其他的线程完成(所有的子线程之间会相互等待,主线程的流程不会受到子线程的影响)。他的主要方法是一个await(),这个方法每被调用一次计数器便会减1,并阻塞住当前的线程,当计数减到0的时候,阻塞会解除,在这个栅栏上面等待的所有线程都会继续运行.然后进行一次新的循环(这时如果是CountDownLatch会回到主线程运行,而不是唤醒所有在栅栏上面等待的子线程继续运行)

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped 参与的线程总数
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

7. 线程阻塞工具类 LockSupport

LockSupport是一个非常方便使用的线程阻塞工具, 它可以在线程内任意位置让线程阻塞. 和Thread.suspend()相比, 它拟补了由于resume()在前发生,导致线程无法继续执行的情况.和Object.wait()相比,它不需要先获得某个对象的锁, 也不会抛出InterruptedException异常

8. StampedLock

它是java8在java.util.concurrent.locks新增的一个API.

ReentrantReadWriteLock在沒有任何读写锁时,才可以取得写入锁,这可用于实现了悲观读(PessimisticReading),即如果执行中进行读取时,经常可能有另一执行要写入的需求,为了保持同步,ReentrantReadWriteLock的读取锁定就可派上用场.然而,如果读取执行情况很多,写入很少的情况下,使用ReentrantReadWriteLock可能会使写入线程遭遇饥饿(Starvation)问题,也就是写入线程迟迟无法竞争到锁定而一直处于等待状态.

StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问.在读锁上分为悲观锁和乐观锁.

所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此乐观地使用完全的读锁定,程序可以查看读取资料之后,是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常),这一个小小改进,可大幅度提高程序的吞吐量!

实现思想 StampedLock的内部实现是基于CLH锁的. CLH锁是一种自旋锁, 它保证没有饥饿发生, 并且可以保证 FIFO(First-In-First-Out)的服务顺序.

CLH锁的基本思想: 锁维护一个等待线程队列, 所有申请锁,但是没有成功的线程都记录在这个队列中. 每一个节点(一个节点代表一个线程), 保存一个标记位(Locked), 用于判断当前线程是否已经释放锁.当一个线程试图获取锁时, 取得当前等待队列的尾部节点作为其前序节点, 并判断前序节点是否已经成功释放锁.

while(pred.locked){
}

只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行, 因此会自旋等待. 反之, 如果前序线程已经释放锁, 则当前线程可以继续执行.

释放锁时,也遵循这个逻辑, 线程会将自身节点 的locked位置标记为false, 那么后续等待的线程就能继续执行了.

public class StampedLock implements java.io.Serializable {
    /** Wait nodes */
    static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // list of linked readers    读节点链表
        volatile Thread thread;   // non-null while possibly parked    当可能被暂停时非空
        volatile int status;      // 0, WAITING, or CANCELLED 重要字段,表示当前锁的状态
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }

    /** CLH队列头部 */
    private transient volatile WNode whead;
    /** CLH队列尾部 */
    private transient volatile WNode wtail;

}

9. LongAdder

是一个高性能的线程安全的long类型.

Doug Lea对该类的说明是: 低并发时LongAdder和AtomicLong性能差不多,高并发时LongAdder更高效!

AtomicLong基本实现机制:内部有个value 变量,当多线程并发自增,自减时,均通过CAS 指令从机器指令级别操作保证并发的原子性. 如果竞争激烈, 意味着CAS的失败几率更高, 重试次数更多, 越多线程重试, CAS失败几率又越高,变成恶性循环,AtomicLong效率降低.

当竞争激烈的时候, 如何进一步提高系统的性能? 一种基本方案就是使用热点分离, 将竞争的数据进行分解, 例如JDK中的ConcurrentHashMap, 将热点数据分离.

LongAdder将单一value分离成一个数组, 每个线程访问时,通过哈希等算法映射到其中一个数字进行基数, 二最终的计数结果, 则为这个数组的求和累加, 降低单个value的"热度",分段更新.

LongAdder源码解析

public class LongAdder extends Striped64 implements Serializable {
    /**
     * Adds the given value.
     * 在实际的操作中, LongAdder并不会一开始就动用数组进行处理, 而是将所有数据都先记录在一个称为base的变量中.
     * 在多线程条件下, 大家修改base都没有冲突,那么没有必要扩展为cell数据. 一旦base修改发生冲突, 就会初始化cell数组, 使用新的策略.
     *
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        // 如果对base的操作冲突, 则会进行下一行
        if ((as = cells) != null || !casBase(b = base, b + x)) {    // casBase操作保证了在低并发时,不会立即进入分支做分段更新操作
            boolean uncontended = true;        // 设置冲突标记为true
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

    /**
     * CASes the base field.
     */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    /**
     * Handles cases of updates involving initialization, resizing,
     * creating new Cells, and/or contention. See above for
     * explanation. This method suffers the usual non-modularity
     * problems of optimistic retry code, relying on rechecked sets of
     * reads.
     * 此方法建议在外部进行一次CAS操作(cell != null时尝试CAS更新base值,cells != null时,CAS更新hash值取模后对应的cell.value)
     *
     * 1. 扩容,将cells数组扩大,降低每个cell的并发量,同样,这也意味着cells数组的longAccumulate动作
     * 2. 给空的cells变量赋一个新的Cell数组。
     * 分支A是用CAS更新对应的cell.value,是个写操作,分支B是进行扩容
     *
     * @param x the value
     * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder).
     * 外部提供的二元算术操作,实例持有并且只能有一个,生命周期内保持不变,null代表LongAdder这种特殊但是最常用的情况,可以减少一次方法调用
     *
     * @param wasUncontended false if CAS failed before call 如果为false,表明调用者预先调用的一次CAS操作都失败了 
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        // True if last slot nonempty 如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向,感觉这个更好理解  
        boolean collide = false;                
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) { // hash取模映射得到的Cell单元还为null(为null表示还没有被使用)  
                    if (cellsBusy == 0) {       // Try to attach new Cell 如果没有线程正在执行扩容
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁  
                            boolean created = false;
                            try {               // Recheck under lock 在有锁的情况下再检测一遍之前的判断  
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) { // 考虑别的线程可能执行了扩容,这里重新赋值重新判断 
                                    rs[j] = r; // 对没有使用的Cell单元进行累积操作(第一次赋值相当于是累积上一个操作数,求和时再和base执行一次运算就得到实际的结果)
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0; // 清空自旋标识,释放锁
                            }
                            if (created)    // // 如果原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环  
                                break;
                            continue;           // Slot is now non-empty 不是自己进行第一次累积操作,重头再来  
                        }
                    }
                    collide = false; // 执行这一句是因为cells被加锁了,不能往下继续执行第一次的赋值操作(第一次累积),所以还不能考虑扩容  
                }
                else if (!wasUncontended)       // CAS already known to fail 前面一次CAS更新a.value(进行一次累积)的尝试已经失败了,说明已经发生了线程竞争  
                    wasUncontended = true;      // Continue after rehash 情况失败标识,后面去重新算一遍线程的hash值  
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x)))) // 尝试CAS更新a.value(进行一次累积) ------ 标记为分支A 
                    break; // 成功了就完成了累积任务,退出循环 
                else if (n >= NCPU || cells != as) // // cell数组已经是最大的了,或者中途发生了扩容操作。因为NCPU不一定是2^n,所以这里用 >=  
                    collide = false;            // At max size or stale 长度n是递增的,执行到了这个分支,说明n >= NCPU会永远为true,下面两个else if就永远不会被执行了,也就永远不会再进行扩容
                    // CPU能够并行的CAS操作的最大数量是它的核心数(CAS在x86中对应的指令是cmpxchg,多核需要通过锁缓存来保证整体原子性),当n >= NCPU时,再出现几个线程映射到同一个Cell导致CAS竞争的情况,那就真不关扩容的事了,完全是hash值的锅了  
                else if (!collide)         // 映射到的Cell单元不是null,并且尝试对它进行累积时,CAS竞争失败了,这时候把扩容意向设置为true, 下一次循环如果还是跟这一次一样,说明竞争很严重,那么就真正扩容  
                    collide = true;     // 把扩容意向设置为true,只有这里才会给collide赋值为true,也只有执行了这一句,才可能执行后面一个else if进行扩容  
                else if (cellsBusy == 0 && casCellsBusy()) {     // 最后再考虑扩容,能到这一步说明竞争很激烈,尝试加锁进行扩容 ------ 标记为分支B  
                    try {
                        if (cells == as) {      // Expand table unless stale 检查下是否被别的线程扩容了(CAS更新锁标识,处理不了ABA问题,这里再检查一遍)  
                            Cell[] rs = new Cell[n << 1]; // 执行2倍扩容  
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;     // 释放锁  
                    }
                    collide = false;    // 扩容意向为false  
                    continue;                   // Retry with expanded table 扩容后重头再来
                }
                h = advanceProbe(h);     // 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况  
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //cells没有被加锁,并且它没有被初始化,那么就尝试对它进行加锁,加锁成功进入这个else if  
                boolean init = false;
                try {                               // Initialize table
                    if (cells == as) {                 // CAS避免不了ABA问题,这里再检测一次,如果还是null,或者空数组,那么就执行初始化    
                        Cell[] rs = new Cell[2];    // 初始化时只创建两个单元 
                        rs[h & 1] = new Cell(x);    // 对其中一个单元进行累积操作,另一个不管,继续为null  
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;            // 清空自旋标识,释放锁 
                }
                if (init)                    // 如果某个原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环  
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))  // cells正在进行初始化时,尝试直接在base上进行累加操作  
                break;                          // Fall back on using base 直接在base上进行累积操作成功了,任务完成,可以退出循环了  
        }
    }
}

二. JDK内部的"锁"优化策略

在Java SE 1.6中,锁一共有4中状态,级别从低到高依次是:无锁偏向锁轻量级锁重量级锁.

2.1 偏向锁

偏向锁也是JDK 1.6中引入的一项锁优化,它的目的是消除数据在无竞争情况下的同步操作,进一步提高程序的运行性能.如果说轻量级锁是在无竞争的情况下使用CAS操作去消除同步使用的互斥量,那偏向锁就是在无竞争的情况下把整个同步操作都消除掉,连CAS操作都不做了.

偏向锁的核心思想是: 如果一个线程获得了锁,那么锁就进入偏向模式.当这个线程再次请求锁时,无须再做任何同步操作,这样就节省了大量有关锁申请的操作, 从而提高了线程性能.偏向锁的“偏”,就是偏心的偏,它的意思就是这个锁会偏向于第一个获取它的线程.

偏向锁的加锁过程 假如当前虚拟机启用了偏向锁,那么,当锁对象第一次被线程获取的时候,虚拟机会将对象头中的锁标志位设为“01”,即偏向锁模式.同时使用CAS操作把获取到这个锁的线程的ID记录在对象的MarkWord之中,如果CAS操作成功,持有偏向锁的线程以后每次进入这个锁相关的同步块时,只需要简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁,如果测试成功虚拟机可以不再进行任何同步操作.如果测试失败,说明有另外一个线程尝试获取这个锁,偏向锁模式宣告结束,执行偏向锁的撤销.

偏向锁的撤销 撤销过程:再次检测Mark Word的偏向锁标识位(不是锁标识位)是否设置为1,即当前对象是否支持偏向锁.如果没有,则升级为轻量级锁,如果有,则继续尝试使用CAS将对象头的偏向锁指向当前线程. 偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁.

关闭或启用偏向锁

  • 使用Java虚拟机参数-XX:+UseBiasedLocking可以开启偏向锁
  • 使用-XX:-UseBiasedLocking来禁用偏向锁.
  • 偏向锁在Java 6和java 7里是默认启用的,如果通过虚拟机参数关闭偏向锁,那么程序默认进入轻量级锁状态.

2.2 轻量级锁

如果偏向锁失败,虚拟机并不会立即挂起线程,它还会使用一种轻量级锁的优化手段. 轻量级锁: 它只是简单地将对象头部作为指针, 指针持有锁的线程堆栈的内部,来判断一个线程是否持有对象锁.如果线程获得轻量级锁成功,则可以顺利进入临界区.如果轻量级锁加锁失败,则表示其他线程资源抢先争夺到了锁,那么当前线程的锁请求就会膨胀为重量级锁.

轻量级锁的加锁过程 在代码进入同步块的时候,如果此同步对象没有被锁定,(锁标志位为“01”状态),虚拟机首先将在当前线程的栈桢中建立一个锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝.然后,虚拟机将使用CAS. CAS 操作尝试将对象的MarkWord更新为指向LockWord的指针,如果这个更新操作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位将转换为“00”,即表示此对象处于轻量级锁定状态.

如果这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈桢(栈桢中的Lock Word),如果是说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块执行了.否则说明这个锁对象已经被其他线程占用了.如果有两条或两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁,锁标志的状态值变为“10”,此时Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也就要进入阻塞状态.

轻量级锁的解锁过程 如果对象Mark Word仍然指向当前线程的锁记录(Lock Record),就用CAS操作把对象的Mark Word用当前线程的LockRecord(加锁之前Mark Word的拷贝)进行替换,如果替换成功,整个同步过程就完成了.如果替换失败,说明有其他线程尝试获取该锁,锁就会膨胀为重量级锁.

一旦锁升级为重量级锁,就不再恢复到轻量级锁状态.在重量级锁状态下,其他线程试图获取锁时,都会被阻塞,当持有锁的线程释放锁后会唤醒这些线程,被唤醒的线程就会进行新一轮的锁竞争.

轻量级锁能提升程序同步性能的依据是“对于绝大部分的锁,在整个同步周期内都是不存在竞争的”,这是一个经验数据.

  • 如果没有竞争,轻量级锁使用CAS操作避免了使用互斥量的开销,
  • 但如果存在锁竞争,除了互斥量的开销,还额外发生了CAS操作,因此在有竞争的情况下,轻量级锁会比传统的重量级锁更慢.

2.3 自旋锁

互斥同步对性能最大的影响是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给系统的并发性能带来了很大的压力.同时虚拟机团队也注意到在许多应用中,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得.如果物理机器上有两个以上的处理器,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程“稍等一下”,但不放弃处理器的执行时间,看看持有锁的线程是否能够很快释放锁. 自旋锁:系统会进行一次堵住,它会假设在不久的将来,线程可以得到这把锁.因此,虚拟机会让当前线程做几个空循环(这也是自旋的含义), 在经过若干次循环后,如果可以得到锁,那么就顺序进入临界区.如果还不能获得锁,才会真实地将线程在操作系统层面挂起.[自旋锁就是占用处理器的资源,但是不做处理,保持一种活跃状态]

自旋锁在JDK 1.6中默认开启.自旋等待不能代替阻塞,且先不说对处理器数量的要求,自旋等待本身虽然避免了线程切换的开销,但它是要占用处理器时间的,因此如果锁被占用的时间很短,自旋等待的效果就会非常好,反之,如果锁被占用的时间很长,那么自旋的线程会白白浪费处理器资源,反而带来性能上的浪费.因此,自旋等待的时间必须有一定的限度,如果自旋超过了限定的次数仍然没有成功获取锁,就应该使用传统的方式来挂起线程了.

在JDK 1.6中引入了自适应的自旋锁,自适应意味着自选的时间不再固定了,而是由上一次在同一个锁上自旋时间以及锁的拥有者的状态来决定.

2.4 锁消除

锁消除是指虚拟机即时编译器(JIT编译器)在运行时,通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁.通过锁消除,可以节省毫无意义的请求锁时间.

锁消除涉及的一项关键技术为逃逸分析.逃逸分析必须在-server模式下进行, 可以使用 -XX:+DoEscapeAnalysis参数打开逃逸分析, 使用 -XX:+EliminateLocks参数可以打开锁消除.

三. 有助于提高"锁"性能的几点建议

3.1 减小锁持有的时间

在JDK中,处理表达式的Pattern类,只有在表达式未编译的时候进行局部加锁,这种方法大大提高了matcher的执行效率和可靠性

public Matcher matcher(CharSequence input) {
    if (!compiled) {
        synchronized(this) {
            if (!compiled)
                compile();
        }
    }
    Matcher m = new Matcher(this, input);
    return m;
}

NOTE: 减少锁的持有时间有助于降低锁冲突的可能性,进而提升系统的并发能力

3.2 减小锁的力度

concurrentHashMap的实现,他的内部被分为了若干个晓得hashmap,称之为段(SEGMENT),默认是16段

减小锁粒度会引入一个新的问题,当需要获取全局锁的时候,其消耗的资源会较多,不如concurrenthashMap的size()方法.可以看到计算size的时候需要计算全部有效的段的锁


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

    /**
     * {@inheritDoc}
     */
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

    /**
     * 计算size的时候会先使用无锁的方式计算,如果失败会采用这个方法
     * 高并发的场合concurrenthashmap的size依然要差于同步的hashmap
     * 因此在类似于size获取全局信息方法调用不频繁的情况下,这种减小粒度的的方法才是真正意义上的提高系统并发量
     */
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
}

NOTE: 所谓减小锁粒度,就是指缩小锁定对象的范围,从而减小锁冲突的可能性,进而提高系统性能

3.3 读写分离来替换独占锁

在读多写少的情况下,使用读写锁可以有效的提高系统性能 ReadWriteLock 可以提高系统性能

3.4 锁分离

JDK中 LinkedBlockingQueue的实现,take函数和put函数分别实现了冲队列取和往队列加数据,虽然两个方法都对队列进项了修改,但是LinkedBlockingQueue是基于链表的所以一个操作的是头,一个是队列尾端,从理论情况下将并不冲突.如果使用独占锁则take和put就不能完成真正的并发,所以jdk并没有才用这种方式取而代之的是两把不同的锁分离了put和take的操作.


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();     //不能有两个线程同时取数据
        try {
            while (count.get() == 0) {     //如果当前没有可用数据,一直等待
                notEmpty.await();          //等待,put操作的通知
            }
            x = dequeue();                 //取得第一个数据
            c = count.getAndDecrement();//数量减一,原子操作因为回合put同时访问count.注意变量c是count减一
            if (c > 1)
                notEmpty.signal();      //通知其他take操作
        } finally {
            takeLock.unlock();     //释放锁
        }
        if (c == capacity)
            signalNotFull();     //通知put,已有空余空间
        return x;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();        //不能有两个线程同时进行put
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) { //如果队列已满
                notFull.await();   //等待
            }
            enqueue(node);  //插入数据
            c = count.getAndIncrement(); //更新总数,变量c是count加1前的值
            if (c + 1 < capacity)
                notFull.signal();   //有足够的空间,通知其他线程
        } finally {
            putLock.unlock();  //释放锁
        }
        if (c == 0)
            signalNotEmpty();  //插入成功后,通知take操作
    }

3.5 锁粗化

所粗化: 虚拟机在遇到一连串地对同一锁不断进行请求和释放的操作时,便会把所有的锁操作整合成对锁的一次请求,从而减小对锁的请求同步次数.

NOTE: 性能优化就是根据运行时的真实情况对各个资源点进行权衡折中的过程,锁粗话的思想和减少锁持有时间是相反的,但是在不同的场合,他们的效果并不相同,所以大家要根据实际情况,进行权衡

results matching ""

    No results matching ""