一、簡(jiǎn)介
1.14 新版本原本規(guī)劃有 35 個(gè)比較重要的新特性以及優(yōu)化工作,目前已經(jīng)有 26 個(gè)工作完成;5 個(gè)任務(wù)不確定是否能準(zhǔn)時(shí)完成;另外 4 個(gè)特性由于時(shí)間或者本身設(shè)計(jì)上的原因,會(huì)放到后續(xù)版本完成。[1]
1.14 相對(duì)于歷屆版本來(lái)說(shuō),囊括的優(yōu)化和新增功能點(diǎn)其實(shí)并不算多。通過(guò)觀(guān)察發(fā)版的節(jié)奏可以發(fā)現(xiàn),通常在 1-2 個(gè)大版本后都會(huì)發(fā)布一個(gè)變化稍微少一點(diǎn)的版本,主要目的是把一些特性穩(wěn)定下來(lái)。
1.14 版本就是這樣一個(gè)定位,我們稱(chēng)之為質(zhì)量改進(jìn)和維護(hù)的版本。這個(gè)版本預(yù)計(jì) 8 月 16 日停止新特性開(kāi)發(fā),可能在 9 月份能夠和大家正式見(jiàn)面,有興趣可以關(guān)注以下鏈接去跟蹤功能發(fā)布進(jìn)度。
Wiki:https://cwiki.apache.org/confluence/display/FLINK/1.14+Release
Jira:https://issues.apache.org/jira/projects/FLINK/versions/12349614
[1] 截至到 8 月 31 日,確定進(jìn)入新版本的是 33 個(gè),已全部完成。
二、流批一體
流批一體其實(shí)從 Flink 1.9 版本開(kāi)始就受到持續(xù)的關(guān)注,它作為社區(qū) RoadMap 的重要組成部分,是大數(shù)據(jù)實(shí)時(shí)化必然的趨勢(shì)。但是另一方面,傳統(tǒng)離線(xiàn)的計(jì)算需求其實(shí)并不會(huì)被實(shí)時(shí)任務(wù)完全取代,而是會(huì)長(zhǎng)期存在。
在實(shí)時(shí)和離線(xiàn)的需求同時(shí)存在的狀態(tài)下,以往的流批獨(dú)立技術(shù)方案存在著一些痛點(diǎn),比如:
需要維護(hù)兩套系統(tǒng),相應(yīng)的就需要兩組開(kāi)發(fā)人員,人力的投入成本很高;
另外,兩套數(shù)據(jù)鏈路處理相似內(nèi)容帶來(lái)維護(hù)的風(fēng)險(xiǎn)性和冗余;
最重要的一點(diǎn)是,如果流批使用的不是同一套數(shù)據(jù)處理系統(tǒng),引擎本身差異可能會(huì)存在數(shù)據(jù)口徑不一致的問(wèn)題,從而導(dǎo)致業(yè)務(wù)數(shù)據(jù)存在一定的誤差。這種誤差對(duì)于大數(shù)據(jù)分析會(huì)有比較大的影響。
在這樣的背景下,F(xiàn)link 社區(qū)認(rèn)定了實(shí)時(shí)離線(xiàn)一體化的技術(shù)路線(xiàn)是比較重要的技術(shù)趨勢(shì)和方向。
Flink 在過(guò)去的幾個(gè)版本中,在流批一體方面做了很多的工作??梢哉J(rèn)為 Flink 在引擎層面,API 層面和算子的執(zhí)行層面上做到了真正的流與批用同一套機(jī)制運(yùn)行。但是在任務(wù)具體的執(zhí)行模式上會(huì)有 2 種不同的模式:
下圖是不同的執(zhí)行模式:
對(duì)于無(wú)限的數(shù)據(jù)流,統(tǒng)一采用了流的執(zhí)行模式。流的執(zhí)行模式指的是所有計(jì)算節(jié)點(diǎn)是通過(guò) Pipeline 模式去連接的,Pipeline 是指上游和下游計(jì)算任務(wù)是同時(shí)運(yùn)行的,隨著上游不斷產(chǎn)出數(shù)據(jù),下游同時(shí)在不斷消費(fèi)數(shù)據(jù)。這種全 Pipeline 的執(zhí)行方式可以:
通過(guò) eventTime 表示數(shù)據(jù)是什么時(shí)候產(chǎn)生的;
通過(guò) watermark 得知在哪個(gè)時(shí)間點(diǎn),數(shù)據(jù)已經(jīng)到達(dá)了;
通過(guò) state 來(lái)維護(hù)計(jì)算中間狀態(tài);
通過(guò) Checkpoint 做容錯(cuò)的處理。
這兩種各有優(yōu)劣,可以根據(jù)作業(yè)的具體場(chǎng)景來(lái)進(jìn)行選擇。
對(duì)于有限的數(shù)據(jù)集有 2 種執(zhí)行模式,我們可以把它看成一個(gè)有限的數(shù)據(jù)流去做處理,也可以把它看成批的執(zhí)行模式。批的執(zhí)行模式雖然也有 eventTime,但是對(duì)于 watermark 來(lái)說(shuō)只支持正無(wú)窮。對(duì)數(shù)據(jù)和 state 排序后,它在任務(wù)的調(diào)度和 shuffle 上會(huì)有更多的選擇。
流批的執(zhí)行模式是有區(qū)別的,最主要的就是批的執(zhí)行模式會(huì)有落盤(pán)的中間過(guò)程,只有當(dāng)前面任務(wù)執(zhí)行完成,下游的任務(wù)才會(huì)觸發(fā),這個(gè)容錯(cuò)機(jī)制是通過(guò) shuffle 進(jìn)行容錯(cuò)的。
這 2 者也各有各的執(zhí)行優(yōu)勢(shì):
對(duì)于流的執(zhí)行模式來(lái)說(shuō),它沒(méi)有落盤(pán)的壓力,同時(shí)容錯(cuò)是基于數(shù)據(jù)的分段,通過(guò)不斷對(duì)數(shù)據(jù)進(jìn)行打點(diǎn) Checkpoint 去保證斷點(diǎn)恢復(fù);
然而在批處理上,因?yàn)橐?jīng)過(guò) shuffle 落盤(pán),所以對(duì)磁盤(pán)會(huì)有壓力。但是因?yàn)閿?shù)據(jù)是經(jīng)過(guò)排序的,所以對(duì)于批來(lái)說(shuō),后續(xù)的計(jì)算效率可能會(huì)有一定的提升。同時(shí),在執(zhí)行時(shí)候是經(jīng)過(guò)分段去執(zhí)行任務(wù)的,無(wú)需同時(shí)執(zhí)行;在容錯(cuò)計(jì)算方面是根據(jù) stage 進(jìn)行容錯(cuò)。
Flink 1.14 的優(yōu)化點(diǎn)主要是針對(duì)在流的執(zhí)行模式下,如何去處理有限數(shù)據(jù)集。之前處理無(wú)限數(shù)據(jù)集,和現(xiàn)在處理有限數(shù)據(jù)集最大的區(qū)別在于引入了 “任務(wù)可能會(huì)結(jié)束” 的概念。這種情況下帶來(lái)了一些新的問(wèn)題,如下圖:
■ 在流的執(zhí)行模式下的 Checkpoint 機(jī)制
對(duì)于無(wú)限流,它的 Checkpoint 是由所有的 source 節(jié)點(diǎn)進(jìn)行觸發(fā)的,由 source 節(jié)點(diǎn)發(fā)送 Checkpoint Barrier ,當(dāng) Checkpoint Barrier 流過(guò)整個(gè)作業(yè)時(shí)候,同時(shí)會(huì)存儲(chǔ)當(dāng)前作業(yè)所有的 state 狀態(tài)。
而在有限流的 Checkpoint 機(jī)制中,Task 是有可能提早結(jié)束的。上游的 Task 有可能先處理完任務(wù)提早退出了,但下游的 Task 卻還在執(zhí)行中。在同一個(gè) stage 不同并發(fā)下,有可能因?yàn)閿?shù)據(jù)量不一致導(dǎo)致部分任務(wù)提早完成了。這種情況下,在后續(xù)的執(zhí)行作業(yè)中,如何進(jìn)行 Checkpoint?
在 1.14 中,JobManager 動(dòng)態(tài)根據(jù)當(dāng)前任務(wù)的執(zhí)行情況,去明確 Checkpoint Barrier 是從哪里開(kāi)始觸發(fā)。同時(shí)在部分任務(wù)結(jié)束后,后續(xù)的 Checkpoint 只會(huì)保存仍在運(yùn)行 Task 所對(duì)應(yīng)的 stage,通過(guò)這種方式能夠讓任務(wù)執(zhí)行完成后,還可以繼續(xù)做 Checkpoint ,在有限流執(zhí)行中提供更好的容錯(cuò)保障。
■ Task 結(jié)束后的兩階段提交
我們?cè)诓糠?Sink 使用上,例如下圖的 Kafka Sink 上,涉及到 Task 需要依靠 Checkpoint 機(jī)制,進(jìn)行二階段提交,從而保證數(shù)據(jù)的 Exactly-once 一致性。
具體可以這樣說(shuō):在 Checkpoint 過(guò)程中,每個(gè)算子只會(huì)進(jìn)行準(zhǔn)備提交的操作。比如數(shù)據(jù)會(huì)提交到外部的臨時(shí)存儲(chǔ)目錄下,所有任務(wù)都完成這次 Checkpoint 后會(huì)收到一個(gè)信號(hào),之后才會(huì)執(zhí)行正式的 commit,把所有分布式的臨時(shí)文件一次性以事務(wù)的方式提交到外部系統(tǒng)。
這種算法在當(dāng)前有限流的情況下,作業(yè)結(jié)束后并不能保證有 Checkpoint,那么最后一部分?jǐn)?shù)據(jù)如何提交?
在 1.14 中,這個(gè)問(wèn)題得到了解決。Task 處理完所有數(shù)據(jù)之后,必須等待 Checkpoint 完成后才可以正式的退出,這是流批一體方面針對(duì)有限流任務(wù)結(jié)束的一些改進(jìn)。
三、checkpoint 機(jī)制
1. 現(xiàn)有 Checkpoint 機(jī)制痛點(diǎn)
目前 Flink 觸發(fā) Checkpoint 是依靠 barrier 在算子間進(jìn)行流通,barrier 隨著算子一直往下游進(jìn)行發(fā)送,當(dāng)算子下游遇到 barrier 的時(shí)候就會(huì)進(jìn)行快照操作,然后再把 barrier 往下游繼續(xù)發(fā)送。對(duì)于多路的情況我們會(huì)把 barrier 進(jìn)行對(duì)齊,把先到 barrier 的這一路數(shù)據(jù)暫時(shí)性的 block,等到兩路 barrier 都到了之后再做快照,最后才會(huì)去繼續(xù)往下發(fā)送 barrier。
現(xiàn)有的 Checkpoint 機(jī)制存在以下問(wèn)題:
反壓時(shí)無(wú)法做出 Checkpoint :在反壓時(shí)候 barrier 無(wú)法隨著數(shù)據(jù)往下游流動(dòng),造成反壓的時(shí)候無(wú)法做出 Checkpoint。但是其實(shí)在發(fā)生反壓情況的時(shí)候,我們更加需要去做出對(duì)數(shù)據(jù)的 Checkpoint,因?yàn)檫@個(gè)時(shí)候性能遇到了瓶頸,是更加容易出問(wèn)題的階段;
Barrier 對(duì)齊阻塞數(shù)據(jù)處理 :阻塞對(duì)齊對(duì)于性能上存在一定的影響;
恢復(fù)性能受限于 Checkpoint 間隔 :在做恢復(fù)的時(shí)候,延遲受到多大的影響很多時(shí)候是取決于 Checkpoint 的間隔,間隔越大,需要 replay 的數(shù)據(jù)就會(huì)越多,從而造成中斷的影響也就會(huì)越大。但是目前 Checkpoint 間隔受制于持久化操作的時(shí)間,所以沒(méi)辦法做的很快。
2. Unaligned Checkpoint
針對(duì)這些痛點(diǎn),F(xiàn)link 在最近幾個(gè)版本一直在持續(xù)的優(yōu)化,Unaligned Checkpoint 就是其中一個(gè)機(jī)制。barrier 算子在到達(dá) input buffer 最前面的時(shí)候,就會(huì)開(kāi)始觸發(fā) Checkpoint 操作。它會(huì)立刻把 barrier 傳到算子的 OutPut Buffer 的最前面,相當(dāng)于它會(huì)立刻被下游的算子所讀取到。通過(guò)這種方式可以使得 barrier 不受到數(shù)據(jù)阻塞,解決反壓時(shí)候無(wú)法進(jìn)行 Checkpoint 的問(wèn)題。
當(dāng)我們把 barrier 發(fā)下去后,需要做一個(gè)短暫的暫停,暫停的時(shí)候會(huì)把算子的 State 和 input output buffer 中的數(shù)據(jù)進(jìn)行一個(gè)標(biāo)記,以方便后續(xù)隨時(shí)準(zhǔn)備上傳。對(duì)于多路情況會(huì)一直等到另外一路 barrier 到達(dá)之前數(shù)據(jù),全部進(jìn)行標(biāo)注。
通過(guò)這種方式整個(gè)在做 Checkpoint 的時(shí)候,也不需要對(duì) barrier 進(jìn)行對(duì)齊,唯一需要做的停頓就是在整個(gè)過(guò)程中對(duì)所有 buffer 和 state 標(biāo)注。這種方式可以很好的解決反壓時(shí)無(wú)法做出 Checkpoint ,和 Barrier 對(duì)齊阻塞數(shù)據(jù)影響性能處理的問(wèn)題。
3. Generalized Incremental Checkpoint [2]
Generalized Incremental Checkpoint 主要是用于減少 Checkpoint 間隔,如左圖 1 所示,在 Incremental Checkpoint 當(dāng)中,先讓算子寫(xiě)入 state 的 changelog。寫(xiě)完后才把變化真正的數(shù)據(jù)寫(xiě)入到 StateTable 上。state 的 changelog 不斷向外部進(jìn)行持久的存儲(chǔ)化。在這個(gè)過(guò)程中我們其實(shí)無(wú)需等待整個(gè) StateTable 去做一個(gè)持久化操作,我們只需要保證對(duì)應(yīng)的 Checkpoint 這一部分的 changelog 能夠持久化完成,就可以開(kāi)始做下一次 Checkpoint。StateTable 是以一個(gè)周期性的方式,獨(dú)立的去對(duì)外做持續(xù)化的一個(gè)過(guò)程。
這兩個(gè)過(guò)程進(jìn)行拆分后,就有了從之前的需要做全量持久化 (Per Checkpoint) 變成 增量持久化 (Per Checkpoint) + 后臺(tái)周期性全量持久化,從而達(dá)到同樣容錯(cuò)的效果。在這個(gè)過(guò)程中,每一次 Checkpoint 需要做持久化的數(shù)據(jù)量減少了,從而使得做 Checkpoint 的間隔能夠大幅度減少。
其實(shí)在 RocksDB 也是能支持 Incremental Checkpoint 。但是有兩個(gè)問(wèn)題:
第一個(gè)問(wèn)題是 RocksDB 的 Incremental Checkpoint 是依賴(lài)它自己本身的一些實(shí)現(xiàn),當(dāng)中會(huì)存在一些數(shù)據(jù)壓縮,壓縮所消耗的時(shí)間以及壓縮效果具有不確定性,這個(gè)是和數(shù)據(jù)是相關(guān)的;
第二個(gè)問(wèn)題是只能針對(duì)特定的 StateBackend 來(lái)使用,目前在做的 Generalized Incremental Checkpoint 實(shí)際上能夠保證的是,它與 StateBackend 是無(wú)關(guān)的,從運(yùn)行時(shí)的機(jī)制來(lái)保證了一個(gè)比較穩(wěn)定、更小的 Checkpoint 間隔。
Unaligned Checkpoint 在 Flink 1.13 就已經(jīng)發(fā)布了,在 1.14 版本主要是針對(duì) bug 的修復(fù)和補(bǔ)充,針對(duì) Generalized Incremental Checkpoint,目前社區(qū)還在做最后的沖刺,比較有希望在 1.14 中和大家見(jiàn)面。[2]
[2] Generalized Incremental Checkpoint 最終在 1.14 中沒(méi)有完成。
四、性能與效率
1. 大規(guī)模作業(yè)調(diào)度的優(yōu)化
構(gòu)建 Pipeline Region 的性能提升:所有由 pipline 邊所連接構(gòu)成的子圖 。在 Flink 任務(wù)調(diào)度中需要通過(guò)識(shí)別 Pipeline Region 來(lái)保證由同一個(gè) Pipline 邊所連接的任務(wù)能夠同時(shí)進(jìn)行調(diào)度。否則有可能上游的任務(wù)開(kāi)始調(diào)度,但是下游的任務(wù)并沒(méi)有運(yùn)行。從而導(dǎo)致上游運(yùn)行完的數(shù)據(jù)無(wú)法給下游的節(jié)點(diǎn)進(jìn)行消費(fèi),可能會(huì)造成死鎖的情況
任務(wù)部署階段:每個(gè)任務(wù)都要從哪些上游讀取數(shù)據(jù),這些信息會(huì)生成 Result Partition Deployment Descriptor。
這兩個(gè)構(gòu)建過(guò)程在之前的版本都有 O (n^2) 的時(shí)間復(fù)雜度,主要問(wèn)題需要對(duì)于每個(gè)下游節(jié)點(diǎn)去遍歷每一個(gè)上游節(jié)點(diǎn)的情況。例如去遍歷每一個(gè)上游是不是一個(gè) Pipeline 邊連接的關(guān)系,或者去遍歷它的每一個(gè)上游生成對(duì)應(yīng)的 Result Partition 信息。
目前通過(guò)引入 group 概念,假設(shè)已知上下游 2 個(gè)任務(wù)的連接方式是 all-to-all,那相當(dāng)于把所有 Pipeline Region 信息或者 Result Partition 信息以 Group 的形式進(jìn)行組合,這樣只需知道下游對(duì)應(yīng)的是上游的哪一個(gè) group,就可以把一個(gè) O (n^2) 的復(fù)雜度優(yōu)化到了 O (n)。我們用 wordcount 任務(wù)做了一下測(cè)試,對(duì)比優(yōu)化前后的性能:
從表格中可以看到構(gòu)建速度具有大幅度提升,構(gòu)建 Pipeline Region 的性能從秒級(jí)提升至毫秒級(jí)別。任務(wù)部署我們是從第一個(gè)任務(wù)開(kāi)始部署到所有任務(wù)開(kāi)始運(yùn)行的狀態(tài),這邊只統(tǒng)計(jì)了流,因?yàn)榕枰嫌谓Y(jié)束后才能結(jié)束調(diào)度。從整體時(shí)間來(lái)看,整個(gè)任務(wù)初始化,調(diào)度以及部署的階段,大概能夠減少分鐘級(jí)的時(shí)間消耗。
2. 細(xì)粒度資源管理
細(xì)粒度資源管理在過(guò)去很多的版本都一直在做,在 Flink1.14 終于可以把這一部分 API 開(kāi)放出來(lái)在 DataSteam 提供給用戶(hù)使用了。用戶(hù)可以在 DataStream 中自定義 SlotSharingGroup 的劃分情況,如下圖所示的方式去定義 Slot 的資源劃分,實(shí)現(xiàn)了支持 DataStream API,自定義 SSG 劃分方式以及資源配置 TaskManager 動(dòng)態(tài)資源扣減。
對(duì)于每一個(gè) Slot 可以通過(guò)比較細(xì)粒度的配置,我們?cè)?Runtime 上會(huì)自動(dòng)根據(jù)用戶(hù)資源配置進(jìn)行動(dòng)態(tài)的資源切割。
這樣做的好處是不會(huì)像之前那樣有固定資源的 Slot,而是做資源的動(dòng)態(tài)扣減,通過(guò)這樣的方式希望能夠達(dá)到更加精細(xì)的資源管理和資源的使用率。
五、Table / SQL / Python API
1. Table API / SQL
Window Table-Valued Function 支持更多算子與窗口類(lèi)型 ,可以看如下表格的對(duì)比:
從表格中可以看出對(duì)于原有的三個(gè)窗口類(lèi)型進(jìn)行加強(qiáng),同時(shí)新增 Session 窗口類(lèi)型,目前支持 Aggregate 的操作。
■ 1.1 支持聲明式注冊(cè) Source/Sink
Table API 支持使用聲明式的方式注冊(cè) Source / Sink 功能對(duì)齊 SQL DDL;
同時(shí)支持 FLIP-27 新的 Source 接口;
new Source 替代舊的 connect() 接口。
■ 1.2 全新代碼生成器
解決了大家在生成代碼超過(guò) Java 最長(zhǎng)代碼限制,新的代碼生成器會(huì)對(duì)代碼進(jìn)行拆解,徹底解決代碼超長(zhǎng)的問(wèn)題。
■ 1.3 移除 Flink Planner
新版本中,Blink Planner 將成為 Flink Planner 的唯一實(shí)現(xiàn)。
2. Python API
在之前的版本中,如果有先后執(zhí)行的兩個(gè) UDF,它的執(zhí)行過(guò)程如下圖左方。在 JVM 上面有 Java 的 Operator,先把數(shù)據(jù)發(fā)給 Python 下面的 UDF 去執(zhí)行,執(zhí)行后又發(fā)回給 Java,然后傳送給下游的 Operator,最后再進(jìn)行一次 Python 的這種跨進(jìn)程的傳輸去處理,會(huì)導(dǎo)致存在很多次冗余的數(shù)據(jù)傳輸。
在 1.14 版本中,改進(jìn)如右圖,可以把它們連接在一起,只需要一個(gè)來(lái)回的 Java 和 Python 進(jìn)行數(shù)據(jù)通信,通過(guò)減少傳輸數(shù)據(jù)次數(shù)就能夠達(dá)到比較好的性能上的提升。
3. 支持 LoopBack 模式
在以往本地執(zhí)行實(shí)際是在 Python 的進(jìn)程中去運(yùn)行客戶(hù)端程序,提交 Java 進(jìn)程啟動(dòng)一個(gè)迷你集群去執(zhí)行 Java 部分代碼。Java 部分代碼也會(huì)和生產(chǎn)環(huán)境部分的一樣,去啟動(dòng)一個(gè)新的 Python 進(jìn)程去執(zhí)行對(duì)應(yīng)的 Python UDF,從圖下可以看出新的進(jìn)程其實(shí)在本地調(diào)試中是沒(méi)有必要存在的。
所以支持 lookback 模式后可以讓 Java 的 opt 直接把 UDF 運(yùn)行在之前 Python client 所運(yùn)行的相同的進(jìn)程內(nèi),通過(guò)這種方式:
首先是避免了啟動(dòng)額外進(jìn)程所帶來(lái)的開(kāi)銷(xiāo);
最重要的是在本地調(diào)試中,我們可以在同一個(gè)進(jìn)程內(nèi)能夠更好利用一些工具進(jìn)行 debug,這個(gè)是對(duì)開(kāi)發(fā)者體驗(yàn)上的一個(gè)提升。
六、總結(jié)
本文主要講解了 Flink1.14 的主要新特性介紹:
首先介紹了目前社區(qū)在批流一體上的工作,通過(guò)介紹批流不同的執(zhí)行模式和 JM 節(jié)點(diǎn)任務(wù)觸發(fā)的優(yōu)化改進(jìn)更好的去兼容批作業(yè);
然后通過(guò)分析現(xiàn)有的 Checkpoint 機(jī)制痛點(diǎn),在新版本中如何改進(jìn),以及在大規(guī)模作業(yè)調(diào)度優(yōu)化和細(xì)粒度的資源管理上面如何做到對(duì)性能優(yōu)化;
最后介紹了 TableSQL API 和 Pyhton上相關(guān)的性能優(yōu)化。
責(zé)任編輯:haq
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
6837瀏覽量
88754 -
Apache
+關(guān)注
關(guān)注
0文章
64瀏覽量
12439
原文標(biāo)題:Apache Flink 1.14 新特性介紹
文章出處:【微信號(hào):DBDevs,微信公眾號(hào):數(shù)據(jù)分析與開(kāi)發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論