无锁
锁处理策略 乐观锁(Optimistic Lock) 其实是一种假设成功(Assuming Success)的设计思想,当一个线程/进程在尝试操作共享数据时,在写入的同时增加条件判断,该条件用于检查其编辑时拿到的原始数据和当前数据是否相同,如果相同,则执行写入操作,否则,将失败,外围的程序必须处理这种可能的失败,并根据应用的不同选择自动的重试或提示用户失败.所谓"乐观",是指这样的设计是比不发生"碰撞"为假设的,在真实的场景中(尤其是现代计算机的计算速度已经很快的情况下),"碰撞"发生的概率确实不高,那么,在绝大多数场景下,没有额外的锁定动作(也即没有额外的性能损耗)的情况下,也可以安全的更新数据.
CAS属于乐观锁
悲观锁(Pessimistic Lock) 一个线程/进程在操作共享数据时,完全不允许线程/进程的访问(当然,有可能只是不允许写),相当于一种"临界区",在临界区之外,系统可以并行处理,而到达"临界区"后,则必须串行的进行处理,在MySQL的InnoDB中,最常见的悲观锁就是显示的排它锁(exclusive lock):
mysql> select * from TABLEA where ID = :id for update
而所谓的"悲观",是指这样的设计是以发生"碰撞"为假设的,即使在甚至的执行场景中很少发生"碰撞",在出入"临界区"时,也必须有显示的加锁和解锁的过程
1. 并发策略CAS
CAS(Compare And Swap): 比较交换,一旦检测到线程冲突产生,就重试当前操作直到没有冲突为止.
CAS操作包含3个参数CAS(V,E,N).
- V表示要更新的变量
- E表示预期值
- N表示新增
仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其它线程做了更新,则当前线程什么都不做.最后,CAS返回当前的V的真实值. CAS总是抱着乐观的态度进行,只有一个会胜出,并更新成功,其余均会失败.失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作. CAS使用的是底层的CPU指令CMPXCHG,从而保证对内存区域的原子操作juc包下的组建大多都用到了CAS,原生的Java代码没有CAS的实现,不过com.msic.Unsafe类通过本地方法实现了对CAS的使用
CAS缺点 1.ABA问题. 因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了.ABA问题的解决思路就是使用版本号.在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A.
从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题.这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值.
2.循环时间长开销大. 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销.如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零.第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率.
3.只能保证一个共享变量的原子操作. 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作.比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij.从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作.
2.无锁的线程安全整数: AutomicInteger
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
// value对象在AtomicInteger对象中的偏移量
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 表示AtomicInteger的当前实际取值
private volatile int value;
// 获取当前值
public final int get() {
return value;
}
// 设置当前值
public final void set(int newValue) {
value = newValue;
}
// 设置新值,并返回旧值
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this,valueOffset,newValue);
}
// 如果当前值为expect,则设置为update
public final boolean compareAndSet(int expect,int update) {
return unsafe.compareAndSwapInt(this,valueOffset,expect,update);
}
// 当前值+1,返回旧值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this,valueOffset,1);
}
// 当前值-1,返回旧值
public final int getAndDecrement() {
return unsafe.getAndAddInt(this,valueOffset,-1);
}
// 当前值+delta,返回旧值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this,valueOffset,delta);
}
// 当前值+1,返回新值
public final int incrementAndGet() {
return unsafe.getAndAddInt(this,valueOffset,1) + 1;
}
// 当前值-1,返回新值
public final int decrementAndGet() {
return unsafe.getAndAddInt(this,valueOffset,-1) - 1;
}
// 当前值+delta,返回新值
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this,valueOffset,delta) + delta;
}
}
Java中和AtomicInteger类似的: AtomicLong,AtomicBoolean,AtomicReference
2.3 Java中的指针: Unsafe类
Java最初被设计为一种安全的受控环境.尽管如此,HotSpot还是包含了一个后门sun.misc.Unsafe,提供了一些可以直接操控内存和线程的底层操作. Unsafe被JDK广泛应用于java.nio和并发包等实现中,这个不安全的类提供了一个观察HotSpot JVM内部结构并且可以对其进行修改,但是不建议在生产环境中使用.
Unsafe类是如此地不安全,以至于JDK开发者增加了很多特殊限制来访问它:
- 私有的构造器
- 工厂方法getUnsafe()的调用器只能被Bootloader加载,否则抛出SecurityException 异常
不过,我们可以通过反射来获取Unsafe的一个实例:
public static Unsafe getUnsafe() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (Unsafe)f.get(null);
} catch (Exception e) {
/* ... */
}
}
如果你直接调用Unsafe.getUnsafe(),你可能会得到SecurityException异常.只能从受信任的代码中使用这个方法
public static Unsafe getUnsafe() {
Class cc = sun.reflect.Reflection.getCallerClass(2);
if (cc.getClassLoader() != null)
throw new SecurityException("Unsafe");
return theUnsafe;
}
注意: 根据Java类加载器的工作原理,应用程序的类由App Loader加载. 而系统核心类,如 rt.jar中的类由Bootstrap类加载器加载. Bootstrap加载器没有Java对象的对象,因此试图获得这个类加载器会返回null. 所以,当一个类的类加载器为null时,说明它是由Bootstrap加载的,而这个类也基友可能是rt.jar中的类.
2.4 无锁的对象引用: AtomicReference
AtomicInteger: 对整数的封装 AtomicReference: 对普通对象的引用,它可以保证你在修改对象引用时的线程安全性.
2.5 带有时间戳的对象引用: AtomicStampedReference
AtomicReference无法解决对象在修改过程中,丢失状态的问题.对象值本身与状态被画上了等号,无法解决北京被反复修改导致线程无法正确判断对象状态的问题.
AtomicStampedReference内部不仅维护了对象值,还维护了一个时间戳(状态值).当它对应的数值被修改时,除了更新数据本身外,还必需要更新时间戳.只有对象值以及时间戳都必须满足期望值,写入才会成功.只要时间戳发生变化,就能防止不恰当的写入.
比AtomicReference内部新增的有关时间戳的信息
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current,Pair.of(newReference,newStamp)));
}
// 获得当前对象的引用
public V getReference() {
return pair.reference;
}
// 获得当前时间戳
public int getStamp() {
return pair.stamp;
}
// 设置当前对象引用和时间戳
public void set(V newReference,int newStamp) {
Pair<V> current = pair;
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference,newStamp);
}
4.6 数组也能无锁: AtomicIntegerArray
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
本质是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性.
4.7 普通变量的原子操作: AtomicIntegerFieldUpdater
在不改动(或者极少改动)原有代码的基础上,让普通变量也享受CAS操作带来的线程安全性.
4.8 无锁的Vector
...
4.9 SynchronousQueue
SynchronousQueue
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>,Serializable
它模拟的功能类似于生活中一手交钱一手交货这种情形,像那种货到付款或者先付款后发货模型不适合使用SynchronousQueue. 首先要知道SynchronousQueue没有容纳元素的能力,即它的isEmpty()方法总是返回true,但是给人的感觉却像是只能容纳一个元素.其实,队列头元素是第一个排队要插入数据的线程,而不是要交换的数据.数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中.可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开.
synchronousQueue
- 顾名思义 同步队列,每一个线程的插入动作必须依赖另外一个线程的移除操作.
- 本身支持公平策略与非公平策略,所以底层存在队列(实现公平策略),栈(实现非公平策略).
- 暴露出来是没有容量的,也不存放元素.但是会利用上一条说的队列或者栈来暂存元素,进行匹配.
- 提供的
iterator()返回空;peek()返回null;isEmpty()返回true;remove()false;remainCapacity()返回0.这些方法对于synchronousQueue来说返回的都是固定的. put()take()如果执行失败会自旋等待,直到空旋次数耗尽.被park;offer()poll()方法在执行的时候,如果刚好匹配到,则执行,否则直接返回,不会进入自旋.poll可以带参数,表明等待多长时间.put()take()offer()poll()这四个方法底层都是调用了SynchronousQueue中 内部抽象类Transferer的transfer方法实现,这个类有两个子类,分别为TransferQueue和TransferStack,也就是上面说的队列与栈,公平策略与非公平策略.transfer(Object e,boolean timed,long nanos)第一个参数如果为null 说明这是一个消费者,也就是take poll ;如果非null 则是一个生产者,希望将元素入队列 . 第二个参数为是否等待,如果传false,则会进行自旋等待 . 如果传true,则将立刻结束
static final class TransferQueue<E> extends Transferer<E> {
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
}
static final class TransferStack<E> extends Transferer<E> {
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
* 如果等到队列为空,或者队列中节点的类型和本次操作是一致的, 那么将当期操作压入队列等待.
* 比如: 等待队列中是读线程等待,本次操作也是读,因此这两个读都需要等待,进入等待队列的线程可能会被挂起, 它们会等待一个"匹配"操作
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
* 如果等待队列中的元素和本次操作是互补的(等待出走是读,本次操作是写), 那就就插入一个"完成状态的节点",并且让他"匹配"到一个等待节点上.
* 接着弹出这两个节点,并且使得对应的两个线程继续执行
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
* 如果线程发下等待队列的节点是"完成"节点, 那么帮助这个节点完成任务. 其流程和步骤2是一致的.
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode (步骤1)队列为空,或者相同模式
if (timed && nanos <= 0) { // can't wait 不进行等待
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node 处理取消行为
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled 等待被取消
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller 帮助s的fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill (步骤2)是否处于fulfill状态
if (h.isCancelled()) // already cancelled 如果以前取消了
casHead(h, h.next); // pop and retry 弹出并重试
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear 一致循环直到匹配(match)或者没有等待着
SNode m = s.next; // m is s's match m是s的匹配者(match)
if (m == null) { // all waiters are gone 已经没有等待者了
casHead(s, null); // pop fulfill node 弹出fulfill节点
s = null; // use new node next time 下一次使用新的节点
break; // restart main loop 重新开始主循环
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m 弹出s和m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match match失败
s.casNext(m, mn); // help unlink 帮助删除节点
}
}
} else { // help a fulfiller (步骤3) 帮助一个fulfiller
SNode m = h.next; // m is h's match m是h的match
if (m == null) // waiter is gone 没有等待者
casHead(h, null); // pop fulfilling node 弹出fulfill节点
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match 尝试match
casHead(h, mn); // pop both h and m 弹出h和m
else // lost match match失败
h.casNext(m, mn); // help unlink 帮助删除节点
}
}
}
}
}
在SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系. 更重要的是,它们彼此之间还会互相帮助, 在一个线程内部, 可能会帮助其他线程完成它们的工作.这种模式可以更大程度上减少饥饿的可能,提高系统整体的并行度.