当前位置:网站首页>AQS原理和介绍
AQS原理和介绍
2022-08-01 20:22:00 【Q z1997】
起源
早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不
够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。
目标
AQS 要实现的功能目标
- 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
- 获取锁超时机制
- 通过打断取消机制
- 独占机制及共享机制
- 条件不满足时的等待机制
设计
AQS 的基本思想其实很简单
获取锁的逻辑
while(state 状态不允许获取) {
if(队列中还没有此线程) {
入队并阻塞
}
}
当前线程出队
释放锁的逻辑
if(state 状态允许了) {
恢复阻塞的线程(s) }
要点
- 原子维护 state 状态
- 阻塞及恢复线程
- 维护队列
- state 设计
- state 使用 volatile 配合 cas 保证其修改时的原子性
- state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
- 阻塞恢复设计
- 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到
- 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题
- park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
- park 线程还可以通过 interrupt 打断
- 队列设计
- 使用了 FIFO 先入先出队列,并不支持优先级队列
- 设计时借鉴了 CLH 队列,它是一种单向无锁队列
队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态
入队伪代码,只需要考虑 tail 赋值的原子性
do {
// 原来的 tail
Node prev = tail;
// 用 cas 在原来 tail 的基础上改为 node
} while(tail.compareAndSet(prev, node))
出队伪代码
// prev 是上一个节点
while((Node prev=node.prev).state != 唤醒状态) {
}
// 设置头节点
head = node;
CLH 好处:
无锁,使用自旋
快速,无阻塞
AQS 在一些方面改进了 CLH
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
// 队列中还没有元素 tail 为 null
if (t == null) {
// 将 head 从 null -> dummy
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 将 node 的 prev 设置为原来的 tail
node.prev = t;
// 将 tail 从原来的 tail 设置为 node
if (compareAndSetTail(t, node)) {
// 原来 tail 的 next 设置为 node
t.next = node;
return t;
}
}
}
}
主要用到 AQS 的并发工具类
自定义锁的实现和测试
package com.example.demo.hmjuc.day19;
import com.example.demo.hmjuc.Sleep;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
public class Test1 {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
Sleep.sleep(1000);
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t2").start();
}
}
/** * 自定义实现的锁 */
class MyLock implements Lock {
/** * AQS锁 */
private static final MySync SYNC = new MySync();
/** * 加锁 * 成功就加锁 * 不成功 就进入队列 */
@Override
public void lock() {
SYNC.acquire(1);
}
/** * 加锁可以被打断的 * * @throws InterruptedException 打断异常 */
@Override
public void lockInterruptibly() throws InterruptedException {
SYNC.acquireInterruptibly(1);
}
/** * 尝试加锁 * 职场时一次 不会进入队列 * * @return true: 成功 */
@Override
public boolean tryLock() {
return SYNC.tryAcquire(1);
}
/** * 尝试加锁 带超时时间 * * @param time 时间 * @param unit 时间单位 * @return true: 成功 * @throws InterruptedException 打断异常 */
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return SYNC.tryAcquireNanos(1, unit.toNanos(time));
}
/** * 解锁 */
@Override
public void unlock() {
SYNC.release(1);
}
/** * 创建条件变量 * * @return 条件变量 */
@Override
public Condition newCondition() {
return SYNC.newCondition();
}
/** * 自定义的同步工具 */
static final class MySync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -3927501487421780249L;
/** * 加锁 * 0 表示无锁 * 1 加锁 * 使用CAS 解决 * * @param age 年龄 * @return 加锁是否成功 */
@Override
protected boolean tryAcquire(int age) {
// 加锁
if (compareAndSetState(0, 1)) {
// 设置当前线程持有锁
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/** * 解锁 * * @param age 年龄 * @return 解锁是否成功 */
@Override
protected boolean tryRelease(int age) {
// 当前线程已解锁
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
// 设置持有锁的线程为空
setExclusiveOwnerThread(null);
// 设置无锁状态 注意: private volatile int state; 这里最后调用这个方法因为 这个 state变量是 volatile 修饰的可以
// 保证代码不会被重排序
setState(0);
return true;
}
/** * 等待的条件 * * @return 条件 */
private Condition newCondition() {
return new ConditionObject();
}
/** * 判读当前线程是否持有锁 * * @return true: 持有, false: 没有持有 */
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
}
测试:
18:44:41.616 DEBUG : locking...
18:44:42.623 DEBUG : unlocking...
18:44:42.624 DEBUG : locking...
18:44:42.624 DEBUG : unlocking...
边栏推荐
- Debug一个ECC的ODP数据源
- Batch get protein .pdb files based on Uniprot ID/PDB ID
- 】 【 nn. The Parameter () to generate and why do you want to initialize
- WhatsApp group sending actual combat sharing - WhatsApp Business API account
- 第57章 业务逻辑之业务实体与数据库表的映射规则定义
- [Multi-task learning] Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts KDD18
- 图文详述Eureka的缓存机制/三级缓存
- Multithreaded producers and consumers
- 第59章 ApplicationPart内置依赖注入中间件
- The graphic details Eureka's caching mechanism/level 3 cache
猜你喜欢
我的驾照考试笔记(4)
18、分布式配置中心nacos
58:第五章:开发admin管理服务:11:开发【管理员人脸登录,接口】;(未实测)(使用了阿里AI人脸识别)(演示了,使用RestTemplate实现接口调用接口;)
Buttons with good user experience should not have hover state on mobile phones
【节能学院】数据机房中智能小母线与列头柜方案的对比分析
Does LabVIEW really close the COM port using VISA Close?
Arthas 常用命令
Remove 360's detection and modification of the default browser
Interview Blitz 70: What are sticky packs and half packs?How to deal with it?
Creo5.0草绘如何绘制正六边形
随机推荐
有用的网站
【torch】张量乘法:matmul,einsum
我的驾照考试笔记(1)
Oracle排序某个字段, 如果这个varchar2类型的字段有数字也有文字 , 怎么按照数字大小排序?
【Dart】dart构造函数学习记录(含dart单例模式写法)
专利检索常用的网站有哪些?
我的驾照考试笔记(4)
作为程序员你应该会的软件
Use WeChat official account to send information to designated WeChat users
Addition, Subtraction, Multiplication of Large Integers, Multiplication and Division of Large Integers and Ordinary Integers
Failed to re-init queues : Illegal queue capacity setting (abs-capacity=0.6) > (abs-maximum-capacity
Interview Blitz 70: What are sticky packs and half packs?How to deal with it?
C语言实现-直接插入排序(带图详解)
Convolutional Neural Network (CNN) mnist Digit Recognition - Tensorflow
有序双向链表的实现。
useful website
【Dart】dart之mixin探究
PROE/Croe如何编辑已完成的草图,让其再次进入草绘状态
Remove 360's detection and modification of the default browser
漏刻有时文档系统之XE培训系统二次开发配置手册