作為優(yōu)秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。
部署 Pulsar
Pulsar 的部署方式主要有 3 種,本地安裝二進(jìn)制文件、docker 部署、在 Kubernetes 上部署。
本文采用 docker 部署一個(gè)單節(jié)點(diǎn)的 Pulsar 集群。實(shí)驗(yàn)環(huán)境是 2 核 CPU 和 4G 內(nèi)存。
部署命令如下:
dockerrun-it-p6650:6650-p8080:8080--mountsource=pulsardata,target=/pulsar/data--mountsource=pulsarconf,target=/pulsar/confapachepulsar/pulsar:2.9.1bin/pulsarstandalone
安裝過程可能會出現(xiàn)下面的錯誤:
unknownflag:--mount See'dockerrun--help'.
這是因?yàn)?docker 版本低,不支持 mount 參數(shù),把 docker 版本升級到 17.06 以上就可以了。
部署過程中可能會因?yàn)?a href="http://ttokpm.com/v/tag/1722/" target="_blank">網(wǎng)絡(luò)的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。
2022-01-08T22:27:58,726+0000[main]INFOorg.apache.pulsar.broker.PulsarService-messagingserviceisready,bootstrapserviceport=8080,brokerurl=pulsar://localhost:6650,cluster=standalone
本地單節(jié)點(diǎn)集群啟動后,會創(chuàng)建一個(gè) namespace,名字叫 public/default
Pulsar 客戶端
目前 Pulsar 支持多種語言的客戶端,包括:
Java 客戶端
Go 客戶端
Python 客戶端
C++ 客戶端
Node.js 客戶端
WebSocket 客戶端
C# 客戶端
SpringBoot 配置
使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:
org.apache.pulsar pulsar-client 2.9.1
然后在 properties 文件中添加配置:
# Pulsar 地址 pulsar.url=pulsar://192.168.59.155:6650 # topic pulsar.topic=testTopic # consumer group pulsar.subscription=topicGroup
創(chuàng)建 Client
創(chuàng)建客戶端非常簡單,代碼如下:
client=PulsarClient.builder() .serviceUrl(url) .build();
上面的 url 就是 properties 文件中定義的 pulsar.url 。
創(chuàng)建 Client 時(shí),即使集群沒有啟成功,程序也不會報(bào)錯,因?yàn)檫@時(shí)還沒有真正地去連接集群。
創(chuàng)建 Producer
producer=client.newProducer() .topic(topic) .compressionType(CompressionType.LZ4) .sendTimeout(0,TimeUnit.SECONDS) .enableBatching(true) .batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS) .batchingMaxMessages(1000) .maxPendingMessages(1000) .blockIfQueueFull(true) .roundRobinRouterBatchingPartitionSwitchFrequency(10) .batcherBuilder(BatcherBuilder.DEFAULT) .create();
創(chuàng)建 Producer,會真正的連接集群,這時(shí)如果集群有問題,就會報(bào)連接錯誤。
下面解釋一下創(chuàng)建 Producer 的參數(shù):
topic :Producer 要寫入的 topic。
compressionType :壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個(gè)策略才會生效。
sendTimeout :超時(shí)時(shí)間,如果 Producer 在超時(shí)時(shí)間為收到 ACK,會進(jìn)行重新發(fā)送。
enableBatching :是否開啟消息批量處理,這里默認(rèn) true,這個(gè)參數(shù)只有在異步發(fā)送 (sendAsync) 時(shí)才能生效,選擇同步發(fā)送會失效。
batchingMaxPublishDelay :批量發(fā)送消息的時(shí)間段,這里定義的是 10ms,需要注意的是,設(shè)置了批量時(shí)間,就不會受消息數(shù)量的影響。批量發(fā)送會把要發(fā)送的批量消息放在一個(gè)網(wǎng)絡(luò)包里發(fā)送出去,減少網(wǎng)絡(luò) IO 次數(shù),大大提高網(wǎng)卡的發(fā)送效率。
batchingMaxMessages :批量發(fā)送消息的最大數(shù)量。
maxPendingMessages :等待從 broker 接收 ACK 的消息隊(duì)列最大長度。如果這個(gè)隊(duì)列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設(shè)置了 blockIfQueueFull 值是 true。
blockIfQueueFull :Producer 發(fā)送消息時(shí)會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發(fā)送。
roundRobinRouterBatchingPartition-SwitchFrequency :如果發(fā)送消息時(shí)沒有指定 key,那默認(rèn)采用 round robin 的方式發(fā)送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。
創(chuàng)建 Consumer
Pulsar 的消費(fèi)模型如下圖:
圖片
從圖中可以看到,Consumer 要綁定一個(gè) subscription 才能進(jìn)行消費(fèi)。
consumer=client.newConsumer() .topic(topic) .subscriptionName(subscription) .subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .negativeAckRedeliveryDelay(60,TimeUnit.SECONDS) .receiverQueueSize(1000) .subscribe();
下面解釋一下創(chuàng)建 Consumer 的參數(shù):
topic :Consumer 要訂閱的 topic。
subscriptionNam e:consumer 要關(guān)聯(lián)的 subscription 名字。
subscriptionType :訂閱類型,Pulsar 支持四種類型訂閱:
Exclusive :獨(dú)占模式,同一個(gè) Topic 只能有一個(gè)消費(fèi)者,如果多個(gè)消費(fèi)者,就會出錯。
Failover :災(zāi)備模式,同一個(gè) Topic 可以有多個(gè)消費(fèi)者,但是只能有一個(gè)消費(fèi)者消費(fèi),其他消費(fèi)者作為故障轉(zhuǎn)移備用,如果當(dāng)前消費(fèi)者出了故障,就從備用消費(fèi)者中選擇一個(gè)進(jìn)行消費(fèi)。如下圖:
圖片
Shared :共享模式,同一個(gè) Topic 可以由多個(gè)消費(fèi)者訂閱和消費(fèi)。消息通過 round robin 輪詢機(jī)制分發(fā)給不同的消費(fèi)者,并且每個(gè)消息僅會被分發(fā)給一個(gè)消費(fèi)者。當(dāng)消費(fèi)者斷開,如果發(fā)送給它消息沒有被消費(fèi),這些消息會被重新分發(fā)給其它存活的消費(fèi)者。如下圖:
Key_Shared :消息和消費(fèi)者都會綁定一個(gè)key,消息只會發(fā)送給綁定同一個(gè)key的消費(fèi)者。如果有新消費(fèi)者建立連接或者有消費(fèi)者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費(fèi)者并發(fā)地消費(fèi)消息,又能保證同一Key下的消息順序。如下圖:
subscriptionInitialPosition :創(chuàng)建新的 subscription 時(shí)從哪里開始消費(fèi),有兩個(gè)選項(xiàng):
Latest :從最新的消息開始消費(fèi)
Earliest :從最早的消息開始消費(fèi)
negativeAckRedeliveryDelay :消費(fèi)失敗后間隔多久 broker 重新發(fā)送。
receiverQueueSize :在調(diào)用 receive 方法之前,最多能累積多少條消息。可以設(shè)置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設(shè)置為 0,可以防止批量消息多發(fā)給一個(gè) Consumer 而導(dǎo)致其他 Consumer 空閑。
Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:
Messagemessage=consumer.receive() CompletableFuturemessage=consumer.receiveAsync(); Messagesmessage=consumer.batchReceive(); CompletableFuture message=consumer.batchReceiveAsync();
對于批量接收,也可以設(shè)置批量接收的策略,代碼如下:
consumer=client.newConsumer() .topic(topic) .subscriptionName(subscription) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(1024*1024) .timeout(200,TimeUnit.MILLISECONDS) .build()) .subscribe();
代碼中的參數(shù)說明如下:
maxNumMessages :批量接收的最大消息數(shù)量。
maxNumBytes :批量接收消息的大小,這里是 1MB。
測試
首先編寫 Producer 發(fā)送消息的代碼,如下:
publicvoidsendMsg(Stringkey,Stringdata){ CompletableFuturefuture=producer.newMessage() .key(key) .value(data.getBytes()).sendAsync(); future.handle((v,ex)->{ if(ex==null){ logger.info("發(fā)送消息成功,key:{},msg:{}",key,data); }else{ logger.error("發(fā)送消息失敗,key:{},msg:{}",key,data); } returnnull; }); future.join(); logger.info("發(fā)送消息完成,key:{},msg:{}",key,data); }
然后編寫一個(gè) Consumer 消費(fèi)消息的代碼,如下:
publicvoidstart()throwsException{ while(true){ Messagemessage=consumer.receive(); Stringkey=message.getKey(); Stringdata=newString(message.getData()); Stringtopic=message.getTopicName(); if(StringUtils.isNotEmpty(data)){ try{ logger.info("收到消息,topic:{},key:{},data:{}",topic,key,data); }catch(Exceptione){ logger.error("接收消息異常,topic:{},key:{},data:{}",topic,key,data,e); } } consumer.acknowledge(message); } }
最后編寫一個(gè) Controller 類,調(diào)用 Producer 發(fā)送消息,代碼如下:
@RequestMapping("/send") @ResponseBody publicStringsend(@RequestParamStringkey,@RequestParamStringdata){ logger.info("收到消息發(fā)送請求,key:{},value:{}",key,data); pulsarProducer.sendMsg(key,data); return"success"; }
調(diào)用 Producer 發(fā)送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車
可以看到控制臺輸出下面日志:
2022-01-0822:42:33,199[pulsar-client-io-6-1][INFO]boot.pulsar.PulsarProducer-發(fā)送消息成功,key:key1,msg:data1 2022-01-0822:42:33,200[http-nio-8083-exec-1][INFO]boot.pulsar.PulsarProducer-發(fā)送消息完成,key:key1,msg:data1 2022-01-0822:42:33,232[Thread-22][INFO]boot.pulsar.PulsarConsumer-收到消息,topic//public/default/testTopic,key:key1,data:data1 2022-01-0822:43:14,498[pulsar-timer-5-1][INFO]org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl-[testTopic][topicGroup][7def6]Prefetchedmessages:0---Consumethroughputreceived:0.02msgs/s---0.00Mbit/s---Acksentrate:0.02ack/s---Failedmessages:0---batchmessages:0---Failedacks:0 2022-01-0822:43:14,961[pulsar-timer-9-1][INFO]org.apache.pulsar.client.impl.ProducerStatsRecorderImpl-[testTopic][standalone-9-0]Pendingmessages:0---Publishthroughput:0.02msg/s---0.00Mbit/s---Latency:med:69.000ms-95pct:69.000ms-99pct:69.000ms-99.9pct:69.000ms-max:69.000ms---Ackreceivedrate:0.02ack/s---Failedmessages:0
從日志中看到,這里使用的 namespace 就是創(chuàng)建集群時(shí)生成的public/default。
總結(jié)
從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。
審核編輯:劉清
-
ACK
+關(guān)注
關(guān)注
0文章
28瀏覽量
11131 -
URL
+關(guān)注
關(guān)注
0文章
139瀏覽量
15299 -
python
+關(guān)注
關(guān)注
55文章
4774瀏覽量
84386
原文標(biāo)題:Spring Boot 整合分布式消息平臺 Pulsar
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論