0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法簡析

馬哥Linux運維 ? 來源:稀土掘金 ? 2023-12-24 14:21 ? 次閱讀

需求:第三方的接口,限制接口請求的QPS,每秒5次

需要控制job「訪問接口」的次數(shù),每秒不能同時超過5次,包括 進行中的任務(wù)、剛啟動的任務(wù)

要確保單位時間內(nèi)(例如每秒)運行的任務(wù)數(shù)量不超過特定的上限(如5個任務(wù)),并且在任務(wù)執(zhí)行完成得很快時,考慮已完成的任務(wù)和正在執(zhí)行的任務(wù)作為正在運行的任務(wù)總數(shù),可以使用限流器來控制任務(wù)的啟動頻率,并結(jié)合使用信號量來管理同時運行的任務(wù)數(shù)量。

具體來說,使用一個信號量來限制同時進行的任務(wù)數(shù)量,并且在任務(wù)完成時,僅在下一秒鐘允許新的任務(wù)開始,以確保即使某些任務(wù)快速完成,也不會在同一秒鐘內(nèi)啟動超過限制數(shù)量的任務(wù)

package main


import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"


    "golang.org/x/time/rate"
)


func RateLimit() {
    const maxJobsPerSecond = 5
    const numJobs = 22
    var wg sync.WaitGroup


    // 計數(shù)器
    var runningJobs int32 // 當前正在執(zhí)行的任務(wù)數(shù)量
    var startedJobs int32 // 啟動后的任務(wù)數(shù)量
    var finishedJobs int32 // 剛完成的任務(wù)數(shù)量


    limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(maxJobsPerSecond)), maxJobsPerSecond)
    semaphore := make(chan struct{}, maxJobsPerSecond)


    for i := 1; i <= numJobs; i++ {
        wg.Add(1)
        go func(jobID int) {
            defer wg.Done()
            limiter.Wait(context.Background()) // 等待限流器允許進行下一個任務(wù)


            semaphore <- struct{}{} // 獲取信號量
            atomic.AddInt32(&startedJobs, 1)
            atomic.AddInt32(&runningJobs, 1)


            executeJob(jobID) // 執(zhí)行任務(wù)
            atomic.AddInt32(&finishedJobs, 1)
            atomic.AddInt32(&runningJobs, -1)


            <-time.After(time.Second) // 等待一秒鐘后釋放信號量
            <-semaphore


            // 打印當前狀態(tài)
            printStatus(&runningJobs, &startedJobs, &finishedJobs)
        }(i)
    }


    wg.Wait()
    fmt.Println("所有工作完成")
}

注意事項

限流器rate.NewLimiter用于控制任務(wù)啟動的頻率,以確保每秒不超過maxJobsPerSecond個任務(wù)開始執(zhí)行。

使用信號量semaphore來控制同時進行的任務(wù)數(shù)量。

為了確保在任何一秒內(nèi)同時進行的任務(wù)數(shù)量不超過限制,在任務(wù)完成后等待一秒鐘,然后再釋放信號量。這樣做可以保證即使任務(wù)很快完成,也不會立即啟動新的任務(wù)。

這種實現(xiàn)方式確保了即使任務(wù)執(zhí)行得很快,每秒鐘啟動的新任務(wù)數(shù)量也不會超過限制,并且同時考慮了正在執(zhí)行和剛剛完成的任務(wù)。

動態(tài)創(chuàng)建協(xié)程

協(xié)程的啟動是動態(tài)的。在代碼中,每個任務(wù)對應(yīng)于一個動態(tài)創(chuàng)建的協(xié)程。這些協(xié)程是在循環(huán)中根據(jù)任務(wù)數(shù)量(numJobs)動態(tài)生成的。

具體來說,每當有一個新的任務(wù)需要執(zhí)行時,都會創(chuàng)建一個新的協(xié)程來處理這個任務(wù)。這是通過在main函數(shù)的循環(huán)中調(diào)用go關(guān)鍵字實現(xiàn)的。這個過程在每次循環(huán)迭代中發(fā)生,從而為每個任務(wù)動態(tài)創(chuàng)建一個新的協(xié)程

由于使用了限流器(rate.Limiter),這些協(xié)程不是一次性全部創(chuàng)建,而是根據(jù)限流器允許的速率逐個創(chuàng)建。每個協(xié)程在開始執(zhí)行任務(wù)之前會等待限流器的許可,以此確保每秒啟動的任務(wù)數(shù)量不超過設(shè)定的最大值

func executeJob(jobID int) {
  startTime := time.Now() // 記錄任務(wù)開始時間
  
    // 模擬任務(wù)執(zhí)行時間
    fmt.Printf("%v Job %d started
",time.Now().Format("2006-01-02 1505.000"), jobID)
  // 初始化隨機數(shù)種子
  rand.Seed(time.Now().UnixNano())
  // 隨機生成一個時間間隔(例如,1到5000毫秒之間)
  min := 1
  max := 5000
  duration := time.Duration(rand.Intn(max-min+1)+min) * time.Millisecond
  time.Sleep(duration)


  durationCost := time.Since(startTime) // 計算任務(wù)耗時


    fmt.Printf("%v Job %d finished Cost:%v
", time.Now().Format("2006-01-02 1505.000"),jobID, durationCost)
}


func printStatus(runningJobs, startedJobs, finishedJobs *int32) {
    fmt.Printf("Current status - Running: %d, Started: %d, Finished: %d
",
        atomic.LoadInt32(runningJobs),
        atomic.LoadInt32(startedJobs),
        atomic.LoadInt32(finishedJobs))
}

可以在代碼中添加額外的邏輯來跟蹤和打印正在執(zhí)行、進行中、剛啟動和剛完成的任務(wù)數(shù)量。使用原子操作(來自sync/atomic包)來確保在并發(fā)環(huán)境下對這些計數(shù)器的操作是安全的。

在這個示例中:

使用sync/atomic包中的AddInt32和LoadInt32來安全地增加和讀取計數(shù)器的值。

在每個任務(wù)開始時,增加startedJobs和runningJobs計數(shù)器。

在每個任務(wù)完成時,增加finishedJobs計數(shù)器,并減少runningJobs計數(shù)器。

在任務(wù)完成后和釋放信號量前,打印當前的任務(wù)狀態(tài)。

注意事項

這種方法可以幫助我們跟蹤不同狀態(tài)下的任務(wù)數(shù)量。

使用原子操作確保在并發(fā)環(huán)境中對計數(shù)器的讀寫是安全的。

printStatus函數(shù)在每個任務(wù)的結(jié)束時被調(diào)用,以打印當前的任務(wù)狀態(tài)

鏈接:https://juejin.cn/post/7315314479204696079








審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 限流器
    +關(guān)注

    關(guān)注

    0

    文章

    41

    瀏覽量

    14469
  • QPS
    QPS
    +關(guān)注

    關(guān)注

    0

    文章

    24

    瀏覽量

    8785

原文標題:Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    談?wù)?b class='flag-5'>協(xié)的那些事兒

    隨著異步編程的發(fā)展以及各種并發(fā)框架的普及,協(xié)作為一種異步編程規(guī)范在各類語言中地位逐步提高。我們不單單會在自己的程序中使用協(xié),各類框架如fastapi,aiohttp等也都是基于異步
    的頭像 發(fā)表于 01-26 11:36 ?1076次閱讀
    談?wù)?b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>的那些事兒

    采用COT穩(wěn)壓器動態(tài)控制輸出電壓

    輸入來達到這一任務(wù)目的,但是大多數(shù)寬泛 VIN 轉(zhuǎn)換器都不提供。在這篇博客文章中,我將討論一種通過反饋引腳動態(tài)控制 DC/DC 轉(zhuǎn)換器輸出電壓的通用方法。COT 轉(zhuǎn)換器非常適合可變輸出電壓應(yīng)用,因為
    發(fā)表于 09-19 11:01

    什么是多任務(wù)系統(tǒng)?FreeRTOS任務(wù)與協(xié)

    功能,初學(xué)者必須先掌握——任務(wù)的創(chuàng)建、刪除、掛起和恢復(fù)等操作。本章節(jié)分為如下幾部分:*什么是多任務(wù)系統(tǒng)*FreeRTOS任務(wù)與協(xié)*初次使用*任務(wù)狀態(tài)*任務(wù)優(yōu)先級*任務(wù)實現(xiàn)*任務(wù)控制
    發(fā)表于 02-18 06:38

    如何最大限度減小電源設(shè)計中輸出電容的數(shù)量和尺寸?

    電源輸出電容一般是100 nF至100 μF的陶瓷電容,它們耗費資金,占用空間,而且,在遇到交付瓶頸的時候還會難以獲得。所以,如何最大限度減小輸出電容的數(shù)量和尺寸,這個問題反復(fù)被提及。 輸出電容造成
    發(fā)表于 03-21 14:42

    如何最大限度減小電源設(shè)計中輸出電容的數(shù)量和尺寸

    電源輸出電容一般是100 nF至100 μF的陶瓷電容,它們耗費資金,占用空間,而且,在遇到交付瓶頸的時候還會難以獲得。所以,如何最大限度減小輸出電容的數(shù)量和尺寸,這個問題反復(fù)被提及。輸出電容造成
    發(fā)表于 06-14 10:19

    基于多步預(yù)測性能指標函數(shù)的神經(jīng)網(wǎng)絡(luò)逆動態(tài)控制方法

    神經(jīng)網(wǎng)絡(luò)逆動態(tài)控制作為神經(jīng)網(wǎng)絡(luò)控制領(lǐng)域中一種重要的控制方法,通過建立對象的逆動態(tài)模型從而實現(xiàn)對象輸出完全跟蹤給定輸入的理想
    發(fā)表于 12-20 15:15 ?2次下載

    最大處理器數(shù)量

    最大處理器數(shù)量          
    發(fā)表于 12-17 11:03 ?335次閱讀

    PCI插槽最大數(shù)量

    PCI插槽最大數(shù)量         
    發(fā)表于 12-26 16:09 ?672次閱讀

    通過例子由淺入深的理解yield協(xié)

    send:send() 方法致使協(xié)程前進到下一個yield 語句,另外,生成器可以作為協(xié)使用
    的頭像 發(fā)表于 08-23 11:12 ?1974次閱讀

    使用channel控制協(xié)數(shù)量

    goroutine 是輕量級線程,調(diào)度由 Go 運行時進行管理的。Go 語言的并發(fā)控制主要使用關(guān)鍵字 go 開啟協(xié) goroutine。Go 協(xié)
    的頭像 發(fā)表于 09-19 15:06 ?1102次閱讀

    5G AAU 功放控制和監(jiān)測模塊

    5G AAU 功放控制和監(jiān)測模塊
    發(fā)表于 10-28 12:00 ?2次下載
    5G AAU 功放<b class='flag-5'>控制</b>和監(jiān)測模塊<b class='flag-5'>簡</b><b class='flag-5'>析</b>

    如何采用 COT 穩(wěn)壓器動態(tài)控制輸出電壓

    如何采用 COT 穩(wěn)壓器動態(tài)控制輸出電壓
    發(fā)表于 11-07 08:07 ?0次下載
    如何采用 COT 穩(wěn)壓器<b class='flag-5'>動態(tài)控制</b>輸出電壓

    協(xié)的概念及協(xié)的掛起函數(shù)介紹

    協(xié)是一種輕量級的線程,它可以在單個線程中實現(xiàn)并發(fā)執(zhí)行。與線程不同,協(xié)不需要操作系統(tǒng)的上下文切換,因此可以更高效地使用系統(tǒng)資源。Kotlin 協(xié)
    的頭像 發(fā)表于 04-19 10:20 ?854次閱讀

    單片機實現(xiàn)爐溫動態(tài)控制

    電子發(fā)燒友網(wǎng)站提供《單片機實現(xiàn)爐溫動態(tài)控制.pdf》資料免費下載
    發(fā)表于 10-12 09:48 ?0次下載
    單片機實現(xiàn)爐溫<b class='flag-5'>動態(tài)控制</b>

    何選擇一個合適的協(xié)來獲得CPU執(zhí)行權(quán)

    如今雖不敢說協(xié)已經(jīng)是紅的發(fā)紫,但確實是越來越受到了大家的重視。Golang中的已經(jīng)是只有g(shù)oroutine,以至于很多go程序員是只知有協(xié)
    的頭像 發(fā)表于 11-13 14:10 ?365次閱讀
    何選擇一個合適的<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>來獲得CPU執(zhí)行權(quán)