?
?
作者簡介:程磊,一線碼農(nóng),在某手機公司擔任系統(tǒng)開發(fā)工程師;閱碼場榮譽總編輯;日常喜歡研究內(nèi)核基本原理。
?
目錄:
一、概念解析
? ? ? 1.1 名詞解釋
? ? ? 1.2 線程同步與防同步
二、線程防同步方法
? ? ? 2.1 時間上防同步
? ? ? 2.2 空間上防同步
? ? ? 2.3 事后防同步
三、原子操作
? ? ? 3.1 int原子操作
? ? ? 3.2 long原子操作
? ? ? 3.3 bit原子操作
四、加鎖機制
? ? ? 4.1 鎖的底層原理
? ? ? 4.2 簡單自旋鎖
? ? ? 4.3 互斥鎖
? ? ? 4.4 信號量
五、per-CPU 變量
六、RCU 簡介
七、序列鎖
八、總結(jié)回顧
?
?
?
?一、概念解析 ? ?我們在工作中會經(jīng)常遇到線程同步,那么到底什么是線程同步呢,線程同步的本質(zhì)是什么,線程同步的方法又有哪些,為什么會有這些方法呢?在回答這些問題之前,我們先做幾個名詞解釋,以便建立共同的概念基礎(chǔ)。?
1.1 名詞解釋
?
CPU: 本文中的CPU都是指邏輯CPU。
UP: 單處理器(單CPU)。
SMP: 對稱多處理器(多CPU)。
線程、執(zhí)行流: 線程的本質(zhì)是一個執(zhí)行流,但執(zhí)行流不僅僅有線程,還有ISR、softirq、tasklet,都是執(zhí)行流。本文中說到線程一般是泛指各種執(zhí)行流,除非是在需要區(qū)分不同執(zhí)行流時,線程才特指狹義的線程。
并發(fā)、并行: 并發(fā)是指線程在宏觀上表現(xiàn)為同時執(zhí)行,微觀上可能是同時執(zhí)行也可能是交錯執(zhí)行,并行是指線程在宏觀上是同時執(zhí)行,微觀上也是同時執(zhí)行。
偽并發(fā)、真并發(fā): 偽并發(fā)就是微觀上是交錯執(zhí)行的并發(fā),真并發(fā)就是并行。UP上只有偽并發(fā),SMP上既有偽并發(fā)也有真并發(fā)。
臨界區(qū): 訪問相同數(shù)據(jù)的代碼段,如果可能會在多個線程中并發(fā)執(zhí)行,就叫做臨界區(qū),臨界區(qū)可以是一個代碼段被多個線程并發(fā)執(zhí)行,也可以是多個不同的代碼段被多個線程并發(fā)執(zhí)行。
同步: 首先線程同步的同步和同步異步的同步,不是一個意思。線程同步的同步,本文按照字面意思進行解釋,同步就是統(tǒng)一步調(diào)、同時執(zhí)行的意思。
線程同步現(xiàn)象: 線程并發(fā)過程中如果存在臨界區(qū)并發(fā)執(zhí)行的情況,就叫做線程同步現(xiàn)象。
線程防同步機制: 如果發(fā)生線程同步現(xiàn)象,由于臨界區(qū)會訪問共同的數(shù)據(jù),程序可能就會出錯,因此我們要防止發(fā)生線程同步現(xiàn)象,也就是防止臨界區(qū)并發(fā)執(zhí)行的情況,為此我們采取的防范措施叫做線程防同步機制。
?
1.2 線程同步與防同步
?
為什么線程同步叫線程同步,不叫線程防同步,叫線程同步給人的感覺好像就是要讓線程同時執(zhí)行的意思啊。但是實際上線程同步是讓臨界區(qū)不要并發(fā)執(zhí)行的意思,不管你們倆誰先執(zhí)行,只要錯開,誰先誰后執(zhí)行都可以。所以本文后面都采用線程防同步機制、防同步機制等詞。
?
我小時候一直有個疑惑,感冒藥為啥叫感冒藥,感冒藥是讓人感冒的藥啊,不是應(yīng)該叫治感冒藥才對嗎,治療感冒的藥。后來一想,就沒有讓人感冒的藥,所以感冒藥表達的就是治療感冒的藥,沒必要加個治字。但是同時還有一種藥,叫老鼠藥,是治療老鼠的藥嗎,不是啊,是要毒死老鼠的藥,因為沒有人會給老鼠治病。不過我們那里也有把老鼠藥叫做害老鼠藥的,加個害字,意思更明確,不會有歧義。
?
因此本文用了兩個詞,同步現(xiàn)象、防同步機制,使得概念更加清晰明確。
?
說了這么多就是為了闡述一個非常簡單的概念,就是不能同時操作相同的數(shù)據(jù),因為可能會出錯,所以我們要想辦法防止,這個方法我們把它叫做線程防同步。
?
還有一個詞是競態(tài)條件(race condition),很多關(guān)于線程同步的書籍文檔中都有提到,但是我一直沒有理解是啥意思。競態(tài)條件,條件,線程同步和條件也沒啥關(guān)系??;競態(tài),也不知道是什么意思。再看它的英文,condition有情況的意思,race有賽跑、競爭的意思,是不是要表達賽跑情況、競爭現(xiàn)象,想說兩個線程在競爭賽跑,看誰能先訪問到公共數(shù)據(jù)。我發(fā)現(xiàn)沒有競態(tài)條件這個詞對我們理解線程同步問題一點影響都沒有,有了這個詞反而不明所以,所以我們就忽略這個詞。
?
?
?
?二、線程防同步方法?
在我們理解了同步現(xiàn)象、防同步機制這個詞后,下面的就很好理解了。同步現(xiàn)象是指同時訪問相同的數(shù)據(jù),那么如何防止呢,我不讓你同時訪問相同的數(shù)據(jù)不就可以了嘛。因此防同步機制有三大類方法。
?
?
2.1 時間上防同步
?
我不讓你們同時進入臨界區(qū),這樣就不會同時操作相同的數(shù)據(jù)了,有三種方法:
?
1.原子操作
對于個別特別簡單特別短的臨界區(qū),CPU會提供一些原子指令,在一條指令中把多個操作完成,兩個原子指令必然一個在前一個在后地執(zhí)行,不可能同時執(zhí)行。原子指令能防偽并發(fā)和真并發(fā),適用于UP和SMP。
?
2.加鎖
對于大部分臨界區(qū)來說,代碼都比較復雜,CPU不可能都去實現(xiàn)原子指令,因此可以在臨界區(qū)的入口處加鎖,同一時間只能有一個線程進入,獲得鎖的線程進入,在臨界區(qū)的出口處再釋放鎖。未獲得鎖的線程在外面等待,等待的方式有兩種,忙等待的叫做自旋鎖,休眠等待的叫做阻塞鎖。根據(jù)臨界區(qū)內(nèi)的數(shù)據(jù)讀寫操作不同,鎖又可以分為單一鎖和讀寫鎖,單一鎖不區(qū)分讀者寫者,所有用戶都互斥;讀寫鎖區(qū)分讀者和寫者,讀者之間可以并行,寫者與讀者、寫者與寫者之間是互斥的。自旋鎖有單一鎖和讀寫鎖,阻塞鎖也有單一鎖和讀寫鎖。自旋鎖只能防真并發(fā),適用于SMP;休眠鎖能防偽并發(fā)和真并發(fā),適用于UP和SMP。
?
3.臨時禁用偽并發(fā)
對于某些由于偽并發(fā)而產(chǎn)生的同步問題,可以通過在臨界區(qū)的入口處禁用此類偽并發(fā)、在臨界區(qū)的出口處再恢復此類偽并發(fā)來解決。這種方式顯然只能防偽并發(fā),適用于UP和SMP上的單CPU。而自旋鎖只能防真并發(fā),所以在SMP上經(jīng)常會同時使用這兩種方法,同時防偽并發(fā)和真并發(fā)。
?
臨時禁用偽并發(fā)有三種情況:
?
a.禁用中斷
如果線程和中斷、軟中斷和中斷之間會訪問公共數(shù)據(jù),那么在前者的臨界區(qū)內(nèi)可以臨時禁用后者,也就是禁用中斷,達到防止偽并發(fā)的目的。在后者的臨界區(qū)內(nèi)不用采取措施,因為前者不能搶占后者,但是后者能搶占前者。前者一般會同時使用禁中斷和自旋鎖,禁中斷防止偽并發(fā),自旋鎖防止真并發(fā)。
b.禁用軟中斷
如果線程和軟中斷會訪問公共數(shù)據(jù),那么在前者的臨界區(qū)內(nèi)禁用后者,也就是禁用軟中斷,可以達到防止偽并發(fā)的目的。后者不用采取任何措施,因為前者不會搶占后者。前者也可以和自旋鎖并用,同時防止偽并發(fā)和真并發(fā)。
c.禁用搶占
如果線程和線程之間會訪問公共數(shù)據(jù),那么可以在臨界區(qū)內(nèi)禁用搶占,達到防止偽并發(fā)的目的。禁用搶占也可以和自旋鎖并用,同時防止偽并發(fā)和真并發(fā)。
?
2.2 空間上防同步
?
你們可以同時,但是我不讓你們訪問相同的數(shù)據(jù),有兩種方法:
?
1. 數(shù)據(jù)分割
把大家都要訪問的公共數(shù)據(jù)分割成N份,各訪問各的。這也有兩種情況:
a. 在SMP上如果多個CPU要經(jīng)常訪問一個全局數(shù)據(jù),那么可以把這個數(shù)據(jù)拆分成NR_CPU份,每個CPU只訪問自己對應(yīng)的那份,這樣就不存在并發(fā)問題了,這個方法叫做 per-CPU 變量,只能防真并發(fā),適用于SMP,需要和禁用搶占配合使用。
b. TLS,每個線程都用自己的局部數(shù)據(jù),這樣就不存在并發(fā)問題了,能防真并發(fā)和偽并發(fā),適用于UP和SMP。
?
2. 數(shù)據(jù)復制
RCU,只適用于用指針訪問的動態(tài)數(shù)據(jù)。讀者復制指針,然后就可以隨意讀取數(shù)據(jù)了,所有的讀者可以共同讀一份數(shù)據(jù)。寫者復制數(shù)據(jù),然后就可以隨意修改復制后的數(shù)據(jù)了,因為這份數(shù)據(jù)是私有的,不過修改完數(shù)據(jù)之后要修改指針指向最新的數(shù)據(jù),修改指針的這個操作需要是原子的。對于讀者來說,它是復制完指針之后用自己的私有指針來訪問數(shù)據(jù)的,所以它訪問的要么是之前的數(shù)據(jù),要么是修改之后的數(shù)據(jù),而不會是不一致的數(shù)據(jù)。能防偽并發(fā)和真并發(fā),適用于UP和SMP。
?
2.3 事后防同步
?
不去積極預防并發(fā),而是假設(shè)不存在并發(fā),直接訪問數(shù)據(jù)。訪問完了之后再檢查剛才是否有并發(fā)發(fā)生,如果有就再重來一遍,一直重試,直到?jīng)]有并發(fā)發(fā)生為止。這就是內(nèi)核里面的序列鎖seqlock,能防偽并發(fā)和真并發(fā),適用于UP和SMP。
?
下面我們來畫張圖總結(jié)一下:
?
?
?三、原子操作 ? ?我們在剛開始學習線程同步時,經(jīng)常用來舉的一個例子就是,就連一個簡單的i++操作都是線程不安全的。i++對于源碼來說已經(jīng)是非常簡單的語句了,但是它編譯成機器碼之后有三個指令,把數(shù)據(jù)從內(nèi)存加載到寄存器,把寄存器加1,把寄存器的值加1。如果有兩個線程同時執(zhí)行i++話,就會出問題,比如i最開始等于0,每個線程都循環(huán)一萬次i++,我們期望最后的結(jié)果是兩萬,但是實際上最后的結(jié)果是不到兩萬的。對于UP來說,兩個線程輪流執(zhí)行,如果線程切換的點落在三條指令之間就會出問題。對于SMP來說多個CPU同時執(zhí)行更會出問題。為此我們可以采取的辦法可以是每次i++都加鎖,這樣做當然可以,不過這么做有點殺雞用牛刀了。很多CPU專門為這種基本的整數(shù)操作提供了原子指令。?
3.1 int原子操作
?
硬件提供的都是直接對整數(shù)的原子操作,但是在Linux內(nèi)核里并不是直接對一個int類型進行原子操作,而是對一個封裝后的數(shù)據(jù)進行操作。本質(zhì)上還是對int進行操作,那么為什么要做這么一層封裝呢?主要是為了接口語義,大家一看到一個變量是atomic_t類型的,大家立馬就明白這是一個原子變量,不能使用普通加減操作進行運算,要使用專門的接口函數(shù)來操作才對。
?
- ?
- ?
- ?
typedef struct {
int counter;
}?atomic_t;
?
那么對于 atomic_t的原子操作都有哪些呢?
?
Atomic Integer Operation | Description |
---|---|
ATOMIC_INIT(int i) | At declaration, initialize to i |
int atomic_read(atomic_t *v) | Atomically read the integer value of v |
void atomic_set(atomic_t *v, int i) | Atomically set v equal to i. |
void atomic_add(int i, atomic_t *v) | Atomically add i to v |
void atomic_sub(int i, atomic_t *v) | Atomically subtract i from v |
void atomic_inc(atomic_t *v) | Atomically add one to v |
void atomic_dec(atomic_t *v) | Atomically subtract one from v |
int atomic_sub_and_test(int i, atomic_t *v) | Atomically subtract i from v and return true if the result is zero; otherwise false. |
int atomic_add_negative(int i, atomic_t *v) | Atomically add i to v and return true if the result is negative; otherwise false. |
int atomic_add_return(int i, atomic_t *v) | Atomically add i to v and return the result. |
int atomic_sub_return(int i, atomic_t *v) | Atomically subtract i from v and return the result. |
int atomic_dec_and_test(atomic_t *v) | Atomically decrement v by one and return true if zero; false otherwise |
int atomic_inc_and_test(atomic_t *v) | Atomically increment v by one and return true if the result is zero; false otherwise. |
?
3.2 long原子操作
?
如果上面的int類型(32位)不能滿足我們的原子操作需求,系統(tǒng)還為我們定義了64位的原子變量。
?
- ?
- ?
- ?
typedef struct {
s64 counter;
}?atomic64_t;
?
同樣的也為我們提供了一堆原子接口:
?
Atomic Integer Operation | Description |
---|---|
ATOMIC64_INIT(int i) | At declaration, initialize to i |
int atomic64_read(atomic_t *v) | Atomically read the integer value of v |
void atomic64_set(atomic_t *v, int i) | Atomically set v equal to i. |
void atomic64_add(int i, atomic_t *v) | Atomically add i to v |
void atomic64_sub(int i, atomic_t *v) | Atomically subtract i from v |
void atomic64_inc(atomic_t *v) | Atomically add one to v |
void atomic64_dec(atomic_t *v) | Atomically subtract one from v |
int atomic64_sub_and_test(int i, atomic_t *v) | Atomically subtract i from v and return true if the result is zero; otherwise false. |
int atomic64_add_negative(int i, atomic_t *v) | Atomically add i to v and return true if the result is negative; otherwise false. |
int atomic64_add_return(int i, atomic_t *v) | Atomically add i to v and return the result. |
int atomic64_sub_return(int i, atomic_t *v) | Atomically subtract i from v and return the result. |
int atomic64_dec_and_test(atomic_t *v) | Atomically decrement v by one and return true if zero; false otherwise |
int atomic64_inc_and_test(atomic_t *v) | Atomically increment v by one and return true if the result is zero; false otherwise. |
?
3.3 bit原子操作
?
系統(tǒng)還給我們提供了位運算的原子操作,不過并沒有封裝數(shù)據(jù)類型,而是操作一個void * 指針所指向的數(shù)據(jù),我們要操作的位數(shù)要在我們希望的數(shù)據(jù)之內(nèi),這點是由我們自己來保證的。
?
Atomic Bitwise Operation | Description |
---|---|
void set_bit(int nr, void *addr) | Atomically set the nr-th bit starting from addr. |
void clear_bit(int nr, void *addr) | Atomically clear the nr-th bit starting from addr. |
void change_bit(int nr, void *addr) | Atomically flip the value of the nr-th bit starting from addr. |
int test_and_set_bit(int nr, void *addr) | Atomically set the nr-th bit starting from addr and return the previous value. |
int test_and_clear_bit(int nr, void *addr) | Atomically clear the nr-th bit starting from addr and return the previous value. |
int test_and_change_bit(int nr, void *addr) | Atomically flip the nr-th bit starting from addr and return the previous value. |
int test_bit(int nr, void *addr) | Atomically return the value of the nrth bit starting from addr. |
?
有了原子操作,我們對這些簡單的基本運算就不用使用加鎖機制了,就可以提高效率。
?
?
?四、加鎖機制 ? ?有很多臨界區(qū),并不是一些簡單的整數(shù)運算,不可能要求硬件都給提供原子操作。為此,我們需要鎖機制,在臨界區(qū)的入口進行加鎖操作,加到鎖的才能進入臨界區(qū)進行操作,加不到鎖的要一直在臨界區(qū)外面等候。等候的方式有兩種,一種是忙等待,就是自旋鎖,一種是休眠等待,就是阻塞鎖,阻塞鎖在Linux里面的實現(xiàn)叫做互斥鎖。?
4.1 鎖的底層原理
?
我們該如何實現(xiàn)一個鎖呢,下面我們嘗試直接用軟件來實現(xiàn)試試,代碼如下:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
int lock = 0;
void lock(int * lock){
start:
if(*lock == 0)
*lock = 1;
else{
wait();
goto start;
}
}
void unlock(int * lock){
*lock = 0;
wakeup();
}
?
可以看到這個鎖的實現(xiàn)邏輯很簡單,定義一個整數(shù)作為鎖,0代表沒人持鎖,1代表有人持鎖。我們先判斷鎖的值,如果是0代表沒人持鎖,我們給鎖賦值1,代表我們獲得了鎖,然后函數(shù)返回,就可以進入臨界區(qū)了。如果鎖是1,代表有人已經(jīng)持有了鎖,此時我們就需要等待,等待函數(shù)wait,可以用忙等待,也可以用休眠等待。釋放鎖的時候把鎖設(shè)為0,然后wakeup其他線程或者為空操作。被喚醒的線程從wait中醒來,然后又重走加鎖流程。但是這里面有個問題,就是加鎖操作也是個臨界區(qū),如果兩個線程在兩個CPU上同時執(zhí)行到加鎖函數(shù),雙方都檢測到鎖是0,然后都把鎖置為1,都加鎖成功,這不是出問題了嗎。鎖就是用來保護臨界區(qū)的,但是加鎖本身也是臨界區(qū),也需要保護,該怎么辦呢?唯一的方法就是求助于硬件,讓加鎖操作成為原子操作,X86平臺提供了CAS指令來實現(xiàn)這個功能。CAS,Compare and Swap,比較并交換,它的接口邏輯是這樣的:
?
- ?
int?cas(int?*?p,?old_value,?new_value)
?
如果p位置的值等于old_value,就把p位置的值設(shè)置為new_value,并返回1,否則返回0。關(guān)鍵就在于cas它是硬件實現(xiàn)的,是原子的。這樣我們就可以用cas指令來實現(xiàn)鎖的邏輯,如下所示:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
int lock = 0;
void lock(int * lock){
start:
if(cas(lock, 0, 1))
return;
else{
wait();
goto start;
}
}
void unlock(int * lock){
*lock = 0;
wakeup();
}
?
4.2 簡單自旋鎖
?
為什么在這里要先講簡單自旋鎖呢?因為互斥鎖、信號量這些鎖的內(nèi)部實現(xiàn)都用了自旋鎖,所以先把自旋鎖的邏輯講清楚才能繼續(xù)講下去。為什么是簡單自旋鎖呢,因為自旋鎖現(xiàn)在已經(jīng)發(fā)展得很復雜了,所以這里就是講一下自旋鎖的簡單版本,因為它們的基本邏輯是一致的。自旋鎖由于在加鎖失敗時是忙等待,所以不用考慮等待隊列、睡眠喚醒的問題,所以實現(xiàn)比較簡單,下面是簡單自旋鎖的實現(xiàn)代碼:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
int lock = 0;
void spin_lock(int * lock){
while(!cas(lock, 0, 1))
}
void spin_unlock(int * lock){
*lock = 0;
}
?
可以看到簡單自旋鎖的代碼是相當簡單,加鎖的時候不停地嘗試加鎖,一直失敗一直加,直到成功才返回。釋放鎖的時候更簡單,直接把鎖置為0就可以了。此時如果有其它自旋鎖在自旋,由于鎖已經(jīng)變成了0,所以就會加鎖成功,結(jié)束自旋。注意這個代碼只是自旋鎖的邏輯演示,并不是真正的自旋鎖實現(xiàn)。
?
4.3 互斥鎖
?
互斥鎖是休眠鎖,加鎖失敗時要把自己放入等待隊列,釋放鎖的時候要考慮喚醒等待隊列的線程。互斥鎖的定義如下(刪除了一些配置選項):
?
- ?
- ?
- ?
- ?
- ?
struct mutex {
atomic_long_t owner;
raw_spinlock_t wait_lock;
struct list_head wait_list;
};
?
可以看到互斥鎖的定義有atomic_long_t owner,這個和我們之前定義的int lock是相似的,只不過這里是個加強版,0代表沒加鎖,加鎖的時候是非0,而不是簡單的1,而是記錄的是加鎖的線程。然后是自旋鎖和等待隊列,自旋鎖是用來保護等待隊列的。這里的自旋鎖為什么要用raw_spinlock_t呢,它和spinlock_t有什么區(qū)別呢?在標準的內(nèi)核版本下是沒有區(qū)別的,如果打了RTLinux補丁之后它們就不一樣了,spinlock_t會轉(zhuǎn)化為阻塞鎖,raw_spinlock_t還是自旋鎖,如果需要在任何情況下都要自旋的話請使用raw_spinlock_t。下面我們看一下它的加鎖操作:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current;
unsigned long zero = 0UL;
if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
return true;
return false;
}
?
可以看到加鎖時先嘗試直接加鎖,用的就是CAS原子指令(x86的CAS指令叫做cmpxchg)。如果owner是0,代表當前鎖是開著的,就把owner設(shè)置為自己(也就是當前線程,struct task_struct * 強轉(zhuǎn)為 ulong),代表自己成為這個鎖的主人,也就是自己加鎖成功了,然后返回true。如果owner不為0的話,代表有人已經(jīng)持有鎖了,返回false,后面就要走慢速路徑了,也就是把自己放入等待隊列里休眠等待。下面看看慢速路徑的代碼是怎么實現(xiàn)的:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched
__mutex_lock(struct mutex *lock, unsigned int state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, unsigned int state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
int ret;
raw_spin_lock(&lock->wait_lock);
waiter.task = current;
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
set_current_state(state);
for (;;) {
bool first;
if (__mutex_trylock(lock))
goto acquired;
raw_spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
first = __mutex_waiter_is_first(lock, &waiter);
set_current_state(state);
if (__mutex_trylock_or_handoff(lock, first))
break;
raw_spin_lock(&lock->wait_lock);
}
acquired:
__set_current_state(TASK_RUNNING);
__mutex_remove_waiter(lock, &waiter);
raw_spin_unlock(&lock->wait_lock);
return ret;
}
static void
__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
struct list_head *list)
{
debug_mutex_add_waiter(lock, waiter, current);
list_add_tail(&waiter->list, list);
if (__mutex_waiter_is_first(lock, waiter))
__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
}
?
可以看到慢速路徑最終會調(diào)用函數(shù)__mutex_lock_common,這個函數(shù)本身是很復雜的,這里進行了大量的刪減,只保留了核心邏輯。函數(shù)先加鎖mutex的自旋鎖,然后把自己放到等待隊列上去,然后就在無限for循環(huán)中休眠并等待別人釋放鎖并喚醒自己。For循環(huán)的入口處先嘗試加鎖,如果成功就goto acquired,如果不成功就釋放自旋鎖,并調(diào)用schedule_preempt_disabled,此函數(shù)就是休眠函數(shù),它會調(diào)度其它進程來執(zhí)行,自己就休眠了,直到有人喚醒自己才會醒來繼續(xù)執(zhí)行。別人釋放鎖的時候會喚醒自己,這個我們后面會分析。當我們被喚醒之后會去嘗試加鎖,如果加鎖失敗,再次來到for循環(huán)的開頭處,再試一次加鎖,如果不行就再走一次休眠過程。為什么我們加鎖會失敗呢,因為有可能多個線程同時被喚醒來爭奪鎖,我們不一定會搶鎖成功。搶鎖失敗就再去休眠,最后總會搶鎖成功的。
?
把自己加入等待隊列時會設(shè)置flag MUTEX_FLAG_WAITERS,這個flag是設(shè)置在owner的低位上去,因為task_struct指針至少是L1_CACHE_BYTES對齊的,所以最少有3位可以空出來做flag。
?
下面我們再來看一下釋放鎖的流程:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
void __sched mutex_unlock(struct mutex *lock)
{ if (__mutex_unlock_fast(lock))
return;
__mutex_unlock_slowpath(lock, _RET_IP_);
}
static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current;
return atomic_long_try_cmpxchg_release(&lock->owner, &curr, 0UL);
}
?
解鎖的時候先嘗試快速解鎖,快速解鎖的意思是沒有其它在等待隊列里,可以直接釋放鎖。怎么判斷等待隊列里沒有線程在等待呢,這就是前面設(shè)置的MUTEX_FLAG_WAITERS的作用了。如果設(shè)置了這個flag,lock->owner 和 curr就不會相等,直接釋放鎖就會失敗,就要走慢速路徑。慢速路徑的代碼如下:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
struct task_struct *next = NULL;
DEFINE_WAKE_Q(wake_q);
unsigned long owner;
owner = atomic_long_read(&lock->owner);
for (;;) {
if (atomic_long_try_cmpxchg_release(&lock->owner, &owner, __owner_flags(owner))) {
if (owner & MUTEX_FLAG_WAITERS)
break;
}
}
raw_spin_lock(&lock->wait_lock);
if (!list_empty(&lock->wait_list)) {
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
wake_q_add(&wake_q, next);
}
raw_spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q);
}
?
上述代碼進行了一些刪減。可以看出上述代碼會先釋放鎖,然后喚醒等待隊列里面的第一個等待者。
?
4.4 信號量
?
信號量與互斥鎖有很大的不同,互斥鎖代表只有一個線程能同時進入臨界區(qū),而信號量是個整數(shù)計數(shù),代表著某一類資源有多少個,能同時讓多少個線程訪問這類資源。信號量也沒有加鎖解鎖操作,信號量類似的操作叫做down和up,down代表獲取一個資源,up代表歸還一個資源。
?
下面我們先看一下信號量的定義:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
#define __SEMAPHORE_INITIALIZER(name, n)
{
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock),
.count = n,
.wait_list = LIST_HEAD_INIT((name).wait_list),
}
static inline void sema_init(struct semaphore *sem, int val)
{
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
}
?
可以看出信號量和互斥鎖的定義很相似,都有一個自旋鎖,一個等待隊列,不同的是信號量沒有owner,取而代之的是count,代表著某一類資源的個數(shù),而且自旋鎖同時保護著等待隊列和count。信號量初始化時要指定count的大小。
?
我們來看一下信號量的down操作(獲取一個資源):
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
void down(struct semaphore *sem)
{
unsigned long flags;
might_sleep();
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
__down(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = current;
waiter.up = false;
for (;;) {
if (signal_pending_state(state, current))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state);
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
?
可以看出我們會先持有自旋鎖,然后看看count是不是大于0,大于0的話代表資源還有剩余,我們直接減1,代表占用一份資源,就可以返回了。如果不大于0的話,代表資源沒有了,我們就進去等待隊列等待。
?
我們再來看看up操作(歸還資源):
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);
}
?
先加鎖自旋鎖,然后看看等待隊列是否為空,如果為空的話直接把count加1就可以了。如果不為空的話,則代表有人在等待資源,資源就不加1了,直接喚醒隊首的線程來獲取。
?
?
?五、per-CPU變量 ? ?前面講的原子操作和加鎖機制都是從時間上防同步的,現(xiàn)在我們開始講空間上防同步,先來講講per-CPU變量。如果我們要操作的數(shù)據(jù)和當前CPU是密切相關(guān)的,不同的CPU可以操作不同的數(shù)據(jù),那么我們就可以把這個變量定義為per-CPU變量,每個CPU就可以各訪問各的,互不影響了。這個方法可以防止多個CPU之間的真并發(fā),但是同一個CPU上如果有偽并發(fā),還是會出問題,所以還需要禁用偽并發(fā)。per-CPU變量的定義和使用方法如下表所示:?
Macro or function name | Description |
---|---|
DEFINE_PER_CPU(type, name) | Statically allocates a per-CPU array called name of type data structures |
per_cpu(name, cpu) | Selects the element for CPU cpu of the per-CPU array name |
_ _get_cpu_var(name) | Selects the local CPU’s element of the per-CPU array name |
get_cpu_var(name) | Disables kernel preemption, then selects the local CPU’s element of the per-CPU array name |
put_cpu_var(name) | Enables kernel preemption (name is not used) |
alloc_percpu(type) | Dynamically allocates a per-CPU array of type data structures and returns its address |
free_percpu(pointer) | Releases a dynamically allocated per-CPU array at address pointer |
per_cpu_ptr(pointer, cpu) | Returns the address of the element for CPU cpu of the per-CPU array at address pointer |
?
?
?六、RCU簡介 ? ?RCU是一種非常巧妙的空間防同步方法。首先它只能用于用指針訪問的動態(tài)數(shù)據(jù)。其次它采取讀者和寫者分開的方法,讀者讀取數(shù)據(jù)要先復制指針,用這個復制的指針來訪問數(shù)據(jù),這個數(shù)據(jù)是只讀的,不會被修改,很多讀者可以同時來訪問。寫者并不去直接更改數(shù)據(jù),而是先申請一塊內(nèi)存空間,把數(shù)據(jù)都復制過來,在這個復制的數(shù)據(jù)上修改數(shù)據(jù),由于這塊數(shù)據(jù)是私有的,所以可以隨意修改,也不用加鎖。修改完了之后,寫者要原子的修改指針的值,讓它指向自己新完成的數(shù)據(jù)。這對于讀者來說是沒有影響的,因為讀者已經(jīng)復制了指針,所以讀者讀的還是原來的數(shù)據(jù)沒有變,新來的讀者會復制新的指針,訪問新的數(shù)據(jù),讀者訪問的一直都是一致性的數(shù)據(jù),不會訪問到修改一半的數(shù)據(jù)。RCU的難點在于,原有的數(shù)據(jù)怎么回收,當寫者更新指針之后,原先的數(shù)據(jù)就過期了,當所有老的讀者都離開臨界區(qū)之后,這個數(shù)據(jù)的內(nèi)存需要被釋放,寫者需要判斷啥時候老的讀者全都離開臨界區(qū)了,才能去釋放老的數(shù)據(jù)。?
?
?七、序列鎖 ? ?除了前面講的時間防同步、空間防同步,Linux還有一種非常巧妙的防同步方法,那就是不妨了事后再補救,我第一次看到這個方法的時候,真是拍案叫絕,驚嘆不已,還能這么做。這種方法叫做序列鎖,它的思想就好比是,我家里也不鎖門了,小偷偷就偷吧,偷了我就再報警把東西找回來。反正小偷又不是天天來我家偷東西,小偷來的次數(shù)非常少,我天天鎖門太費勁了。我每天早上看一下啥東西丟了沒,丟了就報警把東西找回來。當然這個類比并不完全像,只是大概邏輯比較像,下面我們就來講一講序列鎖的做法。序列鎖區(qū)分讀者和寫者,讀者可以并行,寫者還是需要互斥的,讀者和寫者之間也可以并行,所以當讀者很頻繁,寫者很偶發(fā)的時候就適合用序列鎖。這個鎖有一個序列號,初始值是0,寫者進入臨界區(qū)時把這個序列號加1,退出時再加1,讀者讀之前先獲取這個鎖的序列號,如果是奇數(shù)說明有寫者在臨界區(qū),就不停地獲取序列號,直到序列號為偶數(shù)為止。然后讀者進入臨界區(qū)進行讀操作,然后退出臨界區(qū)的時候再讀取一下序列號。如果和剛才獲取的序列號不一樣,說明有寫者剛才進來過,再重新走一遍剛才的操作。如果序列號還不一樣就一直重復,直到序列號一樣為止。?
下面我們先來看看序列鎖的定義:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
typedef struct seqcount {
unsigned sequence;
} seqcount_t;
typedef struct {
seqcount_spinlock_t seqcount;
spinlock_t lock;
}?seqlock_t;
?
seqlock_t包含一個自旋鎖和一個seqcount_spinlock_t,seqcount_spinlock_t經(jīng)過一些復雜的宏定義包含了seqcount_t,所以可以簡單地認為seqlock_t包含一個自旋鎖和一個int序列號。
?
下面我們看一下寫者的操作:
?
- ?
- ?
pid_t?fork(void);
?
寫者的操作很簡單,就是用自旋鎖實現(xiàn)互斥,然后加鎖解釋的時候都把序列號增加1。
?
下面看讀者的操作:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
static inline void write_seqlock(seqlock_t *sl)
{
spin_lock(&sl->lock);
do_write_seqcount_begin(&sl->seqcount.seqcount);
}
static inline void write_sequnlock(seqlock_t *sl)
{
do_write_seqcount_end(&sl->seqcount.seqcount);
spin_unlock(&sl->lock);
}
?
讀者進入臨界區(qū)之前先通過read_seqbegin獲取一個序列號,在臨界區(qū)的時候調(diào)用read_seqretry看看是否需要重走一遍臨界區(qū)。我們下面看一下內(nèi)核里使用序列鎖的一個例子:
?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
- ?
static inline unsigned read_seqbegin(const seqlock_t *sl)
{
unsigned ret = read_seqcount_begin(&sl->seqcount);
kcsan_atomic_next(0); /* non-raw usage, assume closing read_seqretry() */
kcsan_flat_atomic_begin();
return ret;
}
static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start)
{
/*
* Assume not nested: read_seqretry() may be called multiple times when
* completing read critical section.
*/
kcsan_flat_atomic_end();
return read_seqcount_retry(&sl->seqcount, start);
}
?
可以看到寫者使用序列鎖和正常的使用方法是一樣的,讀者使用序列鎖一般都是配合do while循環(huán)一起使用。
?
通過本文,我們明白了線程同步的本質(zhì),了解了線程防同步的基本邏輯和具體方法。防同步就是防止多個執(zhí)行流同時訪問相同的數(shù)據(jù),所以我們可以從兩點來防,一個是同時(時間上防同步),一個是相同的數(shù)據(jù)(空間上防同步)。時間上防同步我們采取的方法有原子操作,通過硬件來防止同時,加鎖機制,軟件方法來防同時,還有禁用偽并發(fā),防止宏觀上的同時微觀上的交錯??臻g防同步我們采取的方法有數(shù)據(jù)分割、per CPU變量,每個CPU值訪問自己對應(yīng)的數(shù)據(jù)。數(shù)據(jù)復制,RCU,讀的時候復制指針,讀的數(shù)據(jù)是不變的,寫的時候不直接改變數(shù)據(jù),而是先把數(shù)據(jù)復制過來,修改自己的私有副本,這樣就不會有同步的問題,然后再原子地更新指針指向最新的數(shù)據(jù)。
?
?
?八、總結(jié)回顧 ? ?通過本文,我們明白了線程同步的本質(zhì),了解了線程防同步的基本邏輯和具體方法。防同步就是防止多個執(zhí)行流同時訪問相同的數(shù)據(jù),所以我們可以從兩點來防,一個是同時(時間上防同步),一個是相同的數(shù)據(jù)(空間上防同步)。時間上防同步我們采取的方法有原子操作,通過硬件來防止同時,加鎖機制,軟件方法來防同時,還有禁用偽并發(fā),防止宏觀上的同時微觀上的交錯。空間防同步我們采取的方法有數(shù)據(jù)分割、per CPU變量,每個CPU值訪問自己對應(yīng)的數(shù)據(jù)。數(shù)據(jù)復制,RCU,讀的時候復制指針,讀的數(shù)據(jù)是不變的,寫的時候不直接改變數(shù)據(jù),而是先把數(shù)據(jù)復制過來,修改自己的私有副本,這樣就不會有同步的問題,然后再原子地更新指針指向最新的數(shù)據(jù)。?
評論
查看更多