线程池
1. JDK对线程池的支持

ThreadPoolExecutor表示线程池
Executors扮演者线程池工厂角色,通过Executors可以获取一个拥有特定功能的线程池
2. Executor
2.1 Executor源码
public class Executors {
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
2.2 线程池说明
newFixedThreadPool()方法: 返回一个固定线程数量的线程池.该线程池中的线程数量始终不变.当有一个新的任务提交时, 线程池中如有空闲线程,则立即执行.若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务.newSingleThreadExecutor()方法: 返回一个只有一个线程的线程池.若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务.newCachedThreadPool()方法: 返回一个可根据实际情况调整线程数量的线程池.线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程.若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务.所有线程在当前任务执行完毕后,将返回线程池进行复用.newSingleThreadScheduledExecutor()方法: 返回一个ScheduledExecutorService对象,线程池大小为1.ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延迟之后执行,或者周期性执行某个任务.newScheduledThreadPool()方法: 返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量.
3.核心线程池的内部实现
3.1 ThreadPoolExecutor
3.1.1 JDK源码
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
3.1.2 参数含义
corePoolSize: 指定了线程池中的线程数量.当提交一个任务到线程池时,线程会创建一个线程来执行任务,即使其他的线程空闲也会创建线程,等到需要执行的任务数大于线程池基本大小corePoolSize时就不再创建.maximumPoolSize: 指定了线程池中的最大线程数量. 如果阻塞队列满了,并且已经创建的线程数小于最大线程数,则线程池会再创建新的线程执行.因为线程池执行任务时是线程池基本大小满了,后续任务进入阻塞队列,阻塞队列满了,再创建线程.keepAliveTime: 当线程池线程数量超过 corePoolSize 时, 多余的空闲线程的存活时间.即, 超过corePoolSize的空闲线程,在多长时间内, 会被销毁unit: keepAliveTime的单位.TimeUnit.DAYS;//天 TimeUnit.SECONDS;//秒 TimeUnit.MILLISECONDS;//毫秒 TimeUnit.NANOSECONDS;//纳秒workQueue: 任务队列, 被提交但尚未被执行的任务.用于保存等待执行的任务的阻塞队列.一个阻塞队列,用来存储等待执行的任务: 数组,链表,不存元素的阻塞队列.threadFactory: 线程工厂,用户创建线程, 一般用默认的即可.用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字handler: 拒绝策略. 当任务太多来不及处理, 如何拒绝任务.
WorkQueue: 是一个BlockingQueue接口的对象, 仅用于存放Runnable对象
常使用的几种BlockingQueue
- 直接提交的队列: 由
SynchronousQueue对象提供SynchronousQueue没有容量.每一个插入操作都要等待一个相应的删除操作,反之, 每一个删除操作都要等待对应的插入操作.如果使用SynchronousQueue,提交的任务不会被真实的保存,二总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果线程数量已经达到最大值,则执行拒绝策略.因此, 使用SynchronousQueue,通常需要设置很大的maximumPoolSize值,否则和容易执行拒绝策略. - 有界的任务队列: 由
ArrayBlockingQueue提供 当使用有界的任务队列时, 若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列. 若等待队列已满, 无法加入, 则在总线程数不大于maximumPoolSize的前提下, 创建新的进程执行任务. 若大于maximumPoolSize,则执行拒绝策略. 可见,有界队列仅当在任务队列装满时, 才可能将线程数提升到corePoolSize以上, 换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize. - 无界的任务队列: 由
LinkedBlockingQueue提供 与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况.当有新的新的任务到来,系统的线程数小于corePoolSize,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize,就不会继续增加.若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待.若任务创建和处理的速度差异很大,无界的任务队列会保持快速增长,直到耗尽系统内存. - 任务优先队列: 由
PriorityBlockingQueue提供 根据任务自身的优先级顺序先后执行,在确保系统性能的同时, 也能有很好的质量保证(总是确保搞优先级的任务先执行).

注意:使用自定义线程池时,要根据应用的具体情况, 选择合适的并发队列作为任务的缓冲.当线程资源紧张时,不同的并发队列对系统行为和性能的影响均不同
ThreadPoolExecutor线程池的核心调度代码
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1.当线程总数小于corePoolSize时,会将任务通过addWorker()方法直接调度执行
* 2.条件1不满足时, 通过workQueue.offer()进入等待队列,如果进入等待队列失败
* (比如有界队列到达了上线,或者使用了SynchronousQueue),则将任务直接提交给线程池
* 3.如果当前线程数已经达到maximumPoolSize,则提交失败,就执行拒绝策略
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
3.1.3 线程池拒绝策略
- AbortPolicy:(默认策略)直接抛出异常,阻止系统正常工作.
- CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
- DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序.(如果再次失败,则重复此过程)
- DiscardPolicy:不能执行的任务将被删除.
3.1.4 合理的选择:优化线程池线程数量
为保持处理器达到期望的期望的使用率,最优的池的大小等于
Nthreads = Ncpu * Ucpu * (1+ W/C )
Ncpu = Cpu的数量 Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1 W/C = 等待时间与计算时间的比例
4. JDK 提供的并发容器
- ConcurrentHashMap
- ConcurrentLinkedQueue
- CopyOnWriteArrayList
- BlockingQueue
- SkipList(跳表)