上一篇文章中,簡單的介紹了一下RabbitMQ,以及安裝和hello world。
有的小伙伴留言說看不懂其中的方法參數(shù),這里先解釋一下幾個(gè)基本的方法參數(shù)。
// 聲明隊(duì)列方法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* param1:queue 隊(duì)列的名字
* param2:durable 是否持久化;比如現(xiàn)在發(fā)送到隊(duì)列里面的消息,如果沒有持久化,重啟這個(gè)隊(duì)列后數(shù) 據(jù)會丟失(false) true:重啟之后數(shù)據(jù)依然在
* param3:exclusive 是否排外(是否是當(dāng)前連接的專屬隊(duì)列),排外的意思是:
* 1:連接關(guān)閉之后 這個(gè)隊(duì)列是否自動(dòng)刪除(false:不自動(dòng)刪除)
* 2:是否允許其他通道來進(jìn)行訪問這個(gè)數(shù)據(jù)(false:不允許)
* param4:autoDelete 是否自動(dòng)刪除
* 就是當(dāng)最后一個(gè)連接斷開的時(shí)候,是否自動(dòng)刪除這個(gè)隊(duì)列(false:不刪除)
* param5:arguments(map) 聲明隊(duì)列的時(shí)候,附帶的一些參數(shù)
*/
// 發(fā)送數(shù)據(jù)到隊(duì)列
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一個(gè)隊(duì)列消息...".getBytes());
/**
* param1:exchange 交換機(jī) 沒有就設(shè)置為 "" 值就可以了
* param2:routingKey 路由的key 現(xiàn)在沒有設(shè)置key,直接使用隊(duì)列的名字
* param3:BasicProperties 發(fā)送數(shù)據(jù)到隊(duì)列的時(shí)候,是否要帶一些參數(shù)。
* MessageProperties.PERSISTENT_TEXT_PLAIN表示沒有帶任何參數(shù)
* param4:body 向隊(duì)列中發(fā)送的消息數(shù)據(jù)
*/
Work模型
work模型稱為工作隊(duì)列或者競爭消費(fèi)者模式,多個(gè)消費(fèi)者消費(fèi)的數(shù)據(jù)之和才是原來隊(duì)列中的所有數(shù)據(jù),適用于流量的削峰。
img
演示
寫個(gè)簡單的測試:
生產(chǎn)者
public class Producer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes()); } channel.close(); connection.close(); } }
消費(fèi)者
// 消費(fèi)者1 public class Consumer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消費(fèi)者1接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
// 消費(fèi)者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消費(fèi)者2接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); // 這里加了個(gè)延遲,表示處理業(yè)務(wù)時(shí)間 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
結(jié)果
image-20221229210012145
image-20221229210046184
可以看出來:100條消息,消費(fèi)者之間是平分的,消費(fèi)者1 幾乎是瞬間完成,消費(fèi)者2 則是慢慢吞吞的運(yùn)行完畢,消費(fèi)者1大量時(shí)間處于空閑狀態(tài),消費(fèi)者2則一直忙碌。這顯然是不適用于實(shí)際開發(fā)中。
我們需要遵從一個(gè)原則,就是 能者多勞 ,消費(fèi)越快的人,消費(fèi)的越多;
現(xiàn)在我們把消費(fèi)者1和2的代碼中 // channel.basicQos(0, 1, false);
這行代碼取消注釋,再次運(yùn)行;
image-20221229211317632
image-20221229211335782
現(xiàn)在的結(jié)果就比較符合能者多勞,雖然你干的多,但是工資是一樣的呀~
work模型的一個(gè)主要的方法是basicQos()
;這里也解釋一下其參數(shù):
// 設(shè)置限流機(jī)制
channel.basicQos(0, 1, false);
/**
* param1: prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對消息本身的大小不限制
* param2: prefetchCount,告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息
* param3:global,是否將上面的設(shè)置應(yīng)用于整個(gè)通道,false表示只應(yīng)用于當(dāng)前消費(fèi)者
*/
小結(jié)
本文到這里就結(jié)束了,主要介紹了RabbitMQ通信模型中的work模型,適用于限流、削峰等應(yīng)用場景。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
6715瀏覽量
88308 -
通信
+關(guān)注
關(guān)注
18文章
5880瀏覽量
135313 -
模型
+關(guān)注
關(guān)注
1文章
3032瀏覽量
48357 -
Work
+關(guān)注
關(guān)注
0文章
9瀏覽量
8942 -
rabbitmq
+關(guān)注
關(guān)注
0文章
17瀏覽量
1000
發(fā)布評論請先 登錄
相關(guān)推薦
評論