0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內(nèi)不再提示

AQS如何解決線程同步與通信問題

科技綠洲 ? 來源:Java技術指北 ? 作者:Java技術指北 ? 2023-10-13 11:23 ? 次閱讀

我們在第一篇中說到AQS使用的是管程模型,而管程模型是使用條件變量來解決同步通信問題的。條件變量會有兩個方法,喚醒和等待。當條件滿足時,我們會通過喚醒方法將條件隊列中的線程放入第二篇所說的同步隊列中;如果不滿足條件,則會通過等待方法將線程阻塞放入條件隊列中。而AQS中通過ConditionObject類實現(xiàn)了條件變量,所以接下來我們就具體看看ConditionObject類吧。

一 屬性

我們先看下ConditionObject中的屬性

/** 鏈表頭節(jié)點 */
private transient Node firstWaiter;
/** 鏈表尾節(jié)點 */
private transient Node lastWaiter;

開頭說了,條件變量中會有一個條件隊列,ConditionObject中的條件隊列使用的是單向鏈表,firstWaiter和lastWaiter為頭尾節(jié)點,節(jié)點也是使用AQS的內(nèi)部類Node,但同步隊列是個雙向鏈表,條件隊列是單向鏈表,所以條件隊列使用的是Node類中的nextWaiter屬性作為下一個節(jié)點的鏈接指針。

volatile Node prev;
volatile Node next;
Node nextWaiter;

我們可以注意到nextWaiter是沒用volatile修飾的,這是因為線程在調(diào)用await方法進入條件隊列時,是已經(jīng)擁有了鎖的。還有一點需要注意是,條件隊列里面的Node只會存在CANCELLED和CONDITION的狀態(tài),有別于同步隊列。

二 喚醒方法

2.1 signalAll

此方法是喚醒所有條件隊列中的節(jié)點,即將條件隊列中的所有節(jié)點都移動到我們第二篇所說的同步隊列中,然后再去競爭鎖,具體源碼如下:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

上面我們說了,要調(diào)用喚醒和等待方法,都需要此線程獲取鎖,首先我們會通過子類復寫的方法isHeldExclusively來看此時的線程是否已經(jīng)獲得了鎖。如果獲得了鎖,我們會判斷條件隊列的頭節(jié)點是否為null,為null則說明條件隊列中沒有阻塞的Node;如果不為null,則會通過doSignalAll方法來將條件隊列中的所有Node移動到同步隊列中

2.1.1 doSignalAll

doSignalAll方法主要功能就是遍歷條件隊列里面的節(jié)點Node,然后通過transferForSignal方法將Node移動到同步隊列中,源碼如下:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
       // 將next指向first的后繼Node
        Node next = first.nextWaiter;
       // 切斷first與后繼Node的聯(lián)系
        first.nextWaiter = null;
       // 將此node轉(zhuǎn)移到同步隊列中
        transferForSignal(first);
        // 將first指向first的后繼Node
        first = next;
    // 在判斷此時的first是否為null,不是則繼續(xù)循環(huán)
    } while (first != null);
}
2.1.2 transferForSignal

transferForSignal主要功能就是將條件隊列中的節(jié)點Node轉(zhuǎn)移到同步隊列中,源碼如下:

final boolean transferForSignal(Node node) {
    // 說明此節(jié)點狀態(tài)為CANCELLED,所以跳過該節(jié)點(GC會回收)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 入隊方法(獨占鎖獲取中詳細闡述過)
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); 
    return true;
}

首先通過CAS來將Node的狀態(tài)置為0,如果失敗,則說明此時Node狀態(tài)是CANCELLED,則直接返回false;如果Node狀態(tài)成功置為了0,我們就通過enq方法將此節(jié)點入隊到同步隊列中,enq方法已經(jīng)在第二篇文章中講過,這里就不再復述了。enq方法執(zhí)行完成后,說明node已經(jīng)成功進入同步隊列了,然后其返回的是入隊的前驅(qū)節(jié)點,如果前驅(qū)節(jié)點是CANCELLED狀態(tài),或者我們將前驅(qū)節(jié)點的狀態(tài)變?yōu)镾IGNAL失敗,則我們就需要喚醒此節(jié)點去搶鎖。這個如果你看了第二篇文章,你肯定是能夠想到的。

2.2 signal

看名字也能大概猜到,因為signalAll是將條件隊列中所有的Node轉(zhuǎn)移到同步隊列中,所以signal肯定是轉(zhuǎn)移單個Node。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

我們可以對比上面的signalAll方法,其唯一不同點就是signalAll內(nèi)部調(diào)用的是doSignalAll方法,而signal內(nèi)部調(diào)用的是doSignal方法,我們接著來看doSignal:

private void doSignal(Node first) {
    do {
        // 將firstWaiter指向傳入的first的后繼節(jié)點,
        // 然后判斷firstWaiter是否為null,
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

我們可以看到方法里面是個do-While的循環(huán),我們首先將firstWaiter指向first的后繼節(jié)點并判斷是否為null,如果為空,則說明條件隊列中只有first這一個節(jié)點,所以我們將整個隊列清空。然后我們再將first的的nextWaiter指向null斷開連接,進入while條件語句中。while條件語句中,會先調(diào)transferForSignal來轉(zhuǎn)移Node,如果返回為false,即轉(zhuǎn)移失敗,我們會判斷此節(jié)點下一個節(jié)點是否為null,不為null則又進入循環(huán)。

三 等待方法

喚醒方法wait,就是將線程阻塞包裝成節(jié)點放入條件隊列中,等到其他線程喚醒(signal)或者自身中斷后再重新去獲取鎖。所以其又可以大致分為兩個階段,線程阻塞前和阻塞后。

3.1 await—阻塞前

我們先來看下await的源碼:

public final void await() throws InterruptedException {
    // 如果此線程被中斷過,直接拋中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前線程包裝成節(jié)點放入條件隊列
    Node node = addConditionWaiter();
    // 釋放當前線程持有的鎖
    long savedState = fullyRelease(node);
    // 初始化中斷模式參數(shù)
    int interruptMode = 0;
    // 檢查節(jié)點是否在同步隊列中
    while (!isOnSyncQueue(node)) {
       // 不在同步隊列中則阻塞此線程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒后再去獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 當線程是被中斷喚醒,node和后繼節(jié)點是沒有斷開的
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 根據(jù)異常標志位對異常進行處理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
3.1.1 addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter的大致邏輯為:lastWaiter不是null且它的等待狀態(tài)不是CONDITION,說明lastWaiter的狀態(tài)是CANCELLED,所以我們會通過unlinkCancelledWaiters方法來移除條件隊列中所有CANCELLED的節(jié)點。然后我們會將當前線程包裝成一個節(jié)點,我們再會判斷尾節(jié)點是否為null,為null說明條件隊列為空,所以我們就將firstWaiter指向新的節(jié)點;如果不為null,就將尾節(jié)點的后繼節(jié)點指向新節(jié)點,然后再重置lastWaiter。最后將新節(jié)點返回。

3.1.2 fullyRelease

此時入隊成功后,我們就會調(diào)用fullyRelease方法來釋放當前線程所持有的鎖了,我們具體看下源碼:

final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        long savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

其中釋放鎖成功調(diào)用的是release方法,這個方法在第二篇文章中詳述過。如果釋放鎖成功,則將failed狀態(tài)置為false,然后返回savedState狀態(tài),否則我們就會拋出異常。其中savedState是重入鎖的數(shù)量,release方法會一起釋放掉。

再看下finally,如果釋放鎖失敗,我們此線程會拋異常終止,然后在finally將waitStatus置為CANCELLED,然后等待后面被移出條件隊列。

3.1.3 isOnSyncQueue

isOnSyncQueue方法是檢查此節(jié)點是否在同步隊列中,具體源碼如下:

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;        
    return findNodeFromTail(node);
}

先看第一個if語句,如果狀態(tài)是CONDITION或者prev參數(shù)是null,說明此節(jié)點是在條件隊列中,返回為false。再來看第二個if,我們知道,prev和next都是同步隊列中的節(jié)點連接是用的prev和next,所以如果兩個屬性不為null,說明此節(jié)點是在同步隊列中,所以node.next不為null則需要返回true。如果兩個if都不成立,說明這個節(jié)點狀態(tài)是0且prev不為null,即屬于我們中CAS進入同步隊列的情況,則我們會通過findNodeFromTail方法來確認是不是這種情況

3.1.3.1 findNodeFromTail
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

如果此時tail就是node的話,說明node在同步隊列中,如果不是就像前遍歷。我們再回到await方法:

// 省略
while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
// 省略

如果不在同步隊列中,則此線程就被park方法阻塞了,只有當線程被喚醒才會在這里開始繼續(xù)執(zhí)行下面代碼。

3.2 wait—喚醒后

我們再來看看await喚醒后的情形:

public final void await() throws InterruptedException {
    // 省略。。。。
    while (!isOnSyncQueue(node)) {
       // 不在同步隊列中則阻塞此線程
        LockSupport.park(this); // < ----- 被喚醒后從下面開始
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒后再去獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 當線程是被中斷喚醒時,node和后繼節(jié)點是沒有斷開的
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 根據(jù)異常標志位對異常進行處理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我們需要注意的是,線程在這里被喚醒有兩種情況:

  1. 其他線程調(diào)用了doSignal或doSignalAll,
  2. 線程被中斷。

我們需要確定我們被喚醒的情況是哪種,這里是通過checkInterruptWhileWaiting方法來判斷。但在講這個方法前,我們需先了解這個interruptMode有幾種狀態(tài):

/** wait方法退出時,會重新再中斷一次 */
private static final int REINTERRUPT =  1;
/** wait方法退出時,會拋出InterruptedException異常 */
private static final int THROW_IE    = -1;

除了上面兩種,還有一種初始態(tài)0,它代表線程沒有被中斷過,不做任何處理。

3.2.1 checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
   return Thread.interrupted() ?
       (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
       0;
}

我們看下代碼,首先我們會檢查中斷標志位,如果interrupted方法返回false,說明沒發(fā)生中斷,方法最終返回0;如果返回了true,則說明中斷了,則我們需要通過transferAfterCancelledWait方法進一步檢查其他線程是否執(zhí)行了喚醒操作。

3.2.1.1 transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
        enq(node);
        return true;
    }

    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;

}

我們先看第一個if條件,如果條件中的CAS操作成功,說明此時的節(jié)點肯定是在條件隊列中,則我們調(diào)動 enq 方法將此節(jié)點放入到同步隊列中,然后返回true。但是這里需要特別注意,這個節(jié)點的nextWaiter還沒置為null;如果CAS失敗,說明這個節(jié)點可能已經(jīng)在同步隊列中或者在入隊的過程中,所以我們通過while循環(huán)等待此節(jié)點入隊后返回false。

我們再回到調(diào)用transferAfterCncelled 的 checkInterruptWhileWaiting方法中,根據(jù)transferAfterCancelledWait方法返回值我們最終會返回REINTERRUPT或THROW_IE。

然后我們返回到調(diào)用checkInterruptWhileWaiting方法的await方法中。

public final void await() throws InterruptedException {
    // 代碼省略
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我們現(xiàn)在在這里?。?!
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我們可以看到,如果返回值不為0,則直接break跳出循環(huán),如果為0,則再次回到while條件檢查是否在同步隊列中。最后我們看最后剩下的三個if語句:

  1. 通過acquireQueued方法來獲取鎖,這個方法在第二篇中詳細講過,acquireQueued返回true(即獲取鎖的的過程中被中斷了),我們再將interruptMode為0置為REINTERRUPT。
  2. 如果node的nextWaiter不是null。我們會通過unlinkCancelledWaiters方法將條件隊列中所有不為CONDITION的節(jié)點移除。
  3. 最后一個if,線程拿到鎖了,且節(jié)點沒在同步隊列和條件隊列中,await方法其實算完成了,我們這時候只需要對中斷進行善后處理。如果interruptMode不為0,說明線程是被中斷過的,需要通過reportInterruptAfterWait對中斷進行處理。
3.2.1.2 reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

如果是THROW_IE,就是拋異常,如果是REINTERRUPT,就再自我中斷一次。

四 總結(jié)

好了,AQS如何解決線程同步與通信問題,就分析完了,這里我再總結(jié)一下:

AQS通過ConditionObject類來實現(xiàn)條件變量,并通過其喚醒方法、阻塞方法來進行線程的通信。當線程獲取鎖之后,可以通過signal、signalAll等喚醒方法將條件隊列中被阻塞的線程節(jié)點轉(zhuǎn)移到同步隊列中,然后喚醒去競爭鎖;也可以通過wait方法將自己包裝成節(jié)點并放入條件隊列中,然后等待被其他線程喚醒或中斷。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 通信
    +關注

    關注

    18

    文章

    5880

    瀏覽量

    135312
  • 模型
    +關注

    關注

    1

    文章

    3032

    瀏覽量

    48354
  • 線程
    +關注

    關注

    0

    文章

    501

    瀏覽量

    19580
收藏 人收藏

    評論

    相關推薦

    一文詳解Linux線程同步

    我們在工作中會經(jīng)常遇到線程同步,那么到底什么是線程同步呢,線程同步的本質(zhì)是什么,
    發(fā)表于 08-25 11:49 ?568次閱讀

    基于TCP/IP協(xié)議和多線程通信軟件的設計與實現(xiàn)

    ,解析后以數(shù)據(jù)表格形式顯示。重點探討了客戶端/服務器模式下基于TCP/IP協(xié)議通信的多線程實現(xiàn)過程,并利用時序圖和活動圖進行具體描述。討論了在軟件安裝調(diào)試過程中如何解決客戶端死機問題和客戶端數(shù)據(jù)與服務器
    發(fā)表于 05-06 09:02

    Linux多線程線程同步

    。同一進程內(nèi)的線程共享進程的地址空間。通信:進程間通信IPC,線程間可以直接讀寫進程數(shù)據(jù)段(如全局變量)來進行通信——需要進程
    發(fā)表于 12-08 14:14

    IOT-OS之RT-Thread--- 線程同步線程通信

    rt_thread,下面要介紹線程間的同步通信線程同步對象rt_sem / rt_mutex / rt_event和
    發(fā)表于 07-02 06:15

    線程同步機制在應用程序與驅(qū)動程序通信中的應用

    本文對Windows NT 操作系統(tǒng)的多線程同步機制和同步對象進行了分析,以其在檢測儀和經(jīng)緯儀同步通信程序開發(fā)中的應用為例,論述了如何通過共
    發(fā)表于 08-24 10:02 ?16次下載

    Linux多線程同步方法

    線程對共享相同內(nèi)存操作時,就會出現(xiàn)多個線程對同一資源的使用,為此,需要對這些線程進行同步,以確保它們在訪問共享內(nèi)存的時候不會訪問到無效的數(shù)值。
    發(fā)表于 08-08 14:17 ?2027次閱讀

    了解Linux多線程線程同步

    進程間通信IPC,線程間可以直接讀寫進程數(shù)據(jù)段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數(shù)據(jù)的一致性。
    發(fā)表于 04-23 14:23 ?672次閱讀
    了解Linux多<b class='flag-5'>線程</b>及<b class='flag-5'>線程</b>間<b class='flag-5'>同步</b>

    HMC253AQS24 S參數(shù)

    HMC253AQS24 S參數(shù)
    發(fā)表于 04-09 14:22 ?2次下載
    HMC253<b class='flag-5'>AQS</b>24 S參數(shù)

    RT-Thread文檔_線程同步

    RT-Thread文檔_線程同步
    發(fā)表于 02-22 18:29 ?1次下載
    RT-Thread文檔_<b class='flag-5'>線程</b>間<b class='flag-5'>同步</b>

    AQS同步組件有哪些呢?

    AQS 的全稱為 Abstract Queued Synchronizer,是在 JUC(java.util.concurrent)下子包中的類。
    的頭像 發(fā)表于 03-16 09:42 ?404次閱讀

    基于AQS共享模式的同步計數(shù)器——CountDownLatch

    await(): 調(diào)用該方法的線程會被掛起,直到 CountDownLatch 計數(shù)器的值為 0 才繼續(xù)執(zhí)行,底層使用的是 AQS 的 tryAcquireShared()
    發(fā)表于 04-24 15:02 ?572次閱讀
    基于<b class='flag-5'>AQS</b>共享模式的<b class='flag-5'>同步</b>計數(shù)器——CountDownLatch

    AQS獨占鎖的獲取

    AQS提供了兩種鎖,獨占鎖和共享鎖。獨占鎖只有一把鎖,同一時間只允許一個線程獲得鎖;而共享鎖則有多把鎖,同一時間允許多個線程獲得鎖。我們本文主要講獨占鎖。 一. 獨占鎖的獲取 AQS
    的頭像 發(fā)表于 10-13 14:51 ?377次閱讀
    <b class='flag-5'>AQS</b>獨占鎖的獲取

    AQS是什么

    的也是這種MESA模型(其模型圖如下圖所示): 可能這個圖大家現(xiàn)在還看不太明白,沒關系,暫時留個印象,當看完指北君AQS系列文章以后,你再回過頭來看這個圖,肯定秒懂! Java中的synchronized關鍵字就是其管程的具體實現(xiàn),當然,今天所要聊的AQS同樣也是。
    的頭像 發(fā)表于 10-13 14:54 ?407次閱讀
    <b class='flag-5'>AQS</b>是什么

    線程同步的幾種方法

    線程同步是指在多個線程并發(fā)執(zhí)行的情況下,為了保證線程執(zhí)行的正確性和一致性,需要采用特定的方法來協(xié)調(diào)線程之間的執(zhí)行順序和共享資源的訪問。下面
    的頭像 發(fā)表于 11-17 14:16 ?958次閱讀

    線程如何保證數(shù)據(jù)的同步

    線程編程是一種并發(fā)編程的方法,意味著程序中同時運行多個線程,每個線程可獨立執(zhí)行不同的任務,共享同一份數(shù)據(jù)。由于多線程并發(fā)執(zhí)行的特點,會引發(fā)數(shù)據(jù)同步
    的頭像 發(fā)表于 11-17 14:22 ?890次閱讀