0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

這么多技術(shù)框架,為什么選debezium?

jf_ro2CN3Fa ? 來源:稀土掘金 ? 2023-08-30 16:40 ? 次閱讀

在一些小型項(xiàng)目當(dāng)中,沒有引入消息中間件,也不想引入,但有一些業(yè)務(wù)邏輯想要解耦異步,那怎么辦呢?

我們的web項(xiàng)目,單獨(dú)內(nèi)網(wǎng)部署,由于大數(shù)據(jù)背景,公司消息中間件統(tǒng)一使用的kafka,在一些小項(xiàng)目上kafka就顯得很笨重。 引入rocketmq或rabittmq也沒必要。 事件或多線程也不適合。

具體一點(diǎn)的,之前對接的一個(gè)系統(tǒng),一張記錄表有10+以上的類型狀態(tài),新的需求是,針對每種狀態(tài)做出對應(yīng)的不同的操作。 之前寫入這張記錄表的時(shí)候,方式也是五花八門,有的是單條記錄寫入,有的是批量寫入,有的調(diào)用了統(tǒng)一的service,有的呢直接調(diào)用了DAO層mapper直接寫入。

所以想找到一個(gè)統(tǒng)一入口進(jìn)行切入處理,就不行了。

這個(gè)時(shí)候就算引入消息隊(duì)列,也需要在不同的業(yè)務(wù)方法里進(jìn)行寫入消息的操作。業(yè)務(wù)方也不太愿意配合改。

可以使用觸發(fā)器,但它是屬于上個(gè)時(shí)代的產(chǎn)物,槽點(diǎn)太多。(這里并不是完全不主張使用觸發(fā)器,技術(shù)永遠(yuǎn)是為業(yè)務(wù)服務(wù)的,只要評估覺得可行,就可以使用)那么這個(gè)時(shí)候,CDC技術(shù)就可以粉墨登場了。

CDC(change data capture)數(shù)據(jù)更改捕獲。常見的數(shù)據(jù)更改捕獲都是通過數(shù)據(jù)庫比如mysql的binlog來達(dá)到目的。

我們可以監(jiān)控mysql binlog日志,當(dāng)寫入一條數(shù)據(jù)的時(shí)候,接收到數(shù)據(jù)變更日志,做出相應(yīng)的操作。

這樣的好處是,只需導(dǎo)入依賴,不額外引入組件,同時(shí)無需改動之前的代碼。 兩邊完全解耦,互不干擾。

常見的CDC框架,比如,canal (非Camel)

canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi) 早期阿里巴巴因?yàn)楹贾莺兔绹p機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。 從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)。

它是基于日志增量訂閱和消費(fèi)的業(yè)務(wù),包括

數(shù)據(jù)庫鏡像 數(shù)據(jù)庫實(shí)時(shí)備份 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等) 業(yè)務(wù) cache 刷新 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

c88b4a38-3e3c-11ee-ac96-dac502259ad0.jpg

它的原理

canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議

MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal );關(guān)注工眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊!

canal 解析 binary log 對象(原始為 byte 流)

再比如,debezium(音同 dbzm 滴BZ姆)很多人可能不太了解. 包括databus,maxwell,flink cdc(大數(shù)據(jù)領(lǐng)域)等等,它們同屬CDC捕獲數(shù)據(jù)更改(change data capture)類的技術(shù)。

c8b79c1e-3e3c-11ee-ac96-dac502259ad0.jpg

為什么是debezium

這么多技術(shù)框架,為什么選debezium?

看起來很多。但一一排除下來就debezium和canal。

sqoop,kettle,datax之類的工具,屬于前大數(shù)據(jù)時(shí)代的產(chǎn)物,地位類似于web領(lǐng)域的structs2。而且,它們基于查詢而非binlog日志,其實(shí)不屬于CDC。首先排除。

flink cdc是大數(shù)據(jù)領(lǐng)域的框架,一般web項(xiàng)目的數(shù)據(jù)量屬于大材小用了。

同時(shí)databus,maxwell相對比較冷門,用得比較少。

最后不用canal的原因有以下幾點(diǎn)。

canal需要安裝,這違背了“如非必要,勿增實(shí)體”的原則。

canal只能對MYSQL進(jìn)行CDC監(jiān)控。有很大的局限性。

大數(shù)據(jù)領(lǐng)域非常流行的flink cdc(阿里團(tuán)隊(duì)主導(dǎo))底層使用的也是debezium,而非同是阿里出品的canal。

debezium可借助kafka組件,將變動的數(shù)據(jù)發(fā)到kafka topic,后續(xù)的讀取操作只需讀取kafka,可有效減少數(shù)據(jù)庫的讀取壓力??杀WC一次語義,至少一次語義。 同時(shí),也可基于內(nèi)嵌部署模式,無需我們手動部署kafka集群,可滿足”如非必要,勿增實(shí)體“的原則。

c8eacf4e-3e3c-11ee-ac96-dac502259ad0.jpg

Debezium是一個(gè)捕獲數(shù)據(jù)更改(CDC)平臺,并且利用Kafka和Kafka Connect實(shí)現(xiàn)了自己的持久性、可靠性和容錯(cuò)性。 每一個(gè)部署在Kafka Connect分布式的、可擴(kuò)展的、容錯(cuò)性的服務(wù)中的connector監(jiān)控一個(gè)上游數(shù)據(jù)庫服務(wù)器,捕獲所有的數(shù)據(jù)庫更改, 然后記錄到一個(gè)或者多個(gè)Kafka topic(通常一個(gè)數(shù)據(jù)庫表對應(yīng)一個(gè)kafka topic)。

Kafka確保所有這些數(shù)據(jù)更改事件都能夠多副本并且總體上有序(Kafka只能保證一個(gè)topic的單個(gè)分區(qū)內(nèi)有序),這樣, 更多的客戶端可以獨(dú)立消費(fèi)同樣的數(shù)據(jù)更改事件而對上游數(shù)據(jù)庫系統(tǒng)造成的影響降到很小(如果N個(gè)應(yīng)用都直接去監(jiān)控?cái)?shù)據(jù)庫更改,對數(shù)據(jù)庫的壓力為N, 而用debezium匯報(bào)數(shù)據(jù)庫更改事件到kafka,所有的應(yīng)用都去消費(fèi)kafka中的消息,可以把對數(shù)據(jù)庫的壓力降到1)。

另外,客戶端可以隨時(shí)停止消費(fèi),然后重啟, 從上次停止消費(fèi)的地方接著消費(fèi)。每個(gè)客戶端可以自行決定他們是否需要exactly-once或者at-least-once消息交付語義保證, 并且所有的數(shù)據(jù)庫或者表的更改事件是按照上游數(shù)據(jù)庫發(fā)生的順序被交付的。

c90670aa-3e3c-11ee-ac96-dac502259ad0.jpg

對于不需要或者不想要這種容錯(cuò)級別、性能、可擴(kuò)展性、可靠性的應(yīng)用,他們可以使用內(nèi)嵌的Debezium connector引擎來直接在應(yīng)用內(nèi)部運(yùn)行connector。 這種應(yīng)用仍需要消費(fèi)數(shù)據(jù)庫更改事件,但更希望connector直接傳遞給它,而不是持久化到Kafka里。

簡介

Debezium是一個(gè)開源項(xiàng)目,為捕獲數(shù)據(jù)更改(change data capture,CDC)提供了一個(gè)低延遲的流式處理平臺。你可以安裝并且配置Debezium去監(jiān)控你的數(shù)據(jù)庫,然后你的應(yīng)用就可以消費(fèi)對數(shù)據(jù)庫的每一個(gè)行級別(row-level)的更改。只有已提交的更改才是可見的,所以你的應(yīng)用不用擔(dān)心事務(wù)(transaction)或者更改被回滾(roll back)。Debezium為所有的數(shù)據(jù)庫更改事件提供了一個(gè)統(tǒng)一的模型,所以你的應(yīng)用不用擔(dān)心每一種數(shù)據(jù)庫管理系統(tǒng)的錯(cuò)綜復(fù)雜性。另外,由于Debezium用持久化的、有副本備份的日志來記錄數(shù)據(jù)庫數(shù)據(jù)變化的歷史,因此,你的應(yīng)用可以隨時(shí)停止再重啟,而不會錯(cuò)過它停止運(yùn)行時(shí)發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉。

監(jiān)控?cái)?shù)據(jù)庫,并且在數(shù)據(jù)變動的時(shí)候獲得通知一直是很復(fù)雜的事情。關(guān)系型數(shù)據(jù)庫的觸發(fā)器可以做到,但是只對特定的數(shù)據(jù)庫有效,而且通常只能更新數(shù)據(jù)庫內(nèi)的狀態(tài)(無法和外部的進(jìn)程通信)。一些數(shù)據(jù)庫提供了監(jiān)控?cái)?shù)據(jù)變動的API或者框架,但是沒有一個(gè)標(biāo)準(zhǔn),每種數(shù)據(jù)庫的實(shí)現(xiàn)方式都是不同的,并且需要大量特定的知識和理解特定的代碼才能運(yùn)用。確保以相同的順序查看和處理所有更改,同時(shí)最小化影響數(shù)據(jù)庫仍然非常具有挑戰(zhàn)性。

Debezium提供了模塊為你做這些復(fù)雜的工作。一些模塊是通用的,并且能夠適用多種數(shù)據(jù)庫管理系統(tǒng),但在功能和性能方面仍有一些限制。另一些模塊是為特定的數(shù)據(jù)庫管理系統(tǒng)定制的,所以他們通??梢愿嗟乩脭?shù)據(jù)庫系統(tǒng)本身的特性來提供更多功能。

github官網(wǎng)上羅列的一些典型應(yīng)用場景

緩存失效(Cache invalidation) 經(jīng)典問題 Redis與MySQL雙寫一致性如何保證?Debezium利用kafka單分區(qū)的有序性(忽略mysql binlog本身可能的延遲和亂序),可完全解決此問題。 在緩存中緩存的條目(entry)在源頭被更改或者被刪除的時(shí)候立即讓緩存中的條目失效。 如果緩存在一個(gè)獨(dú)立的進(jìn)程中運(yùn)行(例如Redis,Memcache,Infinispan或者其他的),那么簡單的緩存失效邏輯可以放在獨(dú)立的進(jìn)程或服務(wù)中, 從而簡化主應(yīng)用的邏輯。在一些場景中,緩存失效邏輯可以更復(fù)雜一點(diǎn),讓它利用更改事件中的更新數(shù)據(jù)去更新緩存中受影響的條目。

簡化單體應(yīng)用(Simplifying monolithic applications) 許多應(yīng)用更新數(shù)據(jù)庫,然后在數(shù)據(jù)庫中的更改被提交后,做一些額外的工作:更新搜索索引,更新緩存,發(fā)送通知,運(yùn)行業(yè)務(wù)邏輯,等等。 這種情況通常稱為雙寫(dual-writes),因?yàn)閼?yīng)用沒有在一個(gè)事務(wù)內(nèi)寫多個(gè)系統(tǒng)。這樣不僅應(yīng)用邏輯復(fù)雜難以維護(hù), 而且雙寫容易丟失數(shù)據(jù)或者在一些系統(tǒng)更新成功而另一些系統(tǒng)沒有更新成功的時(shí)候造成不同系統(tǒng)之間的狀態(tài)不一致。使用捕獲更改數(shù)據(jù)技術(shù)(change data capture,CDC), 在源數(shù)據(jù)庫的數(shù)據(jù)更改提交后,這些額外的工作可以被放在獨(dú)立的線程或者進(jìn)程(服務(wù))中完成。這種實(shí)現(xiàn)方式的容錯(cuò)性更好,不會丟失事件,容易擴(kuò)展,并且更容易支持升級。

共享數(shù)據(jù)庫(Sharing databases) 當(dāng)多個(gè)應(yīng)用共用同一個(gè)數(shù)據(jù)庫的時(shí)候,一個(gè)應(yīng)用提交的更改通常要被另一個(gè)應(yīng)用感知到。一種實(shí)現(xiàn)方式是使用消息總線, 盡管非事務(wù)性(non-transactional)的消息總線總會受上面提到的雙寫(dual-writes)影響。但是,另一種實(shí)現(xiàn)方式,即Debezium,變得很直接:每個(gè)應(yīng)用可以直接監(jiān)控?cái)?shù)據(jù)庫的更改,并且響應(yīng)更改。

數(shù)據(jù)集成(Data integration) 數(shù)據(jù)通常被存儲在多個(gè)地方,尤其是當(dāng)數(shù)據(jù)被用于不同的目的的時(shí)候,會有不同的形式。保持多系統(tǒng)的同步是很有挑戰(zhàn)性的, 但是可以通過使用Debezium加上簡單的事件處理邏輯來實(shí)現(xiàn)簡單的ETL類型的解決方案。

命令查詢職責(zé)分離(CQRS) 在命令查詢職責(zé)分離 Command Query Responsibility Separation (CQRS) 架構(gòu)模式中,更新數(shù)據(jù)使用了一種數(shù)據(jù)模型, 讀數(shù)據(jù)使用了一種或者多種數(shù)據(jù)模型。由于數(shù)據(jù)更改被記錄在更新側(cè)(update-side),這些更改將被處理以更新各種讀展示。 所以CQRS應(yīng)用通常更復(fù)雜,尤其是他們需要保證可靠性和全序(totally-ordered)處理。Debezium和CDC可以使這種方式更可行: 寫操作被正常記錄,但是Debezium捕獲數(shù)據(jù)更改,并且持久化到全序流里,然后供那些需要異步更新只讀視圖的服務(wù)消費(fèi)。 寫側(cè)(write-side)表可以表示面向領(lǐng)域的實(shí)體(domain-oriented entities),或者當(dāng)CQRS和 Event Sourcing 結(jié)合的時(shí)候,寫側(cè)表僅僅用做追加操作命令事件的日志。

springboot 整合 Debezium

依賴

1.7.0.Final
8.0.26


mysql
mysql-connector-java
${mysql.connector.version}
runtime


io.debezium
debezium-api
${debezium.version}


io.debezium
debezium-embedded
${debezium.version}


io.debezium
debezium-connector-mysql
${debezium.version}


mysql
mysql-connector-java



注意debezium版本為1.7.0.Final,對應(yīng)mysql驅(qū)動為8.0.26,低于這個(gè)版本會報(bào)兼容錯(cuò)誤。

配置

相應(yīng)的配置

debezium.datasource.hostname=localhost
debezium.datasource.port=3306
debezium.datasource.user=root
debezium.datasource.password=123456
debezium.datasource.tableWhitelist=test.test
debezium.datasource.storageFile=E:/debezium/test/offsets/offset.dat
debezium.datasource.historyFile=E:/debezium/test/history/custom-file-db-history.dat
debezium.datasource.flushInterval=10000
debezium.datasource.serverId=1
debezium.datasource.serverName=name-1

然后進(jìn)行配置初始化。

主要的配置項(xiàng):

connector.class

監(jiān)控的數(shù)據(jù)庫類型,這里選mysql。

offset.storage

選擇FileOffsetBackingStore時(shí),意思把讀取進(jìn)度存到本地文件,因?yàn)槲覀儾挥胟afka,當(dāng)使用kafka時(shí),選KafkaOffsetBackingStore 。

offset.storage.file.filename

存放讀取進(jìn)度的本地文件地址。

offset.flush.interval.ms

讀取進(jìn)度刷新保存頻率,默認(rèn)1分鐘。如果不依賴kafka的話,應(yīng)該就沒有exactly once只讀取一次語義,應(yīng)該是至少讀取一次。意味著可能重復(fù)讀取。如果web容器掛了,最新的讀取進(jìn)度沒有刷新到文件里,下次重啟時(shí),就會重復(fù)讀取binlog。

table.whitelist

監(jiān)控的表名白名單,建議設(shè)置此值,只監(jiān)控這些表的binlog。

database.whitelist

監(jiān)控的數(shù)據(jù)庫白名單,如果選此值,會忽略table.whitelist,然后監(jiān)控此db下所有表的binlog。

/**
*@className:MysqlConfig
*@author:nyp
*@description:TODO
*@date:2023/8/713:53
*@version:1.0
*/
@Configuration
@ConfigurationProperties(prefix="debezium.datasource")
@Data
publicclassMysqlBinlogConfig{

privateStringhostname;
privateStringport;
privateStringuser;
privateStringpassword;
privateStringtableWhitelist;
privateStringstorageFile;
privateStringhistoryFile;
privateLongflushInterval;
privateStringserverId;
privateStringserverName;

@Bean
publicio.debezium.config.ConfigurationMysqlBinlogConfig()throwsException{
checkFile();
io.debezium.config.Configurationconfiguration=io.debezium.config.Configuration.create()
.with("name","mysql_connector")
.with("connector.class",MySqlConnector.class)
//.with("offset.storage",KafkaOffsetBackingStore.class)
.with("offset.storage",FileOffsetBackingStore.class)
.with("offset.storage.file.filename",storageFile)
.with("offset.flush.interval.ms",flushInterval)
.with("database.history",FileDatabaseHistory.class.getName())
.with("database.history.file.filename",historyFile)
.with("snapshot.mode","Schema_only")
.with("database.server.id",serverId)
.with("database.server.name",serverName)
.with("database.hostname",hostname)
//.with("database.dbname",dbname)
.with("database.port",port)
.with("database.user",user)
.with("database.password",password)
//.with("database.whitelist","test")
.with("table.whitelist",tableWhitelist)
.build();
returnconfiguration;

}

privatevoidcheckFile()throwsIOException{
Stringdir=storageFile.substring(0,storageFile.lastIndexOf("/"));
FiledirFile=newFile(dir);
if(!dirFile.exists()){
dirFile.mkdirs();
}
Filefile=newFile(storageFile);
if(!file.exists()){
file.createNewFile();
}
}
}

snapshot.mode 快照模式,指定連接器啟動時(shí)運(yùn)行快照的條件??赡艿脑O(shè)置有:

initial 只有在沒有為邏輯服務(wù)器名記錄偏移量時(shí),連接器才運(yùn)行快照。

When_needed 當(dāng)連接器認(rèn)為有必要時(shí),它會在啟動時(shí)運(yùn)行快照。也就是說,當(dāng)沒有可用的偏移量時(shí),或者當(dāng)先前記錄的偏移量指定了服務(wù)器中不可用的binlog位置或GTID時(shí)。

Never 連接器從不使用快照。在第一次使用邏輯服務(wù)器名啟動時(shí),連接器從binlog的開頭讀取。謹(jǐn)慎配置此行為。只有當(dāng)binlog保證包含數(shù)據(jù)庫的整個(gè)歷史記錄時(shí),它才有效。

Schema_only 連接器運(yùn)行模式而不是數(shù)據(jù)的快照。當(dāng)您不需要主題包含數(shù)據(jù)的一致快照,而只需要主題包含自連接器啟動以來的更改時(shí),此設(shè)置非常有用。

Schema_only_recovery 這是已經(jīng)捕獲更改的連接器的恢復(fù)設(shè)置。當(dāng)您重新啟動連接器時(shí),此設(shè)置允許恢復(fù)損壞或丟失的數(shù)據(jù)庫歷史主題。您可以定期將其設(shè)置為“清理”意外增長的數(shù)據(jù)庫歷史主題。數(shù)據(jù)庫歷史主題需要無限保留。

database.server.id

偽裝成slave的Debezium服務(wù)的id,自定義,有多個(gè)Debezium服務(wù)不能重復(fù),如果重復(fù)的話會報(bào)以下異常。

io.debezium.DebeziumException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257.Errorcode:1236;SQLSTATE:HY000.
atio.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1167)
atio.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1212)
atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
atcom.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
atcom.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
atjava.lang.Thread.run(Thread.java:750)
Causedby:com.github.shyiko.mysql.binlog.network.ServerException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257.
atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944)
...3commonframesomitted

監(jiān)聽

配置監(jiān)聽服務(wù)

/**
*@projectName:test
*@package:com.test.config
*@className:MysqlBinlogListener
*@author:nyp
*@description:TODO
*@date:2023/8/713:56
*@version:1.0
*/
@Component
@Slf4j
publicclassMysqlBinlogListener{

@Resource
privateExecutortaskExecutor;

privatefinalList>>engineList=newArrayList<>();

privateMysqlBinlogListener(@Qualifier("mysqlConnector")Configurationconfiguration){
this.engineList.add(DebeziumEngine.create(Json.class)
.using(configuration.asProperties())
.notifying(record->receiveChangeEvent(record.value()))
.build());
}

privatevoidreceiveChangeEvent(Stringvalue){
if(Objects.nonNull(value)){
Mappayload=getPayload(value);
Stringop=JSON.parseObject(JSON.toJSONString(payload.get("op")),String.class);
if(!(StringUtils.isBlank(op)||Envelope.Operation.READ.equals(op))){
ChangeDatachangeData=getChangeData(payload);
log.info("changeData="+changeData);
}
}
}

@PostConstruct
privatevoidstart(){
for(DebeziumEngine>engine:engineList){
taskExecutor.execute(engine);
}
}

@PreDestroy
privatevoidstop(){
for(DebeziumEngine>engine:engineList){
if(engine!=null){
try{
engine.close();
}catch(IOExceptione){
log.error("",e);
}
}
}
}


publicstaticMapgetPayload(Stringvalue){
Mapmap=JSON.parseObject(value,Map.class);
Mappayload=JSON.parseObject(JSON.toJSONString(map.get("payload")),Map.class);
returnpayload;
}

publicstaticChangeDatagetChangeData(Mappayload){
Mapsource=JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class);
returnChangeData.builder()
.op(payload.get("op").toString())
.table(source.get("table").toString())
.after(JSON.parseObject(JSON.toJSONString(payload.get("after")),Map.class))
.source(JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class))
.before(JSON.parseObject(JSON.toJSONString(payload.get("before")),Map.class))
.build();
}

@Data
@Builder
publicstaticclassChangeData{
/**
*更改前數(shù)據(jù)
*/
privateMapafter;
privateMapsource;
/**
*更改后數(shù)據(jù)
*/
privateMapbefore;
/**
*更改的表名
*/
privateStringtable;
/**
*操作類型,枚舉Envelope.Operation
*/
privateStringop;
}

}

將監(jiān)聽到的binlog日志封裝為ChangeData對象,包括表名,更改前后的數(shù)據(jù),

以及操作類型

READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d"),
TRUNCATE("t");

測試

update操作輸出

MysqlListener.ChangeData(after={
name=SuzukiMio2,
id=1
},source={
file=binlog.000013,
connector=mysql,
pos=42587833,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691458956000,
snapshot=false,
db=test
table=test
},before={
name=SuzukiMio,
id=1
},table=test,op=u)
data={
name=SuzukiMio2,
id=1
}

新增操作輸出

MysqlListener.ChangeData(after={
name=王五,
id=0
},source={
file=binlog.000013,
connector=mysql,
pos=42588175,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459066000,
snapshot=false,
db=test,
table=test
},before=null,table=test,op=c)

刪除操作輸出

MysqlListener.ChangeData(after=null,source={
file=binlog.000013,
connector=mysql,
pos=42588959,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459104000,
snapshot=false,
db=test
table=test
},before={
name=王五,
id=0
},table=test,op=d)

我們之前配置的保存讀取進(jìn)度的文件storageFile,類似于kafka的偏移量,記錄的內(nèi)容如下:

c92ad81e-3e3c-11ee-ac96-dac502259ad0.jpg

停止服務(wù),對數(shù)據(jù)庫進(jìn)行操作,再次重啟,會根據(jù)進(jìn)度重新讀取。

小結(jié)

本文介紹了debezium,更多的時(shí)候,我們一談到CDC,第一想到的是大量數(shù)據(jù)同步的工具。 但其實(shí)也可以利用其數(shù)據(jù)變更捕獲的特性,來達(dá)到一部份消息隊(duì)列的作用。 但其畢竟不能完全替代消息隊(duì)列。大家理性看待與選擇。

本文的重點(diǎn)在介紹一種思路,具體的某項(xiàng)技術(shù)反而不那么重要。






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • JAVA語言
    +關(guān)注

    關(guān)注

    0

    文章

    138

    瀏覽量

    20026
  • MYSQL數(shù)據(jù)庫
    +關(guān)注

    關(guān)注

    0

    文章

    95

    瀏覽量

    9349
  • CDC技術(shù)
    +關(guān)注

    關(guān)注

    0

    文章

    9

    瀏覽量

    6834

原文標(biāo)題:不想引入MQ?試試debezium

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    看了這么多論壇 還是這個(gè)論壇好啊...

    其他的論壇沒有這么多人,沒有這么快的更新.... 你們覺得呢?
    發(fā)表于 04-17 10:52

    這里的那些是程序,要弄成word 文檔的,新手沒做過這么多程序

    這里的那些是程序,要弄成word 文檔的,新手沒做過這么多程序
    發(fā)表于 04-15 23:29

    為什么roll一上電就飄了這么多?

    放在水平位置上校準(zhǔn)之后pitch還是挺準(zhǔn)的,roll一上電就飄了這么多,為什么??
    發(fā)表于 07-04 04:35

    為什么OLED初始化的時(shí)候要這么多命令?

    void OLED_Init(void)這個(gè)函數(shù)里面要寫的命令好多啊,不知道為什么初始化的時(shí)候要這么多命令啊??求解具體在數(shù)據(jù)手冊哪幾頁
    發(fā)表于 09-18 23:58

    QFP PZP封裝的thermal pad上有49個(gè)過孔,一定要這么多嗎?

    第一次使用TMS320F28377S,有兩個(gè)問題請教:1、QFP PZP封裝的thermal pad上有49個(gè)過孔,一定要這么多嗎?可不可以刪掉一些?2、如果不能刪掉,能不能把這些過孔改成盲孔,只通到內(nèi)電層,不要通孔?謝謝了,先。
    發(fā)表于 07-23 10:21

    什么是VBA?為什么這么多軟件支持VBA?

    什么是VBA?什么是VBS?二者有什么不同?為什么這么多軟件支持VBA?
    發(fā)表于 07-02 06:35

    怎么記住這么多代碼格式?

    我記得剛開始接觸編程的時(shí)候,覺得太難了。也很好奇,寫代碼的那些人也太厲害了吧?全是英文的,他們的英文水平一定很好吧?他們是怎么記住這么多代碼格式的?而且錯(cuò)了一個(gè)標(biāo)點(diǎn)符號,整個(gè)程序都會有影響。一個(gè)程序
    發(fā)表于 07-15 08:56

    電流密度和電荷密度兩個(gè)的表達(dá)式怎么差這么多

    電流密度是什么?電荷密度是什么?電流密度和電荷密度兩個(gè)的表達(dá)式怎么差這么多
    發(fā)表于 09-28 09:36

    為什么要搞這么多架構(gòu)

    問題:為什么要搞這么多架構(gòu)?webrtc雖然是一項(xiàng)主要使用p2p的實(shí)時(shí)通訊技術(shù),本應(yīng)該是無中心化節(jié)點(diǎn)的,但是在一些大型多人通訊場景,如果都使用端對端直連,端上會遇到很帶寬和性能的問題,所以就有了下圖
    發(fā)表于 10-29 06:05

    STM32系統(tǒng)為什么要有時(shí)鐘?為什么有這么多個(gè)時(shí)鐘源

    STM32系統(tǒng)為什么要有時(shí)鐘?為什么有這么多個(gè)時(shí)鐘源?STM32系統(tǒng)時(shí)鐘的框架是由哪些部分組成的?
    發(fā)表于 11-22 07:00

    為什么有這么多編程語言呢

    關(guān)注+星標(biāo)公眾號,不錯(cuò)過精彩內(nèi)容編排|strongerHuang微信公眾號 |嵌入式專欄有很多初學(xué)者都會問:我到底是該學(xué)C語言,還是學(xué)C++,或者JAVA呢?為什么有這么多編程語言呢...
    發(fā)表于 01-12 06:34

    安卓8.0最新消息:安卓8.0初體驗(yàn),竟然這么流暢還有這么多黑科技功能

    安卓8.0初體驗(yàn),竟然這么流暢還有這么多黑科技功能
    發(fā)表于 04-13 09:00 ?3598次閱讀

    小墊圈,里面還有這么多名堂……

    小墊圈,里面還有這么多名堂……
    的頭像 發(fā)表于 07-02 11:40 ?2568次閱讀

    AC-DC電源適配器還有這么多門道?看完才知道

    AC-DC電源適配器還有這么多門道?看完才知道
    的頭像 發(fā)表于 07-02 11:40 ?5915次閱讀

    硬件電路設(shè)計(jì)有這么多坑,如何少走彎路?看大牛怎么說

    硬件電路設(shè)計(jì)有這么多坑,如何少走彎路?看大牛怎么說
    的頭像 發(fā)表于 11-27 17:34 ?534次閱讀