0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

一種基于MySQL實(shí)現(xiàn)的stream隊(duì)列

jf_ro2CN3Fa ? 來(lái)源:稀土掘金 ? 2023-11-24 09:53 ? 次閱讀

EMS

Extend MySQL Stream;一種基于 MySQL 實(shí)現(xiàn)的 stream 隊(duì)列。

功能

集群消費(fèi)、廣播消費(fèi)

自動(dòng)重試、死信隊(duì)列

快速重置消息位點(diǎn),快速回放消息,快速查詢消息

消息可基于磁盤(pán)積壓、消息可快速清理

監(jiān)控 group 積壓,topic 消息量排行,消息鏈路追蹤,消息消費(fèi)超時(shí)告警;

讀寫(xiě)性能 1200-3000 QPS 左右

寫(xiě)入設(shè)計(jì)

msg id 就是 topic 維度的自增 id,可對(duì)多個(gè) topic 并發(fā)寫(xiě)入

針對(duì)一個(gè) topic,需要有物理 physics offset, 每次寫(xiě)入,topic 維度的 physics_offset 自增加一

如果使用 redis 自增特性實(shí)現(xiàn), 為每個(gè) topic 配置一個(gè)自增 key, 則可以避免加鎖.

redis 實(shí)現(xiàn)雖然性能好, 如為配置aof,宕機(jī)則可能導(dǎo)致丟失數(shù)據(jù), 此時(shí),會(huì)出現(xiàn) offset 重復(fù)異常, 過(guò)一會(huì)隨著繼續(xù)自增, 也就恢復(fù)了.

寫(xiě)入需要上鎖嗎? 看怎么寫(xiě), 如果使用非原子的形式自增 id,比如數(shù)據(jù)的的方式,先查出最大 id,再加一,那么必須加鎖

topic 維度的自增 id 如果使用 mysql 實(shí)現(xiàn), 性能不堪受辱,因此,此處使用 redis 自增實(shí)現(xiàn)(可配置為 mysql 實(shí)現(xiàn));

經(jīng)過(guò)測(cè)試,筆記本電腦,單 topic 20 并發(fā)寫(xiě)入,qps 在 1000-1500 左右(local mysql & local redis),基本滿足業(yè)務(wù)需求。

考慮到高可用性和業(yè)務(wù)場(chǎng)景,此處無(wú)法使用批量插入

所有的 topic 和 msg 都寫(xiě)入的這一張表中,表數(shù)據(jù)定時(shí)清理,消費(fèi)完的消息,可提前刪除。

注意,本方案寫(xiě)入性能瓶頸是 MySQL Server 的性能瓶頸。

讀取設(shè)計(jì)

假設(shè)針對(duì)一個(gè) topic,只有一個(gè) consumer,只需循環(huán)讀取,然后更新 offset 即可。

但結(jié)合實(shí)際業(yè)務(wù)場(chǎng)景,這種基本不存在,所以,忽略這種場(chǎng)景。

通常,一個(gè) topic 有多個(gè) consumer group(簡(jiǎn)稱(chēng) tg), 一個(gè) consumer group 有多個(gè) client(jvm or thread

如果一個(gè) topic + group(簡(jiǎn)稱(chēng) tg),有多個(gè) consumer,每個(gè) consumer 有多個(gè)線程,讀取和更新 offset 則會(huì)有并發(fā)問(wèn)題, 如下圖。

這個(gè) client id,我們將其設(shè)計(jì)為,ip + pid + uuid + thread id;

ip 和 pid 可幫助我們追溯問(wèn)題

uuid 簡(jiǎn)單防重復(fù)

thread id,一種性能優(yōu)化,下面細(xì)說(shuō)。

結(jié)合實(shí)際業(yè)務(wù)場(chǎng)景,且遵循 simple is better 原則,讀取時(shí),使用上鎖的方式解決并發(fā)問(wèn)題。鎖的粒度就是 tg

考慮到要實(shí)現(xiàn)基本的順序讀取和防止重復(fù)消費(fèi),多線程并發(fā)時(shí),我們應(yīng)當(dāng)實(shí)現(xiàn)基于自增的形式讀取 msg;每個(gè) clientid 讀取消息后,都會(huì)記錄一個(gè)簡(jiǎn)單的log,并在 tg 維度增加一個(gè) max offset

每次讀取消息時(shí),每個(gè) client 都需要去檢查當(dāng)前想要讀取的 tg 是否已經(jīng)有【其他 client】在操作 max offset。即,我們將鎖的粒度縮小到了 max offset;

對(duì)這個(gè) tg 維度的 max offset + n

批量插入這個(gè) tg + clientid offset log,表明這個(gè)消息被這個(gè) clientid 讀取了,同時(shí)也間接更新了 max offset(order by offset)

釋放鎖

拉取剛剛讀取的 msg id list 里面的消息體

交給業(yè)務(wù)處理消息

整體原則是,一個(gè) t + g 的 max offset,同時(shí)只能有一個(gè) thread 操作(寫(xiě)和更新)

如果有其他人在讀取,則阻塞

如果沒(méi)有其他人在讀取,則鎖住這個(gè) tg, 并批量拉取一定數(shù)量的消息 id,

fe6133ce-8a69-11ee-939d-92fbcf53809c.jpg

ack

對(duì)于集群消息,如何保證在斷電情況下,消息不丟失,使用數(shù)據(jù)庫(kù)存儲(chǔ)消息, 寫(xiě)入即不會(huì)丟失, 但消費(fèi)時(shí), 如果剛剛讀進(jìn)內(nèi)存就立刻宕機(jī),則需要在重啟時(shí)恢復(fù)消息.

每個(gè) client get 到消息后,都需要記錄 msg pid,consumer group,state(start、done,retry)為 start 狀態(tài)

ack success,將 log update 為 done 狀態(tài)

ack fail 后,將 log update 為 retry 狀態(tài),同時(shí)將消息存入重試隊(duì)列

如果 client 還存活,超過(guò) 1 分鐘(可配),則將其撈出,放進(jìn)重試隊(duì)列,并在 10s 進(jìn)行第一次重試

如果 client 還存活,則立刻將其撈出,放進(jìn)重試隊(duì)列,并在 10s 進(jìn)行第一次重試

這里需要上鎖嗎?其實(shí)是不需要的,因?yàn)楦碌木S度是 client id 的 log,不存在并發(fā)更新. 這里更新?tīng)顟B(tài)是表示這些消息已經(jīng)處理結(jié)束了,否則無(wú)法判定宕機(jī)場(chǎng)景。

對(duì)于 start 狀態(tài)的消息,定時(shí)任務(wù)會(huì)去檢查

ack 是批量的,ack 失敗,僅會(huì)導(dǎo)致重復(fù)消費(fèi)。

廣播消息

是否為廣播消息由 topic 確定

廣播消息不需要上鎖,每一個(gè)訂閱該 topic 的 client 都會(huì)讀取到該消息

廣播消息不需要 ack,不需要記錄成功或失敗或重試,僅需要內(nèi)存里記錄 offset

推薦盡可能使用集群模式,使用集群模式模擬廣播模式

client id

只有 consumer 需要 client id

client id 由 ip pid uuid + thread id 組成, 可溯源.

client id 需要續(xù)約(5s),如果機(jī)器宕機(jī),則會(huì)被自動(dòng)清除,且他的 start 狀態(tài)的消息會(huì)進(jìn)入重試隊(duì)列,交給同 group 的其他 client

client id 可以自己主動(dòng)注銷(xiāo),注銷(xiāo)前,自己內(nèi)存的消息應(yīng)當(dāng)被優(yōu)雅消費(fèi)結(jié)束,一般來(lái)講,kill -15 的 jvm 都會(huì)主動(dòng)注銷(xiāo) client id;

核心表設(shè)計(jì)

topic 表:記錄topic 元信息

group 表:記錄 group 訂閱元信息

msg 表:msg總表,記錄寫(xiě)入的信息,包含 body 和 topic 維度的自增 offset,類(lèi)似 rocketmq commit log

該表會(huì)被多個(gè) consumer 消費(fèi)的消息

該表會(huì)被定制刪除過(guò)期數(shù)據(jù)

retry msg 表,消費(fèi)失敗、超時(shí)的消息,會(huì)進(jìn)入該表,并按階梯定時(shí)消費(fèi)

dead msg 表,消費(fèi)重試 16(any config) 次的消息,會(huì)進(jìn)入該表

topic_group_log 表:記錄 consumer group client 的 msg 消費(fèi)記錄,包含 state(start、done,retry) 字段,可 ack

該表的記錄行數(shù)會(huì)非常多,單行數(shù)據(jù)較少,可自動(dòng)刪除 done 的記錄

如上文所說(shuō),由于本方案未采用常見(jiàn)的多 queue 和多 partition 的設(shè)計(jì),因此瓶頸在于上圖提到的分布式鎖的設(shè)計(jì)上,具體鏈路為 consumer group client 在集群消費(fèi)時(shí), 為了讓并發(fā)讀取的 thread 拉取到的消息盡可能準(zhǔn)確,使用上鎖的方式來(lái)實(shí)現(xiàn)。

總體看下來(lái),可以簡(jiǎn)單理解為,ems 失去了性能,卻擁有了所有。







審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • MySQL
    +關(guān)注

    關(guān)注

    1

    文章

    798

    瀏覽量

    26399
  • QPS
    QPS
    +關(guān)注

    關(guān)注

    0

    文章

    24

    瀏覽量

    8785

原文標(biāo)題:如何設(shè)計(jì)一款基于 MySQL 實(shí)現(xiàn)的 Message Queue

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    Stream API原理介紹

    Stream API 是 Java 8 中最重要的新特性之,它是處理集合和數(shù)組的一種新方式。它提供了一種簡(jiǎn)單、靈活和可讀的方式來(lái)處理集合和數(shù)組中的元素,從而使代碼更加簡(jiǎn)潔、高效和易于
    的頭像 發(fā)表于 09-30 15:31 ?663次閱讀

    聊消息隊(duì)列技術(shù)選型的7消息場(chǎng)景

    我們?cè)谧鱿?b class='flag-5'>隊(duì)列的技術(shù)選型時(shí),往往會(huì)結(jié)合業(yè)務(wù)場(chǎng)景進(jìn)行考慮。今天來(lái)聊聊消息隊(duì)列可能會(huì)用到的 7 消息場(chǎng)景。
    的頭像 發(fā)表于 12-09 17:50 ?1302次閱讀
    聊<b class='flag-5'>一</b>聊消息<b class='flag-5'>隊(duì)列</b>技術(shù)選型的7<b class='flag-5'>種</b>消息場(chǎng)景

    Redis Stream應(yīng)用案例

    的IoT設(shè)備會(huì)形成巨大的數(shù)據(jù)洪流,采集完成后在云端進(jìn)行分析,產(chǎn)生巨大的用戶價(jià)值。這些數(shù)據(jù)雖然內(nèi)容各個(gè)不同,但是都有個(gè)共同的特點(diǎn),都是一種時(shí)序數(shù)據(jù)??吹竭@里,你可能會(huì)突然發(fā)現(xiàn),Redis Stream
    發(fā)表于 06-26 17:15

    怎樣去設(shè)計(jì)一種采用覆蓋機(jī)制的FIFO隊(duì)列模型呢

    FIFO隊(duì)列是什么?怎樣去設(shè)計(jì)一種采用覆蓋機(jī)制的FIFO隊(duì)列模型呢?
    發(fā)表于 12-08 06:07

    如何利用Java swing mysql實(shí)現(xiàn)一種電影票訂票管理系統(tǒng)呢

    Java swing mysql實(shí)現(xiàn)的電影票訂票管理系統(tǒng),主要實(shí)現(xiàn)的功能有:用戶端:登錄注冊(cè)、查看電影信息、選擇影院場(chǎng)次、選座購(gòu)票、查看自己的影票、評(píng)價(jià)電影等功能。管理員:登錄、電影管理、影院管理
    發(fā)表于 01-03 06:23

    實(shí)現(xiàn)隊(duì)列環(huán)形緩沖的方法

    串口隊(duì)列環(huán)形緩沖區(qū)隊(duì)列串口環(huán)形緩沖的好處代碼實(shí)現(xiàn)隊(duì)列??要實(shí)現(xiàn)隊(duì)列環(huán)形緩沖,還需要
    發(fā)表于 02-21 07:11

    如何去實(shí)現(xiàn)一種隊(duì)列程序的設(shè)計(jì)呢

    隊(duì)列的原理是什么?隊(duì)列有何作用?如何去實(shí)現(xiàn)一種隊(duì)列程序的設(shè)計(jì)呢?
    發(fā)表于 02-25 07:50

    一種改進(jìn)的主動(dòng)隊(duì)列管理算法

    主動(dòng)隊(duì)列管理是實(shí)現(xiàn)網(wǎng)絡(luò)擁塞控制的重要技術(shù),但是多數(shù)主動(dòng)隊(duì)列管理算法如隨機(jī)早期檢(RED)都存在對(duì)參數(shù)依賴(lài)性強(qiáng)的問(wèn)題。針對(duì)RED算法中平均隊(duì)列長(zhǎng)度不能完全反映網(wǎng)絡(luò)擁塞狀況的
    發(fā)表于 04-13 09:08 ?14次下載

    一種高效的磁盤(pán)隊(duì)列I/O機(jī)制

    分析了傳統(tǒng)磁盤(pán)隊(duì)列的存儲(chǔ)管理開(kāi)銷(xiāo)和讀寫(xiě)性能,針對(duì)磁盤(pán)隊(duì)列I/O已成為影響消息服務(wù)器性能的首要瓶頸,提出了一種高效磁盤(pán)隊(duì)列I/O機(jī)制—FlashQ。FlashQ采用物理上連續(xù)的磁盤(pán)塊
    發(fā)表于 05-14 19:51 ?32次下載

    一種基于速率的公平隊(duì)列管理算法

    針對(duì)主動(dòng)隊(duì)列管理算法普遍存在的公平性問(wèn)題,提出基于速率的公平隊(duì)列管理算法RFED。該算法根據(jù)分組的到達(dá)速率調(diào)節(jié)丟包率,將隊(duì)列的到達(dá)速率控制在鏈路的服務(wù)速率下,根據(jù)
    發(fā)表于 10-04 14:11 ?15次下載

    深度解析數(shù)據(jù)結(jié)構(gòu)與算法篇之隊(duì)列及環(huán)形隊(duì)列實(shí)現(xiàn)

    的位置。 02 — 環(huán)形隊(duì)列實(shí)現(xiàn) 要想將元素放入隊(duì)列我們必須知道對(duì)頭和隊(duì)尾,在隊(duì)列長(zhǎng)度不能無(wú)限大的條件下我們還要知道隊(duì)列的最大容量,我們還
    的頭像 發(fā)表于 06-18 10:07 ?1882次閱讀

    隊(duì)列實(shí)現(xiàn)棧原理是什么?隊(duì)列實(shí)現(xiàn)棧方案有哪幾種?

    棧是一種后進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),而隊(duì)列一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),兩者原理不難理解,使用也簡(jiǎn)單。
    的頭像 發(fā)表于 07-04 13:28 ?2699次閱讀
    <b class='flag-5'>隊(duì)列</b><b class='flag-5'>實(shí)現(xiàn)</b>棧原理是什么?<b class='flag-5'>隊(duì)列</b><b class='flag-5'>實(shí)現(xiàn)</b>棧方案有哪幾種?

    TencentOS-tiny中環(huán)形隊(duì)列實(shí)現(xiàn)

    1. 什么是隊(duì)列隊(duì)列(queue)是一種只能在端插入元素、在另端刪除元素的數(shù)據(jù)結(jié)構(gòu),遵循「先入先出」(FIFO)的規(guī)則。 隊(duì)列中有兩個(gè)基
    的頭像 發(fā)表于 10-08 16:30 ?1355次閱讀

    一種基于單片機(jī)實(shí)現(xiàn)隊(duì)列功能模塊

    基于單片機(jī)實(shí)現(xiàn)隊(duì)列功能模塊,主要用于8位、16位、32位非運(yùn)行RTOS的單片機(jī)應(yīng)用,兼容大多數(shù)單片機(jī)平臺(tái)。
    的頭像 發(fā)表于 08-14 11:09 ?778次閱讀
    <b class='flag-5'>一種</b>基于單片機(jī)<b class='flag-5'>實(shí)現(xiàn)</b>的<b class='flag-5'>隊(duì)列</b>功能模塊

    嵌入式環(huán)形隊(duì)列與消息隊(duì)列實(shí)現(xiàn)原理

    嵌入式環(huán)形隊(duì)列,也稱(chēng)為環(huán)形緩沖區(qū)或循環(huán)隊(duì)列,是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),用于在固定大小的存儲(chǔ)區(qū)域中高效地存儲(chǔ)和訪問(wèn)數(shù)據(jù)。其主要特點(diǎn)包括固定大小的數(shù)組和兩個(gè)指針(頭指針和尾指針),分別指向
    的頭像 發(fā)表于 09-02 15:29 ?293次閱讀