并发理论
[toc]
并发理论
1. synchronized
1.1 作用
可修饰在方法, 类, 变量上; 解决多个线程访问资源的同步性问题, 保证被修饰的方法或者代码块在任意时刻只有一个线程执行.
加锁使用的monitorenter
获取锁, 通过monitorexit
释放锁, 早期jdk中, synchronized为重量级锁:
监视器锁(monitor)依赖与操作系统的MUTEX_LOCK
来实现,java线程映射到操作系统原生线程上, 如果挂起/唤醒一个线程,需要操作系统帮助. 而操作系统实现线程直接的切换需要从用户态切换到内核态, 这个状态切换需要事件较长, 成本较高.
jdk1.6后. 对synchronized进行了优化, 对锁的实现也引入了大量优化来减少锁的操作开销
1.2 jdk1.6对synchronized的优化
jdk1.6为了获取锁和释放锁带来的性能消耗引入了偏向锁和轻量级锁,以及锁的存储结构和升级过程
1.2.1 加锁对象
java对象都可以作为锁, 当线程访问同步代码块时, 必须获取锁, 退出或抛出异常时必须释放锁
- 普通同步方法,锁当前实例对象
- 静态同步方法,锁当前类的Class对象
- 同步块方法,锁synchronized()括号中的对象
1.2.2 实现方法
- 同步块方法: monitorenter插入同步代码块开始位置, monitorexit指令插入同步代码块结束/异常处
- 同步方法: ACC_SYNCHRONIZED 来修饰
同步块会在字节码中生成2个monitoreexit, 一个是退出时,释放锁; 一个是异常时,释放锁
本质都是对象监视器monitor的获取
1.2.3 锁优化
在HotSopt虚拟机中, 对象在内存中布局分为3块区域
长度 | 内容 | 说明 |
---|---|---|
32/64[1] | MarkWord | 存储对象运行数据如hashcode,GC分带年龄,锁状态标志位,线程持有锁, 偏向线程id等 |
32/64 | Class Metadata Address | 执行对象类元数据, 用来确定该对象为哪个类的实例 |
32/64 | Array Length | 可选, 如果当前对象为数组, 则保存数组长度; 如果不为数组, 则没有此头 |
优化后锁分类: 无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
锁可以升级不能降级
无锁:
Markword内容:
25bit | 4bit | 1bit(是否为偏向锁) | 2bit(锁标志位) |
---|---|---|---|
对象hashcode | 对象GC分代年龄 | 0 | 01 |
偏向锁:
Markword内容:
23bit | 2bit | 4bit | 1bit(是否为偏向锁) | 2bit(锁标志位) |
---|---|---|---|---|
线程ID | epoch | 对象GC分代年龄 | 0 | 01 |
偏向锁针对一个线程而言, 线程获得锁后就不会再有解锁操作, 可以节省开销; 如果多个线程竞争锁, 则偏向锁失效, 升级为轻量级锁. | ||||
为何出现: 大部分情况下都是同一个线程进入同一代码块 | ||||
加锁:当只有一个线程访问同步块并获取锁,会在对象头和栈帧中的锁记录里存储偏向锁的线程ID, 以后进入或者退出同步块就不需要进行CAS加解锁了, 只需要判断但Markword中是否存储指向当线程的偏向锁(线程ID为当前线程), 如果测试成功,说明线程已获得锁; 如果测试失败, 则再测试下MarkWord中偏向锁标识是否设置为1(表示当前为偏向锁); 如果没有设置, 则使用CAS竞争锁; 如果设置了,尝试使用CAS将对象的对象头的偏向锁指向当前线程 | ||||
解锁: 当其他线程尝试竞争偏向锁时, 持有偏向锁的线程才会释放锁. 偏向锁的撤销需要等全局安全点(没有正在执行的字节码). 首先暂停持有偏向锁的线程, 检查持有偏向锁的线程是否存活; 如果持有线程不活动, 则将锁头设置为无锁状态; 如果线程活着, 则锁对象中的MarkWord和栈中的锁记录要么重新偏向其他线程, 要么恢复到无锁状态, 最后唤醒暂停的线程(释放偏向锁的线程). | ||||
偏向锁JDK1.6后默认启用,但是在程序启动几秒只会才激活, 可以使用-XX:BiasedLockingStartupDelay=0 来关闭偏向锁的启动延迟, 也可以使用-XX:-UseBiasedLocking=false 来关闭偏向锁, 那么程序直接进入轻量级锁 |
轻量级锁:
Markword内容:
30bit | 2bit |
---|---|
指向栈中锁记录指针 | 00 |
当2个线程来竞争锁时, 偏向锁就失效了. 此时锁就会膨胀, 升级为轻量级锁 | |
加锁: 线程在执行同步块之前, JVM会在当前线程的栈帧中创建用户存储记录的空间, 并将对象头中MarkWord复制到锁的记录中, 然后线程尝试使用CAS将记录头中的MarkWord替换为指向锁记录的指针; 如果成功, 当前线程获得锁; 如果失败,表示其他线程在竞争锁, 当前线程便尝试使用自旋来获取锁, 之后再来的线程, 发现是轻量级锁, 就开始自旋 | |
解锁: 解锁时, 使用原子的CAS操作将当前线程的锁记录替换回到对象头;如果成功,表示没有竞争发生;如果失败, 表示当前存在锁竞争,锁就会膨胀为重量级锁 |
线程A,B竞争对象C的锁;线程A,B会同时将C的MarkWord复制到自己的锁记录中, 再去竞争锁; 如果A成功,会将C的Markword中的线程id改为自己的锁记录指针. B仍通过CAS获取获取锁, 因为C的MarkWord内容已被A改了,所以获取失败. 为了提高获取效率,B开始CAS自旋尝试获取锁, 但是自旋有次数限制, 如果循环结束前CAS操作成功, 那么B获取到锁; 如果循环获取依然获取不到, 则获取锁失败. 对象C的MarkWord会被修改为重量级锁. B直接挂起,当其他线程来获取C的锁时. 看到C的MarkWord是重量级锁的指针, 说明竞争激烈, 直接挂起.
释放时, A会尝试CAS对象C的MarkWord, 改回自己栈中复制的那个MarkWord, 因为C的MarkWord已被设置为重量级锁, 所以CAS失败, A释放锁, 并唤起挂起的线程, 进行新一轮竞争.
重量级锁:
Markword内容:
30bit | 2bit |
---|---|
指向互斥量(重量级锁)指针 | 10 |
1.2.4 锁比较
锁 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
偏向锁 | 加锁解锁不需要额外消耗, 和执行非同步代码性能相差无几 | 如果存在锁竞争, 会有额外的锁撤销消耗 | 适用于只有一个线程访问的同步场景 |
轻量级锁 | 竞争的线程不会阻塞, 提高了程序响应速度 | 始终得不到锁的竞争的线程,使用自旋会消耗CPU | 追求响应速度, 同步块执行速度非常快 |
重量级锁 | 竞争线程不自旋, 不会消耗CPU | 线程阻塞, 响应时间缓慢 | 追求吞吐量, 同步块执行时间较长 |
1.3 synchronized, volatile, CAS比较
synchronized: 悲观锁, 抢占式, 会引起其他线程阻塞
volatile: 多线程共享变量可见性,以及禁止指令重排序优化
CAS: 冲突检测的乐观锁(非阻塞)
1.4 synchronized与Lock区别
synchronized: 内置关键字, 属于JVM层面;可以给类,方法,代码块加锁; 不需要手动获取,释放锁,使用简单,发生异常自动释放锁, 不会造成死锁;
Lock: java类;只能给代码块加锁; 手动加锁,释放锁, 使用不当会造成死锁; 可以直接有没有成功获取到锁.
1.5 synchronized与ReentrantLock
两者都是可重入锁: 直接可再次获取自己的内部锁(即线程已获得某个对象的锁,此时锁未释放,当其再次想要获取对象的锁时还可以获取到). 同一对象每次获取锁, 锁计数器加一,释放一个锁,锁计数器减一, 只有计数器为0时才能释放锁
synchronized:
- 依赖JVM
- 无需手动加解锁
- 可修饰类, 方法, 变量等
- 操作对象的MarkWord
ReentrantLock:
- JDK层面实现
- 使用灵活, 但必须有释放锁的配合动作, 需要手动加解锁
- 只能用于代码块
- 调用Unsafe的park方法加锁
- 高级功能: 无条件, 可轮询(tryLock()方法), 定时(tryLock()带参方法), 等待可中断(lockInterruptibly()), 公平锁, 选择性通知(锁可以绑定多个条件)
等待可中断: 通过lock.lockInterruptibly()
方法, 等待的线程可放弃等待, 改为处理其他事情
公平锁: 设置new ReentrantLock(true), 即为公平锁(先等待线程先获得锁).
选择性通知: 需要借助Condition
接口与newCondition()方法, 线程对象注册到指定的Condition中; Condition实例的sigalAll()方法只会唤醒注册在该Condition实例中所有等待的线程
2. Lock
2.1 悲观锁/乐观锁
悲观锁: 总是假定最坏情况, 每次都认为别人会修改, 所以每次拿数据都会上锁, 这样别人拿这个数据就会阻塞知道它拿到锁. synchronized即为悲观锁. 数据库中也用到了这种锁机制: 行锁,表锁, 读锁,写锁
乐观锁: 每次拿数据都认为别人不会修改, 所以不会上锁, 但是更新时会判断此期间别人有没有去更新这个数据(使用版本号机制). 适用与多读的应用场景, 提高吞吐量.
乐观锁实现方式:
- 适用版本标识来确定读取数据与提交数据是否一致, 提交后修改版本号. 不一致则采取丢弃和再次尝试策略
- CAS, 多个线程尝试同时更新一个变量,只有一个线程更新最新值, 其他线程都失败但是不挂起, 但告知竞争失败再次尝试.
2.2 CAS
CAS操作包括三个操作数- 内存地址V, 预期值A, 新值B, 如果内存地址值与A值一致, 那么更新为B. CAS通过无限循环获取数据,如果a线程值被b修改, 那么a自旋等下一次循环才有机会执行, 对于CPU指令为cmpxchg
带来的问题
- ABA问题:
线程1取出值A,另一个线程2同时也取出A.同时将值变为B,然后又变为了A. 这时线程1进行CAS操作取出仍时A, 操作成功, 尽管A成功, 但是会有一些问题. 提供了AtomicStampedReference解决这个问题(AtomicStampedReference添加了期望值, 期望时间戳;用时间戳来解决ABA问题) - 循环时间开销大
如果资源竞争验证, CAS字段概率会比较大, 浪费更多CPU资源, 效率低于synchronized - 只能保证一个共享对象的原子操作
对一个共享变量指向操作,可以用循环CAS方式保证原子性,但是多个共享变量,这时应该用锁.
3. volatitle
3.1 JMM(Java内存模型)
CPU Cache缓存是内存数据用于解决CPU处理速度与内存不匹配的问题.
线程可以把变量保存到本地内存(如CPU寄存器)中, 而不是存内存中读取,可能造成一个线程在主存中修改了一个变量的值,另外一个线程还在继续使用它寄存器中的值拷贝.造成数据不一致.
为了解决这个问题. 引入了volatile关键字, 指示JVM这个变量共享且不稳定,每次都从主内存中读取数据.
所以, volatile除了防止JVM指令重排, 还有一个作用是保证变量的可见性
实践中, volatile与CAS结合,保证了原子性, 常见如AtominInteger
3.2 volatile与atomic不同
volatile可以确保先行关系, 即确保操作顺序执行,并不能保证原子性
atomic可以让这种操作具有原则性,如getAndIncrement()会原子性的增量当前值加一
3.3 重排序
在执行程序时, 为了提高性能, 处理器和编译器会对指令进行重排序, 但是也必须满足一下条件
- 单线程下不能改变程序运行结果
- 存在数据依赖关系不允许重排序
4. ThreadLocal
主要解决每个线程绑定自己的值, 可用适用get(),set()方法来获取或者更改值
4.1 原理
public class Thread implements Runnable {
//......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;
//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//......
}
ThreadLocalMap为定制化HashMap, 默认是null, 当当前线程调用set(), get()才会创建他们
内存泄漏问题: ThreadLocalMap适用key为ThreadLock的弱引用, 而value为强引用, ThreaLocal在外部没有强引用的情况下, 会垃圾回收, 而key被清理调, 这时Entry的key就变为null了. 如果不做其他措施. value永远不会回收, 这时就需要在适用完ThreadLocal后, 手动remove()方法
5. AQS(AbstractQueuedSynchronizer)
用来构建锁和同步器的框架,能简单且高效构造大量的同步器如ReetrantLock, Semaphore等
5.1 原理
如果请求资源空闲,则将当前请求资源线程设置为有效工作线程,并将共享资源设置为锁定状态.
如果请求资源被占用,那么就需要一套线程阻塞等待以及被唤醒时,锁分派机制. AQS使用CLH队列实现,即暂时获取不到锁,则加入队列
CLH队列是虚拟双向队列(即不存在队列对象,仅存在节点关联关系,节点保存前后节点指针),AQS将每个请求共享资源的线程封装为CLH的一个节点来实现锁分配
5.2 资源共享方式
- Exclusive(独占): 只有一个线程能执行, 如
ReetrantLock
,又可分为公平锁,非公平锁- 公平锁: 按照线程在队列中的排列顺序,先到先得
- 非公平锁: 无视队列顺序直接抢锁
- Share(共享): 多个线程可同时执行, 如
CountDownLatch
,Semaphore
,CycleBarrier
,ReadWriteLock
等 - 组合: 多个线程对资源读, 只允许一个写, 如
ReetrantReadWriteLock
5.3 模板方法
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
以ReetrantLock为例:初始化state=0(未锁定), 线程A lock()时, tryAcquire()将status设置为status+1;其他线程tryAcquire()就会失败,直到线程A tryRelease()到state=0(即释放锁);其他线程才又机会获取锁. 在释放之前, 线程A可以重复获取此锁(state会累加), 即是可重入锁. 注意,必须获取多少次就要释放多少次,直到state=0.
以CountDownLatch为例: 任务以N个子线程执行, 那么state=N,每个子线程并行执行,当子线程执行完后,countDown()一次,state就会CAS减一, 等到所以线程都执行完(即state=0), 会unpark主动调用线程,主动线程就会await()函数返回,继续剩余动作
5.4 并发工具类
Semaphore(信号量): synchronized和ReetrantLock只允许一个线程访问资源, 而信号量支持多个信号量访问资源
CountDownLatch(倒计时器): 同步工具类, 协调多个线程直接同步, 通常控制线程等待, 让某个线程等待直到倒计时结束, 在开始执行
CyclicBarrier(循环栅栏): 同步工具类, 主要功能与CountDownLatch
类似, 主要功能是让一组线程到达某个屏障(同步点)时阻塞,直到最后一个线程到达,屏障才会开门,所有屏障拦截的线程才会继续执行.
CountDownLatch与CyclicBarrier区别:
CountDownLatch: 一般为一个线程等待若干子线程执行完成后才执行; 调用countDown()后,并不会阻塞,会继续往下执行. 方法较少,操作简单. 不能复用.
CyclicBarrier: 一般为一组线程互相等待到某个状态后, 一起执行; 调用await()后, 阻塞线程, 直到其他线程到达指定点后, 才会继续执行. 方法较多, 可通过getNumberWaiting(),isBroken()判断线程状态, 还可以通过构造方法传入barrierAction, 指定到达同步点后执行业务功能.可以复用
Semaphore与Exchanger区别:
Semaphore: 限制某个代码块并发数, 构造方法可传递N,代表资源最多N个线程可以访问. 如果超过N, 则等待某个线程执行完毕,下一个才能执行. 可以允许多个线程访问一个资源.
Exchanger: 线程协作同步类, 用于2个线程间交换数据. 提供一个同步点, 一个线程调用exchange(), 会同步等待另外一个线程也执行exchange(), 这时2个线程到了同步点,就可以交换数据了
5.5 详解
5.5.1 Node节点
static final class Node {
/** waitStatus值,表示线程已被取消(等待超时或者被中断)*/
static final int CANCELLED = 1;
/** waitStatus值,表示后继线程需要被唤醒(unpaking)*/
static final int SIGNAL = -1;
/**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去 */
static final int PROPAGATE = -3;
/** 等待状态,初始为0 */
volatile int waitStatus;
/**当前结点的前驱结点 */
volatile Node prev;
/** 当前结点的后继结点 */
volatile Node next;
/** 与当前结点关联的排队中的线程 */
volatile Thread thread;
/** ...... */
}
负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
5.5.2 独占模式
5.5.2.1 独占模式加锁
acquire(int): 独享模式下顶层入口, 如果获取到资源,则直接返回, 否则进入等待队列,直到获取到资源为止(忽略了线程中断影响)
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取资源, 成功则直接返回(体现了非公平锁, 每个线程都会尝试抢占,而CLH队列还有线程在等待)
acquireQueued( // 使线程在阻塞队列中尝试获取资源, 一直到获取到资源为止, 如果整个过程被中断过, 则返回true.
addWaiter(Node.EXCLUSIVE), arg) // 将线程加入到队列尾部, 并设置为独占模式
)
selfInterrupt(); // 线程在抢占资源过程中, 是不响应中断的, 等获取到资源后, 才响应中断
}
tryAcquire(int): 尝试获取独占资源, 如果成功,返回true, 否则返回false. 为何不为abstract? 独占模式只需要实现tryAcquire-tryRelease, 非独占需实现tryAcquireShared-tryReleaseShared, 不需要每个模式都实现一遍
addWaiter(Node): 线程加入到CLH队列,并返回该节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
// 尝试快速放入队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS尝试入队,多线程下, 可能设置失败
pred.next = node;
return node;
}
}
enq(node); // 快速放入尾部失败, 则通过enq入队。
return node;
}
enq(node): 不断循环CAS入队
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 找到队列尾部
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 尾部为空,则说明是空队列, 将CAS设置后, 将设置队列投
tail = head;
} else { // 正常放入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(node, int): 根据上面调用, 资源以入队. 进入等待状态, 直到其他线程彻底释放资源后, 唤醒自己. 拿到资源后,执行自己的任务
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 是否拿到锁
try {
boolean interrupted = false; // 等待过程中是否被中断
for (;;) { //自旋
final Node p = node.predecessor(); // 节点前驱
if (p == head && tryAcquire(arg)) { // 如果前驱是头, 即当前就是第二节点了, 就可以尝试获取锁了
setHead(node); // 拿到锁后, 将head执行当前节点; 所以head节点指向的是获取到锁的节点或者是null
p.next = null; // setHead中node.prev已设置为null. 这里设置head.next=null, 方便gc回收head节点; 执行完的节点已出队
failed = false; // 成功拿到节点了
return interrupted; // 返回未被中断过
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; //如果等待过程中被中断了, 哪怕只有一次,也算被中断了
}
} finally {
if (failed) // 如果等待过程中没有获取到锁(超时,或者可中断情况下被中断了), 取消队列中等待
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node, Node): 判断线程是否可以进入waiting状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前一个节点状态
if (ws == Node.SIGNAL) // 前一个节点已设置为通知后一个节点模式(是否锁后通知后一个节点), 则本节点可以安心去waiting了
return true;
if (ws > 0) { // 如果前一个节点状态放弃了(大于0), 递归一直往前找到一个状态正常的节点, 并挂在他后面(跳过的线程不会被引用到而被GC了)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 如果前驱状态正常, 设置前驱状态为通知自己状态(有可能失败)
}
return false;
}
parkAndCheckInterrupt(): 线程等待
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 这里会一直让线程处于waiting状态, 有2个方式会继续往下一行执行: 1)被unpark(), 2)被interupt()
return Thread.interrupted(); // 执行到这里, 线程应该被唤醒了, 检查下线程是否被打断了. 注意: interrupted()调用后会清除interupt状态
}
@startuml
start
if (acquire) then(否)
:入队;
repeat :找安全点;
->park();
:等待;
->unpark() interrupt();
repeat while (队伍老二且尝试取得锁) is (Y) not (N)
:设置为队列头;
else(是)
endif
stop
@enduml
小结:
- 调用
tryAcquire()
尝试获取锁, 获取到则退出 - 没成功, 则加入到等待锁队列尾部, 并标记为独占模式
2.1 节点进入队伍尾部, 检查状态, 找到安全休息点
2.2 调用parking(), 使线程进入waiting状态,等待unparking或者interupt()唤醒自己
2.3 被唤醒后, 看是否有资格获取锁, 如果获取到锁, 则head设置为自己, 并返回线程是否中断状态; 如果没获取到锁, 则继续2.1 - acquireQueued时线程在队列中等待, 当有机会(轮到自己,或者unpark()了), 会去尝试获取资源, 获取到才返回, 如果被中断过, 则返回true,否则false
- 如果在抢占过程中, 是不响应中断的. 等获取到锁后, 在执行自我中断, 将中断补上.
5.5.2.2 独占模式解锁
release(int): 释放指定量资源, 如果彻底释放(state=0), 唤醒等待队列中其他线程来获取资源, 注意: 重入锁使, 只有state=0使, 才能使tryRelease=true.
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试解锁, 注意: 这里根据tryRelease返回值判断释放已释放锁. 如果已彻底释放锁(state=0),则返回true, 否则返回false
Node h = head; // 找到头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒等待队列里的下一个线程
return true;
}
return false;
}
unparkSuccessor(Node): 唤醒队列中下一个线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 当前节点状态
if (ws < 0) // 当前节点状态正常
compareAndSetWaitStatus(node, ws, 0); // 将当前节点状态设置为0, 允许失败
Node s = node.next; // 找到下一个唤醒节点
if (s == null || s.waitStatus > 0) { //如果下一节点状态异常
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后往前递归查找到一个状态为正常的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 找到节点后唤醒
LockSupport.unpark(s.thread);
}
小结:
- 释放指定量的资源, 如果彻底释放了, 则唤醒其他线程获取资源
- 如果release异常, 没有unpark()其他节点, 则其他节点永远都不会唤醒
ReetrantLock实现了AQS. AQS默认通过队列实现了公平锁.
非公平锁:多个线程调用lock时,状态为0, 有一个线程获得了锁,并设置当前线程为独占线程, 其他线程就会调用acquire
来竞争锁(加入队列自旋或者挂起), 当线程释放锁时, 正好有其他线程获取锁,这时A就可以优先比队列中获取到锁. 而队列里面还是按照顺序获取锁
5.5.3 共享模式
5.5.3.1 共享模式加锁
acquireShared(int): 加锁, tryAcquireShared(arg)
返回值定义: 小于0获取失败, 等于0获取成功但没有剩余资源(当前线程获取到了锁), 大于0获取成功还有剩余资源, 其他线程还可以获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) //尝试获取资源,成功则返回
doAcquireShared(arg); // 进入等待队列, 直到获取到资源
}
doAcquireShared(int): 放入队列
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 将当前线程设置为共享模式
boolean failed = true;
try {
boolean interrupted = false; // 判断等待线程是否被中断过
for (;;) { //自旋
final Node p = node.predecessor(); // 获取前一个节点
if (p == head) { // 如果前一个节点使head, 代表使第二个, 此时node被唤醒, 代表很有前一个用完资源唤醒自己
int r = tryAcquireShared(arg); // 尝试获取资源
if (r >= 0) { // 获取成功
setHeadAndPropagate(node, r); // 设置自己为head节点,如果有剩余资源, 还可以唤醒其他节点
p.next = null; // 执行完节点制空, 等待GC
if (interrupted) // 如果节点被打断, 补偿打断机制
selfInterrupt();
failed = false; // 设置为未失败
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && //判断状态,寻找安全点
parkAndCheckInterrupt()) // 进入waiting状态,等着被unpark()或interrupt()
interrupted = true;
}
} finally {
if (failed) // 如果等待过程中没有获取到锁(超时,或者可中断情况下被中断了), 取消队列中等待
cancelAcquire(node);
}
}
setHeadAndPropagate(node, int): 设置队列头
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 获取之前队列头
setHead(node); // 设置当前队列头
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
小结:
- 调用
tryAcquireShared()
尝试获取锁, 获取到则退出 - 没成功, 则加入到等待锁队列尾部, 并标记为共享模式
2.1 节点进入队伍尾部, 检查状态, 找到安全休息点
2.2 调用parking(), 使线程进入waiting状态,等待unparking或者interupt()唤醒自己
2.3 被唤醒后, 看是否有资格获取锁, 如果获取到锁, 则head设置为自己, 如果有余量, 则唤醒下一个节点; 如果没获取到锁, 则继续2.1 - doAcquireShared时线程在队列中等待, 当有机会(轮到自己,或者unpark()了), 会去尝试获取资源, 获取到才返回, 如果被中断过, 则返回true,否则false
- 如果在抢占过程中, 是不响应中断的. 等获取到锁后, 在执行自我中断, 将中断补上.
5.5.3.2 共享模式解锁
releaseShared(int): 解锁,共享模式下资源释放就可以唤醒下一个等待节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared():
private void doReleaseShared() {
for (;;) {
Node h = head; // 当前头节点
if (h != null && h != tail) {
int ws = h.waitStatus; // 查看状态
if (ws == Node.SIGNAL) { // 如果使通知其他节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 状态设置为-
continue; // 重试
unparkSuccessor(h); // 唤醒成功
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果不是无条件传播
continue; // 重试
}
if (h == head) // head变化
break;
}
}
6. 线程池
6.1 好处
- 降低资源消耗. 通过重复利用一创建的线程降低线程创建, 销毁造成的消耗
- 提高响应速度. 任务到达, 不需要等待线程创建就能立即执行
- 提高线程可管理性. 线程为稀缺资源, 如果无限制创建, 不仅消耗系统资源,还会降低系统文档性. 适用线程池可进行同一分派, 调优, 监控.
6.2 submit()与execute()区别
execute()不需要返回值, 无法判断任务是否被线程池执行成功与否
submit()提交需要返回值的任务, 会返回一个Future对象, 通过Future对象判断是否成功执行, 通过get()获取返回值(阻塞线程),get(time,timeunit)则会阻塞一会线程后立即返回.这时有可能任务还未执行完
6.3 创建线程池
6.3.1 通过ThreadPoolExecutor
构造方法实现
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
corePoolSize: 核心线程数,定义最小的可同时运行线程数量
maximumPoolSize: 当队列中存放任务达到对了容量时, 可同时运行线程数量最大数
workQueue: 当线程来时会先判断运行线程是否达到核心线程数,如果达到,则新任务放入队列中
keepAliveTime: 当线程池线程数超过核心线程数, 且没有新任务时, 核心线程外的其他线程不会立即销毁而是等待,超过keepAliveTime只后才会回收销毁
unit: keepAliveTime参数的时间单位
threadFactory: 创建线程工厂
handler: 饱和策略
6.3.2 通过Executors工具类实现
FixedThreadPool: 固定数量的线程池,线程数量始终不变; 新任务提交, 有空线程立即执行; 若没有, 则会暂存到一个任务队列待有线程空闲,便去处理任务队列中的任务, 任务队列最大长度Integer.MAX_VALUE, 可能堆积大量请求, 导致OOM
SingleThreadExecutor: 只有一个线程的线程池, 多的线程放入任务队列按照先入先出顺序执行队列中任务, 任务队列最大长度Integer.MAX_VALUE, 可能堆积大量请求, 导致OOM
CachedThreadPool: 可根据实际情况调整的线程池,线程池线程数量不固定,如有空闲线程则复用. 若没有可复用线程,则创建线程. 待当前任务完成, 将返回线程池进行复用.允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM.
ScheduledThreadPool: 创建定时任务线程池, 线程池线程数量不固定,如有空闲线程则复用. 若没有可复用线程,则创建线程. 待当前任务完成, 将返回线程池进行复用.允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM.
6.3.3 饱和策略
ThreadPoolTaskExecutor定义了一些默认策略
ThreadPoolExecutor.AbortPolicy: 抛出RejectedExecutionException异常来拒绝新任务提交
ThreadPoolExecutor.CallerRunsPolicy: 调用线程自己执行任务,如果调用线程已关闭,则丢弃该任务.
ThreadPoolExecutor.DiscardPolicy: 不处理, 直接丢弃新任务
ThreadPoolExecutor.DiscardOldestPolicy: 丢弃最早未处理任务
6.3.4 原理分析
6.3.5 状态常量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 记录当前线程池状态以及运行线程数
private static final int COUNT_BITS = Integer.SIZE - 3; // 最大线程数 2^29^
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大容量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 正在运行
private static final int SHUTDOWN = 0 << COUNT_BITS; // 拒绝接收新的任务
private static final int STOP = 1 << COUNT_BITS; // 拒绝接收新的任务, 并且不再处理队列中剩余任务, 中断正在执行任务
private static final int TIDYING = 2 << COUNT_BITS; // 表示所有线程已停止,准备执行terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS; // 已执行完terminated()方法
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 工作线程
private static int ctlOf(int rs, int wc) { return rs | wc; } // 区值
通过Integer高三位确定线程池状态, 剩余位确定执行任务数(位运算更高效), 而且暂时不可能出现线程数超过229的情况, 够用了
6.4 线程池优化
动态修改coreSize, maxCoreSize
setCorePoolSize: 修改coresize
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) // 数据校验
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize; // 比较修改前后大小
this.corePoolSize = corePoolSize; // 设置最新值
if (workerCountOf(ctl.get()) > corePoolSize) // 如果当前工作线程数大于核心数
interruptIdleWorkers(); // 向idel线程发起中断
else if (delta > 0) { // 如果增大核心数
int k = Math.min(delta, workQueue.size()); // 查看队列数与新增核心数谁大(队列中是否有任务)
while (k-- > 0 && addWorker(null, true)) { // 开始创建工作线程
if (workQueue.isEmpty())
break;
}
}
}
setMaximumPoolSize: 修改maxCoreSize
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) // 校验数据
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize; // 设置值
if (workerCountOf(ctl.get()) > maximumPoolSize) // 如果工作线程大于最大核心线程
interruptIdleWorkers(); // 向idel线程发起中断
}
prestartCoreThread: 启动一个核心线程
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
prestartAllCoreThreads: 启动所有核心线程
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
allowCoreThreadTimeOut: 允许核心线程超时回收
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
动态调整队列长度: 拷贝LinkedBlockingQueue
源码, 并去除capacity的final属性,并支持动态修改(注意线程安全!)
动态修改代码:
ThreadPoolExecutor ex = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new ResizableCapacityLinkedBlockIngQueue(10), new ThreadFactory());
....
ex.setCorePoolSize(5);
ex.setMaximumPoolSize(20);
ex.allowCoreThreadTimeOut(true);
ex.prestartAllCoreThreads();
((ResizableCapacityLinkedBlockIngQueue)ex.getQueue()).setCapacity(100);
7. Atomic 原子类
7.1 原子类类型
类型 | 类 | 说明 |
---|---|---|
基本类型 | AtomicInteger | 整形原子类 |
^ | AtomicLong | 长整型原子类 |
^ | AtomicBoolean | 布尔原子类 |
数组类型 | AtomicIntegerArray | 整形数组原子类 |
^ | AtomicLongArray | 长整形数组原子类 |
^ | AtomicReferenceArray | 引用类型数组原子类 |
引用类型 | AtomicReference | 引用类型原子类 |
^ | AtomicStampedReference | 带版本号引用类型数组原子类 |
^ | AtomicMarkableReference | 带有标记位的引用类型 |
7.2 AtomicInteger常用方法
int get() //获取当前的值
int getAndSet(int newValue)//获取当前的值,并设置新的值
int getAndIncrement()//获取当前的值,并自增
int getAndDecrement() //获取当前的值,并自减
int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
7.3 AtomicInteger原理
主要使用unsafe + valatile + native方法保证原子操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset; // value对应的内存地址
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value")); // native方法, 返回value对象的内存地址
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value; // 由于value时volatile, 内存可见,就可以保证任何线程拿到的都是最新值
7.4 对象属性修改类型原子类
- AtomicIntegerFieldUpdater: 整形字段更新类
- AtomicLongFieldUpdater: 长整形字段更新类
- AtomicStampedReference: 带版本号引用类型更新
AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user = new User("Java", 22);
System.out.println(a.getAndIncrement(user));// 22
System.out.println(a.get(user));// 23
7.5 LongAdder与AtomicLong
高并发下, AtomicLong进行CAS操作, 只有一个线程执行成功, 其他线程都会失败, 不断自旋(重试), 自旋会成为瓶颈
LongAdder把要操作的资源分散到数组cell中, 每个线程对自己的Cell进行CAS操作, 降低失败次数
字宽由虚拟机位长限制, 32位JVM则位32位. ↩︎