0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Topic 模型的使用

科技綠洲 ? 來源:了不起 ? 作者:了不起 ? 2023-09-25 11:30 ? 次閱讀

RabbitMQ 是一個流行的開源消息隊列軟件,它提供了多種通信模型,例如發(fā)布/訂閱模型、路由模型、work模型等。在前面的文章中我們已經(jīng)介紹了前四種模型,本文將會學(xué)習(xí) RabbitMQ 中的 Topic 模型;接下來還會有關(guān)于RabbitMQ的系列教程,對你有幫助的話記得關(guān)注哦~

Topic 模型

Topic 模型是 RabbitMQ 的高級模型之一,Topic 模型使用了通配符的概念,可以匹配更靈活的路由規(guī)則。topic模式相當于是對路由模式的一個升級,topic模式主要就是在匹配的規(guī)則上可以實現(xiàn)模糊匹配。

在 Topic 模型中,生產(chǎn)者將消息發(fā)送到交換機,交換機根據(jù)消息的 routing key 將消息轉(zhuǎn)發(fā)到對應(yīng)的隊列中。與 Direct 模型不同的是,Topic 模型中 routing key 支持通配符匹配,其中 ' ' 可以匹配一個單詞,'#' 可以匹配多個單詞。例如,"order. " 可以匹配 "order.create","order.delete" 等消息,而 "order.#" 可以匹配 "order.create.one","order.delete.two" 等消息。

適用場景

Topic 模型適用于需要靈活的消息路由規(guī)則的場景,例如:

  1. 新聞網(wǎng)站訂閱分類消息;
  2. 電商網(wǎng)站訂閱商品分類消息;
  3. 金融機構(gòu)訂閱股票市場消息等。

演示

  1. 生產(chǎn)者

    // 生產(chǎn)者
    public class Producer {
        private static final String EXCHANGE_NAME = "exchange_topic_1";
        private static final String EXCHANGE_ROUTING_KEY1 = "topic.km";
        private static final String EXCHANGE_ROUTING_KEY2 = "topic.km.001";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            for (int i = 0; i < 100; i++) {
                // topic在路由模型的基礎(chǔ)上,只有路由的key發(fā)生改變,其余的都不變
                if (i % 2 == 0) {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型發(fā)送的第 " + i + " 條信息").getBytes());
                } else {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型發(fā)送的第 " + i + " 條信息").getBytes());
                }
            }
            channel.close();
            connection.close();
        }
    }
    
  2. 消費者

    // 消費者1
    public class Consumer1 {
        private static final String QUEUE_NAME = "queue_topic_1";
        private static final String EXCHANGE_NAME = "exchange_topic_1";
        private static final String EXCHANGE_ROUTING_KEY = "topic.*";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費者1接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    
    // 消費者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_topic_2";
        private static final String EXCHANGE_NAME = "exchange_topic_1";
        private static final String EXCHANGE_ROUTING_KEY = "topic.#";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費者2接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    

測試

先啟動2個消費者,再啟動生產(chǎn)者

消費者1訂閱的是 "order.*" 的消息,消費者2訂閱的是 "order.#" 的消息,可以得到以下結(jié)果:

消費者1接收到的消息是:"Topic 模型發(fā)送的偶數(shù)條消息"

消費者2接收到的消息是:"Topic 模型發(fā)送的全部消息"

小結(jié)

本文介紹了 RabbitMQ 通信模型中的 Topic 模型的使用,通過交換機和 routing key 實現(xiàn)更靈活的消息路由。在實際使用過程中,需要注意以下幾點:

  1. 路由鍵的格式應(yīng)該是多個單詞組成,用 '.' 分隔;
  2. '#' 匹配多個單詞,'*' 匹配一個單詞;
  3. 一個隊列可以綁定多個 routing key;
  4. 如果交換機沒有匹配到任何一個隊列,則會拋棄該消息。
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 交換機
    +關(guān)注

    關(guān)注

    20

    文章

    2610

    瀏覽量

    99093
  • 模型
    +關(guān)注

    關(guān)注

    1

    文章

    3112

    瀏覽量

    48658
  • rabbitmq
    +關(guān)注

    關(guān)注

    0

    文章

    17

    瀏覽量

    1011
收藏 人收藏

    評論

    相關(guān)推薦

    阿里云設(shè)備的物模型數(shù)據(jù)里面始終沒有值是為什么?

    如上圖,不知道講清楚沒有。 IG502自定義TOPIC 上發(fā)到阿里云沒問題。采用阿里云物模型的格式來上發(fā)就不行。請大佬指教!
    發(fā)表于 07-24 07:49

    LabVIEW編寫的簡單3D模型預(yù)覽工具

    ` 本帖最后由 ewb_topic 于 2018-8-7 16:29 編輯 最低兼容LabVIEW 8.6版本,支持ase、stl、wrl三種3D模型文件。小工具一個Pass:tlatlatla`
    發(fā)表于 08-06 10:55

    IoT物聯(lián)網(wǎng)平臺通信用Topic梳理

    簡介: 基于設(shè)備實踐梳理出來的Topic總結(jié)1.前言IoT物聯(lián)網(wǎng)平臺基于MQTT協(xié)議的Pub/Sub通信,那么topic和payload設(shè)計就很重要。我們可以定義出不同topic來處理不同業(yè)務(wù)場景
    發(fā)表于 11-07 14:08

    人機交互Topic推薦-AMiner 精選資料分享

    數(shù)據(jù)和實驗平臺。必讀論文:https://www.aminer.cn/topic人機交互、人機互動(英文:...
    發(fā)表于 09-10 07:16

    Topic通訊小結(jié)

    ROS實操入門系列(三)1、Topic通訊小結(jié)2、編寫ros驅(qū)動實現(xiàn)電機控制和編碼器讀取Topic通訊小結(jié)控制板驅(qū)動代碼實現(xiàn)及工具調(diào)試完整代碼Topic通訊小結(jié)Topic通訊特點通訊
    發(fā)表于 09-16 09:11

    想問一下有沒有MQTT SUBSCRIBE topic之後接收MESSAGE的例子?

    想問一下有沒有MQTT SUBSCRIBE topic 之後接收MESSAGE的例子?我只找到Code: Select all case MQTT_EVENT_DATA: {ESP_LOGI(TAG
    發(fā)表于 02-17 09:04

    請問一下MQTT_topic_t 結(jié)構(gòu)在哪里呢?

    ;MQTT_EVENT_CONNECTED"); mqtt_topic_t *mqtt_topic = get_mqtt_topic(); msg_id = esp_mqtt_client_publish(client
    發(fā)表于 03-02 06:30

    集成電路工藝專題實驗 Topic Experiments o

    集成電路工藝專題實驗 Topic Experiments of Integrate Circuit Techniques 實驗一 硅單晶(或多晶)薄膜的沉積………………………………………………………3實驗二 硅單晶外延層的質(zhì)
    發(fā)表于 03-06 14:06 ?5次下載

    TEP(Topic Embedded Products) 產(chǎn)品攜手基于Zynq的開發(fā)工具和系統(tǒng)模塊加速嵌入式設(shè)計

    Dyplo 開發(fā)系統(tǒng)可以讓設(shè)計工程師很容易使FPGA進行硬件加速 作者:Steve Leibson, 賽靈思戰(zhàn)略營銷與業(yè)務(wù)規(guī)劃總監(jiān) 荷蘭的Topic 公司幫助客戶開發(fā)了不同應(yīng)用領(lǐng)域的嵌入式產(chǎn)品,包括
    發(fā)表于 02-08 08:13 ?314次閱讀

    基于ESCM的動態(tài)主題情感混合模型

    針對現(xiàn)有模型無法進行微博主題情感演化分析的問題,提出一種基于主題情感混合模型( TSCM)和情感周期性理論的主題情感演化模型動態(tài)主題情感混合模型( DTSCM)。DTSCM通過捕獲不同
    發(fā)表于 01-02 10:38 ?0次下載

    Point Topic宣布:全球固網(wǎng)寬帶用戶已經(jīng)突破10億

    近日,市場研究公司Point Topic宣布,全球固網(wǎng)寬帶用戶已經(jīng)突破10億,其中80%是光纖連接用戶?;贏DSL的寬帶用戶數(shù)同比去年同期減少了8%。
    發(fā)表于 10-31 09:01 ?1339次閱讀

    Topic醫(yī)療開發(fā)平臺的最新進展

    Xilinx醫(yī)療產(chǎn)品營銷經(jīng)理Kamran Khan和Topic Embedded Products首席執(zhí)行官Rieny Rijnen分享了由Xilinx Zynq All Programmable SoC實現(xiàn)的Topic醫(yī)療開發(fā)平臺的最新進展,共同解決了這一問題。
    的頭像 發(fā)表于 11-22 06:51 ?3276次閱讀

    如何在LoRaWAN網(wǎng)關(guān)的網(wǎng)頁上設(shè)置MQTT的訂閱的Topic

    當我們將LoRaWAN網(wǎng)關(guān)設(shè)置為NS模式時可參見文章《將LoRaWAN網(wǎng)關(guān)設(shè)置為NS模式的操作方法》,我們就可以在LoRaWAN網(wǎng)關(guān)的網(wǎng)頁上進行操作,以設(shè)置該網(wǎng)關(guān)的MQTT訂閱的topic,從而
    發(fā)表于 06-12 17:29 ?1526次閱讀

    面向微博熱點話題的改進BBTM模型

    針對目前基于主題模型的微博短文本熱點話題發(fā)現(xiàn)存在特征稀疏、高維度以及需要人工指定主題數(shù)目等問題,提出一種基于改進突發(fā)詞對主題模型( bursty biterm topic model,BBTM
    發(fā)表于 06-09 14:20 ?3次下載

    如何在LoRaWAN網(wǎng)關(guān)上設(shè)置MQTT的Topic

    當我們將LoRaWAN網(wǎng)關(guān)設(shè)置為NS模式時,可參見文章如何將LoRaWAN網(wǎng)關(guān)設(shè)置為NS模式,我們就可以在LoRaWAN網(wǎng)關(guān)的網(wǎng)頁上進行操作,以設(shè)置該網(wǎng)關(guān)的MQTT訂閱的topic,從而為接下來采用MQTT訂閱獲取到LoRa節(jié)點數(shù)據(jù)建立基礎(chǔ)。
    發(fā)表于 08-20 10:10 ?1143次閱讀