0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

看不見的控制流:Rust異步取消的幾點(diǎn)思考

jf_wN0SrCdH ? 來源:Rust語言中文社區(qū) ? 2023-01-13 11:19 ? 次閱讀

本文主要介紹了一個在 GreptimeDB 中遇到的一個關(guān)于異步取消 (async cancellation) 的“奇怪”問題[1]。

The Problem

針對這個問題,我們首先描述一個簡化的場景:在一個長時間運(yùn)行的測試中存在元信息損壞的問題,有一個應(yīng)該單調(diào)遞增的序列號出現(xiàn)了重復(fù)。

序列號的更新邏輯非常簡單:從一個原子變量中讀取當(dāng)前值,然后通過異步 I/O 方法 persist_number()將新值寫入文件里,最后更新這個原子變量。整個流程都是串行化的(file 是一個獨(dú)占引用)。

asyncfnupdate_metadata(file:&mutFile,counter:AtomicU64)->Result<()>{
letnext_number=counter.load(Ordering::Relaxed)+1;
persist_number(file,next_number).await?;
counter.fetch_add(1,Ordering::Relaxed);
}

由于一些原因,我們在這里使用了 load 函數(shù)而非 fetch_add(雖然單就這里來說可以用fetch_add,并且用了還不會引發(fā)這次的問題)。當(dāng)這個更新流程在中間出現(xiàn)錯誤時,我們不希望更新內(nèi)存中的計(jì)數(shù)器。我們清楚如果persist_number() 寫入文件時失敗就能夠從 ?提前返回,并且會提早結(jié)束執(zhí)行來傳播錯誤,所以編碼的時候會注意這些問題。

但是到了 .await 這里事情就變得奇妙了起來,因?yàn)?async cancellation 帶來了一個隱藏的控制流。

Async Cancellation

async task and runtime

如果這時候你已經(jīng)猜到了到底是什么引發(fā)了這個問題,可以跳過這一章節(jié)。如果沒有,就讓我從一些偽代碼開始解釋在 await point 那里到底發(fā)生了什么,以及 runtime 是如何參與其中的。

  • poll_future

首先是poll_future,對應(yīng)到Futurepoll[2] 方法。我們寫的異步方法都會被轉(zhuǎn)化成類似這樣子的一個匿名的Future實(shí)現(xiàn)。

fnpoll_future()->FutureOutput{
matchstatus_of_the_task{
Ready(output)=>{
//thetaskisfinished,andwehaveitoutput.
//somelogic
returnour_output;
},
Pending=>{
//itisnotready,wedon'thavetheoutput.
//thuswecannotmakeprogressandneedtowait
returnPending;
}
}
}

?

async塊通常包含其他的異步方法,比如update_metadatapersist_number。這里把persist_number稱為update_metadata的子異步任務(wù)。每個.await都會被展開成類似poll_future的東西,等待子任務(wù)的結(jié)果并繼續(xù)執(zhí)行。在這個例子中就是等待persist_number的結(jié)果返回Ready再更新計(jì)數(shù)器,否則不更新。

[2] poll:

https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll

  • runtime

第二段偽代碼是一個簡化的 runtime,它負(fù)責(zé)輪詢 (poll) 異步任務(wù)直到它們完成(考慮到接下來的文章內(nèi)容,“直到……完成”這種表述并不適合于所有情況)。在 GreptimeDB 中我們使用tokio[3] 作為 runtime?,F(xiàn)在的異步 runtime 可能有很多特性和功能,其中最基礎(chǔ)的就是輪詢這些任務(wù)。

fnruntime(&self){
loop{
letfuture_tasks:Vec=self.get_tasks();
fortaskintasks{
matchtask.poll_future(){
Ready(output)=>{
//thistaskisfinished.wakeitwiththeresult
task.wake(output);
},
Pending=>{
//thistaskneedssometimetorun.pollitlater
self.poll_later(task);
}
}
}
}
}

通過結(jié)合上述兩個簡化的 future 和 runtime 模型,我們得到如下這個循環(huán)(真實(shí)的 runtime 非常復(fù)雜,這里為了內(nèi)容集中省略了很多)。

fnrun()->Output{
loop{
ifletReady(result)=task.poll(){
returnresult;
}
}
}

需要強(qiáng)調(diào)的是,每個 .await 都代表著一個或者多個函數(shù)調(diào)用 (調(diào)用到 poll() 或者說是 poll_future() )。這就是標(biāo)題中“隱藏的控制流”,以及 cancellation 發(fā)生的地方。

我們再看一段簡單的程序來探測 runtime 的行為(可以直接在 playground[4]里面運(yùn)行這段代碼):

usetokio::{sleep,Duration,timeout};

#[tokio::main]
asyncfnmain(){
letf=async{
print(1).await;
println!("1isdone");
print(2).await;
println!("2isdone");
print(3).await;
println!("3isdone");
};

ifletErr(_)=timeout(Duration::from_millis(150),f).await{
println!("timeout");
}

sleep(Duration::from_millis(300)).await;
println!("exit")
}

asyncfnprint(val:u32){
sleep(Duration::from_millis(100)).await;
println!("valis{}",val);
}

只要花幾分鐘時間猜測一下上方代碼的輸出結(jié)果,如果和下面的一致,相信你已經(jīng)知道問題出在哪里。

valis1
1isdone
timeout
exit

之后的語句都因?yàn)槌瑫r而被 runtime 取消執(zhí)行了。

這個問題其中的原理并不復(fù)雜,但是(對我來說)能夠定位到它并不輕易。在把其他問題都排除掉之后我知道問題就發(fā)生在這里,在這個 .await 上。也許是太多次成功的異步函數(shù)調(diào)用麻痹了注意,亦或是我的心智模型中沒有把這兩點(diǎn)聯(lián)系起來,聯(lián)想到這點(diǎn)著實(shí)費(fèi)了一番心思。

[3]tokio:

https://docs.rs/tokio/latest/tokio/

[4]playground:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=40220605392e951e833a0b45719ed1e1

cancellation

目前為止的內(nèi)容是問題復(fù)盤的標(biāo)準(zhǔn)流程,接下來,讓我們來展開討論一下 cancellation,它是與 runtime 的行為相關(guān)的。

雖然 Rust 中的很多 runtime 都有類似的行為,但是這不是一個必須的特性,比如這個自己寫的runtime[5] 就不支持 cancellation。因?yàn)閱栴}發(fā)生在 tokio 上,因此這里會以它為例,而其他的 runtime 也是類似的。在 tokio 中,可以使用 JoinHandle::abort()[6] 來取消一個 task。task 結(jié)構(gòu)中有一個“cancel marker bit”來跟蹤一個任務(wù)是否被取消了。如果它發(fā)現(xiàn)一個 task 被取消了,就會停止執(zhí)行這個 task。

//Ifthetaskisrunning,wemarkitascancelled.Thethread
//runningthetaskwillnoticethecancelledbitwhenit
//stopspollinganditwillkillthetask.
//
//Theset_notified()callisnotstrictlynecessarybutitwill
//insomecasesletawake_by_refcallreturnwithouthaving
//toperformacompare_exchange.
snapshot.set_notified();
snapshot.set_cancelled();

背后的邏輯也很簡單,就是 runtime 放棄了繼續(xù)輪詢你的 task,就和 ? 差不多。某種程度上可能更棘手一點(diǎn),因?yàn)槲覀儾荒芟?Err 那樣處理這個 cancellation。不過這代表我們需要考慮每一個 .await都有可能隨時被 cancel 掉嗎?這也太麻煩了。

以本文這個 metadata 更新的情況為例,如果把 cancel 納入考慮范圍,我們需要檢查文件是否和內(nèi)存中的狀態(tài)一致,如果不一致就要回滾持久化的改動等等。壞消息是,在某些方面答案是肯定的,runtime 可以對你的 future 做任何事情。不過好在大多數(shù)情況下它們都還是很遵守規(guī)矩的。

[5]試驗(yàn) runtime:

https://github.com/waynexia/texn

[6] JoinHandle::abort():

https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.abort

Current Solution

explicit detach

現(xiàn)在是否有手段能防止 task 被取消呢?在 tokio 中我們可以通過 drop JoinHandle來 detach 一個任務(wù)到后臺。一個 detached task 意味著沒有前臺的 handle 來控制這個任務(wù),從某種意義上來說也就使得其他人不能在外面套一層timeoutselect,從而間接地使它不會被取消執(zhí)行。并且開頭提到的問題就是通過這種方式解決的。

JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to joinon it.

不過雖然有辦法能夠?qū)崿F(xiàn)這個功能,是否像 glommio[7]一樣有一個顯式的 detach 方法,類似一個不返回 JoinHandlespawn 方法會更好。但這些都是瑣碎的事情,一個 runtime 通常不會完全沒有理由就取消一個 task,并且在大多數(shù)情況下都是出于用戶的要求,只不過有時候可能沒有注意到,就像 select 中的那些“未選中的分支”或者 tonic 中請求處理的邏輯那樣。所以如果我們確定一個 task 是不能被取消的話,顯式地 detach 可能能預(yù)防某些悲劇的發(fā)生。

Our solution

目前為止所有問題都清晰了,讓我們開始修復(fù)這個 bug 吧!

首先,為什么我們的 future 會被取消呢?通過函數(shù)調(diào)用鏈路很容易就能發(fā)現(xiàn)整個處理過程都是在tonic的請求執(zhí)行邏輯中就地執(zhí)行的,而對于一個網(wǎng)絡(luò)請求來說有一個超時行為是很常見的。解決方案也很簡單,就是將服務(wù)器處理邏輯提交到另一個 runtime 中執(zhí)行,從而防止它被取消。只需要幾行代碼[8]就能完成。

@@-30,12+40,24@@implBatchHandler{
}
batch_resp.admins.push(admin_resp);

-fordb_reqinbatch_req.databases{
-forobj_exprindb_req.exprs{
-letobject_resp=self.query_handler.do_query(obj_expr).await?;
-db_resp.results.push(object_resp);
+let(tx,rx)=oneshot::channel();
+letquery_handler=self.query_handler.clone();
+let_=self.runtime.spawn(asyncmove{
+//executerequestinanotherruntimetopreventtheexecutionfrombeingcancelledunexpectedbytonicruntime.
+letmutresult=vec![];
+fordb_reqinbatch_req.databases{
+forobj_exprindb_req.exprs{
+letobject_resp=query_handler.do_query(obj_expr).await;
+
+result.push(object_resp);
+}
      }

這個問題到這里就修復(fù)完了,不過并不是從根本上解決 async cancellation 帶來的 bug,而是采用間接手段去規(guī)避任務(wù)由于超時而被提前取消的問題,畢竟我們的這些異步邏輯還是需要被完整執(zhí)行的。

但是這樣的處理會放大另外一些問題,比如我們也無法提前取消掉對于已經(jīng)不用執(zhí)行或資源消耗特別大的任務(wù),從而導(dǎo)致系統(tǒng)資源的浪費(fèi)。這些是我們之后需要持續(xù)改進(jìn)的地方。接下來會就這方面繼續(xù)展開,從 async 生態(tài)的方面討論有哪些可能能提升 async cancellation 的使用體驗(yàn)。

[7] glommio's:

https://docs.rs/glommio/0.7.0/glommio/struct.Task.html#method.detach

[8] 解決方案代碼:

https://github.com/GreptimeTeam/greptimedb/pull/376/files#diff-9756dcef86f5ba1d60e01e41bf73c65f72039f9aaa057ffd03f3fc2f7dadfbd0R46-R54

Runtime Behavior

  • marker trait

首先,我們自然希望 runtime 不要無條件地取消我的 task,而是嘗試通過類型系統(tǒng)來變得更友好,比如借助類似 CancelSafe 的 marker trait 。對于 cancellation safety 這個詞,tokio 在它的文檔[9]中有提到:

To determine whether your own methods are cancellation safe, look for the location of uses of.await. This is because when an asynchronous method is cancelled, that always happens at an.await. If your function behaves correctly even if it is restarted while waiting at an.await, then it is cancellation safe.

簡單來說就是用來描述一個 task 是否可以安全地被取消掉,這是 async task 的屬性之一。tokio 維護(hù)了一個很長的列表,列出了哪些是安全的以及哪些是不安全的??雌饋磉@和UnwindSafe[10]這個 marker trait 很像。兩者都是描述“這種控制流程并不總是被預(yù)料到的”,并且“有可能導(dǎo)致一些微妙的 bug”的這樣一種屬性。

如果有這樣一個 CancelSafe 的 trait,我們就有途徑可以告訴 runtime 我們的異步任務(wù)是否可以安全地被取消掉,同時也是一種方式讓用戶承諾 cancelling 這個控制流程是被仔細(xì)處理過的。如果發(fā)現(xiàn)沒有實(shí)現(xiàn)這個 trait,那就意味著我們不希望這個 task 被取消掉,簡單而清晰。以 timeout()為例:

///Themarkertrait
traitCancelSafe{}

///Onlycancellabletaskcanbetimeout-ed
pubfntimeout(duration:Duration,future:F)->Timeoutwhere
F:Future+CancelSafe
{}

  • volunteer cancel

另一個方式是讓任務(wù)自愿地取消。就像 Kotlin 中的 cooperative cancellation[11] 一樣,它有一個 isActive 方法來檢查一個 task 是否被取消掉。這只是一個檢測方法,是否要取消完全取決于 task 本身。下面是 Kotlin 文檔中的一個例子, cooperative cancellation 發(fā)生在第 5 行。這種方式把“隱藏的控制流程”放在了明面上,讓我們能以一種更自然的方式來考慮和處理 cancellation,就像 OptionResult 一樣。

valstartTime=System.currentTimeMillis()
valjob=launch(Dispatchers.Default){
varnextPrintTime=startTime
vari=0
while(isActive){//cancellablecomputationloop
//printamessagetwiceasecond
if(System.currentTimeMillis()>=nextPrintTime){
println("job:I'msleeping${i++}...")
nextPrintTime+=500L
}
}
}
delay(1300L)//delayabit
println("main:I'mtiredofwaiting!")
job.cancelAndJoin()//cancelsthejobandwaitsforitscompletion
println("main:NowIcanquit.")

并且我認(rèn)為這也不難實(shí)現(xiàn),Tokio 現(xiàn)在已經(jīng)有了 Cancelled bit[12]CancellationToken,只是看起來和期望的還有點(diǎn)不一樣。最后還是需要 runtime 把 cancellation 的權(quán)利交給 task,否則情況可能沒有什么大的不同。

[9] tokio 文檔:

https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety

[10]UnwindSafe:

https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html

[11]cooperative cancellation:

https://kotlinlang.org/docs/cancellation-and-timeouts.html#cancellation-is-cooperative

[12]Cancelledbit:

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/is-active.html

異步取消問題作為 Rust 語言中一個經(jīng)典問題,暫時還沒有很完美的解法。如果你正被這個問題困擾,或者有一些自己的見解,歡迎在評論區(qū)留言。期待隨著社區(qū)生態(tài)的共同努力,很快能共建一個更優(yōu)的解決方案~


關(guān)于 Greptime

Greptime 格睿科技于2022年創(chuàng)立,目前正在完善和打造時序性數(shù)據(jù)庫 GreptimeDB 和格睿云 Greptime Cloud 這兩款產(chǎn)品。GreptimeDB 是款用 Rust 語言編寫的時序數(shù)據(jù)庫。具有分布式,開源,云原生,兼容性強(qiáng)等特點(diǎn),幫助企業(yè)實(shí)時讀寫,處理和分析時序數(shù)據(jù)的同時降低長期存儲的成本。

  • 官網(wǎng):https://greptime.com/

  • GitHub:https://github.com/GreptimeTeam/greptimedb

  • 文檔:https://docs.greptime.com/

  • Twitter:https://twitter.com/Greptime

  • Slack:https://greptime.com/slack

  • LinkedIn:https://www.linkedin.com/company/greptime/

審核編輯 :李倩


聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 數(shù)據(jù)庫
    +關(guān)注

    關(guān)注

    7

    文章

    3712

    瀏覽量

    64025
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4237

    瀏覽量

    61967
  • Rust
    +關(guān)注

    關(guān)注

    1

    文章

    226

    瀏覽量

    6497

原文標(biāo)題:看不見的控制流 — Rust 異步取消的幾點(diǎn)思考

文章出處:【微信號:Rust語言中文社區(qū),微信公眾號:Rust語言中文社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    一種看不見但又不可或缺的技術(shù):磁傳感器

    在汽車領(lǐng)域中,磁傳感器是一種看不見但又不可或缺的技術(shù),它能使從轉(zhuǎn)彎信號到點(diǎn)火定時的一切都成為可能。 在您的汽車?yán)?,這些小小的傳感器件可能多達(dá)70個,它們默默地執(zhí)行被賦予的各種功能,讓您可以順利地從
    的頭像 發(fā)表于 03-01 10:56 ?7117次閱讀

    看不見分析嵌入式操作系統(tǒng)的前景

    看不見分析嵌入式操作系統(tǒng)的前景微軟最近放出了下一個版本W(wǎng)indows Embedded的路線圖,現(xiàn)在已經(jīng)可以下載到它的預(yù)覽版本了。Windows Embedded Standard常見于ATM
    發(fā)表于 03-12 15:42

    看不見分析嵌入式操作系統(tǒng)的前景

    看不見分析嵌入式操作系統(tǒng)的前景微軟最近放出了下一個版本W(wǎng)indows Embedded的路線圖,現(xiàn)在已經(jīng)可以下載到它的預(yù)覽版本了。Windows Embedded Standard常見于ATM
    發(fā)表于 03-12 15:49

    用Altium designer畫出的板子看不見焊孔,請問大大是怎么回...

    我用Altium designer畫出的板子只看得見灰色的焊盤,看不見綠色的焊孔,請問大大們這是怎么回事?
    發(fā)表于 10-15 08:49

    怎么看不見

    怎么看不見怎么看不見怎么看不見
    發(fā)表于 11-03 19:06

    CCS程序下載看不見進(jìn)度條

    CCS程序下載的時候,連接目標(biāo)板沒有問題,但是在下載程序的時候并看不見進(jìn)度條,直接運(yùn)行到main函數(shù),程序沒下進(jìn)去,也不報錯,這是什么原因啊,請各位大師指教
    發(fā)表于 12-12 11:03

    Allegroā新加銅皮被分開為什么看不見

    ā新加銅皮被分開 說明十字位置應(yīng)該有銅皮 那么為什么看不見。怎么設(shè)置?
    發(fā)表于 09-06 04:17

    航嘉睿智300DS 電源芯片爆炸了,看不見型號了,求組有知道的大神嗎謝謝。

    航嘉睿智300DS 電源芯片爆炸了,看不見型號了,求組有知道的大神嗎謝謝。
    發(fā)表于 10-22 09:49

    PADS Layout 9.3 移動Copper時偶爾可以看見Copper的外型邊框,但是看不見里面的銅皮

    PADS Layout 9.3 移動Copper時偶爾可以看見Copper的外型邊框,但是看不見里面的銅皮,我試著查找了許多設(shè)置都無法解決,我要怎么處理這個問題。
    發(fā)表于 09-16 10:51

    專家深度評述:危險的物聯(lián)網(wǎng)看不見戰(zhàn)場前路漫長

    專家深度評述:危險的物聯(lián)網(wǎng)看不見戰(zhàn)場前路漫長     危險的物聯(lián)網(wǎng)     這是一個看不見的戰(zhàn)場,它近
    發(fā)表于 05-30 10:39 ?1152次閱讀

    工信部整治非法改號軟件:讓其發(fā)不出、看不見

    工信部部署清理整治網(wǎng)上改號軟件,要求各基礎(chǔ)電信企業(yè)和互聯(lián)網(wǎng)企業(yè)采取有效措施,在網(wǎng)站、搜索引擎、手機(jī)應(yīng)用軟件商店、電商平臺、社交平臺等空間堅(jiān)決阻斷改號軟件網(wǎng)上發(fā)布、搜索、傳播、銷售渠道,讓非法改號軟件“發(fā)不出、看不見、搜不到、下載不了”。
    發(fā)表于 12-06 18:47 ?665次閱讀

    王源《看不見的TA》首播化身OPPOR11王源限量版

    今天,由王源主演的OPPO微電影《看不見的TA之奇怪民宿》今天12點(diǎn),已經(jīng)在騰訊視頻全網(wǎng)首播。這是王源為OPPO拍攝的第二支微電影,本次王源劇中飾演OPPO R11王源限量版手機(jī)人,這位佛性十足的小王子顏值要逆天了。
    發(fā)表于 09-12 15:33 ?4603次閱讀

    TDK | 噪聲的根源和種類,追蹤看不見的噪聲

    TDK | 噪聲的根源和種類,追蹤看不見的噪聲
    的頭像 發(fā)表于 03-19 10:00 ?1096次閱讀
    TDK | 噪聲的根源和種類,追蹤<b class='flag-5'>看不見</b>的噪聲

    TDK | 噪聲的根源和種類,追蹤看不見的噪聲

    TDK | 噪聲的根源和種類,追蹤看不見的噪聲
    的頭像 發(fā)表于 03-23 21:17 ?470次閱讀
    TDK | 噪聲的根源和種類,追蹤<b class='flag-5'>看不見</b>的噪聲

    如何在同步的 Rust 方法中調(diào)用異步代碼 | Tokio 使用中的幾點(diǎn)教訓(xùn)

    在同步的 Rust 方法中調(diào)用異步代碼經(jīng)常會導(dǎo)致一些問題,特別是對于不熟悉異步 Rust runtime 底層原理的初學(xué)者。
    的頭像 發(fā)表于 12-24 16:23 ?1084次閱讀