0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

延遲隊列的實現(xiàn)方式

科技綠洲 ? 來源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-09-30 11:17 ? 次閱讀

延遲任務(wù)

最近有一個需求,基于消息隊列對數(shù)據(jù)消費,并根據(jù)多次消費的結(jié)果對數(shù)據(jù)進(jìn)行重新組裝,如果在指定時間內(nèi),需要的數(shù)據(jù)全部到達(dá),則進(jìn)行數(shù)據(jù)組裝以及后續(xù)邏輯。簡單的說,設(shè)置一個超時時間,如果在該時間內(nèi)由MQ中消費到完整的數(shù)據(jù)則直接處理,否則進(jìn)入其他流程。

針對這種場景使用了延遲任務(wù)來實現(xiàn),以此為契機對延遲任務(wù)相關(guān)的技術(shù)做了個簡單了解...

簡介

延遲任務(wù)是一種指定任務(wù)在未來某個時間點或一定時間后執(zhí)行的方式。通常情況下,延遲任務(wù)可以通過設(shè)置任務(wù)的執(zhí)行時間或延遲時間來實現(xiàn)。

延遲任務(wù)可以用于異步操作、定時任務(wù)和任務(wù)調(diào)度等場景。例如,在用戶注冊后發(fā)送歡迎郵件或者在用戶下單后發(fā)送訂單確認(rèn)短信,可以通過延遲任務(wù)來實現(xiàn)異步操作。定時檢查服務(wù)器狀態(tài)、定時備份數(shù)據(jù)等任務(wù),也可以通過延遲任務(wù)來實現(xiàn)定時任務(wù)。在某個時間點觸發(fā)某個任務(wù)、在某個時間段內(nèi)重復(fù)執(zhí)行某個任務(wù)等,可以通過延遲任務(wù)來實現(xiàn)任務(wù)調(diào)度。

延遲任務(wù)通常使用隊列或者定時器來實現(xiàn)。在隊列中,任務(wù)會被添加到一個等待隊列中,等待隊列中的任務(wù)會在指定的時間點或延遲時間后被取出執(zhí)行。在定時器中,任務(wù)會被添加到一個定時器中,定時器會在指定的時間點觸發(fā)任務(wù)執(zhí)行。

總之,延遲任務(wù)是一種非常實用的技術(shù),可以幫助我們更好地管理系統(tǒng)中的異步操作、定時任務(wù)和任務(wù)調(diào)度等場景。

使用場景

異步操作:延遲任務(wù)可以用于異步操作,例如在用戶注冊后發(fā)送歡迎郵件或者在用戶下單后發(fā)送訂單確認(rèn)短信。通過使用延遲任務(wù),可以將這些操作推遲到后臺處理,從而提高系統(tǒng)的響應(yīng)速度和并發(fā)能力。

定時任務(wù):延遲任務(wù)可以用于定時任務(wù),例如定時檢查服務(wù)器狀態(tài)、定時備份數(shù)據(jù)等。通過使用延遲任務(wù),可以在指定的時間點自動觸發(fā)任務(wù),避免手動操作的繁瑣和容易出錯。

任務(wù)調(diào)度:延遲任務(wù)可以用于任務(wù)調(diào)度,例如在某個時間點觸發(fā)某個任務(wù)、在某個時間段內(nèi)重復(fù)執(zhí)行某個任務(wù)等。通過使用延遲任務(wù),可以方便地進(jìn)行任務(wù)調(diào)度,提高系統(tǒng)的可靠性和穩(wěn)定性。

技術(shù)實現(xiàn)

  • Timer 基于java基礎(chǔ)類庫java.util.Timer實現(xiàn)
  • DelayQueue
    基于延時隊列實現(xiàn)
  1. 基于內(nèi)存,應(yīng)用重啟(或宕機)會導(dǎo)致任務(wù)丟失
  2. 基于內(nèi)存存放隊列,不支持集群
  3. 依據(jù)compareTo方法排列隊列,調(diào)用take阻塞式的取出第一個任務(wù)(不調(diào)用則不取出),比較不靈活,會影響時間的準(zhǔn)確性
  • ScheduledThreadPoolExecutor
    1. 基于內(nèi)存,應(yīng)用重啟(或宕機)會導(dǎo)致任務(wù)丟失
    2. 基于內(nèi)存存放任務(wù),不支持集群
    3. 一個任務(wù)就要新建一個線程綁定任務(wù)的執(zhí)行,容易造成資源浪費
  • Redis過期監(jiān)聽 基于Redis過期訂閱
    1. 客戶端斷開后重連會導(dǎo)致所有事件丟失
    2. 高并發(fā)場景下,存在大量的失效key場景會導(dǎo)出失效時間存在延遲
    3. 若有多個監(jiān)聽器監(jiān)聽該key,是會重復(fù)消費這個過期事件的,需要特定邏輯判斷
  • MQ延遲隊列 基于消息死信隊列實現(xiàn) 支持集群,分布式,高并發(fā)場景;缺點:引入額外的消息隊列,增加項目的部署和維護的復(fù)雜度。
  • HashedWheelTimer 基于Netty提供的工具類HashedWheelTimer HashedWheelTimer 是使用定時輪實現(xiàn)的,定時輪其實就是一種環(huán)型的數(shù)據(jù)結(jié)構(gòu),可以把它想象成一個時鐘, 分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執(zhí)行的超時任務(wù),同時有一個指針一格一格的走,走到那個格子時就執(zhí)行格子對應(yīng)的延遲任務(wù),

其中前三種Timer、DelayQueue、ScheduledThreadPoolExecutor實現(xiàn)比較簡單,只不過只適用于單體應(yīng)用,任務(wù)數(shù)據(jù)都在內(nèi)存中,在系統(tǒng)崩潰后數(shù)據(jù)丟失;后兩張實現(xiàn)相對復(fù)雜,并且需要依賴于第三方應(yīng)用,在系統(tǒng)整體結(jié)構(gòu)上更加復(fù)雜且消耗更多資源,但能支持分布式系統(tǒng),且有較高的容錯性。

示例

定義延遲任務(wù)對象:

@Getter
public class DelayTask implements Serializable{

    private static final long serialVersionUID = -5062977578344039366L;
    
    private long delaySeconds;
    private TaskExecute taskExecute;

    public DelayTask(long delaySeconds, TaskExecute taskExecute) {
        this.delaySeconds = delaySeconds;
        this.taskExecute = taskExecute;
    }

    /**
     *
     */
    public void execute(){
        taskExecute.run();
    }

    public interface TaskExecute extends Runnable, Serializable {

    }
}

調(diào)度器:

public interface ScheduleTrigger {

    /**
     * 延遲任務(wù)調(diào)度
     * @param delayTask
     */
    void schedule(DelayTask delayTask);
}
  1. Timer
public class JavaTrigger implements ScheduleTrigger{

    private Timer timer;

    public JavaTimer(){
        this.timer = new Timer();
    }
    
    /**
     *
     * @param delayTask
     */
    public void schedule(DelayTask delayTask){
        timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
    }

    private TimerTask buildTimerTask(Runnable runnable){
        return new TimerTask() {
            @Override
            public void run() {
                runnable.run();
            }
        };
    }

}
  1. DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{

    private DelayQueue< Task > queue = new DelayQueue<  >();

    public DelayQueueTrigger() {
        Thread thread = new Thread(() - > {
            while (true) {
                try {
                    Task task = queue.take();
                    if(task != null)
                        task.execute();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    /**
     * @param delayTask
     */
    public void schedule(DelayTask delayTask){
        if( delayTask instanceof Task ){
            queue.put((Task) delayTask);
        }
    }

}

class Task extends DelayTask implements Delayed{

    private long execTime;

    public Task(long delaySeconds, TaskExecute taskExecute) {
        super(delaySeconds, taskExecute);
        this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
    }

    /**
     * 輪詢執(zhí)行該方法判斷是否滿足執(zhí)行條件(<=0)
     * 同時該返回作為等待時長
     * @param unit the time unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return this.execTime - System.currentTimeMillis(); // ms
    }

    public long getExecTime() {
        return execTime;
    }

    @Override
    public int compareTo(Delayed other) {
        if(this.getExecTime() == ((Task)other).getExecTime()){
            return 0;
        }
        return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
    }
}
  1. ScheduledThreadPoolExecutor
    ScheduledThreadPoolExecutor實現(xiàn)也是基于延遲隊列BlockingQueue實現(xiàn)
public class ScheduledExecutorTrigger implements ScheduleTrigger{

    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
    
    public void schedule(DelayTask delayTask){
        executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
    }

}
  1. Redis過期監(jiān)聽
    需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{

    private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";

    @Configuration
    @Import(RedisAutoConfiguration.class)
    public static class Config{

        @Bean(name = "redisTemplate")
        public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate< Object, Object > template = new RedisTemplate<  >();
            RedisSerializer< String > keySerializer = new StringRedisSerializer();
            RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
            template.setConnectionFactory(factory);
            template.setKeySerializer(keySerializer);
            template.setValueSerializer(valueSerializer);
            return template;
        }

        /**
         * 消息監(jiān)聽器容器bean
         * @param connectionFactory
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }

        @Bean
        public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
            RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
            redisKeyExpirationListener.setContext(context());
            return redisKeyExpirationListener;
        }

        @Bean
        public Context context(){
            return new Context();
        }

        @Bean
        public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
            return new RedisTrigger(redisTemplate, context());
        }


        class ObjectRedisSerializer implements RedisSerializer{

            @Override
            public byte[] serialize(Object o) throws SerializationException {
                return SerializeUtils.serialize(o);
            }

            @Override
            public Object deserialize(byte[] bytes) throws SerializationException {
                return SerializeUtils.deserialize(bytes);
            }
        }
    }

    public static class RedisTrigger implements ScheduleTrigger{

        private RedisTemplate redisTemplate;
        private Context context;

        public RedisTrigger(RedisTemplate redisTemplate, Context context){
            this.redisTemplate = redisTemplate;
            this.context = context;
        }
        
        public void schedule(DelayTask delayTask){
            context.put(EXPIRATION_KEY, delayTask);
            redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
        }
    }

    @Slf4j
    public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

        private Context context;

        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }

        /**
         * 這里沒法拿到過期值
         * @param message never {@literal null}.
         */
        @SneakyThrows
        @Override
        public void doHandleMessage(Message message) {
            try {
                String topic = new String(message.getChannel(), "utf-8");
                String key = new String(message.getBody(), "utf-8");
                if (EXPIRATION_KEY.equals(key)) {
                    Object object = context.get(EXPIRATION_KEY);
                    if( object instanceof DelayTask ){
                        log.info("redis key[{}] 過期回調(diào)", key);
                        ((DelayTask) object).execute();
                    }
                }
            } catch (Exception e) {
                log.error("處理Redis延遲任務(wù)異常:{}", e.getMessage() ,e);
            }
        }

        public void setContext(Context context) {
            this.context = context;
        }
    }

    public static class Context{
        private Map< String,Object > context = new ConcurrentHashMap<  >();

        public void put(String key, Object value){
            context.put(key, value);
        }

        public Object get(String key){
            return context.get(key);
        }
    }
}
  1. MQ延遲隊列
    這里MQ選擇的是RabbitMq,要知道在RabbitMq中是沒有延遲隊列的,但可以通過延遲消息插件rabbitmq_delayed_message_exchange實現(xiàn),另外一種是基于死信來實現(xiàn)。

什么時候消息進(jìn)入死信?

  • 1)消息消費方調(diào)用了basicNack() 或 basicReject(),并且參數(shù)都是 requeue = false,則消息會路由進(jìn)死信隊列
  • 2)消息消費過期,過了TTL(消息、或隊列設(shè)置超時時間) 存活時間,就是消費方在 TTL 時間之內(nèi)沒有消費,則消息會路由進(jìn)死信隊列
  • 3)隊列設(shè)置了x-max-length 最大消息數(shù)量且當(dāng)前隊列中的消息已經(jīng)達(dá)到了這個數(shù)量,再次投遞,消息將被擠掉,被擠掉的消息會路由進(jìn)死信隊列
public class RabbitTimer{

    @Configuration
    @Import(RabbitAutoConfiguration.class)
    public static class Config{

        static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
        static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
        static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
        static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";

        @Bean
        public Queue ttlQueue(){
            return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
        }

        @Bean
        public Exchange ttlExchange(){
            return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
        }

        @Bean
        public Binding ttlBinding(){
            return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
        }

        @Bean
        public Queue commonQueue(){
            return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
                    .deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
                    .deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
                    .build();
        }

        @Bean
        public TtlMessageConsumer ttlMessageConsumer(){
            return new TtlMessageConsumer();
        }
        
        @Bean
        public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
            return new RabbitTrigger(rabbitTemplate);
        }
    }

    @Slf4j
    @RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
    public static class TtlMessageConsumer{

        @RabbitHandler
        public void handle(byte [] message){
            Object deserialize = SerializeUtils.deserialize(message);
            if( deserialize instanceof DelayTask ){
                ((DelayTask) deserialize).execute();
            }
        }

    }
    
    public static class RabbitTrigger implements ScheduleTrigger{

        @Autowired
        private RabbitTemplate rabbitTemplate;

        public RabbitTrigger(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
        
        public void schedule(DelayTask delayTask){
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
            Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
            rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
        }

    }

}
  1. HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {

    HashedWheelTimer timer = new HashedWheelTimer(200,
            TimeUnit.MILLISECONDS,
            100); // 時間輪中的槽數(shù)

    /**
     *
     */
    @Override
    public void schedule(DelayTask delayTask){
        TimerTask task = timeout - > delayTask.execute();
        //
        timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
    }

}

測試:

ScheduleTrigger.schedule(DelayTask delayTask);

結(jié)束語

通過幾個簡單的示例了解延遲隊列的實現(xiàn)方式,可以根據(jù)實際業(yè)務(wù)場景以及應(yīng)用架構(gòu)做出合理的選擇。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    6808

    瀏覽量

    88743
  • 服務(wù)器
    +關(guān)注

    關(guān)注

    12

    文章

    8958

    瀏覽量

    85085
  • 內(nèi)存
    +關(guān)注

    關(guān)注

    8

    文章

    2966

    瀏覽量

    73815
  • 定時器
    +關(guān)注

    關(guān)注

    23

    文章

    3231

    瀏覽量

    114329
  • 延遲
    +關(guān)注

    關(guān)注

    1

    文章

    70

    瀏覽量

    13504
收藏 人收藏

    評論

    相關(guān)推薦

    利用CAS技術(shù)實現(xiàn)無鎖隊列

    【 導(dǎo)讀 】:本文 主要講解利用CAS技術(shù)實現(xiàn)無鎖隊列。 關(guān)于無鎖隊列實現(xiàn),網(wǎng)上有很多文章,雖然本文可能和那些文章有所重復(fù),但是我還是想以我自己的
    的頭像 發(fā)表于 01-11 10:52 ?2244次閱讀
    利用CAS技術(shù)<b class='flag-5'>實現(xiàn)</b>無鎖<b class='flag-5'>隊列</b>

    深度解析數(shù)據(jù)結(jié)構(gòu)與算法篇之隊列及環(huán)形隊列實現(xiàn)

    的位置。 02 — 環(huán)形隊列實現(xiàn) 要想將元素放入隊列我們必須知道對頭和隊尾,在隊列長度不能無限大的條件下我們還要知道隊列的最大容量,我們還
    的頭像 發(fā)表于 06-18 10:07 ?1882次閱讀

    TencentOS-tiny中環(huán)形隊列實現(xiàn)

    ; 隊尾指針(可變):永遠(yuǎn)指向此隊列的最后一個數(shù)據(jù)元素; 隊列中的數(shù)據(jù)存儲方式有兩種: ① 基于靜態(tài)連續(xù)內(nèi)存(數(shù)組)存儲,如圖:② 基于動態(tài)內(nèi)存(鏈表節(jié)點)存儲,如圖: ? 后續(xù)都使用基于靜態(tài)內(nèi)存存儲的
    的頭像 發(fā)表于 10-08 16:30 ?1355次閱讀

    QueueForMcu 基于單片機實現(xiàn)隊列功能模塊

    QueueForMcu基于單片機實現(xiàn)隊列功能模塊,主要用于8位、16位、32位非運行RTOS的單片機應(yīng)用,兼容大多數(shù)單片機平臺。一、特性動態(tài)創(chuàng)建隊列對象動態(tài)設(shè)置隊列數(shù)據(jù)緩沖區(qū)靜態(tài)指定
    發(fā)表于 12-31 19:35 ?1次下載
    QueueForMcu 基于單片機<b class='flag-5'>實現(xiàn)</b>的<b class='flag-5'>隊列</b>功能模塊

    RTOS消息隊列的多種用途

      消息隊列可以以多種不同的方式使用。事實上,您可以編寫可能只使用消息隊列的相當(dāng)復(fù)雜的應(yīng)用程序。僅使用消息隊列可以減少代碼的大?。凑加每臻g),因為可以模擬許多其他服務(wù)(信號量、時間
    的頭像 發(fā)表于 06-29 14:57 ?2473次閱讀
    RTOS消息<b class='flag-5'>隊列</b>的多種用途

    實現(xiàn)一個雙端隊列的步驟簡析

    隊列是非常基礎(chǔ)且重要的數(shù)據(jù)結(jié)構(gòu),雙端隊列屬于隊列的升級。很多的算法都是基于隊列實現(xiàn),例如搜索中的bfs,圖論中的spfa,計算幾何中的me
    的頭像 發(fā)表于 10-27 18:11 ?1397次閱讀

    什么是消息隊列?消息隊列中間件重要嗎?

    應(yīng)用解耦:消息隊列減少了服務(wù)之間的耦合性,不同的服務(wù)可以通過消息隊列進(jìn)行通信,而不用關(guān)心彼此的實現(xiàn)細(xì)節(jié)。
    的頭像 發(fā)表于 11-07 14:55 ?1365次閱讀

    如何用Redis實現(xiàn)延遲隊列呢?

    前段時間有個小項目需要使用延遲任務(wù),談到延遲任務(wù),我腦子第一時間一閃而過的就是使用消息隊列來做,比如RabbitMQ的死信隊列又或者RocketMQ的
    的頭像 發(fā)表于 03-16 14:28 ?635次閱讀

    一種異步延遲隊列實現(xiàn)方式調(diào)研

    目前系統(tǒng)中有很多需要用到延時處理的功能:支付超時取消、排隊超時、短信、微信等提醒延遲發(fā)送、token刷新、會員卡過期等等。
    的頭像 發(fā)表于 03-31 10:10 ?578次閱讀

    嵌入式環(huán)形隊列和消息隊列實現(xiàn)

    嵌入式環(huán)形隊列和消息隊列實現(xiàn)數(shù)據(jù)緩存和通信的常見數(shù)據(jù)結(jié)構(gòu),廣泛應(yīng)用于嵌入式系統(tǒng)中的通信協(xié)議和領(lǐng)域。
    的頭像 發(fā)表于 04-14 11:52 ?1487次閱讀

    嵌入式環(huán)形隊列和消息隊列是如何去實現(xiàn)的?

    嵌入式環(huán)形隊列和消息隊列實現(xiàn)數(shù)據(jù)緩存和通信的常見數(shù)據(jù)結(jié)構(gòu),廣泛應(yīng)用于嵌入式系統(tǒng)中的通信協(xié)議和領(lǐng)域。
    發(fā)表于 05-20 14:55 ?1086次閱讀

    單片機消息隊列實現(xiàn)原理和機制

    單片機開發(fā)過程中通常會用到“消息隊列”,一般實現(xiàn)的方法有多種。 本文給大家分享一下隊列實現(xiàn)的原理和機制。
    的頭像 發(fā)表于 05-26 09:50 ?1463次閱讀
    單片機消息<b class='flag-5'>隊列</b>的<b class='flag-5'>實現(xiàn)</b>原理和機制

    RTOS消息隊列的應(yīng)用

    基于RTOS的應(yīng)用中,通常使用隊列機制實現(xiàn)任務(wù)間的數(shù)據(jù)交互,一個應(yīng)用程序可以有任意數(shù)量的消息隊列,每個消息隊列都有自己的用途。
    發(fā)表于 05-29 10:49 ?602次閱讀
    RTOS消息<b class='flag-5'>隊列</b>的應(yīng)用

    Disruptor高性能隊列的原理

    許多應(yīng)用程序依靠隊列在處理階段之間交換數(shù)據(jù)。我們的性能測試表明,當(dāng)以這種方式使用隊列時,其延遲成本與磁盤(基于RAID或SSD的磁盤系統(tǒng))的IO操作成本處于同一數(shù)量級都很慢。如果在一個
    的頭像 發(fā)表于 07-26 10:47 ?645次閱讀
    Disruptor高性能<b class='flag-5'>隊列</b>的原理

    嵌入式環(huán)形隊列與消息隊列實現(xiàn)原理

    嵌入式環(huán)形隊列,也稱為環(huán)形緩沖區(qū)或循環(huán)隊列,是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),用于在固定大小的存儲區(qū)域中高效地存儲和訪問數(shù)據(jù)。其主要特點包括固定大小的數(shù)組和兩個指針(頭指針和尾指針),分別指向隊列的起始位置和結(jié)束位置。
    的頭像 發(fā)表于 09-02 15:29 ?293次閱讀