专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

并发编程之可重入锁ReentrantLock

一、简介

ReentrantLock是可重入锁。

可重入:指同一个线程可以多次获取同一把锁。如下所示,method1method2 使用的是同一个锁,method1中调用method2时,调用的线程可以重复获取锁;如若不然,则该线程永远无法获取到“第二个锁”去执行method2。

synchronized 也是可重入锁。

    ReentrantLock lock = new ReentrantLock();
    public void method1() {
        lock.lock();
        // do some  ,  call method2
        method2();
        lock.lock();
    }
     public void method2(){
        lock.lock();
        // do some
        lock.unlock();
    }

二、原理剖析

  • 通过内部类Sync完成并发加锁与释放
  • Sync继承AQS(抽象队列同步器)
  • CAS

分类:

1、 公平锁与非公平锁
2、 独占锁与共享锁

Sync有两个实现类:FairSync、NonfairSync,默认创建的是NonfairSync非公平锁。

原理:

在AQS中维护了被 volatile 修饰的变量 state,说明state是内存可见的,当某个线程改变了这个值后,会及时刷新回主内存,保证各个线程看到的这个变量都是最新的。

     /**
     * The synchronization state.
     */
    private volatile int state;

  • NonfairSync
        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            // 立即去获取一次锁 
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

非公平锁会立即去获取锁,如果获取不到才会进入阻塞获取锁,也就是一上来就有一次获取锁的机会。

接下来看分支的 acquire(1):

public final void acquire(int arg) {
        // 无论如何都会尝试去获取一次
        if (!tryAcquire(arg) &&
            // 如果没有获取到则加入队列中
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1、 tryAcquire(arg)这是比较核心的方法,具体实现:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // c==0 表示当前没有任何一个线程持有锁,去竞争锁。
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 表示当前有线程持有锁,并且就是本线程,再次获取锁,state值增加1,可重入含义的代码体现。
            else if (current == getExclusiveOwnerThread()) { 
                // acquires参数为1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置state为增加后的值
                setState(nextc);
                return true;
            }
            return false;
        }

1、 如果tryAcquire(arg)返回false,没有获取到锁,则加入到队列中,直到获取到锁或者被打断。

// 加入队列,链表的基本操作
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 这里也是利用CAS原子操作,防止多个线程同时添加节点导致部分节点丢失。
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果pred == null,即队列中没有元素,则初始化
        enq(node);
        return node;
    }

初始化过程:

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 创建一个节点为head,CAS
                if (compareAndSetHead(new Node()))
                    // tail设置与head一样
                    tail = head;
            } else {
                node.prev = t;
                // CAS,只要有线程创建头尾节点成功,都会走这一步,这里也可以正常添加节点。
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

第一次循环创建了head,并且tail节点与head节点相同;第二次循环,此时已经有了头尾节点,会执行else逻辑,添加node到tail节点之后,成为最新的tail;如果此时还有其他并发的线程在执行这个初始化的方法,由于这里面也使用了CAS来初始化头尾节点及添加新节点,所以也会执行else代码,添加节点成功的。

然后是将节点添加到队列中:

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 在此处循环
            for (;;) {
                final Node p = node.predecessor();
                // 一直去获取锁,执行的依旧是 tryAcquire()
                // 如果前驱是head,该节点便是第二个节点,那么便有资格去尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 每一次失败后是否需要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 两个方法都返回true,表示线程需要挂起,并且线程打断标识为true.
                    // 这样可以直到线程从等待状态恢复是否经历了被打断。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

线程会在这里阻塞,一直去获取锁(当成为第二个节点的时候就有资格去竞争锁了),直到获取到锁或者被中断,返回结果是线程是否在挂起过程中被中断过。

再看shouldParkAfterFailedAcquire(),parkAndCheckInterrupt():


// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点为SIGNAL,返回true if (ws == Node.SIGNAL) return true; // 前驱节点为CANCELLED,表示被取消了,此时当前节点需要一直向前寻找到 第一个不是CANCELLED的节点,并成为其后继节点,因为一个节点的前驱节点是被取消了的话,没有任何意义。 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // private final boolean parkAndCheckInterrupt() { // 这个方法会让线程进入等待状态 LockSupport.park(this); // 返回当前线程的打断标志位 return Thread.interrupted(); }

waitStatus是节点Node定义的,她是标识线程的等待状态,他主要有如下四个值:

  • CANCELLED = 1:线程已被取消
  • SIGNAL = -1:当前线程的后继线程需要被unpark(唤醒)
  • CONDITION = -2 :线程(处在Condition休眠状态)在等待Condition唤醒
  • PROPAGATE = –3:(共享锁)其它线程获取到“共享锁”

如果一个节点的状态是SIGNAL,表示它的下一个节点需要被唤醒(马上要执行任务了)。

让我们回到这个方法 acquire()

public final void acquire(int arg) {
        // 无论如何都会尝试去获取一次
        if (!tryAcquire(arg) &&
            // 如果没有获取到则加入队列中
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法有返回后,如果线程有被打断,则执行selfInterrupt(); 关于线程的打断,有待研究。


  • FairSync

    公平锁与非公平锁的流程基本一致,不同点在于:


// FairSync protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 这里需要判断当前线程是否位于队列的头部 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

与公平锁相比,非公平锁的不同之处就体现在if(c==0)的条件代码块中:

//----------------非公平锁-----  
  if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
  //----------------公平锁-----  
 if (c == 0) {
                // 
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }

以上是加锁过程。

锁的释放过程:

锁的释放实际调用的是release()方法:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease():释放锁

 protected final boolean tryRelease(int releases) {
            // 更改状态值,一般都是减1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 考虑到重入的情况,同一个线程多次获取到锁,只有state减到0时当前线程才能释放锁。
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

unparkSuccessor():唤醒后继节点

private void unparkSuccessor(Node node) {
        // 当前节点状态更改为0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 如果后继节点为null或者是已取消的状态,没有意义,一直向后寻找第一个有效的节点为止。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒找到的有效的节点。前面,线程获取锁失败后会进入阻塞状态,这里就是唤醒它。
        if (s != null)
            LockSupport.unpark(s.thread);
    }

  • lock(),tryLock(),tryLock(long timeout, TimeUnit unit)
    1. lock()会一直去获取锁,直到获取到锁才返回
    2. tryLock()表示尝试去获取锁,获取不到就立即返回
    3. tryLock(long timeout, TimeUnit unit)表示尝试获取锁,如果时间超时还没获取到,返回false
  • lock()与lockInterruptibly()

    这两个方法在获取锁的过程中都会将线程挂起,但线程终会在某个时刻从挂起状态中恢复,导致恢复的原因一般有两个:

    1. 被其他线程唤醒。如处于队列中第二位置的任务,当第一个任务执行完了后会调用 unparkSuccessor(h)通知该线程去获取锁。
    2. 被其他线程中断。

    lock()与lockInterruptibly()的区别在于,当线程被其他线程中断的时候,lock()会继续去获取锁,记录下被打断,最后获取锁后调用selfInterrupt()设置中断标志(status是否会被重置得看具体的情况,可参考源码注释说明);lockInterruptibly()则不会继续获取锁,抛出InterruptedException异常,异常可以被我们的程序捕获到,异常的处理由调用者决定。


// lock() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 只是做了标记,不影响流程 interrupted = true; ------------------------------------------------- // lockInterruptibly() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 抛出异常,获取锁结束 throw new InterruptedException();
ReentrantLock的加锁与释放源码分析到此为止。


三、总结

这里画了一个非公平锁基本的流程图:

33_1.png

本文使用 tech.souyunku.com 排版

文章永久链接:https://tech.souyunku.com/37457

未经允许不得转载:搜云库技术团队 » 并发编程之可重入锁ReentrantLock

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们