需求:第三方的接口,限制接口請求的QPS,每秒5次
需要控制job「訪問接口」的次數(shù),每秒不能同時超過5次,包括 進行中的任務(wù)、剛啟動的任務(wù)
要確保單位時間內(nèi)(例如每秒)運行的任務(wù)數(shù)量不超過特定的上限(如5個任務(wù)),并且在任務(wù)執(zhí)行完成得很快時,考慮已完成的任務(wù)和正在執(zhí)行的任務(wù)作為正在運行的任務(wù)總數(shù),可以使用限流器來控制任務(wù)的啟動頻率,并結(jié)合使用信號量來管理同時運行的任務(wù)數(shù)量。
具體來說,使用一個信號量來限制同時進行的任務(wù)數(shù)量,并且在任務(wù)完成時,僅在下一秒鐘允許新的任務(wù)開始,以確保即使某些任務(wù)快速完成,也不會在同一秒鐘內(nèi)啟動超過限制數(shù)量的任務(wù)
package main import ( "context" "fmt" "math/rand" "sync" "sync/atomic" "time" "golang.org/x/time/rate" ) func RateLimit() { const maxJobsPerSecond = 5 const numJobs = 22 var wg sync.WaitGroup // 計數(shù)器 var runningJobs int32 // 當前正在執(zhí)行的任務(wù)數(shù)量 var startedJobs int32 // 啟動后的任務(wù)數(shù)量 var finishedJobs int32 // 剛完成的任務(wù)數(shù)量 limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(maxJobsPerSecond)), maxJobsPerSecond) semaphore := make(chan struct{}, maxJobsPerSecond) for i := 1; i <= numJobs; i++ { wg.Add(1) go func(jobID int) { defer wg.Done() limiter.Wait(context.Background()) // 等待限流器允許進行下一個任務(wù) semaphore <- struct{}{} // 獲取信號量 atomic.AddInt32(&startedJobs, 1) atomic.AddInt32(&runningJobs, 1) executeJob(jobID) // 執(zhí)行任務(wù) atomic.AddInt32(&finishedJobs, 1) atomic.AddInt32(&runningJobs, -1) <-time.After(time.Second) // 等待一秒鐘后釋放信號量 <-semaphore // 打印當前狀態(tài) printStatus(&runningJobs, &startedJobs, &finishedJobs) }(i) } wg.Wait() fmt.Println("所有工作完成") }
注意事項
限流器rate.NewLimiter用于控制任務(wù)啟動的頻率,以確保每秒不超過maxJobsPerSecond個任務(wù)開始執(zhí)行。
使用信號量semaphore來控制同時進行的任務(wù)數(shù)量。
為了確保在任何一秒內(nèi)同時進行的任務(wù)數(shù)量不超過限制,在任務(wù)完成后等待一秒鐘,然后再釋放信號量。這樣做可以保證即使任務(wù)很快完成,也不會立即啟動新的任務(wù)。
這種實現(xiàn)方式確保了即使任務(wù)執(zhí)行得很快,每秒鐘啟動的新任務(wù)數(shù)量也不會超過限制,并且同時考慮了正在執(zhí)行和剛剛完成的任務(wù)。
動態(tài)創(chuàng)建協(xié)程
協(xié)程的啟動是動態(tài)的。在代碼中,每個任務(wù)對應(yīng)于一個動態(tài)創(chuàng)建的協(xié)程。這些協(xié)程是在循環(huán)中根據(jù)任務(wù)數(shù)量(numJobs)動態(tài)生成的。
具體來說,每當有一個新的任務(wù)需要執(zhí)行時,都會創(chuàng)建一個新的協(xié)程來處理這個任務(wù)。這是通過在main函數(shù)的循環(huán)中調(diào)用go關(guān)鍵字實現(xiàn)的。這個過程在每次循環(huán)迭代中發(fā)生,從而為每個任務(wù)動態(tài)創(chuàng)建一個新的協(xié)程
由于使用了限流器(rate.Limiter),這些協(xié)程不是一次性全部創(chuàng)建,而是根據(jù)限流器允許的速率逐個創(chuàng)建。每個協(xié)程在開始執(zhí)行任務(wù)之前會等待限流器的許可,以此確保每秒啟動的任務(wù)數(shù)量不超過設(shè)定的最大值
func executeJob(jobID int) { startTime := time.Now() // 記錄任務(wù)開始時間 // 模擬任務(wù)執(zhí)行時間 fmt.Printf("%v Job %d started ",time.Now().Format("2006-01-02 1505.000"), jobID) // 初始化隨機數(shù)種子 rand.Seed(time.Now().UnixNano()) // 隨機生成一個時間間隔(例如,1到5000毫秒之間) min := 1 max := 5000 duration := time.Duration(rand.Intn(max-min+1)+min) * time.Millisecond time.Sleep(duration) durationCost := time.Since(startTime) // 計算任務(wù)耗時 fmt.Printf("%v Job %d finished Cost:%v ", time.Now().Format("2006-01-02 1505.000"),jobID, durationCost) } func printStatus(runningJobs, startedJobs, finishedJobs *int32) { fmt.Printf("Current status - Running: %d, Started: %d, Finished: %d ", atomic.LoadInt32(runningJobs), atomic.LoadInt32(startedJobs), atomic.LoadInt32(finishedJobs)) }
可以在代碼中添加額外的邏輯來跟蹤和打印正在執(zhí)行、進行中、剛啟動和剛完成的任務(wù)數(shù)量。使用原子操作(來自sync/atomic包)來確保在并發(fā)環(huán)境下對這些計數(shù)器的操作是安全的。
在這個示例中:
使用sync/atomic包中的AddInt32和LoadInt32來安全地增加和讀取計數(shù)器的值。
在每個任務(wù)開始時,增加startedJobs和runningJobs計數(shù)器。
在每個任務(wù)完成時,增加finishedJobs計數(shù)器,并減少runningJobs計數(shù)器。
在任務(wù)完成后和釋放信號量前,打印當前的任務(wù)狀態(tài)。
注意事項
這種方法可以幫助我們跟蹤不同狀態(tài)下的任務(wù)數(shù)量。
使用原子操作確保在并發(fā)環(huán)境中對計數(shù)器的讀寫是安全的。
printStatus函數(shù)在每個任務(wù)的結(jié)束時被調(diào)用,以打印當前的任務(wù)狀態(tài)
鏈接:https://juejin.cn/post/7315314479204696079
審核編輯:劉清
-
限流器
+關(guān)注
關(guān)注
0文章
41瀏覽量
14469 -
QPS
+關(guān)注
關(guān)注
0文章
24瀏覽量
8785
原文標題:Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論