0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

kafka設(shè)計(jì)原理的深度探討

454398 ? 來(lái)源:博客園 ? 作者:永志 ? 2020-10-08 07:50 ? 次閱讀

Kafka簡(jiǎn)介

Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下:

以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問(wèn)性能

高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸

支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸

同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理

為什么要用消息系統(tǒng)

解耦
在項(xiàng)目啟動(dòng)之初來(lái)預(yù)測(cè)將來(lái)項(xiàng)目會(huì)碰到什么需求,是極其困難的。消息隊(duì)列在處理過(guò)程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過(guò)程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束

冗余
有些情況下,處理數(shù)據(jù)的過(guò)程會(huì)失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過(guò)這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。在被許多消息隊(duì)列所采用的”插入-獲取-刪除”范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理過(guò)程明確的指出該消息已經(jīng)被處理完畢,確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

擴(kuò)展性
因?yàn)橄㈥?duì)列解耦了你的處理過(guò)程,所以增大消息入隊(duì)和處理的頻率是很容易的;只要另外增加處理過(guò)程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡(jiǎn)單。

靈活性 & 峰值處理能力
在訪問(wèn)量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為以能處理這類峰值訪問(wèn)為標(biāo)準(zhǔn)來(lái)投入資源隨時(shí)待命無(wú)疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。

可恢復(fù)性
當(dāng)體系的一部分組件失效,不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。而這種允許重試或者延后處理請(qǐng)求的能力通常是造就一個(gè)略感不便的用戶和一個(gè)沮喪透頂?shù)挠脩糁g的區(qū)別。

送達(dá)保證
消息隊(duì)列提供的冗余機(jī)制保證了消息能被實(shí)際的處理,只要一個(gè)進(jìn)程讀取了該隊(duì)列即可。在此基礎(chǔ)上,IronMQ提供了一個(gè)”只送達(dá)一次”保證。無(wú)論有多少進(jìn)程在從隊(duì)列中領(lǐng)取數(shù)據(jù),每一個(gè)消息只能被處理一次。這之所以成為可能,是因?yàn)楂@取一個(gè)消息只是”預(yù)定”了這個(gè)消息,暫時(shí)把它移出了隊(duì)列。除非客戶端明確的表示已經(jīng)處理完了這個(gè)消息,否則這個(gè)消息會(huì)被放回隊(duì)列中去,在一段可配置的時(shí)間之后可再次被處理。

順序保證
在大多使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要。消息隊(duì)列本來(lái)就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來(lái)處理。IronMO保證消息通過(guò)FIFO(先進(jìn)先出)的順序來(lái)處理,因此消息在隊(duì)列中的位置就是從隊(duì)列中檢索他們的位置。

緩沖
在任何重要的系統(tǒng)中,都會(huì)有需要不同的處理時(shí)間的元素。例如,加載一張圖片比應(yīng)用過(guò)濾器花費(fèi)更少的時(shí)間。消息隊(duì)列通過(guò)一個(gè)緩沖層來(lái)幫助任務(wù)最高效率的執(zhí)行—寫入隊(duì)列的處理會(huì)盡可能的快速,而不受從隊(duì)列讀的預(yù)備處理的約束。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度。

理解數(shù)據(jù)流
在一個(gè)分布式系統(tǒng)里,要得到一個(gè)關(guān)于用戶操作會(huì)用多長(zhǎng)時(shí)間及其原因的總體印象,是個(gè)巨大的挑戰(zhàn)。消息隊(duì)列通過(guò)消息被處理的頻率,來(lái)方便的輔助確定那些表現(xiàn)不佳的處理過(guò)程或領(lǐng)域,這些地方的數(shù)據(jù)流都不夠優(yōu)化。

異步通信
很多時(shí)候,你不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許你把一個(gè)消息放入隊(duì)列,但并不立即處理它。你想向隊(duì)列中放入多少消息就放多少,然后在你樂(lè)意的時(shí)候再去處理它們。

常用Message Queue對(duì)比

RabbitMQ
RabbitMQ是使用Erlang編寫的一個(gè)開源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級(jí),更適合于企業(yè)級(jí)的開發(fā)。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)。對(duì)路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。

Redis
Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫(kù),開發(fā)維護(hù)很活躍。雖然它是一個(gè)Key-Value數(shù)據(jù)庫(kù)存儲(chǔ)系統(tǒng),但它本身支持MQ功能,所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來(lái)使用。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬(wàn)次,每10萬(wàn)次記錄一次執(zhí)行時(shí)間。測(cè)試數(shù)據(jù)分為128Bytes、512Bytes、1K和10K四個(gè)不同大小的數(shù)據(jù)。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ,而如果數(shù)據(jù)大小超過(guò)了10K,Redis則慢的無(wú)法忍受;出隊(duì)時(shí),無(wú)論數(shù)據(jù)大小,Redis都表現(xiàn)出非常好的性能,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis。

ZeroMQ
ZeroMQ號(hào)稱最快的消息隊(duì)列系統(tǒng),尤其針對(duì)大吞吐量的需求場(chǎng)景。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn)。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,因?yàn)槟愕膽?yīng)用程序?qū)缪萘诉@個(gè)服務(wù)角色。你只需要簡(jiǎn)單的引用ZeroMQ程序庫(kù),可以使用NuGet安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但是ZeroMQ僅提供非持久性的隊(duì)列,也就是說(shuō)如果宕機(jī),數(shù)據(jù)將會(huì)丟失。其中,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開始同時(shí)支持ZeroMQ和Netty作為傳輸模塊)。

ActiveMQ
ActiveMQ是Apache下的一個(gè)子項(xiàng)目。 類似于ZeroMQ,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。同時(shí)類似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場(chǎng)景。

Kafka/Jafka
Kafka是Apache下的一個(gè)子項(xiàng)目,是一個(gè)高性能跨語(yǔ)言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng),而Jafka是在Kafka之上孵化而來(lái)的,即Kafka的一個(gè)升級(jí)版。具有以下特性:快速持久化,可以在O(1)的系統(tǒng)開銷下進(jìn)行消息持久化;高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng),Broker、Producer、Consumer都原生自動(dòng)支持分布式,自動(dòng)實(shí)現(xiàn)負(fù)載均衡;支持Hadoop數(shù)據(jù)并行加載,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。Kafka通過(guò)Hadoop的并行加載機(jī)制來(lái)統(tǒng)一了在線和離線的消息處理。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng),除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)。

Kafka解析

Terminology

Broker
Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker

Topic
每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為topic。(物理上不同topic的消息分開存儲(chǔ),邏輯上一個(gè)topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

Partition
parition是物理上的概念,每個(gè)topic包含一個(gè)或多個(gè)partition,創(chuàng)建topic時(shí)可指定parition數(shù)量。每個(gè)partition對(duì)應(yīng)于一個(gè)文件夾,該文件夾下存儲(chǔ)該partition的數(shù)據(jù)和索引文件

Producer
負(fù)責(zé)發(fā)布消息到Kafka broker

Consumer
消費(fèi)消息。每個(gè)consumer屬于一個(gè)特定的consumer group(可為每個(gè)consumer指定group name,若不指定group name則屬于默認(rèn)的group)。使用consumer high level API時(shí),同一topic的一條消息只能被同一個(gè)consumer group內(nèi)的一個(gè)consumer消費(fèi),但多個(gè)consumer group可同時(shí)消費(fèi)這一消息。

Kafka架構(gòu)

如上圖所示,一個(gè)典型的kafka集群中包含若干producer(可以是web前端產(chǎn)生的page view,或者是服務(wù)器日志,系統(tǒng)CPU、memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干consumer group,以及一個(gè)Zookeeper集群。Kafka通過(guò)Zookeeper管理集群配置,選舉leader,以及在consumer group發(fā)生變化時(shí)進(jìn)行rebalance。producer使用push模式將消息發(fā)布到broker,consumer使用pull模式從broker訂閱并消費(fèi)消息。

Push vs. Pull

作為一個(gè)messaging system,Kafka遵循了傳統(tǒng)的方式,選擇由producer向broker push消息并由consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事實(shí)上,push模式和pull模式各有優(yōu)劣。
push模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由broker決定的。push模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來(lái)不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。

Topic & Partition

Topic在邏輯上可以被認(rèn)為是一個(gè)queue。每條消費(fèi)都必須指定它的topic,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里。為了使得Kafka的吞吐率可以水平擴(kuò)展,物理上把topic分成一個(gè)或多個(gè)partition,每個(gè)partition在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè)partition的所有消息和索引文件。

每個(gè)日志文件都是“l(fā)og entries”序列,每一個(gè)log entry包含一個(gè)4字節(jié)整型數(shù)(值為N),其后跟N個(gè)字節(jié)的消息體。每條消息都有一個(gè)當(dāng)前partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置。磁盤上存儲(chǔ)的消息格式如下:
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
這個(gè)“l(fā)og entries”并非由一個(gè)文件構(gòu)成,而是分成多個(gè)segment,每個(gè)segment名為該segment第一條消息的offset和“.kafka”組成。另外會(huì)有一個(gè)索引文件,它標(biāo)明了每個(gè)segment下包含的log entry的offset范圍,如下圖所示。

因?yàn)槊織l消息都被append到該partition中,是順序?qū)懘疟P,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是Kafka高吞吐率的一個(gè)很重要的保證)。

每一條消息被發(fā)送到broker時(shí),會(huì)根據(jù)paritition規(guī)則選擇被存儲(chǔ)到哪一個(gè)partition。如果partition規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的partition里,這樣就實(shí)現(xiàn)了水平擴(kuò)展。(如果一個(gè)topic對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器I/O將會(huì)成為這個(gè)topic的性能瓶頸,而partition解決了這個(gè)問(wèn)題)。在創(chuàng)建topic時(shí)可以在$KAFKA_HOME/config/server.properties中指定這個(gè)partition的數(shù)量(如下所示),當(dāng)然也可以在topic創(chuàng)建之后去修改parition數(shù)量。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

在發(fā)送一條消息時(shí),可以指定這條消息的key,producer根據(jù)這個(gè)key和partition機(jī)制來(lái)判斷將這條消息發(fā)送到哪個(gè)parition。paritition機(jī)制可以通過(guò)指定producer的paritition. class這一參數(shù)來(lái)指定,該class必須實(shí)現(xiàn)kafka.producer.Partitioner接口。本例中如果key可以被解析為整數(shù)則將對(duì)應(yīng)的整數(shù)與partition總數(shù)取余,該消息會(huì)被發(fā)送到該數(shù)對(duì)應(yīng)的partition。(每個(gè)parition都會(huì)有個(gè)序號(hào))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
importkafka.producer.Partitioner;
importkafka.utils.VerifiableProperties;
publicclassJasonPartitionerimplementsPartitioner {
publicJasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
publicintpartition(Object key,intnumPartitions) {
try{
intpartitionNum = Integer.parseInt((String) key);
returnMath.abs(Integer.parseInt((String) key) % numPartitions);
}catch(Exception e) {
returnMath.abs(key.hashCode() % numPartitions);
}
}
}
并通過(guò)如下代碼發(fā)送20條消息(key分別為0,1,2,3)至topic2(包含4個(gè)partition)。
1
2
3
4
5
6
7
8
9
10
publicvoidsendMessage()throwsInterruptedException{
for(inti =1; i <=?5; i++){
List messageList =newArrayList>();
for(intj =0; j messageList.add(newKeyedMessage("topic2", j+"","The "+ i +" message for key "+ j));
}
producer.send(messageList);
}
producer.close();
}
則key相同的消息會(huì)被發(fā)送并存儲(chǔ)到同一個(gè)partition里,而且key的序號(hào)正好和partition序號(hào)相同。(partition序號(hào)從0開始,本例中的key也正好從0開始)。如下圖所示。



對(duì)于傳統(tǒng)的message queue而言,一般會(huì)刪除已經(jīng)被消費(fèi)的消息,而Kafka集群會(huì)保留所有的消息,無(wú)論其被消費(fèi)與否。當(dāng)然,因?yàn)榇疟P限制,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒(méi)必要),因此Kafka提供兩種策略去刪除舊數(shù)據(jù)。一是基于時(shí)間,二是基于partition文件大小。例如可以通過(guò)配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數(shù)據(jù),也可通過(guò)配置讓Kafka在partition文件超過(guò)1GB時(shí)刪除舊數(shù)據(jù),如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligiblefordeletion
log.retention.hours=168
# A size-based retention policyforlogs. Segments are pruned from the log aslongas the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. Whenthissize is reached anewlog segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to seeifthey can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# Bydefaultthe log cleaner is disabled and the log retention policy willdefaultto
#just delete segments after their retention expires.
# If log.cleaner.enable=trueis set the cleaner will be enabled and individual logs
#can then be markedforlog compaction.
log.cleaner.enable=false
這里要注意,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無(wú)關(guān),所以這里刪除文件與Kafka性能無(wú)關(guān),選擇怎樣的刪除策略只與磁盤以及具體的需求有關(guān)。另外,Kafka會(huì)為每一個(gè)consumer group保留一些metadata信息—當(dāng)前消費(fèi)的消息的position,也即offset。這個(gè)offset由consumer控制。正常情況下consumer會(huì)在消費(fèi)完一條消息后線性增加這個(gè)offset。當(dāng)然,consumer也可將offset設(shè)成一個(gè)較小的值,重新消費(fèi)一些消息。因?yàn)閛ffet由consumer控制,所以Kafka broker是無(wú)狀態(tài)的,它不需要標(biāo)記哪些消息被哪些consumer過(guò),不需要通過(guò)broker去保證同一個(gè)consumer group只有一個(gè)consumer能消費(fèi)某一條消息,因此也就不需要鎖機(jī)制,這也為Kafka的高吞吐率提供了有力保障。

Replication & Leader election

Kafka從0.8開始提供partition級(jí)別的replication,replication的數(shù)量可在$KAFKA_HOME/config/server.properties中配置。

1 default.replication.factor =1
該 Replication與leader election配合提供了自動(dòng)的failover機(jī)制。replication對(duì)Kafka的吞吐率是有一定影響的,但極大的增強(qiáng)了可用性。默認(rèn)情況下,Kafka的replication數(shù)量為1。每個(gè)partition都有一個(gè)唯一的leader,所有的讀寫操作都在leader上完成,leader批量從leader上pull數(shù)據(jù)。一般情況下partition的數(shù)量大于等于broker的數(shù)量,并且所有partition的leader均勻分布在broker上。follower上的日志和其leader上的完全一樣。和大部分分布式系統(tǒng)一樣,Kakfa處理失敗需要明確定義一個(gè)broker是否alive。對(duì)于Kafka而言,Kafka存活包含兩個(gè)條件,一是它必須維護(hù)與Zookeeper的session(這個(gè)通過(guò)Zookeeper的heartbeat機(jī)制來(lái)實(shí)現(xiàn))。二是follower必須能夠及時(shí)將leader的writing復(fù)制過(guò)來(lái),不能“落后太多”。
leader會(huì)track“in sync”的node list。如果一個(gè)follower宕機(jī),或者落后太多,leader將把它從”in sync” list中移除。這里所描述的“落后太多”指follower復(fù)制的消息落后于leader后的條數(shù)超過(guò)預(yù)定值,該值可在$KAFKA_HOME/config/server.properties中配置
1
2
3
4
5
#If a replica falls more thanthismany messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000
#If a follower hasn't sent any fetch requestsforthiswindow of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000

需要說(shuō)明的是,Kafka只解決”fail/recover”,不處理“Byzantine”(“拜占庭”)問(wèn)題。
一條消息只有被“in sync” list里的所有follower都從leader復(fù)制過(guò)去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了leader,還沒(méi)來(lái)得及被任何follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(consumer無(wú)法消費(fèi)這些數(shù)據(jù))。而對(duì)于producer而言,它可以選擇是否等待消息commit,這可以通過(guò)request.required.acks來(lái)設(shè)置。這種機(jī)制確保了只要“in sync” list有一個(gè)或以上的flollower,一條被commit的消息就不會(huì)丟失。
這里的復(fù)制機(jī)制即不是同步復(fù)制,也不是單純的異步復(fù)制。事實(shí)上,同步復(fù)制要求“活著的”follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個(gè)特性)。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下如果follwer都落后于leader,而leader突然宕機(jī),則會(huì)丟失數(shù)據(jù)。而Kafka的這種使用“in sync” list的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。follower可以批量的從leader復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫磁盤),極大減少了follower與leader的差距(前文有說(shuō)到,只要follower落后leader不太遠(yuǎn),則被認(rèn)為在“in sync” list里)。

上文說(shuō)明了Kafka是如何做replication的,另外一個(gè)很重要的問(wèn)題是當(dāng)leader宕機(jī)了,怎樣在follower中選舉出新的leader。因?yàn)閒ollower可能落后許多或者crash了,所以必須確保選擇“最新”的follower作為新的leader。一個(gè)基本的原則就是,如果leader不在了,新的leader必須擁有原來(lái)的leader commit的所有消息。這就需要作一個(gè)折衷,如果leader在標(biāo)明一條消息被commit前等待更多的follower確認(rèn),那在它die之后就有更多的follower可以作為新的leader,但這也會(huì)造成吞吐率的下降。
一種非常常用的選舉leader的方式是“majority vote”(“少數(shù)服從多數(shù)”),但Kafka并未采用這種方式。這種模式下,如果我們有2f+1個(gè)replica(包含leader和follower),那在commit之前必須保證有f+1個(gè)replica復(fù)制完消息,為了保證正確選出新的leader,fail的replica不能超過(guò)f個(gè)。因?yàn)樵谑O碌娜我鈌+1個(gè)replica里,至少有一個(gè)replica包含有最新的所有消息。這種方式有個(gè)很大的優(yōu)勢(shì),系統(tǒng)的latency只取決于最快的幾臺(tái)server,也就是說(shuō),如果replication factor是3,那latency就取決于最快的那個(gè)follower而非最慢那個(gè)。majority vote也有一些劣勢(shì),為了保證leader election的正常進(jìn)行,它所能容忍的fail的follower個(gè)數(shù)比較少。如果要容忍1個(gè)follower掛掉,必須要有3個(gè)以上的replica,如果要容忍2個(gè)follower掛掉,必須要有5個(gè)以上的replica。也就是說(shuō),在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)程度,必須要有大量的replica,而大量的replica又會(huì)在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降。這就是這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要存儲(chǔ)大量數(shù)據(jù)的系統(tǒng)中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal,但是它的數(shù)據(jù)存儲(chǔ)并沒(méi)有使用這種expensive的方式。
實(shí)際上,leader election算法非常多,比如Zookeper的Zab,Raft和Viewstamped Replication。而Kafka所使用的leader election算法更像微軟的PacificA算法。
Kafka在Zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in-sync replicas) set,這個(gè)set里的所有replica都跟上了leader,只有ISR里的成員才有被選為leader的可能。在這種模式下,對(duì)于f+1個(gè)replica,一個(gè)Kafka topic能在保證不丟失已經(jīng)ommit的消息的前提下容忍f個(gè)replica的失敗。在大多數(shù)使用場(chǎng)景中,這種模式是非常有利的。事實(shí)上,為了容忍f個(gè)replica的失敗,majority vote和ISR在commit前需要等待的replica數(shù)量是一樣的,但是ISR需要的總的replica的個(gè)數(shù)幾乎是majority vote的一半。
雖然majority vote與ISR相比有不需等待最慢的server這一優(yōu)勢(shì),但是Kafka作者認(rèn)為Kafka可以通過(guò)producer選擇是否被commit阻塞來(lái)改善這一問(wèn)題,并且節(jié)省下來(lái)的replica和磁盤使得ISR模式仍然值得。

上文提到,在ISR中至少有一個(gè)follower時(shí),Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某一個(gè)partition的所有replica都掛了,就無(wú)法保證數(shù)據(jù)不丟失了。這種情況下有兩種可行的方案:

等待ISR中的任一個(gè)replica“活”過(guò)來(lái),并且選它作為leader

選擇第一個(gè)“活”過(guò)來(lái)的replica(不一定是ISR中的)作為leader

這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的平衡。如果一定要等待ISR中的replica“活”過(guò)來(lái),那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中的所有replica都無(wú)法“活”過(guò)來(lái)了,或者數(shù)據(jù)都丟失了,這個(gè)partition將永遠(yuǎn)不可用。選擇第一個(gè)“活”過(guò)來(lái)的replica作為leader,而這個(gè)replica不是ISR中的replica,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會(huì)成為leader而作為consumer的數(shù)據(jù)源(前文有說(shuō)明,所有讀寫都由leader完成)。Kafka0.8.*使用了第二種方式。根據(jù)Kafka的文檔,在以后的版本中,Kafka支持用戶通過(guò)配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性。

上文說(shuō)明了一個(gè)parition的replication過(guò)程,然爾Kafka集群需要管理成百上千個(gè)partition,Kafka通過(guò)round-robin的方式來(lái)平衡partition從而避免大量partition集中在了少數(shù)幾個(gè)節(jié)點(diǎn)上。同時(shí)Kafka也需要平衡leader的分布,盡可能的讓所有partition的leader均勻分布在不同broker上。另一方面,優(yōu)化leadership election的過(guò)程也是很重要的,畢竟這段時(shí)間相應(yīng)的partition處于不可用狀態(tài)。一種簡(jiǎn)單的實(shí)現(xiàn)是暫停宕機(jī)的broker上的所有partition,并為之選舉leader。實(shí)際上,Kafka選舉一個(gè)broker作為controller,這個(gè)controller通過(guò)watch Zookeeper檢測(cè)所有的broker failure,并負(fù)責(zé)為所有受影響的parition選舉leader,再將相應(yīng)的leader調(diào)整命令發(fā)送至受影響的broker,過(guò)程如下圖所示。

這樣做的好處是,可以批量的通知leadership的變化,從而使得選舉過(guò)程成本更低,尤其對(duì)大量的partition而言。如果controller失敗了,幸存的所有broker都會(huì)嘗試在Zookeeper中創(chuàng)建/controller->{this broker id},如果創(chuàng)建成功(只可能有一個(gè)創(chuàng)建成功),則該broker會(huì)成為controller,若創(chuàng)建不成功,則該broker會(huì)等待新controller的命令。

Consumer group

(本節(jié)所有描述都是基于consumer hight level API而非low level API)。
每一個(gè)consumer實(shí)例都屬于一個(gè)consumer group,每一條消息只會(huì)被同一個(gè)consumer group里的一個(gè)consumer實(shí)例消費(fèi)。(不同consumer group可以同時(shí)消費(fèi)同一條消息)

很多傳統(tǒng)的message queue都會(huì)在消息被消費(fèi)完后將消息刪除,一方面避免重復(fù)消費(fèi),另一方面可以保證queue的長(zhǎng)度比較少,提高效率。而如上文所將,Kafka并不刪除已消費(fèi)的消息,為了實(shí)現(xiàn)傳統(tǒng)message queue消息只被消費(fèi)一次的語(yǔ)義,Kafka保證保證同一個(gè)consumer group里只有一個(gè)consumer會(huì)消費(fèi)一條消息。與傳統(tǒng)message queue不同的是,Kafka還允許不同consumer group同時(shí)消費(fèi)同一條消息,這一特性可以為消息的多元化處理提供了支持。實(shí)際上,Kafka的設(shè)計(jì)理念之一就是同時(shí)提供離線處理和實(shí)時(shí)處理。根據(jù)這一特性,可以使用Storm這種實(shí)時(shí)流處理系統(tǒng)對(duì)消息進(jìn)行實(shí)時(shí)在線處理,同時(shí)使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時(shí)將數(shù)據(jù)實(shí)時(shí)備份到另一個(gè)數(shù)據(jù)中心,只需要保證這三個(gè)操作所使用的consumer在不同的consumer group即可。下圖展示了Kafka在Linkedin的一種簡(jiǎn)化部署。

為了更清晰展示Kafka consumer group的特性,筆者作了一項(xiàng)測(cè)試。創(chuàng)建一個(gè)topic (名為topic1),創(chuàng)建一個(gè)屬于group1的consumer實(shí)例,并創(chuàng)建三個(gè)屬于group2的consumer實(shí)例,然后通過(guò)producer向topic1發(fā)送key分別為1,2,3r的消息。結(jié)果發(fā)現(xiàn)屬于group1的consumer收到了所有的這三條消息,同時(shí)group2中的3個(gè)consumer分別收到了key為1,2,3的消息。如下圖所示。

Consumer Rebalance

(本節(jié)所講述內(nèi)容均基于Kafka consumer high level API)
Kafka保證同一consumer group中只有一個(gè)consumer會(huì)消費(fèi)某條消息,實(shí)際上,Kafka保證的是穩(wěn)定狀態(tài)下每一個(gè)consumer實(shí)例只會(huì)消費(fèi)某一個(gè)或多個(gè)特定partition的數(shù)據(jù),而某個(gè)partition的數(shù)據(jù)只會(huì)被某一個(gè)特定的consumer實(shí)例所消費(fèi)。這樣設(shè)計(jì)的劣勢(shì)是無(wú)法讓同一個(gè)consumer group里的consumer均勻消費(fèi)數(shù)據(jù),優(yōu)勢(shì)是每個(gè)consumer不用都跟大量的broker通信,減少通信開銷,同時(shí)也降低了分配難度,實(shí)現(xiàn)也更簡(jiǎn)單。另外,因?yàn)橥粋€(gè)partition里的數(shù)據(jù)是有序的,這種設(shè)計(jì)可以保證每個(gè)partition里的數(shù)據(jù)也是有序被消費(fèi)。
如果某consumer group中consumer數(shù)量少于partition數(shù)量,則至少有一個(gè)consumer會(huì)消費(fèi)多個(gè)partition的數(shù)據(jù),如果consumer的數(shù)量與partition數(shù)量相同,則正好一個(gè)consumer消費(fèi)一個(gè)partition的數(shù)據(jù),而如果consumer的數(shù)量多于partition的數(shù)量時(shí),會(huì)有部分consumer無(wú)法消費(fèi)該topic下任何一條消息。
如下例所示,如果topic1有0,1,2共三個(gè)partition,當(dāng)group1只有一個(gè)consumer(名為consumer1)時(shí),該 consumer可消費(fèi)這3個(gè)partition的所有數(shù)據(jù)。

增加一個(gè)consumer(consumer2)后,其中一個(gè)consumer(consumer1)可消費(fèi)2個(gè)partition的數(shù)據(jù),另外一個(gè)consumer(consumer2)可消費(fèi)另外一個(gè)partition的數(shù)據(jù)。

再增加一個(gè)consumer(consumer3)后,每個(gè)consumer可消費(fèi)一個(gè)partition的數(shù)據(jù)。consumer1消費(fèi)partition0,consumer2消費(fèi)partition1,consumer3消費(fèi)partition2

再增加一個(gè)consumer(consumer4)后,其中3個(gè)consumer可分別消費(fèi)一個(gè)partition的數(shù)據(jù),另外一個(gè)consumer(consumer4)不能消費(fèi)topic1任何數(shù)據(jù)。


聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 服務(wù)器
    +關(guān)注

    關(guān)注

    12

    文章

    8963

    瀏覽量

    85086
  • 異步通信
    +關(guān)注

    關(guān)注

    1

    文章

    57

    瀏覽量

    10113
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    50

    瀏覽量

    5202
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    Kafka高性能背后的技術(shù)原理

    Kafka 是一款性能非常優(yōu)秀的消息隊(duì)列,每秒處理的消息體量可以達(dá)到千萬(wàn)級(jí)別。
    的頭像 發(fā)表于 10-23 09:37 ?224次閱讀
    <b class='flag-5'>Kafka</b>高性能背后的技術(shù)原理

    簡(jiǎn)單認(rèn)識(shí)深度神經(jīng)網(wǎng)絡(luò)

    處理數(shù)據(jù),從而解決各種復(fù)雜的數(shù)據(jù)驅(qū)動(dòng)問(wèn)題。本文將詳細(xì)探討深度神經(jīng)網(wǎng)絡(luò)的定義、基本結(jié)構(gòu)、工作原理及其在多個(gè)領(lǐng)域的應(yīng)用。
    的頭像 發(fā)表于 07-10 18:23 ?894次閱讀

    深度學(xué)習(xí)中的時(shí)間序列分類方法

    的發(fā)展,基于深度學(xué)習(xí)的TSC方法逐漸展現(xiàn)出其強(qiáng)大的自動(dòng)特征提取和分類能力。本文將從多個(gè)角度對(duì)深度學(xué)習(xí)在時(shí)間序列分類中的應(yīng)用進(jìn)行綜述,探討常用的深度學(xué)習(xí)模型及其改進(jìn)方法,并展望未來(lái)的研究
    的頭像 發(fā)表于 07-09 15:54 ?654次閱讀

    基于深度學(xué)習(xí)的小目標(biāo)檢測(cè)

    )的廣泛應(yīng)用,小目標(biāo)檢測(cè)的性能得到了顯著提升。本文將詳細(xì)探討基于深度學(xué)習(xí)的小目標(biāo)檢測(cè)技術(shù),包括其定義、挑戰(zhàn)、常用方法以及未來(lái)發(fā)展方向。
    的頭像 發(fā)表于 07-04 17:25 ?703次閱讀

    深度學(xué)習(xí)中的模型權(quán)重

    深度學(xué)習(xí)這一充滿無(wú)限可能性的領(lǐng)域中,模型權(quán)重(Weights)作為其核心組成部分,扮演著至關(guān)重要的角色。它們不僅是模型學(xué)習(xí)的基石,更是模型智能的源泉。本文將從模型權(quán)重的定義、作用、優(yōu)化、管理以及應(yīng)用等多個(gè)方面,深入探討深度學(xué)習(xí)
    的頭像 發(fā)表于 07-04 11:49 ?817次閱讀

    深度學(xué)習(xí)常用的Python庫(kù)

    深度學(xué)習(xí)作為人工智能的一個(gè)重要分支,通過(guò)模擬人類大腦中的神經(jīng)網(wǎng)絡(luò)來(lái)解決復(fù)雜問(wèn)題。Python作為一種流行的編程語(yǔ)言,憑借其簡(jiǎn)潔的語(yǔ)法和豐富的庫(kù)支持,成為了深度學(xué)習(xí)研究和應(yīng)用的首選工具。本文將深入探討
    的頭像 發(fā)表于 07-03 16:04 ?523次閱讀

    深度學(xué)習(xí)與卷積神經(jīng)網(wǎng)絡(luò)的應(yīng)用

    到自然語(yǔ)言處理,深度學(xué)習(xí)和CNN正逐步改變著我們的生活方式。本文將深入探討深度學(xué)習(xí)與卷積神經(jīng)網(wǎng)絡(luò)的基本概念、工作原理及其在多個(gè)領(lǐng)域的應(yīng)用,并展望其未來(lái)的發(fā)展趨勢(shì)。
    的頭像 發(fā)表于 07-02 18:19 ?747次閱讀

    示波器的采樣率和存儲(chǔ)深度詳解

    示波器,作為電子測(cè)量領(lǐng)域的核心工具,其性能直接決定了電子信號(hào)分析的準(zhǔn)確性和效率。在示波器的眾多參數(shù)中,采樣率和存儲(chǔ)深度是兩個(gè)至關(guān)重要的參數(shù),它們共同決定了示波器捕獲和顯示信號(hào)的能力。本文將對(duì)示波器的采樣率和存儲(chǔ)深度進(jìn)行深入的探討
    的頭像 發(fā)表于 05-13 16:09 ?2141次閱讀

    面試官:Kafka會(huì)丟消息嗎?

    許多開發(fā)人員普遍認(rèn)為,Kafka 的設(shè)計(jì)本身就能保證不會(huì)丟失消息。然而,Kafka 架構(gòu)和配置的細(xì)微差別會(huì)導(dǎo)致消息的丟失。我們需要了解它如何以及何時(shí)可能丟失消息,并防止此類情況的發(fā)生。
    的頭像 發(fā)表于 04-29 17:32 ?957次閱讀
    面試官:<b class='flag-5'>Kafka</b>會(huì)丟消息嗎?

    Redis流與Kafka相比如何?

    我們先來(lái)看看Kafka的基本架構(gòu)。基本的數(shù)據(jù)結(jié)構(gòu)是主題。它是一個(gè)按時(shí)間排序的記錄序列,只需追加。使用這種數(shù)據(jù)結(jié)構(gòu)的好處在Jay Kreps的經(jīng)典博文The Log中得到了很好的描述。
    的頭像 發(fā)表于 02-21 16:09 ?417次閱讀
    Redis流與<b class='flag-5'>Kafka</b>相比如何?

    kafka基本原理詳解

    今天浩道跟大家分享一篇關(guān)于kafka相關(guān)原理的硬核干貨,可以說(shuō)即使你沒(méi)有接觸過(guò)kafka,也可以秒懂,一起看看!
    的頭像 發(fā)表于 01-03 09:57 ?842次閱讀
    <b class='flag-5'>kafka</b>基本原理詳解

    kafka支持哪些消息交付語(yǔ)義?

    在讀完kafka官方文檔,kafka設(shè)計(jì)里的消息交付語(yǔ)義一章后,給我的第一印象是內(nèi)容很抽象,于是草擬和總結(jié)了給個(gè)副標(biāo)題,并把相關(guān)內(nèi)容進(jìn)行了歸類;有些生澀的句子,盡量用大白話和舉例進(jìn)行說(shuō)明,并加入了總結(jié)。
    的頭像 發(fā)表于 12-22 11:27 ?447次閱讀
    <b class='flag-5'>kafka</b>支持哪些消息交付語(yǔ)義?

    如何保證kafka消息不丟失

    如果在簡(jiǎn)歷上寫了使用過(guò)kafka消息中間件,面試官大概80%的概率會(huì)問(wèn)你:"如何保證kafka消息不丟失?"反正我是屢試不爽。
    的頭像 發(fā)表于 12-19 09:52 ?690次閱讀
    如何保證<b class='flag-5'>kafka</b>消息不丟失

    zookeeper和kafka的關(guān)系

    Zookeeper和Kafka是兩個(gè)不同的開源軟件,它們可以在分布式系統(tǒng)中發(fā)揮不同的作用。下面我將詳細(xì)說(shuō)明它們之間的關(guān)系以及它們?cè)诜植际较到y(tǒng)中的作用。 首先,讓我們先介紹一下Zookeeper
    的頭像 發(fā)表于 12-03 16:39 ?1468次閱讀

    golang中使用kafka的綜合指南

    kafka是一個(gè)比較流行的分布式、可拓展、高性能、可靠的流處理平臺(tái)。在處理kafka的數(shù)據(jù)時(shí),這里有確保處理效率和可靠性的多種最佳實(shí)踐。本文將介紹這幾種實(shí)踐方式,并通過(guò)sarama實(shí)現(xiàn)他們。
    的頭像 發(fā)表于 11-30 11:18 ?535次閱讀