并发编程

1. JAVA

1.1 Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果. 必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果.

Future模式 核心思想是异步调用

Future模式的参与者

参与者 作用
Main 启动系统,调用Client发出请求
Client 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData
Data 返回数据的接口
FutureData Future数据,构造很快,但是是一个虚拟的数据,需要装配RealData
RealData 真实数据,构造比较慢

Future模式结构图 Future模式结构图

JDK中的Future

public interface Future<V> {
    // 取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false
    boolean cancel(boolean mayInterruptIfRunning);

    // 是否被取消, 如果在任务正常完成前被取消成功,返回true
    boolean isCancelled();

    // 是否完成, 如果任务完成, 返回true
    boolean isDone();

    // 获取执行结果, 一直阻塞
    V get() throws InterruptedException, ExecutionException;

    // 如果在timeout内没有获得结果,则抛timeout异常,而不是返回null
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

1.2 方式方式

  1. 使用FutureTask创建
FutureTask<Integer> task = new FutureTask<>(() -> null);
new Thread(task).start();
Integer result = task.get();
System.out.println(result);
  1. 使用线程池
// 线程服务类型:
ExecutorService executorService = Executors.newFixedThreadPool();
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService = Executors.newSingleThreadExecutor(); 
ExecutorService executorService = Executors.newScheduledThreadPool(); 

// 委托为executorService的方式:
void execute(Runnable command);  :没有办法获取执行 Runnable 之后的结果
Future<?> submit(Runnable task)  :有返回值为null
<T> Future<T> submit(Callable<T> task)  :
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException   :返回其中某个执行结束的callable结果,如果其中一个异常或结束,取消其他callable
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; :返回list,和task顺序一致。

1.3 Executor 和 ExecutorService

两个接口的主要区别

  1. ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
  2. Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
  3. Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。
  4. 除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。 Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。
  5. 当线程池中的线程均处于工作状态,并且线程数已达线程池允许的最大线程数时,就会采取指定的饱和策略来处理新提交的任务
    1. AbortPolicy: 直接抛异常
    2. CallerRunsPolicy: 用调用者的线程来运行任务
    3. DiscardOldestPolicy: 丢弃线程队列里最近的一个任务,执行新提交的任务
    4. DiscardPolicy 直接将新任务丢弃
  6. 使用 Executors 的工厂方法创建的线程池,那么饱和策略都是采用默认的 AbortPolicy

2.ListenableFuture

2.1 Guava - ListenableFuture

我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:

  • 大多数 Futures 方法中需要它。
  • 转到 ListenableFuture 编程比较容易。
  • Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。

ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。

ListenableFuture 中的基础方法是 addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。

2.2 实例

1.委托执行器

//执行器和listenableFuture实例创建方式
ListeningExecutorService listeningExecutorService = MoreExecutors.sameThreadExecutor();   这个是直接将现在的任务传进去执行
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));   将执行托管传进去执行任务

ListenableFuture<Integer> listenableFuture = listeningExecutorService.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 1;
    }
});

2.带回调的执行器

Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {   //默认使用MoreExecutors.sameThreadExecutor()
    @Override
    public void onSuccess(Integer result) {
        System.out.println("success + result:" + result);
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("throwable + t:" + t);
    }
});
  1. futures提供的扩展方法
transform:对于ListenableFuture的返回值进行转换。
allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。

3.Fork/Join

3.1 Fork/Join概述

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork/Join框架

ForkJoin结构

  • ForkJoinPool: 用于执行ForkJoinTask任务的执行池,不再是传统执行池 Worker+Queue 的组合模式,而是维护了一个队列数组WorkQueue,这样在提交任务和线程任务的时候大幅度的减少碰撞
  • WorkQueue: 双向列表,用于任务的有序执行,如果WorkQueue用于自己的执行线程Thread,线程默认将会从top端选取任务用来执行 - LIFO。因为只有owner的Thread才能从top端取任务,所以在设置变量时,int top; 不需要使用 volatile
  • ForkJoinWorkThread: 从WorkQueue中获取ForkJoinTask执行,用于执行任务的线程,用于区别使用非ForkJoinWorkThread线程提交的task;启动一个该Thread,会自动注册一个WorkQueue到Pool,这里规定,拥有Thread的WorkQueue只能出现在WorkQueue数组的奇数位
  • ForkJoinTask: 任务,它比传统的任务更加轻量,不再对是RUNNABLE的子类,提供fork/join方法用于分割任务以及聚合结果。
  • 实现了复杂的 worker-stealing算法,当任务处于等待中,thread通过一定策略,不让自己挂起,充分利用资源,当然,它比其他语言的协程要重一些

使用示例

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 当需要计算的数字小于6时,直接计算结果
            if (to - from < 5) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否则,把任务一分为二,递归计算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

3.2 工作窃取算法(work-stealing)

工作窃取算法: 是指某个线程从其他队列里窃取任务来执行.

对于常见的一个大型任务,我们可以把这个大的任务切割成很多个小任务,然后这些小任务会放在不同的队列中,每一个队列都有一个相应的的工作执行线程来执行,当一个线程所需要执行的队列中,任务执行完之后,这个线程就会被闲置,为了提高线程的利用率,这些空闲的线程可以从其他的任务队列中窃取一些任务,来避免使自身资源浪费,它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行.这种在自己线程闲置,同时窃取其他任务队列中任务执行的算法,就是工作窃取算法

Fork/Join中的实现

  • ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)
  • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行
  • 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式
  • 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成
  • 在既没有自己的任务,也没有可以窃取的任务时,进入休眠

3.3 Fork/Join 原理

调度过程 调度过程

index为偶数的queue是怎么产生的?

  • 创建pool : ForkJoinPool → defaultForkJoinWorkerThreadFactory → newThread
  • 提交任务/异步执行:submit → workQueue→signalWork? →ForkJoinWorkerThread.run

fork()

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    /**
     * Arranges to asynchronously execute this task in the pool the
     * current task is running in, if applicable, or using the {@link
     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.  Subsequent modifications to the state of this
     * task or any data it operates on are not necessarily
     * consistently observable by any thread other than the one
     * executing it unless preceded by a call to {@link #join} or
     * related methods, or a call to {@link #isDone} returning {@code
     * true}.
     * 把任务推入当前工作线程的工作队列里
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
}

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
    /**
     * Tries to add the given task to a submission queue at
     * submitter's current queue. Only the (vastly) most common path
     * is directly handled in this method, while screening for need
     * for externalSubmit.
     *
     * @param task the task. Caller must ensure non-null.
     */
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }
}

join()

  • 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  • 查看任务的完成状态,如果已经完成,直接返回结果。
  • 如果任务尚未完成,但处于自己的工作队列内,则完成它。
  • 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
  • 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  • 递归地执行第5步
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

    /**
     * Implementation for join, get, quietlyJoin. Directly handles
     * only cases of already-completed, external wait, and
     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
     *
     * @return status upon completion
     */
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

    /**
     * Blocks a non-worker-thread until completion.
     * @return status upon completion
     */
    private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }
}

引用

https://www.jianshu.com/p/f777abb7b251

results matching ""

    No results matching ""