如果在簡歷上寫了使用過kafka消息中間件,面試官大概80%的概率會問你:"如何保證kafka消息不丟失?"反正我是屢試不爽。
如果你的核心業(yè)務數(shù)據(jù),比如訂單數(shù)據(jù),或者其它核心交易業(yè)務數(shù)據(jù),在使用kafka時,要保證消息不丟失,并讓下游消費系統(tǒng)一定能獲得訂單數(shù)據(jù),只靠kafka中間件來保證,是并不可靠的。
kafka已經(jīng)這么的優(yōu)秀 了,為什么還會丟消息了?這一定是初學者或者初級使用者心中的疑惑
kafka 已經(jīng)這么的優(yōu)秀了,為啥還會丟消息了?----太不省心了
圖一 生產(chǎn)者,broker,消費者
要解決kafka丟失消息的情況,需要從使用kafka涉及的主流程和主要組件進行分析。kafka的核心業(yè)務流程很簡單:發(fā)送消息,暫存消息,消費消息。而這中間涉及到的主要組件,分別是生產(chǎn)端,broker端,消費端。
生產(chǎn)端丟失消息的情況和解決方法
生產(chǎn)端丟失消息的第一個原因主要來源于kafka的特性:批量發(fā)送異步提交。我們知道,kafka在發(fā)送消息時,是由底層的IO SEND線程進行消息的批量發(fā)送,不是由業(yè)務代碼線程執(zhí)行發(fā)送的。即業(yè)務代碼線程執(zhí)行完send方法后,就返回了。消息到底發(fā)送給broker側(cè)沒有了?通過send方法其實是無法知道的。
那么如何解決了?kafka提供了一個帶有callback回調(diào)函數(shù)的方法,如果消息成功/(失敗的)發(fā)送給broker端了,底層的IO線程是可以知道的,所以此時IO線程可以回調(diào)callback函數(shù),通知上層業(yè)務應用。我們也一般在callback函數(shù)里,根據(jù)回調(diào)函數(shù)的參數(shù),就能知道消息是否發(fā)送成功了,如果發(fā)送失敗了,那么我們還可以在callback函數(shù)里重試。一般業(yè)務場景下 通過重試的方法保證消息再次發(fā)送出去。
90%的面試者都能給出上面的標準回答。
但在一些嚴格的交易場景:僅僅依靠回調(diào)函數(shù)的通知和重試,是不能保證消息一定能發(fā)送到broker端的
理由如下:
1、callback函數(shù)是在jvm層面由IO SEND線程執(zhí)行的,如果剛好遇到在執(zhí)行回調(diào)函數(shù)時,jvm宕機了,或者恰好長時間的GC,最終導致OOM,或者jvm假死的情況;那么回調(diào)函數(shù)是不能被執(zhí)行的。恰好你的消息數(shù)據(jù),是一個帶有交易屬性核心業(yè)務數(shù)據(jù),必須要通知給下游。比如下單或者支付后,需要通知傭金系統(tǒng),或者積分系統(tǒng),去計算訂單傭金。此時一個JVM宕機或者OOM,給下游的數(shù)據(jù)就丟了,那么計算聯(lián)盟客的訂單傭金數(shù)據(jù)也就丟了,造成聯(lián)盟客資損了。
2、IO SEND線程和broker之間是通過網(wǎng)絡進行通信的,而網(wǎng)絡通信并不一定都能保證一直都是順暢的,比如網(wǎng)絡丟包,網(wǎng)絡中的交換機壞了,由底層網(wǎng)絡硬件的故障,導致上層IO線程發(fā)送消息失??;此時發(fā)送端配置的重試參數(shù) retries 也不好使了。
如何解決生產(chǎn)端在極端嚴格的交易場景下,消息丟失了?
如果要解決jvm宕機,或者JVM假死;又或者底層網(wǎng)絡問題,帶來的消息丟失;是需要上層應用額外的機制來保證消息數(shù)據(jù)發(fā)送的完整性。大概流程如下圖
1、在發(fā)送消息之前,加一個發(fā)送記錄,并且初始化為待發(fā)送;并且把發(fā)送記錄進行存儲(可以存儲在DB里,或者其它存儲引擎里);2、利用帶有回調(diào)函數(shù)的callback通知,在業(yè)務代碼里感知到消息是否發(fā)送成功;如果消息發(fā)送成功,則把存儲引擎里對應的消息標記為已發(fā)送 3、利用延遲的定時任務,每隔5分鐘(可根據(jù)實際情況調(diào)整掃描頻率)定時掃描5分鐘前未發(fā)送或者發(fā)送失敗的消息,再次進行發(fā)送。
這樣即使應用的jvm宕機,或者底層網(wǎng)絡出現(xiàn)故障,消息是否發(fā)送的記錄,都進行了保存。通過持續(xù)的定時任務掃描和重試,能最終保證消息一定能發(fā)送出去。
broker端丟失消息的情況和解決方法
broker端接收到生產(chǎn)端的消息后,并成功應答生產(chǎn)端后,消息會丟嗎?如果broker能像mysql服務器一樣,在成功應答給客戶端前,能把消息寫入到了磁盤進行持久化,并且在宕機斷電后,有恢復機制,那么我們能說broker端不會丟消息。
但broker端提供數(shù)據(jù)不丟的保障和mysql是不一樣的。broker端在接受了一批消息數(shù)據(jù)后,是不會馬上寫入磁盤的,而是先寫入到page cache里,這個page cache是操作系統(tǒng)的頁緩存(也就是另外一個內(nèi)存,只是由操作系統(tǒng)管理,不屬于JVM管理的內(nèi)存),通過定時或者定量的的方式( log.flush.interval.messages和log.flush.interval.ms)會把page cache里的數(shù)據(jù)寫入到磁盤里。
如果page cache在持久化到磁盤前,broker進程宕機了,這個時候不會丟失消息,重啟broker即可;如果此時操作系統(tǒng)宕機或者物理機宕機了,page cache里的數(shù)據(jù)還沒有持久化到磁盤里,此種情況數(shù)據(jù)就丟了。
kafka應對此種情況,建議是通過多副本機制來解決的,核心思想也挺簡單的:如果數(shù)據(jù)保存在一臺機器上你覺得可靠性不夠,那么我就把相同的數(shù)據(jù)保存到多臺機器上,某臺機器宕機了可以由其它機器提供相同的服務和數(shù)據(jù)。
要想達到上面效果,有三個關鍵參數(shù)需要配置
第一:生產(chǎn)端參數(shù)ack 設置為all
代表消息需要寫入到“大多數(shù)”的副本分區(qū)后,leader broker才給生產(chǎn)端應答消息寫入成功。(即寫入了“大多數(shù)”機器的page cache里)
第二:在broker端 配置min.insync.replicas參數(shù)設置至少為2
此參數(shù)代表了 上面的“大多數(shù)”副本。為2表示除了寫入leader分區(qū)外,還需要寫入到一個follower 分區(qū)副本里,broker端才會應答給生產(chǎn)端消息寫入成功。此參數(shù)設置需要搭配第一個參數(shù)使用。
第三:在broker端配置replicator.factor參數(shù)至少3
此參數(shù)表示:topic每個分區(qū)的副本數(shù)。如果配置為2,表示每個分區(qū)只有2個副本,在加上第二個參數(shù)消息寫入時至少寫入2個分區(qū)副本,則整個寫入邏輯就表示集群中topic的分區(qū)副本不能有一個宕機。如果配置為3,則topic的每個分區(qū)副本數(shù)為3,再加上第二個參數(shù)min.insync.replicas為2,即每次,只需要寫入2個分區(qū)副本即可,另外一個宕機也不影響,在保證了消息不丟的情況下,也能提高分區(qū)的可用性;只是有點費空間,畢竟多保存了一份相同的數(shù)據(jù)到另外一臺機器上。
另外在broker端,還有個參數(shù)unclean.leader.election.enable
此參數(shù)表示:沒有和leader分區(qū)保持數(shù)據(jù)同步的副本分區(qū)是否也能參與leader分區(qū)的選舉,建議設置為false,不允許。如果允許,這這些落后的副本分區(qū)競選為leader分區(qū)后,則之前l(fā)eader分區(qū)已保存的最新數(shù)據(jù)就有丟失的風險。注意在0.11版本之前默認為TRUE。
消費端側(cè)丟失消息的情況和解決方法
消費端丟失消息的情況:消費端丟失消息的情況,主要是設置了 autoCommit為true,即消費者消費消息的位移,由消費者自動提交。
自動提交,表面上看起來挺高大上的,但這是消費端丟失消息的主要原因。實例代碼如下
while(true){ consumer.poll(); #①拉取消息 XXX #②進行業(yè)務處理; }
如果在第一步拉取消息后,即提交了消息位移;而在第二步處理消息的時候發(fā)生了業(yè)務異常,或者jvm宕機了。則第二次在從消費端poll消息時,會從最新的位移拉取后面的消息,這樣就造成了消息的丟失。
消費端解決消息丟失也不復雜,設置autoCommit為false;然后在消費完消息后手工提交位移即可 實例代碼如下:
while(true){ consumer.poll(); #①拉取消息 XXX #②處理消息; consumer.commit(); }
在第二步進行了業(yè)務處理后,在提交消費的消息位移;這樣即使第二步或者第三步提交位移失敗了又或者宕機了,第二次再從poll拉取消息時,則會以第一次拉取消息的位移處獲取后面的消息,以此保證了消息的不丟失。
總結(jié)
在生產(chǎn)端所在的jvm運行正常,底層網(wǎng)絡通順的情況下,通過kafka 生產(chǎn)端自身的retries機制和call back回調(diào)能減少一部分消息丟失情況;但并不能保證在應用層,網(wǎng)絡層有問題時,也能100%確保消息不丟失;如果要解決此問題,可以試試 記錄消息發(fā)送狀態(tài)+定時任務掃描+重試的機制。
在broker端,要保證消息數(shù)據(jù)不丟失;kafka提供了多副本機制來進行保證。關鍵核心參數(shù)三個,一個生產(chǎn)端ack=all,兩個broker端參數(shù)min.insync.replicas 寫入數(shù)據(jù)到分區(qū)最小副本數(shù)為2,并且每個分區(qū)的副本集最小為3
在消費端,要保證消息不丟失,需要設置消費端參數(shù) autoCommit為false,并且在消息消費完后,再手工提交消息位置
審核編輯:湯梓紅
-
代碼
+關注
關注
30文章
4722瀏覽量
68234 -
線程
+關注
關注
0文章
503瀏覽量
19636 -
kafka
+關注
關注
0文章
50瀏覽量
5202
原文標題:kafka 消息“零丟失”的配方
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關推薦
評論