0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Spring Boot整合分布式消息平臺Pulsar介紹

jf_ro2CN3Fa ? 來源:君哥聊技術(shù) ? 2023-07-12 09:58 ? 次閱讀

作為優(yōu)秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。

部署 Pulsar

Pulsar 的部署方式主要有 3 種,本地安裝二進(jìn)制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一個(gè)單節(jié)點(diǎn)的 Pulsar 集群。實(shí)驗(yàn)環(huán)境是 2 核 CPU4G 內(nèi)存。

部署命令如下:

dockerrun-it-p6650:6650-p8080:8080--mountsource=pulsardata,target=/pulsar/data--mountsource=pulsarconf,target=/pulsar/confapachepulsar/pulsar:2.9.1bin/pulsarstandalone

安裝過程可能會出現(xiàn)下面的錯誤:

unknownflag:--mount
See'dockerrun--help'.

這是因?yàn)?docker 版本低,不支持 mount 參數(shù),把 docker 版本升級到 17.06 以上就可以了。

部署過程中可能會因?yàn)?a href="http://ttokpm.com/v/tag/1722/" target="_blank">網(wǎng)絡(luò)的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。

2022-01-08T22:27:58,726+0000[main]INFOorg.apache.pulsar.broker.PulsarService-messagingserviceisready,bootstrapserviceport=8080,brokerurl=pulsar://localhost:6650,cluster=standalone

本地單節(jié)點(diǎn)集群啟動后,會創(chuàng)建一個(gè) namespace,名字叫 public/default

Pulsar 客戶端

目前 Pulsar 支持多種語言的客戶端,包括:

Java 客戶端

Go 客戶端

Python 客戶端

C++ 客戶端

Node.js 客戶端

WebSocket 客戶端

C# 客戶端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:


org.apache.pulsar
pulsar-client
2.9.1

然后在 properties 文件中添加配置:

# Pulsar 地址
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

創(chuàng)建 Client

創(chuàng)建客戶端非常簡單,代碼如下:

client=PulsarClient.builder()
.serviceUrl(url)
.build();

上面的 url 就是 properties 文件中定義的 pulsar.url 。

創(chuàng)建 Client 時(shí),即使集群沒有啟成功,程序也不會報(bào)錯,因?yàn)檫@時(shí)還沒有真正地去連接集群。

創(chuàng)建 Producer

producer=client.newProducer()
.topic(topic)
.compressionType(CompressionType.LZ4)
.sendTimeout(0,TimeUnit.SECONDS)
.enableBatching(true)
.batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
.maxPendingMessages(1000)
.blockIfQueueFull(true)
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();

創(chuàng)建 Producer,會真正的連接集群,這時(shí)如果集群有問題,就會報(bào)連接錯誤。

下面解釋一下創(chuàng)建 Producer 的參數(shù):

topic :Producer 要寫入的 topic。

compressionType :壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個(gè)策略才會生效。

sendTimeout :超時(shí)時(shí)間,如果 Producer 在超時(shí)時(shí)間為收到 ACK,會進(jìn)行重新發(fā)送。

enableBatching :是否開啟消息批量處理,這里默認(rèn) true,這個(gè)參數(shù)只有在異步發(fā)送 (sendAsync) 時(shí)才能生效,選擇同步發(fā)送會失效。

batchingMaxPublishDelay :批量發(fā)送消息的時(shí)間段,這里定義的是 10ms,需要注意的是,設(shè)置了批量時(shí)間,就不會受消息數(shù)量的影響。批量發(fā)送會把要發(fā)送的批量消息放在一個(gè)網(wǎng)絡(luò)包里發(fā)送出去,減少網(wǎng)絡(luò) IO 次數(shù),大大提高網(wǎng)卡的發(fā)送效率。

batchingMaxMessages :批量發(fā)送消息的最大數(shù)量。

maxPendingMessages :等待從 broker 接收 ACK 的消息隊(duì)列最大長度。如果這個(gè)隊(duì)列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設(shè)置了 blockIfQueueFull 值是 true。

blockIfQueueFull :Producer 發(fā)送消息時(shí)會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發(fā)送。

roundRobinRouterBatchingPartition-SwitchFrequency :如果發(fā)送消息時(shí)沒有指定 key,那默認(rèn)采用 round robin 的方式發(fā)送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。

創(chuàng)建 Consumer

Pulsar 的消費(fèi)模型如下圖:

f5c11b2e-2053-11ee-962d-dac502259ad0.png圖片

從圖中可以看到,Consumer 要綁定一個(gè) subscription 才能進(jìn)行消費(fèi)。

consumer=client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(60,TimeUnit.SECONDS)
.receiverQueueSize(1000)
.subscribe();

下面解釋一下創(chuàng)建 Consumer 的參數(shù):

topic :Consumer 要訂閱的 topic。

subscriptionNam e:consumer 要關(guān)聯(lián)的 subscription 名字。

subscriptionType :訂閱類型,Pulsar 支持四種類型訂閱:

Exclusive :獨(dú)占模式,同一個(gè) Topic 只能有一個(gè)消費(fèi)者,如果多個(gè)消費(fèi)者,就會出錯。

Failover :災(zāi)備模式,同一個(gè) Topic 可以有多個(gè)消費(fèi)者,但是只能有一個(gè)消費(fèi)者消費(fèi),其他消費(fèi)者作為故障轉(zhuǎn)移備用,如果當(dāng)前消費(fèi)者出了故障,就從備用消費(fèi)者中選擇一個(gè)進(jìn)行消費(fèi)。如下圖:

f5de2fde-2053-11ee-962d-dac502259ad0.png圖片

Shared :共享模式,同一個(gè) Topic 可以由多個(gè)消費(fèi)者訂閱和消費(fèi)。消息通過 round robin 輪詢機(jī)制分發(fā)給不同的消費(fèi)者,并且每個(gè)消息僅會被分發(fā)給一個(gè)消費(fèi)者。當(dāng)消費(fèi)者斷開,如果發(fā)送給它消息沒有被消費(fèi),這些消息會被重新分發(fā)給其它存活的消費(fèi)者。如下圖:

f6008dcc-2053-11ee-962d-dac502259ad0.png

Key_Shared :消息和消費(fèi)者都會綁定一個(gè)key,消息只會發(fā)送給綁定同一個(gè)key的消費(fèi)者。如果有新消費(fèi)者建立連接或者有消費(fèi)者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費(fèi)者并發(fā)地消費(fèi)消息,又能保證同一Key下的消息順序。如下圖:

f6357d02-2053-11ee-962d-dac502259ad0.png

subscriptionInitialPosition :創(chuàng)建新的 subscription 時(shí)從哪里開始消費(fèi),有兩個(gè)選項(xiàng):

Latest :從最新的消息開始消費(fèi)

Earliest :從最早的消息開始消費(fèi)

negativeAckRedeliveryDelay :消費(fèi)失敗后間隔多久 broker 重新發(fā)送。

receiverQueueSize :在調(diào)用 receive 方法之前,最多能累積多少條消息。可以設(shè)置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設(shè)置為 0,可以防止批量消息多發(fā)給一個(gè) Consumer 而導(dǎo)致其他 Consumer 空閑。

Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:

Messagemessage=consumer.receive()
CompletableFuturemessage=consumer.receiveAsync();
Messagesmessage=consumer.batchReceive();
CompletableFuturemessage=consumer.batchReceiveAsync();

對于批量接收,也可以設(shè)置批量接收的策略,代碼如下:

consumer=client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024*1024)
.timeout(200,TimeUnit.MILLISECONDS)
.build())
.subscribe();

代碼中的參數(shù)說明如下:

maxNumMessages :批量接收的最大消息數(shù)量。

maxNumBytes :批量接收消息的大小,這里是 1MB。

測試

首先編寫 Producer 發(fā)送消息的代碼,如下:

publicvoidsendMsg(Stringkey,Stringdata){
CompletableFuturefuture=producer.newMessage()
.key(key)
.value(data.getBytes()).sendAsync();
future.handle((v,ex)->{
if(ex==null){
logger.info("發(fā)送消息成功,key:{},msg:{}",key,data);
}else{
logger.error("發(fā)送消息失敗,key:{},msg:{}",key,data);
}
returnnull;
});
future.join();
logger.info("發(fā)送消息完成,key:{},msg:{}",key,data);
}

然后編寫一個(gè) Consumer 消費(fèi)消息的代碼,如下:

publicvoidstart()throwsException{
while(true){
Messagemessage=consumer.receive();
Stringkey=message.getKey();
Stringdata=newString(message.getData());
Stringtopic=message.getTopicName();
if(StringUtils.isNotEmpty(data)){
try{
logger.info("收到消息,topic:{},key:{},data:{}",topic,key,data);
}catch(Exceptione){
logger.error("接收消息異常,topic:{},key:{},data:{}",topic,key,data,e);
}
}
consumer.acknowledge(message);
}
}

最后編寫一個(gè) Controller 類,調(diào)用 Producer 發(fā)送消息,代碼如下:

@RequestMapping("/send")
@ResponseBody
publicStringsend(@RequestParamStringkey,@RequestParamStringdata){
logger.info("收到消息發(fā)送請求,key:{},value:{}",key,data);
pulsarProducer.sendMsg(key,data);
return"success";
}

調(diào)用 Producer 發(fā)送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車

可以看到控制臺輸出下面日志:

2022-01-0822:42:33,199[pulsar-client-io-6-1][INFO]boot.pulsar.PulsarProducer-發(fā)送消息成功,key:key1,msg:data1
2022-01-0822:42:33,200[http-nio-8083-exec-1][INFO]boot.pulsar.PulsarProducer-發(fā)送消息完成,key:key1,msg:data1
2022-01-0822:42:33,232[Thread-22][INFO]boot.pulsar.PulsarConsumer-收到消息,topic//public/default/testTopic,key:key1,data:data1
2022-01-0822:43:14,498[pulsar-timer-5-1][INFO]org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl-[testTopic][topicGroup][7def6]Prefetchedmessages:0---Consumethroughputreceived:0.02msgs/s---0.00Mbit/s---Acksentrate:0.02ack/s---Failedmessages:0---batchmessages:0---Failedacks:0
2022-01-0822:43:14,961[pulsar-timer-9-1][INFO]org.apache.pulsar.client.impl.ProducerStatsRecorderImpl-[testTopic][standalone-9-0]Pendingmessages:0---Publishthroughput:0.02msg/s---0.00Mbit/s---Latency:med:69.000ms-95pct:69.000ms-99pct:69.000ms-99.9pct:69.000ms-max:69.000ms---Ackreceivedrate:0.02ack/s---Failedmessages:0

從日志中看到,這里使用的 namespace 就是創(chuàng)建集群時(shí)生成的public/default。

總結(jié)

從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • ACK
    ACK
    +關(guān)注

    關(guān)注

    0

    文章

    28

    瀏覽量

    11131
  • URL
    URL
    +關(guān)注

    關(guān)注

    0

    文章

    139

    瀏覽量

    15299
  • python
    +關(guān)注

    關(guān)注

    55

    文章

    4774

    瀏覽量

    84386

原文標(biāo)題:Spring Boot 整合分布式消息平臺 Pulsar

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    Spring Boot Starter需要些什么

    pulsar-spring-boot-starter是非常有必要的,在此之前,我們先看看一個(gè)starter需要些什么。 Spring Boot Starter spring-boot
    的頭像 發(fā)表于 09-25 11:35 ?715次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> Starter需要些什么

    HarmonyOS應(yīng)用開發(fā)-分布式設(shè)計(jì)

    設(shè)計(jì)理念HarmonyOS 是面向未來全場景智慧生活方式的分布式操作系統(tǒng)。對消費(fèi)者而言,HarmonyOS 將生活場景中的各類終端進(jìn)行能力整合,形成“One Super Device”,以實(shí)現(xiàn)
    發(fā)表于 09-22 17:11

    啟動Spring Boot項(xiàng)目應(yīng)用的三種方法

    ,從而使開發(fā)人員不再需要定義樣板化的配置。用我的話來理解,就是spring boot其實(shí)不是什么新的框架,它默認(rèn)配置了很多框架的使用方式,就像maven整合了所有的jar包,spring
    發(fā)表于 01-14 17:33

    如何高效完成HarmonyOS分布式應(yīng)用測試?

    及云測平臺接入Portal共5項(xiàng)測試服務(wù),詳見圖2。針對分布式應(yīng)用測試面臨的挑戰(zhàn),我們接下來將重點(diǎn)介紹分布式UI測試框架和評分工具。(1)分布式
    發(fā)表于 12-13 18:07

    Spring Boot嵌入Web容器原理是什么

    Spring Boot嵌入Web容器原理Spring Boot的目標(biāo)是構(gòu)建“非常容易創(chuàng)建、獨(dú)立、產(chǎn)品級別的基于
    發(fā)表于 12-16 07:57

    Spring Boot Web相關(guān)的基礎(chǔ)知識

    上一篇文章我們已經(jīng)學(xué)會了如何通過IDEA快速建立一個(gè)Spring Boot項(xiàng)目,還介紹Spring Boot項(xiàng)目的結(jié)構(gòu),
    的頭像 發(fā)表于 03-17 15:03 ?615次閱讀

    kafka client在 spring如何實(shí)現(xiàn)

    認(rèn)識了 spring-boot-starter ,今天不妨來看下如何寫一個(gè) pulsar-spring-boot-starter 模塊。 目標(biāo) 寫一個(gè)完整的類似 kafka-spring-boot-st
    的頭像 發(fā)表于 09-25 11:21 ?453次閱讀
    kafka client在 <b class='flag-5'>spring</b>如何實(shí)現(xiàn)

    Spring Boot Actuator快速入門

    不知道大家在寫 Spring Boot 項(xiàng)目的過程中,使用過 Spring Boot Actuator 嗎?知道 Spring
    的頭像 發(fā)表于 10-09 17:11 ?595次閱讀

    Spring Boot的啟動原理

    可能很多初學(xué)者會比較困惑,Spring Boot 是如何做到將應(yīng)用代碼和所有的依賴打包成一個(gè)獨(dú)立的 Jar 包,因?yàn)閭鹘y(tǒng)的 Java 項(xiàng)目打包成 Jar 包之后,需要通過 -classpath 屬性
    的頭像 發(fā)表于 10-13 11:44 ?598次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>的啟動原理

    Spring Boot 的設(shè)計(jì)目標(biāo)

    什么是Spring Boot Spring BootSpring 開源組織下的一個(gè)子項(xiàng)目,也是 S
    的頭像 發(fā)表于 10-13 14:56 ?545次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> 的設(shè)計(jì)目標(biāo)

    spring分布式框架有哪些

    Spring分布式框架是一套基于Spring框架的解決方案,用于構(gòu)建分布式系統(tǒng)。它提供了一系列的組件和模塊,可以幫助開發(fā)人員輕松地構(gòu)建可擴(kuò)展、高可用、高性能的
    的頭像 發(fā)表于 11-16 10:58 ?739次閱讀

    springclould分布式教程

    Spring Cloud是一個(gè)基于Spring Boot分布式系統(tǒng)開發(fā)工具,它提供了一系列的分布式系統(tǒng)解決方案,可以幫助開發(fā)者快速構(gòu)建和部
    的頭像 發(fā)表于 11-16 10:59 ?458次閱讀

    springcloud如何實(shí)現(xiàn)分布式

    Spring Cloud是基于Spring Boot開發(fā)的一套分布式系統(tǒng)解決方案,它主要包括了多個(gè)子項(xiàng)目,如服務(wù)注冊與發(fā)現(xiàn)、配置中心、負(fù)載均衡、斷路器、路由等等。通過使用
    的頭像 發(fā)表于 11-16 11:01 ?638次閱讀

    springcloud 分布式事務(wù)解決方案實(shí)例

    么都執(zhí)行成功,要么都執(zhí)行失敗。本文將介紹如何使用Spring Cloud來實(shí)現(xiàn)分布式事務(wù)。 在分布式系統(tǒng)中,使用數(shù)據(jù)庫事務(wù)來保證數(shù)據(jù)一致性是常見的做法。
    的頭像 發(fā)表于 12-03 16:32 ?1070次閱讀

    基于分布式對象存儲WDS的信托非結(jié)構(gòu)化數(shù)據(jù)整合平臺

    基于分布式對象存儲WDS的信托非結(jié)構(gòu)化數(shù)據(jù)整合平臺
    的頭像 發(fā)表于 08-28 09:56 ?255次閱讀
    基于<b class='flag-5'>分布式</b>對象存儲WDS的信托非結(jié)構(gòu)化數(shù)據(jù)<b class='flag-5'>整合</b><b class='flag-5'>平臺</b>