0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

怎樣去解決Kafka消息重復(fù)的問題呢?

jf_ro2CN3Fa ? 來源:稀土掘金 ? 2023-02-12 14:18 ? 次閱讀

一、前言

數(shù)據(jù)重復(fù)這個(gè)問題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。

00fc324a-a28b-11ed-bfe3-dac502259ad0.jpg

通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來副作用是可能出現(xiàn)消息重復(fù)。

整理下消息重復(fù)的幾個(gè)場景:

生產(chǎn)端: 遇到異常,基本解決措施都是 重試 。

場景一:leader分區(qū)不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區(qū)。

場景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。

場景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋 NetworkException 異常,等待網(wǎng)絡(luò)恢復(fù)。

消費(fèi)端: poll 一批數(shù)據(jù),處理完畢還沒提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì) poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

怎么解決?

先來了解下消息的三種投遞語義:

最多一次( at most once): 消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如:mqtt 中 QoS = 0。

至少一次( at least once): 消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如:mqtt 中 QoS = 1

精確一次( exactly once): 消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如:mqtt 中 QoS = 2。

了解了這三種語義,再來看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:

Kafka 冪等性 Producer: 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話)

Kafka 事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等 Producer 的局限性。

消費(fèi)端冪等:保證消費(fèi)端接收消息冪等。蔸底方案。

1)Kafka 冪等性 Producer

冪等性指 :無論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.設(shè)置冪等
props.put("acks","all");//2.當(dāng)enable.idempotence為true,這里默認(rèn)為all
props.put("max.in.flight.requests.per.connection",5);//3.注意

設(shè)置冪等,啟動(dòng)冪等。

配置 acks,注意:一定要設(shè)置 acks=all,否則會(huì)拋異常。

配置 max.in.flight.requests.per.connection 需要 <= 5 ,否則會(huì)拋異常 OutOfOrderSequenceException。

0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1

Kafka >= 1.1, max.in.flight.request.per.connection <= 5

[**為了更好理解,需要了解下 Kafka 冪等機(jī)制:]

010c3212-a28b-11ed-bfe3-dac502259ad0.jpg

Producer 每次啟動(dòng)后,會(huì)向 Broker 申請(qǐng)一個(gè)全局唯一的 pid。(重啟后 pid 會(huì)變化,這也是弊端之一)

Sequence Numbe:針對(duì)每個(gè) 都對(duì)應(yīng)一個(gè)從0開始單調(diào)遞增的 Sequence,同時(shí) Broker端會(huì)緩存這個(gè) seq num

判斷是否重復(fù): 去 Broker 里對(duì)應(yīng)的隊(duì)列 ProducerStateEntry.Queue(默認(rèn)隊(duì)列長度為 5)查詢是否存在

如果 nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。

如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。

反之,要么重復(fù),要么丟消息,均拒絕。

011a794e-a28b-11ed-bfe3-dac502259ad0.jpg

這種設(shè)計(jì)針對(duì)解決了兩個(gè)問題:

消息重復(fù): 場景 Broker 保存消息后還沒發(fā)送 ack 就宕機(jī)了,這時(shí)候 Producer 就會(huì)重試,這就造成消息重復(fù)。

消息亂序: 避免場景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。

那什么時(shí)候該使用冪等:

如果已經(jīng)使用 acks=all,使用冪等也可以。

如果已經(jīng)使用 acks=0 或者 acks=1,說明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。

2)Kafka 事務(wù)

使用 Kafka 事務(wù)解決冪等的弊端:單會(huì)話且單分區(qū)冪等。

Tips: 這塊篇幅較長,這先稍微提及下使用,之后另起一篇。

事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.設(shè)置冪等
props.put("acks","all");//2.當(dāng)enable.idempotence為true,這里默認(rèn)為all
props.put("max.in.flight.requests.per.connection",5);//3.最大等待數(shù)
props.put("transactional.id","my-transactional-id");//4.設(shè)定事務(wù)id

Producerproducer=newKafkaProducer(props);

//初始化事務(wù)
producer.initTransactions();

try{
//開始事務(wù)
producer.beginTransaction();

//發(fā)送數(shù)據(jù)
producer.send(newProducerRecord("Topic","Key","Value"));

//數(shù)據(jù)發(fā)送及Offset發(fā)送均成功的情況下,提交事務(wù)
producer.commitTransaction();
}catch(ProducerFencedException|OutOfOrderSequenceException|AuthorizationExceptione){
//數(shù)據(jù)發(fā)送或者Offset發(fā)送出現(xiàn)異常時(shí),終止事務(wù)
producer.abortTransaction();
}finally{
//關(guān)閉Producer和Consumer
producer.close();
consumer.close();
}

這里消費(fèi)端 Consumer 需要設(shè)置下配置:isolation.level 參數(shù)

read_uncommitted: 這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。

如果你用了事務(wù)型 Producer,那么對(duì)應(yīng)的 Consumer 就不要使用這個(gè)值。

read_committed: 表明 Consumer 只會(huì)讀取事務(wù)型 Producer 成功提交事務(wù)寫入的消息。當(dāng)然了,它也能看到非事務(wù)型 Producer 寫入的所有消息。

3)消費(fèi)端冪等

“如何解決消息重復(fù)?” 這個(gè)問題,其實(shí)換一種說法:就是如何解決消費(fèi)端冪等性問題。

只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問題也就解決了。

典型的方案是使用:消息表,來去重:

0129106c-a28b-11ed-bfe3-dac502259ad0.jpg

上述栗子中,消費(fèi)端拉取到一條消息后,開啟事務(wù),將消息Id 新增到本地消息表中,同時(shí)更新訂單信息

如果消息重復(fù),則新增操作 insert 會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。

二、案例:Kafka 冪等性 Producer 使用

準(zhǔn)備工作如下:

1、Zookeeper:本地使用 Docker 啟動(dòng)

$dockerrun-d--namezookeeper-p2181:2181zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

2、Kafka:版本 2.7.1,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng))

3、啟動(dòng)生產(chǎn)者:Kafka 源碼中 exmaple 中

4、啟動(dòng)消息者:可以用 Kafka 提供的腳本

>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element實(shí)現(xiàn)的后臺(tái)管理系統(tǒng)+用戶小程序,支持RBAC動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
>
>*項(xiàng)目地址:
>*視頻教程

#舉個(gè)栗子:topic 需要自己去修改
$cd./kafka-2.7.1-src/bin
$./kafka-console-producer.sh--broker-listlocalhost:9092--topictest_topic

創(chuàng)建 topic : 1副本,2 分區(qū)

$./kafka-topics.sh--bootstrap-serverlocalhost:9092--topicmyTopic--create--replication-factor1--partitions2

#查看
$./kafka-topics.sh--bootstrap-serverbroker:9092--topicmyTopic--describe

生產(chǎn)者代碼:

013daf5e-a28b-11ed-bfe3-dac502259ad0.jpg

publicclassKafkaProducerApplication{

privatefinalProducerproducer;
finalStringoutTopic;

publicKafkaProducerApplication(finalProducerproducer,
finalStringtopic){
this.producer=producer;
outTopic=topic;
}

publicvoidproduce(finalStringmessage){
finalString[]parts=message.split("-");
finalStringkey,value;
if(parts.length>1){
key=parts[0];
value=parts[1];
}else{
key=null;
value=parts[0];
}
finalProducerRecordproducerRecord
=newProducerRecord<>(outTopic,key,value);
producer.send(producerRecord,
(recordMetadata,e)->{
if(e!=null){
e.printStackTrace();
}else{
System.out.println("key/value"+key+"/"+value+"	writtentotopic[partition]"+recordMetadata.topic()+"["+recordMetadata.partition()+"]atoffset"+recordMetadata.offset());
}
}
);
}

publicvoidshutdown(){
producer.close();
}

publicstaticvoidmain(String[]args){

finalPropertiesprops=newProperties();

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
props.put(ProducerConfig.ACKS_CONFIG,"all");

props.put(ProducerConfig.CLIENT_ID_CONFIG,"myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

finalStringtopic="myTopic";
finalProducerproducer=newKafkaProducer<>(props);
finalKafkaProducerApplicationproducerApp=newKafkaProducerApplication(producer,topic);

StringfilePath="/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
try{
ListlinesToProduce=Files.readAllLines(Paths.get(filePath));
linesToProduce.stream().filter(l->!l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsetsandtimestampscommittedinbatchfrom"+filePath);
}catch(IOExceptione){
System.err.printf("Errorreadingfile%sdueto%s%n",filePath,e);
}finally{
producerApp.shutdown();
}
}
}

啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:

014ce7da-a28b-11ed-bfe3-dac502259ad0.jpg

啟動(dòng)消費(fèi)者:

$./kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicmyTopic
015c4680-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 acks

``

啟用冪等的情況下,調(diào)整 acks 配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:

修改配置 acks = 1

修改配置 acks = 0

會(huì)直接報(bào)錯(cuò):

Exceptioninthread"main"org.apache.kafka.common.config.ConfigException:Mustsetackstoallinordertousetheidempotentproducer.
Otherwisewecannotguaranteeidempotence.
0173c8b4-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 max.in.flight.requests.per.connection

``

啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:

將 max.in.flight.requests.per.connection > 5 會(huì)怎樣?

0182e556-a28b-11ed-bfe3-dac502259ad0.jpg

當(dāng)然會(huì)報(bào)錯(cuò):

Causedby:org.apache.kafka.common.config.ConfigException:Mustsetmax.in.flight.requests.per.connectiontoatmost5tousetheidempotentproducer.
01987f24-a28b-11ed-bfe3-dac502259ad0.jpg






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • MQTT協(xié)議
    +關(guān)注

    關(guān)注

    0

    文章

    93

    瀏覽量

    5308
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    49

    瀏覽量

    5195

原文標(biāo)題:一碰就頭疼的 Kafka 消息重復(fù)問題,立馬解決!

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    在Boost電源中該怎樣選擇電容的型號(hào)和電容容量?

    我們之前了解過電容的作用,不外乎儲(chǔ)能、濾波等作用。那么在Boost電源中又該怎樣選擇電容的型號(hào)和電容容量?
    發(fā)表于 08-14 15:44 ?2451次閱讀
    在Boost電源中該<b class='flag-5'>怎樣</b><b class='flag-5'>去</b>選擇電容的型號(hào)和電容容量<b class='flag-5'>呢</b>?

    kafka數(shù)據(jù)可靠性深度解讀

    At least once,可以保證不丟,但是可能會(huì)重復(fù),為了解決重復(fù)需要引入唯一標(biāo)識(shí)和重機(jī)制,kafka提供了GUID實(shí)現(xiàn)了唯一標(biāo)識(shí),但是并沒有提供自帶的
    發(fā)表于 05-08 16:29

    基于閃存存儲(chǔ)的Apache Kafka性能提升方法

    據(jù)生態(tài)系統(tǒng)中最常用的分布式消息傳遞系統(tǒng)之一的Apache Kafka進(jìn)行評(píng)估,測試如何以最佳方式將美光固態(tài)存儲(chǔ)應(yīng)用于 Apache Kafka,以及將產(chǎn)生怎樣的收益。A
    發(fā)表于 07-24 06:58

    Kafka集群環(huán)境的搭建

    :2181,zk02:2181,zk03:2181注意:broker.id安裝集群服務(wù)個(gè)數(shù)編排即可,集群下不能重復(fù)。5、啟動(dòng)kafka集群# 啟動(dòng)命令[root@node02 kafka2.11]# bin
    發(fā)表于 01-05 17:55

    怎樣設(shè)置數(shù)值元件的格式

    怎樣設(shè)置數(shù)值元件?怎樣設(shè)置數(shù)值元件的格式?
    發(fā)表于 09-26 09:16

    怎樣獲取Android的電池電壓

    怎樣獲取Android的電池電壓?怎樣獲取Android的電池電流?
    發(fā)表于 10-09 08:39

    怎樣使用springboot

    怎樣使用springboot?學(xué)習(xí)springboot需要懂得哪些?
    發(fā)表于 10-25 07:13

    怎樣使用HSE/HSI配置RCC的時(shí)鐘

    怎樣使用HSE/HSI配置RCC的時(shí)鐘?怎樣設(shè)置系統(tǒng)時(shí)鐘的庫函數(shù)
    發(fā)表于 11-10 07:08

    怎樣配置設(shè)備樹的leds節(jié)點(diǎn)

    配置設(shè)備樹leds節(jié)點(diǎn),sys文件系統(tǒng)中沒有出現(xiàn)相應(yīng)設(shè)備文件,引腳沒有查出有重復(fù)定義的?怎樣配置設(shè)備樹的leds節(jié)點(diǎn)
    發(fā)表于 01-07 06:15

    socket通信該怎樣實(shí)現(xiàn)

    socket通信該怎樣實(shí)現(xiàn)怎樣實(shí)現(xiàn)socket AES-CBC加密?
    發(fā)表于 01-20 07:41

    怎樣配置Android的SDIO部分

    怎樣配置Android的電源部分?怎樣配置Android的SDIO部分?
    發(fā)表于 02-10 07:00

    Ubuntu固件的編譯該怎樣使用

    怎樣編譯Ubuntu固件?Ubuntu固件的編譯該怎樣使用?
    發(fā)表于 02-15 06:18

    怎樣寫回調(diào)函數(shù)?怎樣使用回調(diào)函數(shù)

    回調(diào)函數(shù)的作用是什么?單片機(jī)怎么用回調(diào)函數(shù)在不同文件之間傳遞數(shù)據(jù)?怎樣寫回調(diào)函數(shù)?怎樣使
    發(fā)表于 02-23 07:40

    Kafka的概念及Kafka的宕機(jī)

    問題要從一次Kafka的宕機(jī)開始說起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計(jì)之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?1952次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    怎樣減少Confluent Cloud Kafka運(yùn)營成本

    流式數(shù)據(jù)已成為企業(yè)構(gòu)建和運(yùn)營出色數(shù)據(jù)產(chǎn)品的必要條件,而 Apache Kafka 已成為實(shí)時(shí)流式傳輸?shù)臉?biāo)準(zhǔn)。
    的頭像 發(fā)表于 09-23 17:23 ?794次閱讀