1 什么是協(xié)程
協(xié)程是與其他函數(shù)或方法一起并發(fā)運行的函數(shù)或方法。Go協(xié)程可以看作是輕量級線程,與線程相比,創(chuàng)建一個 Go 協(xié)程的成本很小。
1.1 協(xié)程與線程的對比
協(xié)程的成本極低。堆棧大小只有若干KB(2或4KB),并且可以根據(jù)應(yīng)用的需求進(jìn)行增減。而線程必須指定堆棧的大小,其堆棧是固定不變的(一般默認(rèn)2MB)。固定了棧的大小導(dǎo)致兩個問題:
一是對于很多只需要很小的棧空間的線程來說是一個巨大的浪費
二是對于少數(shù)需要巨大棧空間的線程來說又面臨棧溢出的風(fēng)險
協(xié)程會復(fù)用(Multiplex)數(shù)量更少的 OS 線程。即使程序有數(shù)以千計的協(xié)程,也可能只有一個線程。
如果該線程中的某一Go協(xié)程發(fā)生了阻塞(比如說等待用戶輸入),那么系統(tǒng)會再創(chuàng)建一個OS線程,并把其余協(xié)程都移動到這個新的OS線程。所有這一切都在運行時進(jìn)行,作為程序員,我們沒有直接面臨這些復(fù)雜的細(xì)節(jié),而是有一個簡潔的 API 來處理并發(fā)。
Go內(nèi)置半搶占式的協(xié)作調(diào)度器,在用戶態(tài)進(jìn)行協(xié)程的調(diào)度。
Go協(xié)程使用信道(Channel)來進(jìn)行通信。信道用于防止多個協(xié)程訪問共享內(nèi)存時發(fā)生競態(tài)條件(Race Condition)。信道可以看作是協(xié)程之間通信的管道。
1.2 啟動協(xié)程
調(diào)用函數(shù)或者方法時,在前面加上關(guān)鍵字go,可以讓一個新的Go協(xié)程并發(fā)地運行。需要注意:
啟動一個新的協(xié)程時,協(xié)程的調(diào)用會立即返回。程序控制不會去等待Go協(xié)程執(zhí)行完畢。在調(diào)用Go協(xié)程之后,程序控制會立即返回到代碼的下一行,忽略該協(xié)程的任何返回值。
如果希望運行其他Go協(xié)程,Go 主協(xié)程必須繼續(xù)運行著。如果Go主協(xié)程終止,則程序終止,于是其他協(xié)程也不會繼續(xù)運行。
使用示例如下:
package?main import?(?? ????"fmt" ????"time" ) func?numbers()?{?? ????for?i?:=?1;?i?<=?5;?i++?{ ????????time.Sleep(250?*?time.Millisecond) ????????fmt.Printf("%d?",?i) ????} } func?alphabets()?{?? ????for?i?:=?'a';?i?<=?'e';?i++?{ ????????time.Sleep(400?*?time.Millisecond) ????????fmt.Printf("%c?",?i) ????} } func?main()?{?? ????go?numbers()?//啟動協(xié)程 ????go?alphabets()?//啟動協(xié)程 ????//等待子協(xié)程允許完畢,后面介紹更高級的信道方式,這里就簡單的等待 ????time.Sleep(3000?*?time.Millisecond) ????fmt.Println("main?terminated") } //輸出:1?a?2?3?b?4?c?5?d?e?main?terminated
下圖可以清晰的看到三個協(xié)程的運行關(guān)系:
2 信道
2.1 信道的創(chuàng)建
信道可以想像成協(xié)程之間通信的管道。如同管道中的水會從一端流到另一端,通過使用信道,數(shù)據(jù)也可以從一端發(fā)送,在另一端接收。所有信道都關(guān)聯(lián)了一個類型。信道只能運輸這種類型的數(shù)據(jù),而運輸其他類型的數(shù)據(jù)都是非法的。chan T表示T類型的信道,使用make函數(shù)進(jìn)行初始化。例如:
a?:=?make(chan?int)
2.2 信道的收發(fā)
信道旁的箭頭方向指定了是發(fā)送數(shù)據(jù)還是接收數(shù)據(jù)
data?:=?<-?a?//?讀取信道a,保存值到data a?<-?data?//?寫入信道a
發(fā)送與接收默認(rèn)是阻塞的。當(dāng)把數(shù)據(jù)發(fā)送到信道時,程序控制會在發(fā)送數(shù)據(jù)的語句處發(fā)生阻塞,直到有其它協(xié)程從信道讀取到數(shù)據(jù),才會解除阻塞。與此類似,當(dāng)讀取信道的數(shù)據(jù)時,如果沒有其它的協(xié)程把數(shù)據(jù)寫入到這個信道,那么讀取過程就會一直阻塞著。**信道的這種特性能夠幫助Go協(xié)程之間進(jìn)行高效的通信,不需要用到其他編程語言常見的顯式鎖或條件變量。 借助阻塞這個特性,我們可以用一個讀操作等待子協(xié)程結(jié)束,而不是使用sleep:
func?hello(done?chan?bool)?{?? ????fmt.Println("Hello?world?goroutine") ????done?<-?true//子協(xié)程結(jié)束,寫入數(shù)據(jù) } func?main()?{?? ????done?:=?make(chan?bool)//創(chuàng)建bool信道 ????go?hello(done) ????<-done?//讀操作,一直阻塞直到子協(xié)程結(jié)束 ????fmt.Println("main?function") }
2.3 小心死鎖
使用信道需要考慮的一個重點是死鎖。
當(dāng)Go協(xié)程給一個信道發(fā)送數(shù)據(jù)時,照理說會有其他Go協(xié)程來接收數(shù)據(jù)。如果沒有的話,程序就會在運行時觸發(fā) panic,形成死鎖。
當(dāng)有Go協(xié)程等著從一個信道接收數(shù)據(jù)時,我們期望其他的Go協(xié)程會向該信道寫入數(shù)據(jù),要不然程序就會觸發(fā) panic。
2.4 關(guān)閉信道和range遍歷
數(shù)據(jù)發(fā)送方可以關(guān)閉信道,通知接收方這個信道不再有數(shù)據(jù)發(fā)送過來。當(dāng)從信道接收數(shù)據(jù)時,接收方可以多用一個變量來檢查信道是否已經(jīng)關(guān)閉。
func?producer(chnl?chan?int)?{??
????for?i?:=?0;?i?10;?i++?{ ????????chnl?<-?i ????} ????close(chnl)//關(guān)閉信道 } func?main()?{?? ????ch?:=?make(chan?int) ????go?producer(ch) ????for?{ ????????v,?ok?:=?<-ch?//判斷信道是否關(guān)閉 ????????if?ok?==?false?{ ????????????break ????????} ????????fmt.Println("Received?",?v,?ok) ????} }
上面的語句里,如果成功接收信道所發(fā)送的數(shù)據(jù),那么 ok 等于 true。而如果 ok 等于 false,說明我們試圖讀取一個關(guān)閉的通道。從關(guān)閉的信道讀取到的值會是該信道類型的零值。 或者我們可以用range遍歷信道,代替上面示例中的for循環(huán):
func?main()?{?? ????ch?:=?make(chan?int) ????go?producer(ch) ????for?v?:=?range?ch?{//range可以在信道關(guān)閉后自動結(jié)束,不用顯示的判斷 ????????fmt.Println("Received?",v) ????} }
2.5 緩沖信道
上面無緩沖信道的發(fā)送和接收過程是阻塞的,讀寫操作會一直阻塞。我們還可以創(chuàng)建一個有緩沖的信道(Buffered Channel)。只在緩沖已滿的情況,才會阻塞向緩沖信道發(fā)送數(shù)據(jù)。同樣,只有在緩沖為空的時候,才會阻塞從緩沖信道接收數(shù)據(jù)。 通過向 make 函數(shù)時再傳遞一個表示容量的參數(shù)(指定緩沖的大小,sizeof(type) * capacity),就可以創(chuàng)建緩沖信道。
ch?:=?make(chan?type,?capacity)//capacity?應(yīng)該大于?0。無緩沖信道的容量默認(rèn)為?0
緩沖區(qū)容量和長度的區(qū)別:
容量是指信道可以存儲的值的數(shù)量(總的大小)。我們在使用make函數(shù)創(chuàng)建緩沖信道的時候會指定容量大小。
長度是指信道中當(dāng)前排隊的元素個數(shù)(當(dāng)前保存的大?。?。
使用示例如下:
func?write(ch?chan?int)?{??
????for?i?:=?0;?i?5;?i++?{ ????????ch?<-?i?//寫入兩個值之后緩沖區(qū)滿,阻塞等待緩沖區(qū)空閑 ????????fmt.Println("successfully?wrote",?i,?"to?ch") ????} ????close(ch) } func?main()?{?? ????ch?:=?make(chan?int,?2)//緩沖大小為2 ????go?write(ch) ????time.Sleep(2?*?time.Second) ????for?v?:=?range?ch?{ ????????fmt.Println("read?value",?v,"from?ch") ????????time.Sleep(2?*?time.Second) ????} }
2.6 select
select 語句用于在多個發(fā)送/接收信道操作中進(jìn)行選擇。該語法與 switch 類似,所不同的是,這里的每個 case 語句都是信道操作。
select 語句會一直阻塞,直到發(fā)送/接收操作準(zhǔn)備就緒。如果有多個信道操作準(zhǔn)備完畢,select 會隨機(jī)地選取其中之一執(zhí)行。
在沒有case準(zhǔn)備就緒時,可以執(zhí)行select語句中的默認(rèn)情況(Default Case),這通常用于防止select語句一直阻塞,沒有信道可用時會立刻返回。
使用示例:
func?server1(ch?chan?string)?{?? ????time.Sleep(6?*?time.Second) ????ch?<-?"from?server1" } func?server2(ch?chan?string)?{?? ????time.Sleep(3?*?time.Second) ????ch?<-?"from?server2" } func?main()?{?? ????output1?:=?make(chan?string) ????output2?:=?make(chan?string) ????go?server1(output1) ????go?server2(output2) ????select?{//一直阻塞,直到某個信道可用 ????case?s1?:=?<-output1: ????????fmt.Println(s1) ????case?s2?:=?<-output2: ????????fmt.Println(s2) ????} }
3 WaitGroup
3.1 WaitGroup的使用
WaitGroup可以用來等待一批go協(xié)程執(zhí)行結(jié)束,類似于C++的std::join。使用示例如下:
import?( ????"fmt" ????"sync" ????"time" ) func?process(i?int,?wg?*sync.WaitGroup)?{//waitgroup參數(shù)指針,因為要修改內(nèi)部的值,不能是值傳遞 ????fmt.Println("started?Goroutine?",?i) ????time.Sleep(2?*?time.Second) ????fmt.Printf("Goroutine?%d?ended ",?i) ????wg.Done()//子協(xié)程結(jié)束,調(diào)用done減少計數(shù)器 } func?main()?{ ????no?:=?3 ????var?wg?sync.WaitGroup?//定義waitgroup ????for?i?:=?0;?i?3.2 實現(xiàn)一個協(xié)程池
基本思路:
創(chuàng)建一個Go協(xié)程池,監(jiān)聽一個等待作業(yè)分配的輸入型緩沖信道
將作業(yè)添加到該輸入型緩沖信道中
作業(yè)完成后,再將結(jié)果寫入一個輸出型緩沖信道
從輸出型緩沖信道讀取并打印結(jié)果
代碼和解析如下:
package?main import?(?? ????"fmt" ????"math/rand" ????"sync" ????"time" ) //定義任務(wù)和結(jié)果兩個結(jié)構(gòu)體 type?Job?struct?{?? ????id???????int ????randomno?int } type?Result?struct?{?? ????job?????????Job?//包含job結(jié)構(gòu)體 ????sumofdigits?int } //創(chuàng)建任務(wù)和結(jié)果的兩個緩沖信道 var?jobs?=?make(chan?Job,?10)?? var?results?=?make(chan?Result,?10) //計算一個整數(shù)每一位相加的和 func?digits(number?int)?int?{?? ????sum?:=?0 ????no?:=?number ????for?no?!=?0?{ ????????digit?:=?no?%?10 ????????sum?+=?digit ????????no?/=?10 ????} ????time.Sleep(2?*?time.Second) ????return?sum } //遍歷job信道,計算后每個job的數(shù)字并將結(jié)果寫入reslut信道 func?worker(wg?*sync.WaitGroup)?{?? ????for?job?:=?range?jobs?{ ????????output?:=?Result{job,?digits(job.randomno)} ????????results?<-?output ????} ????wg.Done() } //初始化waitgroup,并開啟多個協(xié)程開始計算 func?createWorkerPool(noOfWorkers?int)?{?? ????var?wg?sync.WaitGroup ????for?i?:=?0;?i?4 協(xié)程的同步手段4.1 互斥與Mutex
Mutex用于提供一種加鎖機(jī)制(Locking Mechanism),可確保在某時刻只有一個協(xié)程在臨界區(qū)運行,以防止出現(xiàn)競態(tài)條件。Mutex可以在sync包內(nèi)找到。Mutex 定義了兩個方法:Lock和Unlock。所有在 Lock 和 Unlock 之間的代碼,都只能由一個Go協(xié)程執(zhí)行,于是就可以避免競態(tài)條件。
mutex.Lock() x?=?x?+?1?? mutex.Unlock()使用示例:
//互斥鎖保證線程同步 package?main import?( ?"fmt" ?"sync" ) var?total?struct?{?//全局的結(jié)構(gòu)體變量 ?sync.Mutex?//互斥鎖 ?value??????int } func?worker(wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?for?i?:=?0;?i?<=?100;?i++?{ ??total.Lock()?//加鎖 ??total.value++ ??total.Unlock()?//解鎖 ?} } func?main()?{ ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg) ?go?worker(&wg) ?wg.Wait() ?fmt.Println(total.value) }4.2 原子操作
用互斥鎖來保護(hù)一個數(shù)值型的共享資源,麻煩且效率低下。標(biāo)準(zhǔn)庫的sync/atomic包對原子操作提供了豐富的支持:sync/atomic包對基本的數(shù)值類型及復(fù)雜對象的讀寫都提供了原子操作的支持。atomic.Value原子對象提供了Load和Store兩個原子方法,分別用于加載和保存數(shù)據(jù),返回值和參數(shù)都是interface{}類型。
//原子操作實現(xiàn)線程同步 package?main import?( ?"fmt" ?"sync" ?"sync/atomic" ) var?total?uint64 func?worker(wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?var?i?uint64 ?for?i?=?0;?i?<=?100;?i++?{ ??atomic.AddUint64(&total,?1)?//原子操作,線程安全的 ?} } func?main()?{ ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg) ?go?worker(&wg) ?wg.Wait() ?fmt.Println(atomic.LoadUint64(&total))?//讀取值 }4.3 阻塞信道
上面的示例我們也可以用信道來實現(xiàn)互斥(還是推薦實際中使用Mutex),使用大小為1的緩沖信道可以導(dǎo)致可寫阻塞,這樣其他協(xié)程就不能繼續(xù)執(zhí)行,只能等待阻塞結(jié)束。在并發(fā)編程中,對共享資源的正確訪問需要精確的控制,在目前的絕大多數(shù)語言中,都是通過加鎖等線程同步方案來解決這一困難問題,而Go語言卻另辟蹊徑,它將共享的值通過Channel傳遞(實際上多個獨立執(zhí)行的線程很少主動共享資源)。在任意給定的時刻,最好只有一個Goroutine能夠擁有該資源。
//使用channel實現(xiàn)線程同步 package?main import?( ?"fmt" ?"sync" ) var?total?uint64 func?worker(wg?*sync.WaitGroup,?ch?chan?bool)?{ ?defer?wg.Done() ?var?i?uint64 ?for?i?=?0;?i?<=?100;?i++?{ ??ch?<-?true?//信道被寫入值,其他協(xié)程到這一句也想寫入值,就會阻塞等待信道可寫 ??total++ ??<-ch?//本協(xié)程讀取信道,信道空了,其他協(xié)程可以寫入了 ?} } func?main()?{ ?ch?:=?make(chan?bool,?1)?//?創(chuàng)建大小為1的chan ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg,?ch) ?go?worker(&wg,?ch) ?wg.Wait() ?fmt.Println(total)?//讀取值 }不僅如此,我們還可以通過設(shè)置chan的緩存大小來控制最大并發(fā)數(shù)。5 常見并發(fā)模型
5.1 生產(chǎn)者消費者模型
通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。簡單地說,就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù),然后放到成果隊列中,同時消費者從成果隊列中來取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費變成了異步的兩個過程。當(dāng)成果隊列中沒有數(shù)據(jù)時,消費者就進(jìn)入饑餓的等待中;而當(dāng)成果隊列中數(shù)據(jù)已滿時,生產(chǎn)者則面臨因產(chǎn)品擠壓導(dǎo)致CPU被剝奪的下崗問題。 Go可以使用帶緩沖區(qū)的chan作為成功隊列,由不同的協(xié)程負(fù)責(zé)接入和讀取,很簡單的實現(xiàn)生產(chǎn)者消費者模型:
package?main import?( ?"fmt" ?"os" ?"os/signal" ?"syscall" ) //?生產(chǎn)者:?生成?factor?整數(shù)倍的序列 func?Producer(factor?int,?out?chan<-?int)?{ ?for?i?:=?0;?;?i++?{ ??out?<-?i?*?factor?//往信道緩沖區(qū)內(nèi)寫入數(shù)據(jù) ?} } //?消費者 func?Consumer(in?<-chan?int)?{ ?for?v?:=?range?in?{ ??fmt.Println(v)?//從信道讀取數(shù)據(jù)打印 ?} } func?main()?{ ?ch?:=?make(chan?int,?64)?//?成果隊列,大小為64 ?//開啟了2個Producer生產(chǎn)流水線,分別用于生成3和5的倍數(shù)的序列 ?//然后開啟1個Consumer消費者線程,打印獲取的結(jié)果 ?go?Producer(3,?ch)?//?生成?3?的倍數(shù)的序列 ?go?Producer(5,?ch)?//?生成?5?的倍數(shù)的序列 ?go?Consumer(ch)????//?消費?生成的隊列 ?//?Ctrl+C?退出 ?sig?:=?make(chan?os.Signal,?1) ?signal.Notify(sig,?syscall.SIGINT,?syscall.SIGTERM) ?fmt.Printf("quit?(%v) ",?<-sig) }5.2 發(fā)布訂閱模型
發(fā)布訂閱(publish/subscribe)模型通常被簡寫為pub/sub模型。在這個模型中,消息生產(chǎn)者成為發(fā)布者(publisher),而消息消費者則成為訂閱者(subscriber),生產(chǎn)者和消費者是M:N的關(guān)系。在傳統(tǒng)生產(chǎn)者和消費者模型中,是將消息發(fā)送到一個隊列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個主題。在發(fā)布訂閱模型中,每條消息都會傳送給多個訂閱者。發(fā)布者通常不會知道、也不關(guān)心哪一個訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運行時動態(tài)添加,是一種松散的耦合關(guān)系,這使得系統(tǒng)的復(fù)雜性可以隨時間的推移而增長。在現(xiàn)實生活中,像天氣預(yù)報之類的應(yīng)用就可以應(yīng)用這個并發(fā)模式。 示例代碼如下:
//?發(fā)布訂閱模型實現(xiàn) package?pubsub import?( ?"sync" ?"time" ) type?( ?subscriber?chan?interface{}?????????//?訂閱者為一個管道 ?topicFunc??func(v?interface{})?bool?//?主題為一個過濾器 ) //?發(fā)布者對象 type?Publisher?struct?{ ?m???????????sync.RWMutex?????????????//?讀寫鎖,保護(hù)訂閱者map ?buffer??????int??????????????????????//?訂閱隊列的緩存大小 ?timeout?????time.Duration????????????//?發(fā)布超時時間 ?subscribers?map[subscriber]topicFunc?//?訂閱者信息 } //?構(gòu)建一個發(fā)布者對象,?可以設(shè)置發(fā)布超時時間和緩存隊列的長度 func?NewPublisher(publishTimeout?time.Duration,?buffer?int)?*Publisher?{ ?return?&Publisher{?//返回對象指針 ??buffer:??????buffer, ??timeout:?????publishTimeout, ??subscribers:?make(map[subscriber]topicFunc),?//創(chuàng)建訂閱者map ?} } //?添加一個新的訂閱者,訂閱全部主題 func?(p?*Publisher)?Subscribe()?chan?interface{}?{ ?return?p.SubscribeTopic(nil) } //?添加一個新的訂閱者,訂閱過濾器篩選后的主題 func?(p?*Publisher)?SubscribeTopic(topic?topicFunc)?chan?interface{}?{ ?ch?:=?make(chan?interface{},?p.buffer) ?p.m.Lock() ?p.subscribers[ch]?=?topic ?p.m.Unlock() ?return?ch } //?退出訂閱 func?(p?*Publisher)?Evict(sub?chan?interface{})?{ ?p.m.Lock() ?defer?p.m.Unlock()?//函數(shù)退出時解鎖 ?delete(p.subscribers,?sub)?//根據(jù)key刪除map中一項 ?close(sub)?????????????????//關(guān)閉chan } //?發(fā)布一個主題 func?(p?*Publisher)?Publish(v?interface{})?{ ?p.m.RLock() ?defer?p.m.RUnlock() ?var?wg?sync.WaitGroup ?for?sub,?topic?:=?range?p.subscribers?{ ??wg.Add(1) ??go?p.sendTopic(sub,?topic,?v,?&wg) ?} ?wg.Wait() } //?關(guān)閉發(fā)布者對象,同時關(guān)閉所有的訂閱者管道。 func?(p?*Publisher)?Close()?{ ?p.m.Lock() ?defer?p.m.Unlock() ?for?sub?:=?range?p.subscribers?{ ??delete(p.subscribers,?sub) ??close(sub) ?} } //?發(fā)送主題,可以容忍一定的超時 func?(p?*Publisher)?sendTopic(sub?subscriber,?topic?topicFunc,?v?interface{},?wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?if?topic?!=?nil?&&?!topic(v)?{ ??return ?} ?//監(jiān)聽sub?chan寫入成功或超時 ?select?{ ?case?sub?<-?v: ?case?<-time.After(p.timeout): ?} }我們可以選擇訂閱全部,或指定自定義函數(shù)只訂閱符合要求的消息,返回chan對象:
all?:=?p.Subscribe()?//添加一個訂閱者,訂閱全部消息 //添加一個訂閱者,只關(guān)系有g(shù)olang字符串的內(nèi)容 golang?:=?p.SubscribeTopic(func(v?interface{})?bool?{ ?if?s,?ok?:=?v.(string);?ok?{ ??return?strings.Contains(s,?"golang") ?} ?return?false }) 編輯:黃飛?
?
?
評論
查看更多