0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

RabbitMQ通信模型中的work模型

科技綠洲 ? 來源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-09-25 14:34 ? 次閱讀

上一篇文章中,簡單的介紹了一下RabbitMQ,以及安裝和hello world。

有的小伙伴留言說看不懂其中的方法參數(shù),這里先解釋一下幾個(gè)基本的方法參數(shù)。

// 聲明隊(duì)列方法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * param1:queue 隊(duì)列的名字
 * param2:durable 是否持久化;比如現(xiàn)在發(fā)送到隊(duì)列里面的消息,如果沒有持久化,重啟這個(gè)隊(duì)列后數(shù) 據(jù)會丟失(false) true:重啟之后數(shù)據(jù)依然在
 * param3:exclusive 是否排外(是否是當(dāng)前連接的專屬隊(duì)列),排外的意思是:
 *            1:連接關(guān)閉之后 這個(gè)隊(duì)列是否自動(dòng)刪除(false:不自動(dòng)刪除)
 *            2:是否允許其他通道來進(jìn)行訪問這個(gè)數(shù)據(jù)(false:不允許) 
 * param4:autoDelete 是否自動(dòng)刪除
 *            就是當(dāng)最后一個(gè)連接斷開的時(shí)候,是否自動(dòng)刪除這個(gè)隊(duì)列(false:不刪除)
 * param5:arguments(map) 聲明隊(duì)列的時(shí)候,附帶的一些參數(shù)
 */
// 發(fā)送數(shù)據(jù)到隊(duì)列
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一個(gè)隊(duì)列消息...".getBytes());
/**
 * param1:exchange  交換機(jī)  沒有就設(shè)置為 "" 值就可以了
 * param2:routingKey 路由的key  現(xiàn)在沒有設(shè)置key,直接使用隊(duì)列的名字
 * param3:BasicProperties 發(fā)送數(shù)據(jù)到隊(duì)列的時(shí)候,是否要帶一些參數(shù)。
 *      MessageProperties.PERSISTENT_TEXT_PLAIN表示沒有帶任何參數(shù)
 * param4:body 向隊(duì)列中發(fā)送的消息數(shù)據(jù)
 */

Work模型

work模型稱為工作隊(duì)列或者競爭消費(fèi)者模式,多個(gè)消費(fèi)者消費(fèi)的數(shù)據(jù)之和才是原來隊(duì)列中的所有數(shù)據(jù),適用于流量的削峰。

圖片
img

演示

寫個(gè)簡單的測試:

  1. 生產(chǎn)者

    public class Producer {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes());
            }
            channel.close();
            connection.close();
        }
    
    }
    
  2. 消費(fèi)者

    // 消費(fèi)者1
    public class Consumer {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.basicQos(0, 1, false);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(System.currentTimeMillis() + "消費(fèi)者1接收到信息:" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    
    }
    
    // 消費(fèi)者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.basicQos(0, 1, false);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(System.currentTimeMillis() + "消費(fèi)者2接收到信息:" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    // 這里加了個(gè)延遲,表示處理業(yè)務(wù)時(shí)間
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    }
    
  3. 結(jié)果
    圖片
    image-20221229210012145

圖片
image-20221229210046184

可以看出來:100條消息,消費(fèi)者之間是平分的,消費(fèi)者1 幾乎是瞬間完成,消費(fèi)者2 則是慢慢吞吞的運(yùn)行完畢,消費(fèi)者1大量時(shí)間處于空閑狀態(tài),消費(fèi)者2則一直忙碌。這顯然是不適用于實(shí)際開發(fā)中。

我們需要遵從一個(gè)原則,就是 能者多勞 ,消費(fèi)越快的人,消費(fèi)的越多;

現(xiàn)在我們把消費(fèi)者1和2的代碼中 // channel.basicQos(0, 1, false); 這行代碼取消注釋,再次運(yùn)行;

圖片
image-20221229211317632

圖片
image-20221229211335782

現(xiàn)在的結(jié)果就比較符合能者多勞,雖然你干的多,但是工資是一樣的呀~

work模型的一個(gè)主要的方法是basicQos();這里也解釋一下其參數(shù):

// 設(shè)置限流機(jī)制
channel.basicQos(0, 1, false);
/**  
 *  param1: prefetchSize,消息本身的大小 如果設(shè)置為0  那么表示對消息本身的大小不限制
 *  param2: prefetchCount,告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息
 *  param3:global,是否將上面的設(shè)置應(yīng)用于整個(gè)通道,false表示只應(yīng)用于當(dāng)前消費(fèi)者
 */

小結(jié)

本文到這里就結(jié)束了,主要介紹了RabbitMQ通信模型中的work模型,適用于限流、削峰等應(yīng)用場景。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    6715

    瀏覽量

    88308
  • 通信
    +關(guān)注

    關(guān)注

    18

    文章

    5880

    瀏覽量

    135313
  • 模型
    +關(guān)注

    關(guān)注

    1

    文章

    3032

    瀏覽量

    48357
  • Work
    +關(guān)注

    關(guān)注

    0

    文章

    9

    瀏覽量

    8942
  • rabbitmq
    +關(guān)注

    關(guān)注

    0

    文章

    17

    瀏覽量

    1000
收藏 人收藏

    評論

    相關(guān)推薦

    RabbitMQ的發(fā)布訂閱模型

    上一篇文章,簡單的介紹了一下RabbitMQwork模型。這篇文章來學(xué)習(xí)一下RabbitMQ
    的頭像 發(fā)表于 09-25 14:30 ?450次閱讀
    <b class='flag-5'>RabbitMQ</b><b class='flag-5'>中</b>的發(fā)布訂閱<b class='flag-5'>模型</b>

    CAN總線通信協(xié)議模型概述 CAN總線通信模型作用

    參照 ISO/OSI 標(biāo)準(zhǔn)模型,CAN 總線的通信參考模型如圖 9-1 所示。這 4 層結(jié)構(gòu)的功能如下:? 物理層規(guī)定了節(jié)點(diǎn)的全部電氣特性,在一個(gè)網(wǎng)絡(luò)里,要實(shí)現(xiàn)不同節(jié)點(diǎn)間的數(shù)據(jù)傳輸,所有節(jié)點(diǎn)的物理層
    發(fā)表于 12-14 14:17

    MQTT的通信模型及消息

     MQTT通信模型    MQTT協(xié)議是基于客戶端-服務(wù)器模型,在協(xié)議主要有三種身份:發(fā)布者(Publisher)、服務(wù)器(Broker) 以及訂閱者(Subscriber)。 并且消息發(fā)布者可以
    發(fā)表于 01-19 15:57

    基于VxWorks實(shí)時(shí)操作系統(tǒng)的通信模型該怎樣去設(shè)計(jì)?

    多任務(wù)實(shí)時(shí)操作系統(tǒng)VxWorks是什么?與傳統(tǒng)通信機(jī)制相比,模塊間通信模型有什么優(yōu)勢?基于VxWorks實(shí)時(shí)操作系統(tǒng)的通信模型該怎樣去設(shè)計(jì)?
    發(fā)表于 04-26 06:25

    移動(dòng)Agent位置透明通信模型的設(shè)計(jì)

    提出一種高效可靠的移動(dòng)Agent通信模型――D-C通信模型,結(jié)合域名字解析器和移動(dòng)Agent系統(tǒng)的Communicator實(shí)現(xiàn)移動(dòng)Agent之間的通信。通過引入一種基于全局的、與位置
    發(fā)表于 04-16 08:53 ?26次下載

    過程控制工業(yè)以太網(wǎng)通信模型探討

    提出了建立在交換式以太網(wǎng)和IEEE 802.1Q/P 技術(shù)基礎(chǔ)上用于過程控制的以太網(wǎng)通信模型REPC,并進(jìn)行了分析。關(guān)鍵詞:通信模型工業(yè)以太網(wǎng) 過程控制Abstract: REPC, a communication model of in
    發(fā)表于 06-19 08:34 ?27次下載

    數(shù)據(jù)網(wǎng)格基于優(yōu)化機(jī)制的通信模型

    針對基于多計(jì)算機(jī)機(jī)群構(gòu)成的網(wǎng)格的大規(guī)模并行計(jì)算的需要,對多級分組通信模型的單一機(jī)群分組通信進(jìn)行了研究。探討了在單一機(jī)群內(nèi)的主動(dòng)節(jié)點(diǎn)、被動(dòng)節(jié)點(diǎn)個(gè)數(shù)和各個(gè)計(jì)算節(jié)點(diǎn)
    發(fā)表于 06-25 13:52 ?12次下載

    基于VxWorks的通信模型設(shè)計(jì)

    本文提出了一種任務(wù)間的通信模型,將用于網(wǎng)絡(luò)通信的UDP方式引進(jìn)到任務(wù)間的通信中,使通信更加靈活和便于管理,改善了整個(gè)系統(tǒng)的性能。
    發(fā)表于 06-01 10:07 ?986次閱讀
    基于VxWorks的<b class='flag-5'>通信模型</b>設(shè)計(jì)

    企業(yè)資產(chǎn)管理系統(tǒng)通信模型的研究與實(shí)現(xiàn)

    為了改善企業(yè)資產(chǎn)管理(EAM)系統(tǒng)在用戶體驗(yàn)、模塊間數(shù)據(jù)傳輸效率及耦合度等方面的不足,構(gòu)建了基于Silverlight與WCF技術(shù)研究與實(shí)現(xiàn)EAM系統(tǒng)通信模型。利用Silverlight構(gòu)建客戶端提升
    發(fā)表于 07-06 16:57 ?34次下載
    企業(yè)資產(chǎn)管理系統(tǒng)<b class='flag-5'>中</b><b class='flag-5'>通信模型</b>的研究與實(shí)現(xiàn)

    網(wǎng)絡(luò)通信模型

    網(wǎng)絡(luò)通信模型,在基礎(chǔ)講解的前提下,建立數(shù)學(xué)模型來分析。
    發(fā)表于 03-15 13:56 ?9次下載

    一種基于Kademlia的P2P語音通信模型

    一種基于Kademlia的P2P語音通信模型_陳立全
    發(fā)表于 01-07 16:52 ?3次下載

    基于Zigbee的無線智能輸液通信模型設(shè)計(jì)楊艷

    基于Zigbee的無線智能輸液通信模型設(shè)計(jì)_楊艷
    發(fā)表于 03-16 08:00 ?3次下載

    Topic 模型的使用

    RabbitMQ 是一個(gè)流行的開源消息隊(duì)列軟件,它提供了多種通信模型,例如發(fā)布/訂閱模型、路由模型、work
    的頭像 發(fā)表于 09-25 11:30 ?552次閱讀

    RabbitMQ的路由模型(direct)

    路由模型 RabbitMQ 提供了五種不同的通信模型,上一篇文章,簡單的介紹了一下RabbitMQ的發(fā)布訂閱
    的頭像 發(fā)表于 09-25 11:32 ?403次閱讀

    什么是通信模型DDS

    完成的,它相當(dāng)于是ROS機(jī)器人系統(tǒng)的神經(jīng)網(wǎng)絡(luò)。 通信模型 DDS的核心是通信,能夠?qū)崿F(xiàn)通信模型和軟件框架非常多,這里我們列出常用的四種
    的頭像 發(fā)表于 11-24 17:50 ?1286次閱讀