IDC

教大家如何手写一个AQS?

作者:admin 2021-04-24 我要评论

手写一个AQS AQS即AbstractQueuedSynchronizer,是用来实现锁和线程同步的一个工具类。大部分操作基于CAS和FIFO队列来实现。 如果让我们自己基于API来实现一个锁...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

手写一个AQS

AQS即AbstractQueuedSynchronizer,是用来实现锁和线程同步的一个工具类。大部分操作基于CAS和FIFO队列来实现。

如果让我们自己基于API来实现一个锁,实现可以分为几个大部分

  • 加锁
  • 解锁
  • 入队
  • 出队
  • 阻塞
  • 唤醒

我们来想一下这几个部分的实现

加锁

1.用一个变量state作为锁的标志位,默认是0,表示此时所有线程都可以加锁,加锁的时候通过cas将state从0变为1,cas执行成功表示加锁成功

2.当有线程占有了锁,这时候有其他线程来加锁,判断当前来抢锁的线程是不是占用锁的线程?是:重入锁,state+1,当释放的时候state-1,用state表示加锁的次数 否:加锁失败,将线程放入等待队列,并且阻塞

3.有没有什么其他可以优化的地方?当放入等待队列的时候,看看有没有其他线程?有,锁被占用了,并且轮不到当前线程来抢,直接阻塞就行了 在放入队列时候,通过cas再尝试获取一波锁,如果获取成功,就不用阻塞了,提高了效率

解锁

1.通过cas对state-1,如果是重入锁,释放一次减一次,当state=0时表示锁被释放。2.唤醒等待队列中的线程

入队

入队这个过程和我们平常使用的队列不同。我们平常使用的队列每次生成一个节点放入即可。

而AQS队列,当队列为空时,第一次生成两个节点,第一个节点代表当前占有锁的线程,第二个节点为抢锁失败的节点。不为空的时候,每次生成一个节点放入队尾。

「当把线程放入队列中时,后续应该做哪些操作呢?」

如果让你写是不是直接放入队列中就完事了?但Doug Lea是这样做的

  1. 如果当前线程是队列中的第二个节点则再尝试抢一下锁(不是第二个节点就不用抢来,轮不到),这样避免了频繁的阻塞和唤醒线程,提高了效率
  2. 上闹钟,让上一个线程来唤醒自己(后续会说到,即更改上一个节点的waitStatus)
  3. 阻塞

出队

当A线程释放锁,唤醒队列中的B线程,A线程会从队列中删除

那出队这个事情由谁来做?是由被唤醒的线程来做,即B线程

阻塞和唤醒

阻塞和唤醒线程调用api即可

  1. // 阻塞线程 
  2. LockSupport.park(this) 
  3. // 唤醒线程 
  4. LockSupport.unpark(this) 

独占锁的获取和释放

JUC中的许多并发工具类ReentrantLock,CountDownLatch等的实现都依赖AbstractQueuedSynchronizer

AbstractQueuedSynchronizer定义了一个锁实现的内部流程,而如何加锁和解锁则在各个子类中实现,典型的模板方法模式

AQS内部维护了一个FIFO的队列(底层实现就是双向链表),通过该队列来实现线程的并发访问控制,队列中的元素是一个Node节点

  1. static final class Node { 
  2.  //表示当前线程以共享模式持有锁 
  3.  static final Node SHARED = new Node(); 
  4.  //表示当前线程以独占模式持有锁 
  5.  static final Node EXCLUSIVE = null
  6.  
  7.  static final int CANCELLED =  1; 
  8.  static final int SIGNAL    = -1; 
  9.  static final int CONDITION = -2; 
  10.  static final int PROPAGATE = -3; 
  11.  
  12.  //当前节点的状态 
  13.  volatile int waitStatus; 
  14.  
  15.  //前继节点 
  16.  volatile Node prev; 
  17.  
  18.  //后继节点 
  19.  volatile Node next
  20.  
  21.  //当前线程 
  22.  volatile Thread thread; 
  23.  
  24.  //存储在condition队列中的后继节点 
  25.  Node nextWaiter; 
  26.  

waitStatus(默认是0)表示节点的状态,包含的状态有

状态 含义
CANCELLED 1 线程获取锁的请求已经取消
SIGNAL -1 表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点
CONDITION -2 表示当前节点在等待condition,即在condition队列中
PROPAGATE -3 表示状态需要向后传播,仅在共享模式下使用)
  0 Node被初始化后的默认值,当前节点在队列中等待获取锁

再来看AbstractQueuedSynchronizer这个类的属性

  1. //等待队列的头节点 
  2. private transient volatile Node head; 
  3.  
  4. //等待队列的尾节点 
  5. private transient volatile Node tail; 
  6.  
  7. //加锁的状态,在不同子类中有不同的意义 
  8. private volatile int state; 

「这个state在不同的子类中有不同的含义」

「ReentrantLock」:state表示加锁的次数,为0表示没有被加锁,为1表示被加锁1次,为2表示被加锁2次,因为ReentrantLock是一个可以重入的锁「CountDownLatch」:state表示一个计数器,当state>0时,线程调用await会被阻塞,当state值被减少为0时,线程会被唤醒「Semaphore」:state表示资源的数量,state>0时,可以获取资源,并将state-1,当state=0时,获取不到资源,此时线程会被阻塞。当资源被释放时,state+1,此时其他线程可以获得资源

AbstractQueuedSynchronizer中的FIFO队列是用双向链表来实现的

在这里插入图片描述

AQS提供了独占锁和共享锁两种加锁方式,每种方式都有响应中断和不响应中断的区别,所以AQS的锁可以分为如下四类

  1. 不响应中断的独占锁(acquire)
  2. 响应中断的独占锁(acquireInterruptibly)
  3. 不响应中断的共享锁(acquireShared)
  4. 响应中断的共享锁(acquireSharedInterruptibly)

而释放锁的方式只有两种

  1. 独占锁的释放(release)
  2. 共享锁的释放(releaseShared)

不响应中断的独占锁

以ReentrantLock为例,从加锁这一部分开始分析

  1. // 调用ReentrantLock.FairSync#lock方法其实就是调用acquire(1); 
  2. public final void acquire(int arg) { 
  3.  if (!tryAcquire(arg) && 
  4.   acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取到锁返回false,否则返回true 
  5.   selfInterrupt();//当前线程将自己中断 
  1. 先尝试获取,如果获取到直接退出,否则进入2
  2. 获取锁失败,以独占模式将线程包装成Node放到队列中
  3. 如果放入的节点是队列的第二个节点,则再尝试获取锁,因为此时锁有可能释放类,不是第二个节点就不用尝试了,因为轮不到。如果获取到锁则将当前节点设为head节点,退出,否则进入4
  4. 设置好闹钟后将自己阻塞
  5. 线程被唤醒,重新竞争锁,获取锁成功,继续执行。如果线程发生过中断,则最后重置中断标志位位true,即执行selfInterrupt()方法

「从代码层面详细分析一波,走起」

tryAcquire是让子类实现的

  1. protected boolean tryAcquire(int arg) { 
  2.  throw new UnsupportedOperationException(); 

这里通过抛出异常来告诉子类要重写这个方法,为什么不将这个方法定义为abstract方法呢?因为AQS有2种功能,独占和共享,如果用abstract修饰,则子类需要同时实现两种功能的方法,对子类不友好

  1. 当队列不为空,尝试将新节点通过CAS的方式设置为尾节点,如果成功,返回附加着当前线程的节点
  2. 当队列为空,或者新节点通过CAS的方式设置为尾节点失败,进入enq方法
  1. private Node addWaiter(Node mode) { 
  2.  Node node = new Node(Thread.currentThread(), mode); 
  3.  Node pred = tail; 
  4.  if (pred != null) { 
  5.   node.prev = pred; 
  6.   if (compareAndSetTail(pred, node)) { 
  7.    pred.next = node; 
  8.    return node; 
  9.   } 
  10.  } 
  11.  enq(node); 
  12.  return node; 
  1. 当队列不为空,一直CAS,直到把新节点放入队尾
  2. 当队列为空,先往对列中放入一个节点,在把传入的节点CAS为尾节点

「前面已经说过了哈,AQS队列为空时,第一次会放入2个节点」

  1. private Node enq(final Node node) { 
  2.  for (;;) { 
  3.   Node t = tail; 
  4.   // 队列为空,进行初始化, 
  5.   if (t == null) { 
  6.    if (compareAndSetHead(new Node())) 
  7.     tail = head; 
  8.   } else { 
  9.    node.prev = t; 
  10.    if (compareAndSetTail(t, node)) { 
  11.     t.next = node; 
  12.     return t; 
  13.    } 
  14.   } 
  15.  } 

放入队列后还要干什么?

  1. 如果是第二个节点再尝试获取一波锁,因为此时有可能锁已经释放了,其他节点就不用了,因为还轮不到
  2. 上闹钟,让别的线程唤醒自己
  3. 阻塞自己
  1. // 自旋获取锁,直到获取锁成功,或者异常退出 
  2. // 但是并不是busy acquire,因为当获取失败后会被挂起,由前驱节点释放锁时将其唤醒 
  3. // 同时由于唤醒的时候可能有其他线程竞争,所以还需要进行尝试获取锁,体现的非公平锁的精髓。 
  4. final boolean acquireQueued(final Node node, int arg) { 
  5.  boolean failed = true
  6.  try { 
  7.   boolean interrupted = false
  8.   for (;;) { 
  9.    // 获取前继节点 
  10.    final Node p = node.predecessor(); 
  11.    // node节点的前继节点是head节点,尝试获取锁,如果成功说明head节点已经释放锁了 
  12.    // 将node设为head开始运行(head中不包含thread) 
  13.    if (p == head && tryAcquire(arg)) { 
  14.     setHead(node); 
  15.     // 将第一个节点出队 
  16.     p.next = null; // help GC 
  17.     failed = false
  18.     return interrupted; 
  19.    } 
  20.    // 获取锁失败后是否可以挂起 
  21.    // 如果可以挂起,则阻塞当前线程(获取锁失败的节点) 
  22.    if (shouldParkAfterFailedAcquire(p, node) && 
  23.     parkAndCheckInterrupt()) 
  24.     interrupted = true
  25.   } 
  26.  } finally { 
  27.   if (failed) 
  28.    cancelAcquire(node); 
  29.  } 

根据前继节点的状态,是否可以阻塞当前获取锁失败的节点

一般情况会经历如下2个过程

  1. 默认情况下上一个节点的waitStatus=0,所以会进入compareAndSetWaitStatus方法,通过cas将上一个节点的waitStatus设置为SIGNAL,然后return false
  2. shouldParkAfterFailedAcquire方法外面是一个死循环,当再次进入这个方法时,如果上一步cas成功,则会走第一个if,return true。接着执行parkAndCheckInterrupt,线程会阻塞
  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 
  2.  int ws = pred.waitStatus; 
  3.  // 前继节点释放时会unpark后继节点,可以挂起 
  4.  if (ws == Node.SIGNAL) 
  5.   return true
  6.  if (ws > 0) { 
  7.   //将CANCELLED状态的线程清理出队列 
  8.   // 后面会提到为什么会有CANCELLED的节点 
  9.   do { 
  10.    node.prev = pred = pred.prev; 
  11.   } while (pred.waitStatus > 0); 
  12.   pred.next = node; 
  13.  } else { 
  14.   // 将前继节点的状态设置为SIGNAL,代表释放锁时需要唤醒后面的线程 
  15.   // cas更新可能失败,所以不能直接返回true 
  16.   compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
  17.  } 
  18.  return false

shouldParkAfterFailedAcquire表示上好闹钟了,可以阻塞线程了。后续当线程被唤醒的时候会从return语句出继续执行,然后进入acquireQueued方法的死循环,重新抢锁。至此,加锁结束。

  1. // 挂起线程,返回是否被中断过 
  2. private final boolean parkAndCheckInterrupt() { 
  3.  // 阻塞线程 
  4.  LockSupport.park(this); 
  5.  // 返回当前线程是否被调用过Thread#interrupt方法 
  6.  return Thread.interrupted(); 

最后用一个流程图来解释不响应中断的独占锁

入队过程中有异常该怎么办?

可以看到上面调用acquireQueued方法发生异常的时候,会调用cancelAcquire方法,我们就详细分析一下这个cancelAcquire方法有哪些作用?

「哪些地方执行发生异常会执行cancelAcquire?」

可以看到调用cancelAcquire方法的有如下几个部分

「分析这些方法的调用,发现基本就是如下2个地方会发生异常」

  1. 尝试获取锁的方法如tryAcquire,这些一般是交给子类来实现的
  2. 当线程是被调用Thread#interrupt方法唤醒,如果要响应中断,会抛出InterruptedException

  1. //处理异常退出的node 
  2. private void cancelAcquire(Node node) { 
  3.  if (node == null
  4.   return
  5.  
  6.  // 设置该节点不再关联任何线程 
  7.  node.thread = null
  8.  
  9.  // 跳过CANCELLED节点,找到一个有效的前继节点 
  10.  Node pred = node.prev; 
  11.  while (pred.waitStatus > 0) 
  12.   node.prev = pred = pred.prev; 
  13.  
  14.  // 获取过滤后的有效节点的后继节点 
  15.  Node predNext = pred.next
  16.  
  17.  // 设置状态为取消 
  18.  node.waitStatus = Node.CANCELLED; 
  19.  
  20.  // case 1 
  21.  if (node == tail && compareAndSetTail(node, pred)) { 
  22.   compareAndSetNext(pred, predNext, null); 
  23.  } else { 
  24.   // case 2 
  25.   int ws; 
  26.   if (pred != head && 
  27.    ((ws = pred.waitStatus) == Node.SIGNAL || 
  28.     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
  29.    pred.thread != null) { 
  30.    Node next = node.next
  31.    if (next != null && next.waitStatus <= 0) 
  32.     compareAndSetNext(pred, predNext, next); 
  33.   } else { 
  34.    // case3 
  35.    unparkSuccessor(node); 
  36.   } 
  37.  
  38.   node.next = node; // help GC 
  39.  } 

将node出队有如下三种情况

当前节点是tail

当前节点不是head的后继节点,也不是tail

当前节点是head的后继节点

「当前节点是tail」

compareAndSetTail,将tail指向pred compareAndSetNext,将pred的next指向null,也就是把当前节点移出队列

在这里插入图片描述

「当前节点不是head的后继节点,也不是tail」

这里将node的前继节点的next指向了node的后继节点,即compareAndSetNext(pred, predNext, next),「注意pred和node节点中间有可能有CANCELLED的节点,怕乱就没画出来」

「当前节点是head的后继节点」

没有对队列进行操作,只是进行head后继节点的唤醒操作(unparkSuccessor方法,后面会分析这个方法),因为此时他是head的后继节点,还是有可能获取到锁的,所以唤醒它尝试获取一波锁,当再次调用到shouldParkAfterFailedAcquire(判断是否应该阻塞的方法时)会把CANCELLED状态的节点从队列中删除

独占锁的释放

独占锁是释放其实就是利用cas将state-1,当state=0表示锁被释放,需要将阻塞队列中的线程唤醒

  1. // 调用ReentrantLock#unlock方法其实就是调用release(1) 
  2. public final boolean release(int arg) { 
  3.  // 尝试释放锁 
  4.  // 当state=0,表示锁被释放,tryRelease返回true,此时需要唤醒阻塞队列中的线程 
  5.  if (tryRelease(arg)) { 
  6.   Node h = head; 
  7.   if (h != null && h.waitStatus != 0) 
  8.    unparkSuccessor(h); 
  9.   return true
  10.  } 
  11.  return false

「tryRelease即具体的解锁逻辑,需要子类自己去实现」

「唤醒同步队列中的线程,可以看到前面加了判断h != null && h.waitStatus != 0」

h = null,说明同步同步队列中没有数据,则不需要唤醒 h = null && waitStatus = 0,同步队列是有了,但是没有线程给自己上闹钟,不用唤醒 h != null && waitStatus < 0,说明头节点被人上了闹钟,自己需要唤醒阻塞的线程 h != null && waitStatus > 0,头节点因为发生异常被设置为取消,但还是得唤醒线程

  1. private void unparkSuccessor(Node node) { 
  2.  
  3.  int ws = node.waitStatus; 
  4.  if (ws < 0) 
  5.   compareAndSetWaitStatus(node, ws, 0); 
  6.  
  7.  // 头结点的下一个节点 
  8.  Node s = node.next
  9.  // 为空或者被取消 
  10.  if (s == null || s.waitStatus > 0) { 
  11.   s = null
  12.   // 从队列尾部向前遍历找到最前面的一个waitStatus<=0的节点 
  13.   for (Node t = tail; t != null && t != node; t = t.prev) 
  14.    if (t.waitStatus <= 0) 
  15.     s = t; 
  16.  } 
  17.  if (s != null
  18.   // 唤醒节点,但并不表示它持有锁,要从阻塞的地方开始运行 
  19.   LockSupport.unpark(s.thread); 

「为什么要从后向前找第一个非CANCELLED的节点呢?」

  1. private Node addWaiter(Node mode) { 
  2.     Node node = new Node(Thread.currentThread(), mode); 
  3.     // Try the fast path of enq; backup to full enq on failure 
  4.     Node pred = tail; 
  5.     if (pred != null) { 
  6.         node.prev = pred; 
  7.         if (compareAndSetTail(pred, node)) { 
  8.          // 线程在这里挂起了 
  9.             pred.next = node; 
  10.             return node; 
  11.         } 
  12.     } 
  13.     enq(node); 
  14.     return node; 

这其实和入队的逻辑有关系,假如Node1在图示位置挂起了,Node1后面又陆续增加了Node2和Node3,如果此时从前向后遍历会导致元素丢失,不能正确唤醒线程

分析一下独占锁响应中断和不响应中断的区别

我们之前说过独占锁可以响应中断,也可以不响应中断,调用的方法如下?

  1. 不响应中断的独占锁(acquire)
  2. 响应中断的独占锁(acquireInterruptibly)

所以我们只需要看这2个方法的区别在哪里就可以,我下面只列出有区别的部分哈。

  1. public final void acquire(int arg) { 
  2.     if (!tryAcquire(arg) && 
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
  4.         selfInterrupt(); 
  1. public final void acquireInterruptibly(int arg) 
  2.         throws InterruptedException { 
  3.     // 判断线程是否被中断 
  4.     if (Thread.interrupted()) 
  5.         throw new InterruptedException(); 
  6.     if (!tryAcquire(arg)) 
  7.         doAcquireInterruptibly(arg); 

「acquire在尝试获取锁的时候完全不管线程有没有被中断,而acquireInterruptibly在尝试获取锁之前会判断线程是否被中断,如果被中断,则直接抛出异常。」

tryAcquire方法一样,所以我们只需要对比acquireQueued方法和doAcquireInterruptibly方法的区别即可

「执行acquireQueued方法当线程发生中断时,只是将interrupted设置为true,并且调用selfInterrupt方法将中断标志位设置为true」

「而执行doAcquireInterruptibly方法,当线程发生中断时,直接抛出异常。」

最后看一下parkAndCheckInterrupt方法,这个方法中判断线程是否中断的逻辑特别巧!

  1. private final boolean parkAndCheckInterrupt() { 
  2.  LockSupport.park(this); 
  3.  return Thread.interrupted(); 

「Thread类提供了如下2个方法来判断线程是否是中断状态」

  1. isInterrupted
  2. interrupted

「这里为什么用interrupted而不是isInterrupted的呢?」

演示一下这2个方法的区别

  1. @Test 
  2. public void testInterrupt() throws InterruptedException { 
  3.     Thread thread = new Thread(() -> { 
  4.         while (true) {} 
  5.     }); 
  6.     thread.start(); 
  7.     TimeUnit.MICROSECONDS.sleep(100); 
  8.     thread.interrupt(); 
  9.     // true 
  10.     System.out.println(thread.isInterrupted()); 
  11.     // true 
  12.     System.out.println(thread.isInterrupted()); 
  13.     // true 
  14.     System.out.println(thread.isInterrupted()); 
  1. @Test 
  2. public void testInterrupt2() { 
  3.     Thread.currentThread().interrupt(); 
  4.     // true 
  5.     System.out.println(Thread.interrupted()); 
  6.     // false 
  7.     System.out.println(Thread.interrupted()); 
  8.     // false 
  9.     System.out.println(Thread.interrupted()); 

「isInterrupted和interrupted的方法区别如下」

Thread#isInterrupted:测试线程是否是中断状态,执行后不更改状态标志 Thread#interrupted:测试线程是否是中断状态,执行后将中断标志更改为false

接着再写2个例子

  1. public static void main(String[] args) { 
  2.  LockSupport.park(); 
  3.  // end被一直阻塞没有输出 
  4.  System.out.println("end"); 
  1. public static void main(String[] args) { 
  2.  Thread.currentThread().interrupt(); 
  3.  LockSupport.park(); 
  4.  // 输出end 
  5.  System.out.println("end"); 

可以看到当线程被中断时,调用park()方法并不会被阻塞

  1. public static void main(String[] args) { 
  2.  Thread.currentThread().interrupt(); 
  3.  LockSupport.park(); 
  4.  // 返回中断状态,并且清除中断状态 
  5.  Thread.interrupted(); 
  6.  // 输出start 
  7.  System.out.println("start"); 
  8.  LockSupport.park(); 
  9.  // end被阻塞,没有输出 
  10.  System.out.println("end"); 

到这我们就能理解为什么要进行中断的复位了

  • 如果当前线程是非中断状态,则在执行park时被阻塞,返回中断状态false
  • 如果当前线程是中断状态,则park方法不起作用,返回中断状态true,interrupted将中断复位,变为false
  • 再次执行循环的时候,前一步已经在线程的中断状态进行了复位,则再次调用park方法时会阻塞

「所以这里要对中断进行复位,是为了不让循环一直执行,让当前线程进入阻塞状态,如果不进行复位,前一个线程在获取锁之后执行了很耗时的操作,那当前线程岂不是要一直执行死循环,造成CPU使用率飙升?」

独占锁的获取和释放我们已经搞清楚了,共享锁的获取和释放我们放到分析CountDownLatch源码的那一节来分析

基于AQS自己写一个锁

你看AQS已经把入队,出队,阻塞,唤醒的操作都封装好了,当我们用AQS来实现自己的锁时,就非常的方便了,只需要重写加锁和解锁的逻辑即可。我这里演示一个基于AQS实现的非重入的互斥锁

  1. public class MyLock { 
  2.  
  3.     private final Sync sync; 
  4.  
  5.     public MyLock() { 
  6.         sync = new Sync(); 
  7.     } 
  8.  
  9.     public class Sync extends AbstractQueuedSynchronizer { 
  10.  
  11.         @Override 
  12.         protected boolean tryAcquire(int arg) { 
  13.             return compareAndSetState(0, arg); 
  14.         } 
  15.  
  16.         @Override 
  17.         protected boolean tryRelease(int arg) { 
  18.             setState(0); 
  19.             return true
  20.         } 
  21.  
  22.     } 
  23.  
  24.     public void lock() { 
  25.         sync.acquire(1); 
  26.     } 
  27.  
  28.     public void unLock() { 
  29.         sync.release(1); 
  30.     } 

本文转载自微信公众号「Java识堂」,可以通过以下二维码关注。转载本文请联系Java识堂公众号。


本文转载自网络,原文链接:https://mp.weixin.qq.com/s/uUAUGQ_WGHJzhJAyBmcWYg

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • 教大家如何手写一个AQS?

    教大家如何手写一个AQS?

  • 关于Python可视化Dash工具—散点地图、

    关于Python可视化Dash工具—散点地图、

  • 详解 ZooKeeper 数据持久化

    详解 ZooKeeper 数据持久化

  • IAP、APP程序拼接,及Hex/Bin格式互转

    IAP、APP程序拼接,及Hex/Bin格式互转

腾讯云代理商
海外云服务器