并发编程
1. JAVA
1.1 Future
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果. 必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果.
Future模式 核心思想是异步调用
Future模式的参与者
| 参与者 | 作用 |
|---|---|
| Main | 启动系统,调用Client发出请求 |
| Client | 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData |
| Data | 返回数据的接口 |
| FutureData | Future数据,构造很快,但是是一个虚拟的数据,需要装配RealData |
| RealData | 真实数据,构造比较慢 |
Future模式结构图

JDK中的Future
public interface Future<V> {
// 取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false
boolean cancel(boolean mayInterruptIfRunning);
// 是否被取消, 如果在任务正常完成前被取消成功,返回true
boolean isCancelled();
// 是否完成, 如果任务完成, 返回true
boolean isDone();
// 获取执行结果, 一直阻塞
V get() throws InterruptedException, ExecutionException;
// 如果在timeout内没有获得结果,则抛timeout异常,而不是返回null
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
1.2 方式方式
- 使用FutureTask创建
FutureTask<Integer> task = new FutureTask<>(() -> null);
new Thread(task).start();
Integer result = task.get();
System.out.println(result);
- 使用线程池
// 线程服务类型:
ExecutorService executorService = Executors.newFixedThreadPool();
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newScheduledThreadPool();
// 委托为executorService的方式:
void execute(Runnable command); :没有办法获取执行 Runnable 之后的结果
Future<?> submit(Runnable task) :有返回值为null
<T> Future<T> submit(Callable<T> task) :
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException :返回其中某个执行结束的callable结果,如果其中一个异常或结束,取消其他callable
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; :返回list,和task顺序一致。
1.3 Executor 和 ExecutorService
两个接口的主要区别
- ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
- Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
- Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。
- 除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。 Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。
- 当线程池中的线程均处于工作状态,并且线程数已达线程池允许的最大线程数时,就会采取指定的饱和策略来处理新提交的任务
- AbortPolicy: 直接抛异常
- CallerRunsPolicy: 用调用者的线程来运行任务
- DiscardOldestPolicy: 丢弃线程队列里最近的一个任务,执行新提交的任务
- DiscardPolicy 直接将新任务丢弃
- 使用 Executors 的工厂方法创建的线程池,那么饱和策略都是采用默认的 AbortPolicy
2.ListenableFuture
2.1 Guava - ListenableFuture
我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:
- 大多数 Futures 方法中需要它。
- 转到 ListenableFuture 编程比较容易。
- Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。
ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。
ListenableFuture 中的基础方法是 addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。
2.2 实例
1.委托执行器
//执行器和listenableFuture实例创建方式
ListeningExecutorService listeningExecutorService = MoreExecutors.sameThreadExecutor(); 这个是直接将现在的任务传进去执行
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); 将执行托管传进去执行任务
ListenableFuture<Integer> listenableFuture = listeningExecutorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
});
2.带回调的执行器
Futures.addCallback(listenableFuture, new FutureCallback<Integer>() { //默认使用MoreExecutors.sameThreadExecutor()
@Override
public void onSuccess(Integer result) {
System.out.println("success + result:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("throwable + t:" + t);
}
});
- futures提供的扩展方法
transform:对于ListenableFuture的返回值进行转换。
allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。
3.Fork/Join
3.1 Fork/Join概述
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

ForkJoin结构
ForkJoinPool: 用于执行ForkJoinTask任务的执行池,不再是传统执行池 Worker+Queue 的组合模式,而是维护了一个队列数组WorkQueue,这样在提交任务和线程任务的时候大幅度的减少碰撞WorkQueue: 双向列表,用于任务的有序执行,如果WorkQueue用于自己的执行线程Thread,线程默认将会从top端选取任务用来执行 - LIFO。因为只有owner的Thread才能从top端取任务,所以在设置变量时,int top; 不需要使用volatileForkJoinWorkThread: 从WorkQueue中获取ForkJoinTask执行,用于执行任务的线程,用于区别使用非ForkJoinWorkThread线程提交的task;启动一个该Thread,会自动注册一个WorkQueue到Pool,这里规定,拥有Thread的WorkQueue只能出现在WorkQueue数组的奇数位ForkJoinTask: 任务,它比传统的任务更加轻量,不再对是RUNNABLE的子类,提供fork/join方法用于分割任务以及聚合结果。- 实现了复杂的
worker-stealing算法,当任务处于等待中,thread通过一定策略,不让自己挂起,充分利用资源,当然,它比其他语言的协程要重一些
使用示例
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
private static class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
// 当需要计算的数字小于6时,直接计算结果
if (to - from < 5) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
// 否则,把任务一分为二,递归计算
} else {
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle+1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
}
}
3.2 工作窃取算法(work-stealing)
工作窃取算法: 是指某个线程从其他队列里窃取任务来执行.
对于常见的一个大型任务,我们可以把这个大的任务切割成很多个小任务,然后这些小任务会放在不同的队列中,每一个队列都有一个相应的的工作执行线程来执行,当一个线程所需要执行的队列中,任务执行完之后,这个线程就会被闲置,为了提高线程的利用率,这些空闲的线程可以从其他的任务队列中窃取一些任务,来避免使自身资源浪费,它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行.这种在自己线程闲置,同时窃取其他任务队列中任务执行的算法,就是工作窃取算法
Fork/Join中的实现
ForkJoinPool的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠
3.3 Fork/Join 原理
调度过程

index为偶数的queue是怎么产生的?
- 创建pool : ForkJoinPool → defaultForkJoinWorkerThreadFactory → newThread
- 提交任务/异步执行:submit → workQueue→signalWork? →ForkJoinWorkerThread.run
fork()
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
* it is not necessarily enforced, it is a usage error to fork a
* task more than once unless it has completed and been
* reinitialized. Subsequent modifications to the state of this
* task or any data it operates on are not necessarily
* consistently observable by any thread other than the one
* executing it unless preceded by a call to {@link #join} or
* related methods, or a call to {@link #isDone} returning {@code
* true}.
* 把任务推入当前工作线程的工作队列里
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
}
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
}
join()
- 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
- 查看任务的完成状态,如果已经完成,直接返回结果。
- 如果任务尚未完成,但处于自己的工作队列内,则完成它。
- 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
- 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
- 递归地执行第5步
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
}
引用