线程池和AQS知识整理

线程池和AQS代码解析

使用线程池是为了将线程得以复用,线程创建后在线程池中被循环利用,防止了线程得多次创建和销毁.下面就通过代码来看线程池的工作原理.

线程池配置比较繁琐,所以java提供了一个线程池的工具类Executors 来使用静态的方法来创建拥有特定属性的线程池.下面先总体的看一下都有哪些创建线程池的方法,和各自的实现.

  1. newFixedThreadPool 创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  2. newCachedThreadPool 创建一个可缓存的线程池,此时线程池对线程得多少没有限制,如果多了就定时回收,如果少了,就会只能添加
  3. newScheduledThreadPool 创建一个无限大小的线程池,此线程池可执行定时的任务
  4. newSingleThreadExecutor 线程池中有且只有一个线程执行任务,如果因异常结束,则会生成一个线程来替换原来的线程.此种连接池会按照任务的提交顺序依次执行.因为池中只有一个线程在工作.

Excutors中有上面四种方式来创建线程池,那么这几种线程池具体是如何工作的呢?通过源码来一探究竟.

ThreadPoolExecutor

除了newScheduledThreadPool连接池没有使用ThreadPoolExcutor来实现,其他三种都使用到了ThreadPoolExcutor来进行连接池的实现和定制.下图是ThreadPoolExcutor的结构关系.它是ExecutorService的直接实现类.

5cc08171ca014

ThreadPoolExcutor提供了三种构造函数,不同的构造函数含有不同的参数.下面先了解一下各参数的含义

参数名称 参数类型 含义
corePoolSize Int 池中的线程数量,即使他们是空闲的,除非设置allowCoreThreadTimeOut参数
maximumPoolSize int 池中允许的最大的线程数量
keepAliveTime long 如果池中的线程超过核心数,如果超过这个时间线程仍是空闲的则结束这个线程
unit TimeUnit keepAilveTime的时间的单位
workQueue BlockingQueue 一个阻塞队列,用来存储未进行的任务.未执行的任务会一直在队列里,知道使用Excute方法执行.
threadFactory ThreadFactory 线程工厂,用于生产线程
handle RejectedExecutionHandler 执行阻塞的处理策略,造成阻塞的原因可能是达到了线程边界或者队列容量

其中线程池使用ctl变量控制状态信息,ctl是一个AtomInteger,其中高28位表示线程得数量,也就是workerCount,低位的4位表示线程池的状态.

名称 承担的任务
RUNNING 接受新的任务,并且处理队列中的任务
SHUTDOWN 不接受新的任务,但是会处理队列中的任务
STOP 不接受新任务,不处理排队的人物,并且终止正在进行的任务
TIDYING 所有任务已经终止,worker为空,通过terminate方法来转换为TIDYING
TERMINATED 当terminate方法执行过后的状态

Xnip2019-04-23_23-21-36

方法源码解析

提交的方法submit实在ExcutorService的实现类AbstarctExecutorService中所定义的,可以看出submit是excute方法的外壳,其实核心还是执行excute方法,但是会返回Future结果.

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;//返回Future结果
}

因为执行的核心方法是excute,所以进到ThreadPoolExecutor类中看excute方法是如何执行的.其实最通俗的解释excute的过程就是,将新的worker加入到worker的队列中,并执行当前的command线程.执行过程可以参考官方注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
* 执行总共分为三步
* 1. 如果当前线程池中的工作线程数小于核心线程数,则添加一个worker来执行当前这个任务
* 2. 为了防止进入方法后线程池关闭或者线程挂掉需要重新获取ctl,并且进行判断,是否进行
* 回退或者是空池后添加一个空线程.
* 3.如果不能将任务放到队列中,则执行rollback,回退任务
*/
int c = ctl.get();//获得ctl变量,从而获得workercount和状态信息
if (workerCountOf(c) < corePoolSize) { //如果线程数小于核心线程数
if (addWorker(command, true)) //添加一个新的worker来执行任务
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//如果当前池的状态为运行状态并且接受了这个任务,也就是说任务成功进入了执行队列
int recheck = ctl.get();
//重新获取ctl,因为有可能在进入这个if语句的时候一个线程挂了,或者整个池被关闭了
if (! isRunning(recheck) && remove(command)) //如果线程池的状态为非运行状态
// 从队列中移除任务,并且reject任务 相当于第一步操作的rollback
reject(command);
else if (workerCountOf(recheck) == 0) //如果池是空的
addWorker(null, false);//新建一个线程
}
else if (!addWorker(command, false))
//如果放到队列中失败,则将任务rollback
reject(command);
}

线程池中拥有的状态有以下几种:

1
2
3
4
5
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

在excute方法中需要用到addWorker在当前状态和给定的边界条件下(核心线程数core或者最大线程数maximum)能否添加一个worker线程执行任务.如果当前线程池为空或者有资格关闭线程池.这是调用该方法会返回false.如果线程池工厂the thread factory不能再产出更多的线程也会返回false.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private boolean addWorker(Runnable firstTask, boolean core) {
/**
* firstTask 理解为初始化时给的第一个任务,可以绕过排队的过程,直接使用新的线程执行
* core 如果是true则使用corePoolSize作为边界,否则使用maximumPoolSize作为边界
*/
retry://flag标记
for (;;) {
int c = ctl.get(); //获取ctl变量
int rs = runStateOf(c);// 获取状态参数

// Check if queue empty only if necessary. 检查队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//如果大于最大的CAPACITY或者wc大于核心线程数
return false;
if (compareAndIncrementWorkerCount(c))
//使用CAS添加worker后,操作成功后跳出循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//以后单说这个worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 使用ReentrantLock上锁保持原子性
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)// 当前worker的数量大于largestPoolSize时
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//线程开始执行
workerStarted = true;
}
}
} finally {
if (! workerStarted) //如果到这里线程还是没有开始,rollback添加worker的过程
addWorkerFailed(w);
}
return workerStarted;
}

其中涉及到了woker线程创建的rollback过程,调用了addWorkerFailed方法:

1
2
3
4
5
6
7
8
9
10
11
12
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);//如果存在在队列中移除w
decrementWorkerCount();//减少workerCount变量的值
tryTerminate();//重新检查终止过程,以防止因为这个worker的存在导致终止失败
} finally {
mainLock.unlock();
}
}

在当运行的线程数到达核心线程数后,就会将任务插入到workQueue里面,让池中的工作线程完成任务,上面的源码中使用addWoker方法来执行当前指定的任务,而未涉及到加入到workQueue中的任务是如何执行的,下面通过查看Woker中的run方法来查看池中的线程如何获取任务来执行的过程.

因为Worker实现AQS「AbstractQueuedSynchronizer」,并继承来Runnable,所以Worker其实本质上还是一个线程.在执行start后,同样会执行run方法.

Worker中的全局变量:

1
2
3
4
5
6
/** Thread this worker is running in.  Null if factory fails. 执行当前工作线程*/
final Thread thread;
/** 初始化的任务,可能是空*/
Runnable firstTask;
/** 每个线程的任务计数器(统计完成的任务数) */
volatile long completedTasks;

Worker的构造器,默认是需要传入Runnable的任务的.

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//保存从ThreadFactory中获取到的Thread
}

worker中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//查看是否是独占的
protected boolean isHeldExclusively() {
return getState() != 0; //AQS中的state保存这个状态
}
//尝试获得锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) { //通过改变state来实现 如果state>0说明获取到了锁
setExclusiveOwnerThread(Thread.currentThread()); //设置独占访问的线程
return true;
}
return false;
}
//尝试释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);//清空独占访问的线程
setState(0);//状态置为「释放锁」
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

在addWorke中调用run方法执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this); //委托runWorker执行
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//获得当前线程
Runnable task = w.firstTask;//获得任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //如果存在任务
w.lock(); //将当前工作线程上锁
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果线程池正在被停止,需要确保线程已经被中断
//如果线程池没有被停止,需要确保线程不能被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; //清空任务
w.completedTasks++; //当前线程处理任务数加一
w.unlock();//解锁
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

AQS「AbstractQueuedSynchronizer」 队列同步器

AQS的使用方式是继承,之类通过继承AQS,实现抽象方法来实现管理同步器. 其中上文中的ThreadPoolExcutor中的Worker就是继承AQS来实现的.

AQS使用一个全局变量state来表示当前的锁的状态,如果state>0,说明已经获取到了锁,如果state=0,说明释放了锁.

AQS依靠CLH队列完成对状态的管理.当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

其中CLH的一个节点包括了:当前的状态,前驱,后继,当前节点保存的线程.

具体的AQS过程可以参考QAS原理

补充的countDownLatch和cyclicBarrier的区别

区别

-------------本文结束感谢您的阅读-------------

本文标题:线程池和AQS知识整理

文章作者:NanYin

发布时间:2019年04月23日 - 12:04

最后更新:2019年08月12日 - 13:08

原始链接:https://nanyiniu.github.io/2019/04/23/2019-04-23-%E7%BA%BF%E7%A8%8B%E6%B1%A0%E4%BB%A3%E7%A0%81%E8%A7%A3%E6%9E%90/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。