RabbitMQ 簡(jiǎn)介
RabbitMQ是一個(gè)開源的,在AMQP基礎(chǔ)上完整的,可復(fù)用的企業(yè)消息系統(tǒng)。
支持主流的操作系統(tǒng),Linux、Windows、MacOX等
多種開發(fā)語(yǔ)言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等
AMQP,即 Advanced Message Queuing Protocol(高級(jí)消息隊(duì)列協(xié)議),是一個(gè)網(wǎng)絡(luò)協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語(yǔ)言等條件的限制。2006年,AMQP 規(guī)范發(fā)布。
2007年,Rabbit 技術(shù)公司基于 AMQP 標(biāo)準(zhǔn)開發(fā)的 RabbitMQ 1.0 發(fā)布。RabbitMQ 采用 Erlang 語(yǔ)言開發(fā)。Erlang 語(yǔ)言由 Ericson 設(shè)計(jì),專門為開發(fā)高并發(fā)和分布式系統(tǒng)的一種語(yǔ)言,在電信領(lǐng)域使用廣泛。
RabbitMQ基本概念
RabbitMQ 基礎(chǔ)架構(gòu):
Broker
接收和分發(fā)消息的應(yīng)用,RabbitMQ Server就是 Message Broker
Virtual host
虛擬主機(jī),出于多租戶和安全因素設(shè)計(jì)的,把 AMQP 的基本組件劃分到一個(gè)虛擬的分組中,類似于網(wǎng)絡(luò)中的 namespace 概念。當(dāng)多個(gè)不同的用戶使用同一個(gè) RabbitMQ server 提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的 vhost 創(chuàng)建 exchange/queue 等,每一個(gè)虛擬主機(jī)都有AMQP的全套基礎(chǔ)組件,并且可以針對(duì)每個(gè)虛擬主機(jī)進(jìn)行權(quán)限以及數(shù)據(jù)分配,并且不同虛擬主機(jī)之間是完全隔離的。
Connection
客戶端與RabbitMQ進(jìn)行交互,首先就需要建立一個(gè)TPC連接。RabbitMQ為了減少性能開銷,也會(huì)在一個(gè)Connection中建立多個(gè)Channel,這樣便于客戶端進(jìn)行多線程連接,這些連接會(huì)復(fù)用同一個(gè)Connection的TCP通道,提高性能。
Channel
客戶端與RabbitMQ建立了連接,就會(huì)分配一個(gè)AMQP信道 Channel。每個(gè)信道都會(huì)被分配一個(gè)唯一的ID。
Exchange
消息隊(duì)列交換機(jī),消息發(fā)送到RabbitMQ中后,會(huì)首先進(jìn)入一個(gè)交換機(jī),然后由交換機(jī)負(fù)責(zé)將數(shù)據(jù)轉(zhuǎn)發(fā)到不同的隊(duì)列中。RabbitMQ中有多種不同類型的交換機(jī)來支持不同的路由策略。
交換機(jī)多用來與生產(chǎn)者打交道。生產(chǎn)者發(fā)送的消息通過Exchange交換機(jī)分配到各個(gè)不同的Queue隊(duì)列上,而對(duì)于消息消費(fèi)者來說,通常只需要關(guān)注自己的隊(duì)列就可以了。
Queue
消息隊(duì)列,隊(duì)列是實(shí)際保存數(shù)據(jù)的最小單位。隊(duì)列結(jié)構(gòu)天生就具有FIFO的順序。
Producer
消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送
Consumer
消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
消息發(fā)送者的固定步驟
1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
2.指定Nameserver地址
3.啟動(dòng)producer
4.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
5.發(fā)送消息
6.關(guān)閉生產(chǎn)者producer
消息消費(fèi)者的固定步驟
1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名
2.指定Nameserver地址
3.訂閱主題Topic和Tag
4.設(shè)置回調(diào)函數(shù),處理消息
5.啟動(dòng)消費(fèi)者consumer
編程模型
引入依賴
com.rabbitmq amqp-client 5.9.0
創(chuàng)建連接獲取Channel
ConnectionFactory factory = new ConnectionFactory() factory.setHost(HOST_NAME) factory.setPort(HOST_PORT) factory.setUsername(USER_NAME) factory.setPassword(PASSWORD) factory.setVirtualHost(VIRTUAL_HOST) Connection connection = factory.newConnection() Channel channel = connection.createChannel()
聲明Exchange(可選)
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Maparguments) throws IOException;
Exchange有四種類型: fanout、 topic 、headers 、direct
聲明queue
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments);
durable 表示是否持久化。Durable選項(xiàng)表示會(huì)將隊(duì)列的消息寫入硬盤,這樣服務(wù)重啟后這些消息就不會(huì)丟失。
聲明Exchange與Queue的綁定關(guān)系(可選)
channel.queueBind(String queue, String exchange, String routingKey) throws IOException;
聲明了Exchange和Queue,那么就還需要聲明Exchange與Queue的綁定關(guān)系Binding。有了這些Binding,Exchange才可以知道Producer發(fā)送過來的消息將要分發(fā)到哪些Queue上。
這些Binding涉及到消息的不同分發(fā)邏輯,與Exchange和Queue一樣,如果Broker上沒有建立綁定關(guān)系,那么RabbitMQ會(huì)按照客戶端的聲明,創(chuàng)建這些綁定關(guān)系。
發(fā)送消息
channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;
其中Exchange如果不需要,傳個(gè)空字符串。
props的這些配置項(xiàng),可以用RabbitMQ中提供的一個(gè)Builder對(duì)象來構(gòu)建。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder() //對(duì)應(yīng)頁(yè)面上的Properties部分,傳入一些預(yù)定的參數(shù)值。 builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()) builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()) //builder.headers(headers) builder.build() AMQP.BasicProperties prop = builder.build()
MessageProperties.PERSISTENT_TEXT_PLAIN是RabbitMQ提供的持久化消息的默認(rèn)配置。
消費(fèi)消息
被動(dòng)消費(fèi)模式
Consumer等待rabbitMQ 服務(wù)器將message推送過來再消費(fèi)。
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
主動(dòng)消費(fèi)模式
Comsumer主動(dòng)到rabbitMQ服務(wù)器上去拉取messge進(jìn)行消費(fèi)。
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)
消費(fèi)消息確認(rèn)
自動(dòng)ACK:autoAck為true,消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送ACK,如果消費(fèi)失敗了,后續(xù)也無法再消費(fèi)了
手動(dòng)ACK:autoAck為false,消息接收后,不會(huì)發(fā)送ACK,需要手動(dòng)調(diào)用 channel.basicAck 來通知服務(wù)器已經(jīng)消費(fèi)了該message.這樣即使Consumer在執(zhí)行message過程中出問題了,也不會(huì)造成消息丟失。
釋放資源
channel.close() conection.clouse()
消息模型
簡(jiǎn)單模式
最直接的方式,P端發(fā)送一個(gè)消息到一個(gè)指定的queue,中間不需要任何exchange規(guī)則。C端按queue方式進(jìn)行消費(fèi)。
在上圖的模型中,有以下概念:
P:生產(chǎn)者,也就是要發(fā)送消息的程序
C:消費(fèi)者:消息的接受者。
queue:消息隊(duì)列,圖中紅色部分??梢跃彺嫦ⅲ簧a(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))。
producer:
channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
consumer:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Work queues 工作隊(duì)列模式
Work Queues:與簡(jiǎn)單模式相比,多了一個(gè)或一些消費(fèi)端,多個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。一個(gè)消息只會(huì)被一個(gè)消費(fèi)者消費(fèi)。
一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者(競(jìng)爭(zhēng)關(guān)系),不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))。
應(yīng)用場(chǎng)景:對(duì)于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
Consumer: 每次拉取一條消息。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicQos(1); channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
Publish/Subscribe 發(fā)布訂閱
exchange type是 fanout。
在訂閱模型中,多了一個(gè) Exchange 角色,而且過程略有變化:
P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
C:消費(fèi)者,消息的接收者
Queue:消息隊(duì)列,接收消息、緩存消息Exchange:交換機(jī)(X)。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列,交換機(jī)需要與隊(duì)列進(jìn)行綁定,綁定之后;一個(gè)消息可以被多個(gè)消費(fèi)者都收到。
Direct:定向,把消息交給符合指定routing key 的隊(duì)列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與 Exchange 綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!producer只負(fù)責(zé)發(fā)送消息,至于消息進(jìn)入哪個(gè)queue,由exchange來分配。
使用場(chǎng)景:
所有消費(fèi)者獲得相同的消息,例如天氣預(yù)報(bào)。
生產(chǎn)者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
消費(fèi)者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout") String queueName = channel.queueDeclare().getQueue() channel.queueBind(queueName, EXCHANGE_NAME, "")
發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別:
工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)
發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))
發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī)
Rout 路由模式
exchange typ 是 direct。
P:生產(chǎn)者,向 Exchange 發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key
X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與 routing key 完全匹配的隊(duì)列
C1:消費(fèi)者,其所在隊(duì)列指定了需要 routing key 為 error 的消息
C2:消費(fèi)者,其所在隊(duì)列指定了需要 routing key 為 info、error、warning 的消息
路由模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定 routing key,消息會(huì)轉(zhuǎn)發(fā)到符合 routing key 的隊(duì)列。
生產(chǎn)者:
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
消費(fèi)者:
channel.exchangeDeclare(EXCHANGE_NAME, "direct") channel.queueBind(queueName, EXCHANGE_NAME, routingKey1) channel.queueBind(queueName, EXCHANGE_NAME, routingKey2) channel.basicConsume(queueName, true, consumer)
Topics 通配符模式
exchange type 是 topic
紅色 Queue:綁定的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會(huì)被匹配到
黃色 Queue:綁定的是 #.news ,因此凡是以 .news 結(jié)尾的 routing key 都會(huì)被匹配
對(duì)routingKey進(jìn)行了模糊匹配單詞之間用,隔開,* 代表一個(gè)具體的單詞。# 代表0個(gè)或多個(gè)單詞
Topic 主題模式可以實(shí)現(xiàn) Pub/Sub 發(fā)布與訂閱模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的時(shí)候可以使用通配符,顯得更加靈活。
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
Receiver:
channel.exchangeDeclare(EXCHANGE_NAME, "topic") channel.queueBind(queueName, EXCHANGE_NAME, routingKey1) channel.queueBind(queueName, EXCHANGE_NAME, routingKey2) channel.basicConsume(queueName, true, consumer)
發(fā)送消息確認(rèn)
發(fā)送的消息如果沒有被消費(fèi)者及時(shí)消費(fèi)有可能會(huì)導(dǎo)致消息丟失。
發(fā)送者確認(rèn)模式默認(rèn)是不開啟的,所以如果需要開啟發(fā)送者確認(rèn)模式,需要手動(dòng)在channel中進(jìn)行聲明。
channel.confirmSelect()
使用異步確認(rèn)消息保證消息在生產(chǎn)端不丟失。
Producer在channel中注冊(cè)監(jiān)聽器來對(duì)消息進(jìn)行確認(rèn)。核心代碼:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
ConfirmCallback,監(jiān)聽器接口,里面只有一個(gè)方法:
void handle(long sequenceNumber, boolean multiple) throws IOException;
這方法中的兩個(gè)參數(shù),
sequenceNumer:這個(gè)是一個(gè)唯一的序列號(hào),代表一個(gè)唯一的消息。在RabbitMQ中,他的消息體只是一個(gè)二進(jìn)制數(shù)組,默認(rèn)消息是沒有序列號(hào)的。那么在回調(diào)的時(shí)候,Producer怎么知道是哪一條消息成功或者失敗呢?RabbitMQ提供了一個(gè)方法int sequenceNumber = channel.getNextPublishSeqNo();來生成一個(gè)全局遞增的序列號(hào),這個(gè)序列號(hào)將會(huì)分配給新發(fā)送的那一條消息。然后應(yīng)用程序需要自己來將這個(gè)序列號(hào)與消息對(duì)應(yīng)起來。沒錯(cuò)!是的!需要客戶端自己去做對(duì)應(yīng)!
multiple:這個(gè)是一個(gè)Boolean型的參數(shù)。如果是false,就表示這一次只確認(rèn)了當(dāng)前一條消息。如果是true,就表示RabbitMQ這一次確認(rèn)了一批消息,在sequenceNumber之前的所有消息都已經(jīng)確認(rèn)完成了。
SpringBoot集成RabbitMQ
添加依賴
org.springframework.boot spring-boot-starter-amqp
配置文件
server: port: 8081 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /
rabbitMQ配置類
配置Exchange、Queue、及綁定交換機(jī),下面配置Topic交換機(jī)。
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; * RabbitmqConfig */ @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
發(fā)送消息
String message = "hello world" rabbitTemplate.convertAndSend(RabbitmqTopicConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message)
消費(fèi)消息
消費(fèi)者都是通過@RabbitListener注解來聲明。在@RabbitMQListener注解中包含了非常多對(duì)Queue進(jìn)行定制的屬性,大部分的屬性都是有默認(rèn)值的。
@RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_EMAIL}) public void receive_email(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_EMAIL msg"+msg); } @RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_SMS}) public void receive_sms(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_SMS msg"+msg); }
-
Linux
+關(guān)注
關(guān)注
87文章
11207瀏覽量
208712 -
編程
+關(guān)注
關(guān)注
88文章
3565瀏覽量
93535 -
開源
+關(guān)注
關(guān)注
3文章
3215瀏覽量
42310 -
rabbitmq
+關(guān)注
關(guān)注
0文章
17瀏覽量
1011
原文標(biāo)題:RabbitMq 入門教程看這一篇就夠了!
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論