背景
前段時(shí)間有個(gè)小項(xiàng)目需要使用延遲任務(wù),談到延遲任務(wù),我腦子第一時(shí)間一閃而過的就是使用消息隊(duì)列來做,比如RabbitMQ的死信隊(duì)列又或者RocketMQ的延遲隊(duì)列,但是奈何這是一個(gè)小項(xiàng)目,并沒有引入MQ,我也不太想因?yàn)橐粋€(gè)延遲任務(wù)就引入MQ,增加系統(tǒng)復(fù)雜度,所以這個(gè)方案直接就被pass了。
雖然基于MQ這個(gè)方式走不通了,但是這個(gè)項(xiàng)目中使用到Redis,所以我就想是否能夠使用Redis來代替MQ實(shí)現(xiàn)延遲隊(duì)列的功能,于是我就查了一下有沒有現(xiàn)成可用的方案,別說,還真給我查到了兩種方案,并且我還仔細(xì)研究對比了這兩個(gè)方案,發(fā)現(xiàn)要想很好的實(shí)現(xiàn)延遲隊(duì)列,并不簡單。
監(jiān)聽過期key
基于監(jiān)聽過期key的方式來實(shí)現(xiàn)延遲隊(duì)列是我查到的第一個(gè)方案,為了弄懂這個(gè)方案實(shí)現(xiàn)的細(xì)節(jié),我還特地去扒了扒官網(wǎng),還真有所收獲
1、Redis發(fā)布訂閱模式
一談到發(fā)布訂閱模式,其實(shí)一想到的就是MQ,只不過Redis也實(shí)現(xiàn)了一套,并且跟MQ賊像,如圖:
圖中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。
生產(chǎn)者在消息發(fā)送時(shí)需要到指定發(fā)送到哪個(gè)channel上,消費(fèi)者訂閱這個(gè)channel就能獲取到消息。
在Redis中,有很多默認(rèn)的channel,只不過向這些channel發(fā)送消息的生產(chǎn)者不是我們寫的代碼,而是Redis本身。當(dāng)消費(fèi)者監(jiān)聽這些channel時(shí),就可以感知到Redis中數(shù)據(jù)的變化。
這個(gè)功能Redis官方稱為keyspace notifications,字面意思就是鍵空間通知。
這些默認(rèn)的channel被分為兩類:
以__keyspace@
舉個(gè)例子,現(xiàn)在有個(gè)消費(fèi)者監(jiān)聽了__keyspace@0__:sanyou這個(gè)channel,sanyou就是Redis中的一個(gè)普通key,那么當(dāng)sanyou這個(gè)key被刪除或者發(fā)生了其它事件,那么消費(fèi)者就會(huì)收到sanyou這個(gè)key刪除或者其它事件的消息
以__keyevent@
同樣舉個(gè)例子,現(xiàn)在有個(gè)消費(fèi)者監(jiān)聽了__keyevent@0__:expired這個(gè)channel,代表了監(jiān)聽key的過期事件。那么當(dāng)某個(gè)Redis的key過期了(expired),那么消費(fèi)者就能收到這個(gè)key過期的消息。如果把expired換成del,那么監(jiān)聽的就是刪除事件。具體支持哪些事件,可從官網(wǎng)查。
上述db是指具體的數(shù)據(jù)庫,Redis不是默認(rèn)分為16個(gè)庫么,序號(hào)從0-15,所以db就是0到15的數(shù)字,示例中的0就是指0對應(yīng)的數(shù)據(jù)庫。
3、延遲隊(duì)列實(shí)現(xiàn)原理
通過對上面的兩個(gè)概念了解之后,應(yīng)該就對監(jiān)聽過期key的實(shí)現(xiàn)原理一目了然了,其實(shí)就是當(dāng)這個(gè)key過期之后,Redis會(huì)發(fā)布一個(gè)key過期的事件到__keyevent@
所以這種方式實(shí)現(xiàn)延遲隊(duì)列就只需要兩步:
發(fā)送延遲任務(wù),key是延遲消息本身,過期時(shí)間就是延遲時(shí)間
監(jiān)聽__keyevent@
4、demo
好了,基本概念和核心原理都說完了之后,又到了show me the code環(huán)節(jié)。
好巧不巧,Spring已經(jīng)實(shí)現(xiàn)了監(jiān)聽__keyevent@*__:expired這個(gè)channel這個(gè)功能,__keyevent@*__:expired中的*代表通配符的意思,監(jiān)聽所有的數(shù)據(jù)庫。
所以demo寫起來就很簡單了,只需3步即可
引入pom
org.springframework.boot spring-boot-starter-data-redis 2.2.5.RELEASE org.springframework.boot spring-boot-starter-web 2.2.5.RELEASE
配置類
@Configuration publicclassRedisConfiguration{ @Bean publicRedisMessageListenerContainerredisMessageListenerContainer(RedisConnectionFactoryconnectionFactory){ RedisMessageListenerContainerredisMessageListenerContainer=newRedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); returnredisMessageListenerContainer; } @Bean publicKeyExpirationEventMessageListenerredisKeyExpirationListener(RedisMessageListenerContainerredisMessageListenerContainer){ returnnewKeyExpirationEventMessageListener(redisMessageListenerContainer); } }
KeyExpirationEventMessageListener實(shí)現(xiàn)了對__keyevent@*__:expiredchannel的監(jiān)聽
當(dāng)KeyExpirationEventMessageListener收到Redis發(fā)布的過期Key的消息的時(shí)候,會(huì)發(fā)布RedisKeyExpiredEvent事件
所以我們只需要監(jiān)聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。
對RedisKeyExpiredEvent事件的監(jiān)聽實(shí)現(xiàn)MyRedisKeyExpiredEventListener
@Component publicclassMyRedisKeyExpiredEventListenerimplementsApplicationListener{ @Override publicvoidonApplicationEvent(RedisKeyExpiredEventevent){ byte[]body=event.getSource(); System.out.println("獲取到延遲消息:"+newString(body)); } }
整個(gè)工程目錄也簡單
代碼寫好,啟動(dòng)應(yīng)用
之后我直接通過Redis命令設(shè)置消息,就沒通過代碼發(fā)送消息了,消息的key為sanyou,值為task,值不重要,過期時(shí)間為5s
setsanyoutask expiresanyou5
如果上面都理論都正確,不出意外的話,5s后MyRedisKeyExpiredEventListener應(yīng)該可以監(jiān)聽到sanyou這個(gè)key過期的消息,也就相當(dāng)于拿到了延遲任務(wù),控制臺(tái)會(huì)打印出獲取到延遲消息:sanyou。
于是我滿懷希望,靜靜地等待了5s。。
5、4、3、2、1,時(shí)間一到,我查看控制臺(tái),但是控制臺(tái)并沒有按照預(yù)期打印出上面那句話。
為什么會(huì)沒打印出?難道是代碼寫錯(cuò)了?正當(dāng)我準(zhǔn)備檢查代碼的時(shí)候,官網(wǎng)的一段話道出了真實(shí)原因。
我給大家翻譯一下上面這段話講的內(nèi)容。
上面這段話主要討論的是key過期事件的時(shí)效性問題,首先提到了Redis過期key的兩種清除策略,就是面試八股文常背的兩種:
惰性清除。當(dāng)這個(gè)key過期之后,訪問時(shí),這個(gè)Key才會(huì)被清除
定時(shí)清除。后臺(tái)會(huì)定期檢查一部分key,如果有key過期了,就會(huì)被清除
再后面那段話是核心,意思是說,key的過期事件發(fā)布時(shí)機(jī)并不是當(dāng)這個(gè)key的過期時(shí)間到了之后就發(fā)布,而是這個(gè)key在Redis中被清理之后,也就是真正被刪除之后才會(huì)發(fā)布。
到這我終于明白了,上面的例子中即使我設(shè)置了5s的過期時(shí)間,但是當(dāng)5s過去之后,只要兩種清除策略都不滿足,沒人訪問sanyou這個(gè)key,后臺(tái)的定時(shí)清理的任務(wù)也沒掃描到sanyou這個(gè)key,那么就不會(huì)發(fā)布key過期的事件,自然而然也就監(jiān)聽不到了。
至于后臺(tái)的定時(shí)清理的任務(wù)什么時(shí)候能掃到,這個(gè)沒有固定時(shí)間,可能一到過期時(shí)間就被掃到,也可能等一定時(shí)間才會(huì)被掃到,這就可能會(huì)造成了客戶端從發(fā)布到監(jiān)聽到的消息時(shí)間差會(huì)大于等于過期時(shí)間,從而造成一定時(shí)間消息的延遲,這就著實(shí)有點(diǎn)坑了。。
5、坑
除了上面測試demo的時(shí)候遇到的坑之外,在我深入研究之后,還發(fā)現(xiàn)了一些更離譜的坑。
丟消息太頻繁
Redis的丟消息跟MQ不一樣,因?yàn)镸Q都會(huì)有消息的持久化機(jī)制,可能只有當(dāng)機(jī)器宕機(jī)了,才會(huì)丟點(diǎn)消息,但是Redis丟消息就很離譜,比如說你的服務(wù)在重啟的時(shí)候就消息會(huì)丟消息。
Redis實(shí)現(xiàn)的發(fā)布訂閱模式,消息是沒有持久化機(jī)制,當(dāng)消息發(fā)布到某個(gè)channel之后,如果沒有客戶端訂閱這個(gè)channel,那么這個(gè)消息就丟了,并不會(huì)像MQ一樣進(jìn)行持久化,等有消費(fèi)者訂閱的時(shí)候再給消費(fèi)者消費(fèi)。
所以說,假設(shè)服務(wù)重啟期間,某個(gè)生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個(gè)channel,由于服務(wù)重啟,沒有監(jiān)聽這個(gè)channel,那么這個(gè)消息自然就丟了。
消息消費(fèi)只有廣播模式
Redis的發(fā)布訂閱模式消息消費(fèi)只有廣播模式一種。
所謂的廣播模式就是多個(gè)消費(fèi)者訂閱同一個(gè)channel,那么每個(gè)消費(fèi)者都能消費(fèi)到發(fā)布到這個(gè)channel的所有消息。
如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個(gè)消費(fèi)者都可以同時(shí)收到sanyou這條消息。
所以,如果通過監(jiān)聽channel來獲取延遲任務(wù),那么一旦服務(wù)實(shí)例有多個(gè)的話,還得保證消息不能重復(fù)處理,額外地增加了代碼開發(fā)量。
接收到所有key的某個(gè)事件
這個(gè)不屬于Redis發(fā)布訂閱模式的問題,而是Redis本身事件通知的問題。
當(dāng)消費(fèi)者監(jiān)聽了以__keyevent@
舉個(gè)例子,某個(gè)消費(fèi)者監(jiān)聽了__keyevent@*__:expired這個(gè)channel,那么只要key過期了,不管這個(gè)key是張三還會(huì)李四,消費(fèi)者都能收到。
所以如果你只想消費(fèi)某一類消息的key,那么還得自行加一些標(biāo)記,比如消息的key加個(gè)前綴,消費(fèi)的時(shí)候判斷一下帶前綴的key就是需要消費(fèi)的任務(wù)。
所以,綜上能夠得出一個(gè)非常重要的結(jié)論,那就是監(jiān)聽Redis過期Key這種方式實(shí)現(xiàn)延遲隊(duì)列,不穩(wěn)定,坑賊多!
那有沒有比較靠譜的延遲隊(duì)列的實(shí)現(xiàn)方案呢?這就不得不提到我研究的第二種方案了。
Redisson實(shí)現(xiàn)延遲隊(duì)列
Redisson他是Redis的兒子(Redis son),基于Redis實(shí)現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實(shí)現(xiàn),但是除了實(shí)現(xiàn)Redis分布式鎖之外,它還實(shí)現(xiàn)了延遲隊(duì)列的功能。
先來個(gè)demo,后面再來說說這種實(shí)現(xiàn)的原理。
1、demo
引入pom
org.redisson redisson 3.13.1
封裝了一個(gè)RedissonDelayQueue類
@Component @Slf4j publicclassRedissonDelayQueue{ privateRedissonClientredissonClient; privateRDelayedQueuedelayQueue; privateRBlockingQueue blockingQueue; @PostConstruct publicvoidinit(){ initDelayQueue(); startDelayQueueConsumer(); } privatevoidinitDelayQueue(){ Configconfig=newConfig(); SingleServerConfigserverConfig=config.useSingleServer(); serverConfig.setAddress("redis://localhost:6379"); redissonClient=Redisson.create(config); blockingQueue=redissonClient.getBlockingQueue("SANYOU"); delayQueue=redissonClient.getDelayedQueue(blockingQueue); } privatevoidstartDelayQueueConsumer(){ newThread(()->{ while(true){ try{ Stringtask=blockingQueue.take(); log.info("接收到延遲任務(wù):{}",task); }catch(Exceptione){ e.printStackTrace(); } } },"SANYOU-Consumer").start(); } publicvoidofferTask(Stringtask,longseconds){ log.info("添加延遲任務(wù):{}延遲時(shí)間:{}s",task,seconds); delayQueue.offer(task,seconds,TimeUnit.SECONDS); } }
這個(gè)類在創(chuàng)建的時(shí)候會(huì)去初始化延遲隊(duì)列,創(chuàng)建一個(gè)RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊(duì)列名字叫SANYOU,這個(gè)名字無所謂。
當(dāng)延遲隊(duì)列創(chuàng)建之后,會(huì)開啟一個(gè)延遲任務(wù)的消費(fèi)線程,這個(gè)線程會(huì)一直從RBlockingQueue中通過take方法阻塞獲取延遲任務(wù)。
添加任務(wù)的時(shí)候是通過RDelayedQueue的offer方法添加的。
controller類,通過接口添加任務(wù),延遲時(shí)間為5s
@RestController publicclassRedissonDelayQueueController{ @Resource privateRedissonDelayQueueredissonDelayQueue; @GetMapping("/add") publicvoidaddTask(@RequestParam("task")Stringtask){ redissonDelayQueue.offerTask(task,5); } }
啟動(dòng)項(xiàng)目,添加任務(wù)
靜靜等待5s,成功獲取到任務(wù)。
2、實(shí)現(xiàn)原理
如下圖就是上面demo中,一個(gè)延遲隊(duì)列會(huì)在Redis內(nèi)部使用到的channel和數(shù)據(jù)類型
SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時(shí)候會(huì)拼上前綴。
redisson_delay_queue_timeout:SANYOU,sorted set數(shù)據(jù)類型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時(shí)間戳(提交任務(wù)時(shí)的時(shí)間戳 + 延遲時(shí)間)來排序的,所以列表的最前面的第一個(gè)元素就是整個(gè)延遲隊(duì)列中最早要被執(zhí)行的任務(wù),這個(gè)概念很重要
redisson_delay_queue:SANYOU,list數(shù)據(jù)類型,也是存放所有的任務(wù),但是研究下來發(fā)現(xiàn)好像沒什么用。。
SANYOU,list數(shù)據(jù)類型,被稱為目標(biāo)隊(duì)列,這個(gè)里面存放的任務(wù)都是已經(jīng)到了延遲時(shí)間的,可以被消費(fèi)者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個(gè)目標(biāo)隊(duì)列中獲取到任務(wù)的
redisson_delay_queue_channel:SANYOU,是一個(gè)channel,用來通知客戶端開啟一個(gè)延遲任務(wù)
有了這些概念之后,再來看看整體的運(yùn)行原理圖
生產(chǎn)者在提交任務(wù)的時(shí)候?qū)⑷蝿?wù)放到redisson_delay_queue_timeout:SANYOU中,分?jǐn)?shù)就是提交任務(wù)的時(shí)間戳+延遲時(shí)間,就是延遲任務(wù)的到期時(shí)間戳
客戶端會(huì)有一個(gè)延遲任務(wù),為了區(qū)分,后面我都說是客戶端延遲任務(wù)。這個(gè)延遲任務(wù)會(huì)向Redis Server發(fā)送一段lua腳本,Redis執(zhí)行l(wèi)ua腳本中的命令,并且是原子性的
這段lua腳本主要干了兩件事:
將到了延遲時(shí)間的任務(wù)從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個(gè)目標(biāo)隊(duì)列
獲取到redisson_delay_queue_timeout:SANYOU中目前最早到過期時(shí)間的延遲任務(wù)的到期時(shí)間戳,然后發(fā)布到redisson_delay_queue_channel:SANYOU這個(gè)channel中
當(dāng)客戶端監(jiān)聽到redisson_delay_queue_channel:SANYOU這個(gè)channel的消息時(shí),會(huì)再次提交一個(gè)客戶端延遲任務(wù),延遲時(shí)間就是消息(最早到過期時(shí)間的延遲任務(wù)的到期時(shí)間戳)- 當(dāng)前時(shí)間戳,這個(gè)時(shí)間其實(shí)也就是redisson_delay_queue_channel:SANYOU中最早到過期時(shí)間的任務(wù)還剩余的延遲時(shí)間。
此處可以等待10s,好好想想。。
這樣,一旦時(shí)間來到了上面說的最早到過期時(shí)間任務(wù)的到期時(shí)間戳,redisson_delay_queue_timeout:SANYOU中上面說的最早到過期時(shí)間的任務(wù)已經(jīng)到期了,客戶端的延遲任務(wù)也同時(shí)到期,于是開始執(zhí)行l(wèi)ua腳本操作,及時(shí)將到了延遲時(shí)間的任務(wù)放到目標(biāo)隊(duì)列中。然后再次發(fā)布剩余的延遲任務(wù)中最早到期的任務(wù)到期時(shí)間戳到channe中,如此循環(huán)往復(fù),一直運(yùn)行下去,保證redisson_delay_queue_timeout:SANYOU中到期的數(shù)據(jù)能及時(shí)放到目標(biāo)隊(duì)列中。
所以,上述說了一大堆的主要的作用就是保證到了延遲時(shí)間的任務(wù)能夠及時(shí)被放到目標(biāo)隊(duì)列。
這里再補(bǔ)充兩個(gè)特殊情況,圖中沒有畫出:
第一個(gè)就是如果redisson_delay_queue_timeout:SANYOU是新添加的任務(wù)(隊(duì)列之前有或者沒有任務(wù))是隊(duì)列中最早需要被執(zhí)行的,也會(huì)發(fā)布消息到channel,之后就按時(shí)上面說的流程走了。
添加任務(wù)代碼如下,也是通過lua腳本來的
第二種特殊情況就是項(xiàng)目啟動(dòng)的時(shí)候會(huì)執(zhí)行一次客戶端延遲任務(wù)。項(xiàng)目在重啟時(shí),由于沒有客戶端延遲任務(wù)的執(zhí)行,可能會(huì)出現(xiàn)redisson_delay_queue_timeout:SANYOU隊(duì)列中有到期但是沒有被放到目標(biāo)隊(duì)列的可能,重啟就執(zhí)行一次就是為了保證到期的數(shù)據(jù)能被及時(shí)放到目標(biāo)隊(duì)列中。
3、與第一種方案比較
現(xiàn)在來比較一下第一種方案和Redisson的這種方案,看看有沒有第一種方案的那些坑。
第一個(gè)任務(wù)延遲的問題,Redisson方案理論上是沒有延遲的,但是當(dāng)消息數(shù)量增加,消費(fèi)者消費(fèi)緩慢這個(gè)情況下可能會(huì)導(dǎo)致延遲任務(wù)消費(fèi)的延遲。
第二個(gè)丟消息的問題,Redisson方案很大程度上減輕了丟消息的可能性,因?yàn)樗械娜蝿?wù)都是存在list和sorted set兩種數(shù)據(jù)類型中,Redis有持久化機(jī)制,就算Redis宕機(jī)了,也就可能會(huì)丟一點(diǎn)點(diǎn)數(shù)據(jù)。
第三個(gè)廣播消費(fèi)任務(wù)的問題,這個(gè)是不會(huì)出現(xiàn)的,因?yàn)槊總€(gè)客戶端都是從同一個(gè)目標(biāo)隊(duì)列中獲取任務(wù)的。
第四個(gè)問題是Redis內(nèi)部channel發(fā)布事件的問題,跟這種方案不沾邊,就更不可能存在了。
所以,通過上面的對比可以看出,Redisson這種實(shí)現(xiàn)方案就顯得更加的靠譜了。
審核編輯:劉清
-
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3752瀏覽量
64233 -
lua腳本
+關(guān)注
關(guān)注
0文章
21瀏覽量
7573 -
Redis
+關(guān)注
關(guān)注
0文章
370瀏覽量
10830
原文標(biāo)題:用 Redis 實(shí)現(xiàn)延遲隊(duì)列,我研究了兩種方案,發(fā)現(xiàn)并不簡單
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評(píng)論請先 登錄
相關(guān)推薦
評(píng)論