线程池和AQS代码解析
使用线程池是为了将线程得以复用,线程创建后在线程池中被循环利用,防止了线程得多次创建和销毁.下面就通过代码来看线程池的工作原理.
线程池配置比较繁琐,所以java提供了一个线程池的工具类Executors
来使用静态的方法来创建拥有特定属性的线程池.下面先总体的看一下都有哪些创建线程池的方法,和各自的实现.
newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。newCachedThreadPool
创建一个可缓存的线程池,此时线程池对线程得多少没有限制,如果多了就定时回收,如果少了,就会只能添加newScheduledThreadPool
创建一个无限大小的线程池,此线程池可执行定时的任务newSingleThreadExecutor
线程池中有且只有一个线程执行任务,如果因异常结束,则会生成一个线程来替换原来的线程.此种连接池会按照任务的提交顺序依次执行.因为池中只有一个线程在工作.
在Excutors
中有上面四种方式来创建线程池,那么这几种线程池具体是如何工作的呢?通过源码来一探究竟.
ThreadPoolExecutor
除了newScheduledThreadPool
连接池没有使用ThreadPoolExcutor
来实现,其他三种都使用到了ThreadPoolExcutor
来进行连接池的实现和定制.下图是ThreadPoolExcutor
的结构关系.它是ExecutorService的直接实现类.
ThreadPoolExcutor提供了三种构造函数,不同的构造函数含有不同的参数.下面先了解一下各参数的含义
参数名称 | 参数类型 | 含义 |
---|---|---|
corePoolSize | Int | 池中的线程数量,即使他们是空闲的,除非设置allowCoreThreadTimeOut参数 |
maximumPoolSize | int | 池中允许的最大的线程数量 |
keepAliveTime | long | 如果池中的线程超过核心数,如果超过这个时间线程仍是空闲的则结束这个线程 |
unit | TimeUnit | keepAilveTime的时间的单位 |
workQueue | BlockingQueue |
一个阻塞队列,用来存储未进行的任务.未执行的任务会一直在队列里,知道使用Excute方法执行. |
threadFactory | ThreadFactory | 线程工厂,用于生产线程 |
handle | RejectedExecutionHandler | 执行阻塞的处理策略,造成阻塞的原因可能是达到了线程边界或者队列容量 |
其中线程池使用ctl变量控制状态信息,ctl是一个AtomInteger,其中高28位表示线程得数量,也就是workerCount,低位的4位表示线程池的状态.
名称 | 承担的任务 |
---|---|
RUNNING | 接受新的任务,并且处理队列中的任务 |
SHUTDOWN | 不接受新的任务,但是会处理队列中的任务 |
STOP | 不接受新任务,不处理排队的人物,并且终止正在进行的任务 |
TIDYING | 所有任务已经终止,worker为空,通过terminate方法来转换为TIDYING |
TERMINATED | 当terminate方法执行过后的状态 |
方法源码解析
提交的方法submit实在ExcutorService的实现类AbstarctExecutorService
中所定义的,可以看出submit是excute方法的外壳,其实核心还是执行excute方法,但是会返回Future
结果.
1 | public Future<?> submit(Runnable task) { |
因为执行的核心方法是excute,所以进到ThreadPoolExecutor类中看excute方法是如何执行的.其实最通俗的解释excute的过程就是,将新的worker加入到worker的队列中,并执行当前的command线程.执行过程可以参考官方注释:
1 | public void execute(Runnable command) { |
线程池中拥有的状态有以下几种:
1 | private static final int RUNNING = -1 << COUNT_BITS; |
在excute方法中需要用到addWorker在当前状态和给定的边界条件下(核心线程数core
或者最大线程数maximum
)能否添加一个worker线程执行任务.如果当前线程池为空或者有资格关闭线程池.这是调用该方法会返回false.如果线程池工厂the thread factory
不能再产出更多的线程也会返回false.
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
其中涉及到了woker线程创建的rollback过程,调用了addWorkerFailed
方法:
1 | private void addWorkerFailed(Worker w) { |
在当运行的线程数到达核心线程数后,就会将任务插入到workQueue里面,让池中的工作线程完成任务,上面的源码中使用addWoker方法来执行当前指定的任务,而未涉及到加入到workQueue中的任务是如何执行的,下面通过查看Woker中的run方法来查看池中的线程如何获取任务来执行的过程.
因为Worker实现AQS「AbstractQueuedSynchronizer」,并继承来Runnable,所以Worker其实本质上还是一个线程.在执行start后,同样会执行run方法.
Worker中的全局变量:
1 | /** Thread this worker is running in. Null if factory fails. 执行当前工作线程*/ |
Worker的构造器,默认是需要传入Runnable的任务的.
1 | Worker(Runnable firstTask) { |
worker中的方法
1 | //查看是否是独占的 |
在addWorke中调用run方法执行任务
1 | /** Delegates main run loop to outer runWorker */ |
AQS「AbstractQueuedSynchronizer」 队列同步器
AQS的使用方式是继承,之类通过继承AQS,实现抽象方法来实现管理同步器. 其中上文中的ThreadPoolExcutor中的Worker就是继承AQS来实现的.
AQS使用一个全局变量state来表示当前的锁的状态,如果state>0,说明已经获取到了锁,如果state=0,说明释放了锁.
AQS依靠CLH队列完成对状态的管理.当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
其中CLH的一个节点包括了:当前的状态,前驱,后继,当前节点保存的线程.
具体的AQS过程可以参考QAS原理