0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

RocketMQ中各類重復(fù)消費(fèi)的原理淺析

jf_uPRfTJDa ? 來源: 移動Labs ? 2024-01-08 09:29 ? 次閱讀

Labs 導(dǎo)讀

隨著大數(shù)據(jù)和云計(jì)算時代的到來,我國的各個產(chǎn)業(yè)每天都在產(chǎn)生不可估計(jì)的數(shù)據(jù),以及對數(shù)據(jù)的各式各樣的需求,消息中間件在處理數(shù)據(jù)、消費(fèi)數(shù)據(jù)的過程中越來越受到重視。在高并發(fā)、微服務(wù)、分布式的場景下,如何合理地利用消息中間件,如何保證MQ消費(fèi)消息的冪等性?所謂知其然,才能知其所以然,本文將通過RocketMQ作為例子,來扒一扒什么情況下會導(dǎo)致重復(fù)消費(fèi)。

作者:李佳斌

單位:中國移動智慧家庭運(yùn)營中心

Part 01RocketMQ如何生產(chǎn)和消費(fèi)消息

先簡單介紹下RocketMQ正常生產(chǎn)消息和消費(fèi)消息的流程,如下圖。

1.生產(chǎn)者在發(fā)送消息之前根據(jù)負(fù)載均衡策略(默認(rèn)是輪詢)選擇一個Queue,然后跟這個Queue所在的機(jī)器建立連接,把消息發(fā)送到這個Queue上。

2.消費(fèi)者消費(fèi)這個Queue,就能獲取到對應(yīng)的消息。

8861f07e-ad40-11ee-8b88-92fbcf53809c.png

- 問題出現(xiàn)

當(dāng)異常情況出現(xiàn)時,如消息發(fā)送超時或者消息消費(fèi)超時,RocketMQ為保證消息發(fā)送成功,會啟動重試機(jī)制,選擇另一臺機(jī)器的Queue重發(fā)。現(xiàn)在假設(shè)有這樣一種情況,消費(fèi)者實(shí)際正確接收到了消息,只是由于網(wǎng)絡(luò)波動導(dǎo)致響應(yīng)超時了,那就會出現(xiàn)消息重復(fù)發(fā)送,導(dǎo)致消費(fèi)者重復(fù)消費(fèi)的情況出現(xiàn)。

那除此之外,還有沒有其他情況會導(dǎo)致消息重復(fù)消費(fèi)的情況呢?總結(jié)起來一共有如下幾種情況。

1)消息發(fā)送異常時的重復(fù)消費(fèi)

2)消費(fèi)消息時拋出了異常

3)消費(fèi)者提交offset失敗

4)Broker持久化offset失敗

5)主從同步失敗

6)重平衡

7)清理長時間消費(fèi)的消息

Part 02淺析各類情況

- 消費(fèi)消息時拋出異常

問題分析一

RocketMQ在并發(fā)消費(fèi)的模式下會調(diào)用MessageListenerConcurrently的consumeMessage方法,入?yún)⑹莔sgs集合。當(dāng)調(diào)用該方法消費(fèi)消息出現(xiàn)異常時,返回的結(jié)果status就會是null。這種情況下會導(dǎo)致status被設(shè)置為RECONSUME_LATER,也就是說消息之后會被重復(fù)消費(fèi)。

問題分析二

傳入的是msgs集合。上述原因一中消息處理之后,不管成功失敗,都會對結(jié)果進(jìn)行處理。而集合中的任意一個失敗,都會導(dǎo)致status被設(shè)置為RECONSUME_LATER。在對結(jié)果處理是,判斷到RECONSUME_LATER時,就會對msgs重新遍歷并發(fā)送消息,重新消費(fèi),從而導(dǎo)致之前成功處理的消息都會被重復(fù)消費(fèi)。不過好在msgs消息的數(shù)量默認(rèn)情況下是1。

88792f32-ad40-11ee-8b88-92fbcf53809c.png

- 消費(fèi)者提交offset失敗

何為offset

producer發(fā)送消息到broker,Rocketmq會將消息的內(nèi)容持久化到commitLog文件中,再分發(fā)到topic下的消費(fèi)隊(duì)列consume Queue,消費(fèi)者提交消費(fèi)請求時,broker從該consumer負(fù)責(zé)的消費(fèi)隊(duì)列中根據(jù)請求參數(shù)傳入的起始o(jì)ffset來獲取需要消費(fèi)的消息索引信息,再從commitLog中獲取具體的消息內(nèi)容返回給consumer。消費(fèi)成功之后,消費(fèi)者提交offset,來記錄這個queue消費(fèi)到哪個位置了。

問題分析

RocketMq設(shè)計(jì)的時候,消費(fèi)完消息,并不是同步提交offset,而是將offset保存到內(nèi)存中,通過一個定時任務(wù)(默認(rèn)是5S一次),以網(wǎng)絡(luò)請求的方式將offset提交給broker。如果最新的offset還沒提交,此時服務(wù)器宕機(jī)了,那么重啟之后,就會從broker中讀取到之前的提交的offset,并從此處開始消費(fèi),此時就會出現(xiàn)重復(fù)消費(fèi)的情況了。

888b513a-ad40-11ee-8b88-92fbcf53809c.png

- broker持久化offset失敗

問題分析

與消費(fèi)者提交offset同理,Broker為了防止數(shù)據(jù)丟失,會將offset持久化到磁盤中。同樣的也是通過一個默認(rèn)5S的定時任務(wù)來處理持久化操作。所以offset的完整過程就如下圖。當(dāng)broker宕機(jī)時,就會導(dǎo)致offset丟失,此時如果消費(fèi)者重新拉取消費(fèi)進(jìn)度,就會比實(shí)際消費(fèi)的進(jìn)度要低,導(dǎo)致重復(fù)消費(fèi)。

88a54d42-ad40-11ee-8b88-92fbcf53809c.png

- 主從同步失敗

問題分析

為保證RocketMQ服務(wù)的高可用,一般項(xiàng)目中都會啟用主從備份的模式,當(dāng)主節(jié)點(diǎn)掛掉之后,從節(jié)點(diǎn)就會升級為主節(jié)點(diǎn)對外提供服務(wù)。因此就需要進(jìn)行主從同步,保證數(shù)據(jù)的一致性。默認(rèn)情況下每隔10S,從節(jié)點(diǎn)會向主節(jié)點(diǎn)請求,同步元數(shù)據(jù),包括消費(fèi)進(jìn)度。此時如果主節(jié)點(diǎn)宕機(jī)了,從節(jié)點(diǎn)就無法獲取到10S之內(nèi)的消費(fèi)進(jìn)度,自然也就會導(dǎo)致重復(fù)消費(fèi)。

88bf60a6-ad40-11ee-8b88-92fbcf53809c.png

- 重平衡

何為重平衡

RocketMQ的消費(fèi)者有兩種模式,集群消費(fèi)模式和廣播消費(fèi)模式,絕大多數(shù)場景采用的都是集群消費(fèi)模式。前面提到的消費(fèi)進(jìn)度就是在集群消費(fèi)模式下才會存在。集群消費(fèi)模式中有一個消費(fèi)組的概念。一個消費(fèi)組可以有多個消費(fèi)者,不同消費(fèi)組之間消費(fèi)消息互不干擾,而同一消費(fèi)組的消費(fèi)者按照一定的算法分配消息隊(duì)列進(jìn)行消息消費(fèi),保證一個消息只能被一個消費(fèi)組消費(fèi)一次。當(dāng)消費(fèi)組中的消費(fèi)組增加或者減少時就會觸發(fā)重平衡。如圖,原先消費(fèi)組中有兩個消費(fèi)者,平均消費(fèi)4個隊(duì)列,每個消費(fèi)組2個隊(duì)列;當(dāng)加入了一個新的消費(fèi)者時,為了保證新的消費(fèi)者能夠消費(fèi)消息,就會進(jìn)行重平衡,重新分配消息隊(duì)列。

88d971d0-ad40-11ee-8b88-92fbcf53809c.png

問題分析

假設(shè)在重平衡發(fā)生時,此時消費(fèi)者2還在正常消費(fèi)Queue4,當(dāng)消費(fèi)者3加入,重平衡完成時,此時消費(fèi)者2判斷到Queue4已經(jīng)不屬于自己消費(fèi)了,就會將Queue4設(shè)置為dropped,消費(fèi)完成時,發(fā)現(xiàn)隊(duì)列是dropped狀態(tài),那么消費(fèi)者2的消費(fèi)進(jìn)度offset就不會被提交。成功消費(fèi)了消息,但是消費(fèi)進(jìn)度卻沒有被提交,于是當(dāng)消費(fèi)者3開始消費(fèi)消息時,就會從服務(wù)端拉取到之前的消費(fèi)進(jìn)度,造成隊(duì)列4的消息被重復(fù)消費(fèi)。

- 清理長時間消費(fèi)的消息

清理機(jī)制講解

RocketMQ中有一個機(jī)制會定時清理長時間正在消費(fèi)的消息,默認(rèn)是15分鐘執(zhí)行一次清理任務(wù)。之所以這么做,是有原因的。我們說過,消息被消費(fèi)之后,就會提交offset。當(dāng)一個線程消費(fèi)了所有消息時,就會把消息從集合中移除,提交的消息進(jìn)度offset就是msg5的offset+1。

假設(shè),現(xiàn)在是兩個線程消費(fèi),線程2消費(fèi)完成,之后提交offset,但是此時線程1還在處理前兩條消息,因此為了保證消費(fèi)消息的不丟失,移除之后發(fā)現(xiàn)集合中還有剩余消息,就會把msg1的offset返回提交上去。而一旦集合最前面的消息長時間處理,就會導(dǎo)致這個消費(fèi)進(jìn)度一直在最前面。此時如果服務(wù)器重啟,就會導(dǎo)致很多消費(fèi)過的消息都會被重復(fù)消費(fèi)。因此引入了清理長時間消費(fèi)的機(jī)制。

88f595a4-ad40-11ee-8b88-92fbcf53809c.png

問題分析

引入清理長時間消費(fèi)的消息機(jī)制后,一旦發(fā)現(xiàn)某個消息已經(jīng)處理超過15分鐘了,就會將消息移除,保障后續(xù)消息消費(fèi)進(jìn)度的正常提交,之后會隔一定的時間再次消費(fèi)這個被移除的消息。但是,這個消息雖然被移除了,卻并不是沒有消費(fèi)過,因此再次消費(fèi)就會導(dǎo)致重復(fù)消費(fèi)的問題出現(xiàn)。

Part 03總結(jié)

RocketMq的官方文檔中對消息傳遞有這樣的解釋:RocketMq確保所有消息至少被傳遞一次,在大多數(shù)情況下,消息不會重復(fù)??梢奟ocketMq為了保證消息的不丟失,犧牲了消息投遞的重復(fù)率。因此我們在使用RokcetMq時需要合理使用它的特點(diǎn),設(shè)計(jì)合理的冪等技術(shù)方案來解決重復(fù)消費(fèi)的問題。

審核編輯:湯梓紅

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • 云計(jì)算
    +關(guān)注

    關(guān)注

    38

    文章

    7612

    瀏覽量

    136734
  • Queue
    +關(guān)注

    關(guān)注

    0

    文章

    16

    瀏覽量

    7246
  • 負(fù)載均衡
    +關(guān)注

    關(guān)注

    0

    文章

    98

    瀏覽量

    12337
  • 大數(shù)據(jù)
    +關(guān)注

    關(guān)注

    64

    文章

    8805

    瀏覽量

    136992

原文標(biāo)題:技術(shù) | RocketMQ中各類重復(fù)消費(fèi)的原理淺析

文章出處:【微信號:5G通信,微信公眾號:5G通信】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    RocketMQ入門手冊

    RocketMQ入門篇
    發(fā)表于 10-09 14:13

    Rocketmq怎么安裝

    Rocketmq 安裝步驟
    發(fā)表于 10-24 07:47

    淺析伺服系統(tǒng)應(yīng)用的慣量匹配問題

    剛性、慣量、響應(yīng)時間及伺服增益調(diào)整之間的關(guān)系 淺析伺服系統(tǒng)應(yīng)用的慣量匹配問題-慣量匹配
    發(fā)表于 09-07 07:01

    在Linux系統(tǒng)下部署RocketMQ單機(jī)實(shí)例

    前言這篇文章以4.3.0版本為標(biāo)準(zhǔn)進(jìn)行講述在linux下部署RocketMQ單機(jī)實(shí)例,在此之前需要已配置JAVA環(huán)境。下載程序包直接使用一般就下載已經(jīng)編譯好的二進(jìn)制文件就好了,下載好以后&
    發(fā)表于 11-11 16:29

    展望Apache RocketMQ5.0 | 談RocketMQ的過去、現(xiàn)在和未來

    RocketMQ 創(chuàng)始人,阿里巴巴中間件高級技術(shù)專家 馮嘉 向開發(fā)者們分享了Apache RocketMQ 的過去、現(xiàn)在和未來,以及對RocketMQ5.0的展望。本文是根據(jù)馮嘉的現(xiàn)場分享所整理,為大家回顧分享
    發(fā)表于 08-14 16:37 ?172次閱讀

    全面簡析RocketMQ 架構(gòu)

    Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲消息。每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于集群的不同的Broker
    的頭像 發(fā)表于 06-12 17:07 ?1975次閱讀

    Apache RocketMQ MQTT協(xié)議架構(gòu)模型

    rocketmq-mqtt.zip
    發(fā)表于 04-20 10:45 ?0次下載
    Apache <b class='flag-5'>RocketMQ</b> MQTT協(xié)議架構(gòu)模型

    開源軟件-RocketMQ Externals Apache RocketMQ的擴(kuò)展項(xiàng)目

    ./oschina_soft/rocketmq-externals.zip
    發(fā)表于 06-23 15:03 ?0次下載
    開源軟件-<b class='flag-5'>RocketMQ</b> Externals Apache <b class='flag-5'>RocketMQ</b>的擴(kuò)展項(xiàng)目

    如何在RocketMQ合理使用重試機(jī)制

    RocketMQ 的重試機(jī)制包括三部分,分別是生產(chǎn)者重試,服務(wù)端內(nèi)部數(shù)據(jù)復(fù)制遇到非預(yù)期問題時重試,消費(fèi)消費(fèi)重試。
    的頭像 發(fā)表于 11-23 10:15 ?1007次閱讀

    淺析LeetCode 83刪除排序鏈表重復(fù)元素

    給定一個已排序的鏈表的頭 head , 刪除所有重復(fù)的元素,使每個元素只出現(xiàn)一次 。返回 已排序的鏈表 。
    的頭像 發(fā)表于 02-06 10:25 ?614次閱讀

    聊聊RocketMQ的主從復(fù)制

    RocketMQ 主從復(fù)制是 RocketMQ 高可用機(jī)制之一,數(shù)據(jù)可以從主節(jié)點(diǎn)復(fù)制到一個或多個從節(jié)點(diǎn)。
    的頭像 發(fā)表于 07-04 09:42 ?538次閱讀
    聊聊<b class='flag-5'>RocketMQ</b>的主從復(fù)制

    RocketMQ和RabbitMQ的區(qū)別

    化:RocketMQ將消息存儲在磁盤上,保證消息的可靠性;RabbitMQ默認(rèn)將消息保存在內(nèi)存,可以通過插件進(jìn)行持久化。 可用性:RocketMQ具備分布
    的頭像 發(fā)表于 07-24 13:39 ?1.4w次閱讀

    RocketMQ在業(yè)務(wù)消息場景的優(yōu)勢有哪些呢?

    RocketMQ 5.0 是消息事件流一體的實(shí)時數(shù)據(jù)處理平臺,是業(yè)務(wù)消息領(lǐng)域的事實(shí)標(biāo)準(zhǔn),很多互聯(lián)網(wǎng)公司在業(yè)務(wù)消息場景會使用 RocketMQ。
    的頭像 發(fā)表于 08-07 11:36 ?664次閱讀
    <b class='flag-5'>RocketMQ</b>在業(yè)務(wù)消息場景的優(yōu)勢有哪些呢?

    磁盤RocketMQ構(gòu)建的索引結(jié)構(gòu)

    RocketMQ 廣泛使用于各類業(yè)務(wù)場景,在實(shí)際生產(chǎn)場景,用戶通常會選擇消息 ID 或者特定的業(yè)務(wù) Key(例如學(xué)號,訂單號)來查詢和定位特定的一批消息,進(jìn)而定位分布式系統(tǒng)
    的頭像 發(fā)表于 12-22 10:43 ?316次閱讀
    磁盤<b class='flag-5'>中</b><b class='flag-5'>RocketMQ</b>構(gòu)建的索引結(jié)構(gòu)

    RocketMQ協(xié)議是什么?RocketMQ協(xié)議特點(diǎn)

    分布式消息系統(tǒng)中生產(chǎn)者和消費(fèi)者之間的高效可靠通信。它支持同步和異步消息傳遞模式,可以實(shí)現(xiàn)靈活和響應(yīng)迅速的通信方式。 RocketMQ協(xié)議基于發(fā)布-訂閱消息模式,生產(chǎn)者將消息發(fā)布到特定的主題,消費(fèi)者訂閱這些主題以接收消息。該協(xié)議通
    的頭像 發(fā)表于 01-03 16:11 ?705次閱讀