RabbitMQ 是一個流行的開源消息隊列軟件,它提供了多種通信模型,例如發(fā)布/訂閱模型、路由模型、work模型等。在前面的文章中我們已經(jīng)介紹了前四種模型,本文將會學(xué)習(xí) RabbitMQ 中的 Topic 模型;接下來還會有關(guān)于RabbitMQ的系列教程,對你有幫助的話記得關(guān)注哦~
Topic 模型
Topic 模型是 RabbitMQ 的高級模型之一,Topic 模型使用了通配符的概念,可以匹配更靈活的路由規(guī)則。topic模式相當于是對路由模式的一個升級,topic模式主要就是在匹配的規(guī)則上可以實現(xiàn)模糊匹配。
在 Topic 模型中,生產(chǎn)者將消息發(fā)送到交換機,交換機根據(jù)消息的 routing key 將消息轉(zhuǎn)發(fā)到對應(yīng)的隊列中。與 Direct 模型不同的是,Topic 模型中 routing key 支持通配符匹配,其中 ' ' 可以匹配一個單詞,'#' 可以匹配多個單詞。例如,"order. " 可以匹配 "order.create","order.delete" 等消息,而 "order.#" 可以匹配 "order.create.one","order.delete.two" 等消息。
適用場景
Topic 模型適用于需要靈活的消息路由規(guī)則的場景,例如:
- 新聞網(wǎng)站訂閱分類消息;
- 電商網(wǎng)站訂閱商品分類消息;
- 金融機構(gòu)訂閱股票市場消息等。
演示
生產(chǎn)者
// 生產(chǎn)者 public class Producer { private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY1 = "topic.km"; private static final String EXCHANGE_ROUTING_KEY2 = "topic.km.001"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); for (int i = 0; i < 100; i++) { // topic在路由模型的基礎(chǔ)上,只有路由的key發(fā)生改變,其余的都不變 if (i % 2 == 0) { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型發(fā)送的第 " + i + " 條信息").getBytes()); } else { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型發(fā)送的第 " + i + " 條信息").getBytes()); } } channel.close(); connection.close(); } }
消費者
// 消費者1 public class Consumer1 { private static final String QUEUE_NAME = "queue_topic_1"; private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY = "topic.*"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者1接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
// 消費者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_topic_2"; private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY = "topic.#"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
測試
先啟動2個消費者,再啟動生產(chǎn)者
消費者1訂閱的是 "order.*" 的消息,消費者2訂閱的是 "order.#" 的消息,可以得到以下結(jié)果:
消費者1接收到的消息是:"Topic 模型發(fā)送的偶數(shù)條消息"
消費者2接收到的消息是:"Topic 模型發(fā)送的全部消息"
小結(jié)
本文介紹了 RabbitMQ 通信模型中的 Topic 模型的使用,通過交換機和 routing key 實現(xiàn)更靈活的消息路由。在實際使用過程中,需要注意以下幾點:
- 路由鍵的格式應(yīng)該是多個單詞組成,用 '.' 分隔;
- '#' 匹配多個單詞,'*' 匹配一個單詞;
- 一個隊列可以綁定多個 routing key;
- 如果交換機沒有匹配到任何一個隊列,則會拋棄該消息。
-
交換機
+關(guān)注
關(guān)注
20文章
2610瀏覽量
99093 -
模型
+關(guān)注
關(guān)注
1文章
3112瀏覽量
48658 -
rabbitmq
+關(guān)注
關(guān)注
0文章
17瀏覽量
1011
發(fā)布評論請先 登錄
相關(guān)推薦
評論