线程池

1. JDK对线程池的支持

Executor框架结构图

ThreadPoolExecutor表示线程池 Executors扮演者线程池工厂角色,通过Executors可以获取一个拥有特定功能的线程池

2. Executor

2.1 Executor源码

public class Executors {
    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newScheduledThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

2.2 线程池说明

  1. newFixedThreadPool()方法: 返回一个固定线程数量的线程池.该线程池中的线程数量始终不变.当有一个新的任务提交时, 线程池中如有空闲线程,则立即执行.若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务.
  2. newSingleThreadExecutor()方法: 返回一个只有一个线程的线程池.若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务.
  3. newCachedThreadPool()方法: 返回一个可根据实际情况调整线程数量的线程池.线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程.若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务.所有线程在当前任务执行完毕后,将返回线程池进行复用.
  4. newSingleThreadScheduledExecutor()方法: 返回一个ScheduledExecutorService对象,线程池大小为1. ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延迟之后执行,或者周期性执行某个任务.
  5. newScheduledThreadPool()方法: 返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量.

3.核心线程池的内部实现

3.1 ThreadPoolExecutor

3.1.1 JDK源码


public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

3.1.2 参数含义

  • corePoolSize: 指定了线程池中的线程数量.当提交一个任务到线程池时,线程会创建一个线程来执行任务,即使其他的线程空闲也会创建线程,等到需要执行的任务数大于线程池基本大小corePoolSize时就不再创建.
  • maximumPoolSize: 指定了线程池中的最大线程数量. 如果阻塞队列满了,并且已经创建的线程数小于最大线程数,则线程池会再创建新的线程执行.因为线程池执行任务时是线程池基本大小满了,后续任务进入阻塞队列,阻塞队列满了,再创建线程.
  • keepAliveTime: 当线程池线程数量超过 corePoolSize 时, 多余的空闲线程的存活时间.即, 超过corePoolSize的空闲线程,在多长时间内, 会被销毁
  • unit: keepAliveTime的单位.TimeUnit.DAYS;//天 TimeUnit.SECONDS;//秒 TimeUnit.MILLISECONDS;//毫秒 TimeUnit.NANOSECONDS;//纳秒
  • workQueue: 任务队列, 被提交但尚未被执行的任务.用于保存等待执行的任务的阻塞队列.一个阻塞队列,用来存储等待执行的任务: 数组,链表,不存元素的阻塞队列.
  • threadFactory: 线程工厂,用户创建线程, 一般用默认的即可.用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
  • handler: 拒绝策略. 当任务太多来不及处理, 如何拒绝任务.

WorkQueue: 是一个BlockingQueue接口的对象, 仅用于存放Runnable对象 常使用的几种BlockingQueue

  1. 直接提交的队列: 由SynchronousQueue对象提供 SynchronousQueue没有容量.每一个插入操作都要等待一个相应的删除操作,反之, 每一个删除操作都要等待对应的插入操作.如果使用SynchronousQueue,提交的任务不会被真实的保存,二总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果线程数量已经达到最大值,则执行拒绝策略.因此, 使用SynchronousQueue,通常需要设置很大的maximumPoolSize值,否则和容易执行拒绝策略.
  2. 有界的任务队列: 由ArrayBlockingQueue提供 当使用有界的任务队列时, 若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列. 若等待队列已满, 无法加入, 则在总线程数不大于maximumPoolSize的前提下, 创建新的进程执行任务. 若大于maximumPoolSize,则执行拒绝策略. 可见,有界队列仅当在任务队列装满时, 才可能将线程数提升到corePoolSize以上, 换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize.
  3. 无界的任务队列: 由LinkedBlockingQueue提供 与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况.当有新的新的任务到来,系统的线程数小于corePoolSize,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize,就不会继续增加.若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待.若任务创建和处理的速度差异很大,无界的任务队列会保持快速增长,直到耗尽系统内存.
  4. 任务优先队列: 由PriorityBlockingQueue提供 根据任务自身的优先级顺序先后执行,在确保系统性能的同时, 也能有很好的质量保证(总是确保搞优先级的任务先执行).

Executor解析

注意:使用自定义线程池时,要根据应用的具体情况, 选择合适的并发队列作为任务的缓冲.当线程资源紧张时,不同的并发队列对系统行为和性能的影响均不同

ThreadPoolExecutor线程池的核心调度代码

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1.当线程总数小于corePoolSize时,会将任务通过addWorker()方法直接调度执行
     * 2.条件1不满足时, 通过workQueue.offer()进入等待队列,如果进入等待队列失败
     * (比如有界队列到达了上线,或者使用了SynchronousQueue),则将任务直接提交给线程池
     * 3.如果当前线程数已经达到maximumPoolSize,则提交失败,就执行拒绝策略
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

3.1.3 线程池拒绝策略

  1. AbortPolicy:(默认策略)直接抛出异常,阻止系统正常工作.
  2. CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  3. DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序.(如果再次失败,则重复此过程)
  4. DiscardPolicy:不能执行的任务将被删除.

3.1.4 合理的选择:优化线程池线程数量

为保持处理器达到期望的期望的使用率,最优的池的大小等于

 Nthreads = Ncpu * Ucpu * (1+ W/C )

Ncpu = Cpu的数量 Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1 W/C = 等待时间与计算时间的比例

4. JDK 提供的并发容器

  1. ConcurrentHashMap
  2. ConcurrentLinkedQueue
  3. CopyOnWriteArrayList
  4. BlockingQueue
  5. SkipList(跳表)

results matching ""

    No results matching ""