基于Flink CDC打通業務數據實時入湖
大家好,我是一哥,今天分享一篇數據實時入湖的干貨文章。
在構建實時數倉的過程中,如何快速、正確的同步業務數據是最先面臨的問題,本文主要討論一下如何使用實時處理引擎Flink和數據湖Apache Iceberg兩種技術,來解決業務數據實時入湖相關的問題。
01 Flink CDC介紹
CDC全稱是Change Data Capture,捕獲變更數據,是一個比較廣泛的概念,只要是能夠捕獲所有數據的變化,比如數據庫捕獲完整的變更日志記錄增、刪、改等,都可以稱為CDC。該功能被廣泛應用于數據同步、更新緩存、微服務間同步數據等場景,本文主要介紹基于Flink CDC在數據實時同步場景下的應用。
Flink在1.11版本開始引入了Flink CDC功能,并且同時支持Table & SQL兩種形式。Flink SQL CDC是以SQL的形式編寫實時任務,并對CDC數據進行實時解析同步。相比于傳統的數據同步方案,該方案在實時性、易用性等方面有了極大的改善。下圖是基于Flink SQL CDC的數據同步方案的示意圖。
Oracle的變更日志的采集有多種方案,這里采用的Debezium實時同步工具作為示例,該工具能夠解析Oracle的changlog數據,并實時同步數據到下游Kafka。Flink SQL通過創建Kafka映射表并指定 format格式為debezium-json,然后通過Flink進行解析后直接插入到其他外部數據存儲系統,例如圖中外部數據源以Apache Iceberg為例。
下面詳細解析一下數據同步過程。首先了解一下Debezium抽取的Oracle的change log的格式,以update為例,變更日志上記錄了更新之前的數據和更新以后的數據,在Kafka下游的Flink接受到這樣的數據以后,一條update操作記錄就轉變為了先delete、后insert兩條記錄。日志格式如下所示,該update操作的內容的name字段從tom更新為了jerry。
- {
- "before": { --更新之前的數據
- "id": 001,
- "name": "tom"
- },
- "after": { --更新之后的數據
- "id": 001,
- "name": "jerry"
- },
- "source": {...},
- "op": "u",
- "ts_ms": 1589362330904,
- "transaction": null
- }
其次再來看一下Flink SQL內部是如何處理update記錄的。Flink在1.11版本支持了完整的changelog機制,對于每條數據本身只要是攜帶了相應增、刪、改的標志,Flink就能識別這些數據,并對結果表做出相應的增、刪、改的動作,如下圖所示changlog數據流經過Flink解析,同步到下游Sink Database。
通過以上分析,基于Flink SQL CDC的數據同步有如下優點:
- 業務解耦:無需入侵業務,和業務完全解耦,也就是業務端無感知數據同步的存在。
- 性能消耗:業務數據庫性能消耗小,數據同步延遲低。
- 同步易用:使用SQL方式執行CDC同步任務,極大的降低使用維護門檻。
- 數據完整:完整的數據庫變更記錄,不會丟失任何記錄,Flink 自身支持 Exactly Once。
02 Apache Iceberg介紹
通常認為數據湖是一種支持存儲多種原始數據格式、多種計算引擎、高效的元數據統一管理和海量統一數據存儲。其中以Apache Iceberg為代表的表格式和Flink計算引擎組成的數據湖解決方案尤為亮眼。Flink社區方面也主動擁抱數據湖技術,當前Flink和Iceberg在數據入湖方面的集成度最高。
那么Apache Iceberg是什么呢?引用官網的定義是:Apache Iceberg is an open table format for huge analytic datasets。也就是Apache Iceberg是一個大規模數據分析的開放表格式。
Iceberg將數據分為元數據管理層和數據存儲層。首先了解一下Iceberg在文件系統中的布局,第一部分是數據文件data files,用于存儲具體業務數據,如下圖中的data files文件。第二部分是表元數據文件(Metadata 文件),包含Snapshot文件、Manifest文件等。Snapshot表示當前操作的一個快照,每次commit都會生成一個快照,一個快照中包含多個Manifest,每個Manifest中記錄了當前操作生成數據所對應的文件地址,也就是data files的地址。基于snapshot的管理方式,iceberg能夠進行time travel(歷史版本讀取以及增量讀取)。Iceberg文件系統設計特點如下圖所示:
Iceberg的表格式設計具有如下特點:
- ACID:不會讀到不完整的commit數據,基于樂觀鎖實現,支持并發commit,支持Row-level delete,支持upsert操作。
- 增量快照:Commit后的數據即可見,在Flink實時入湖場景下,數據可見根據checkpoint的時間間隔來確定的,增量形式也可回溯歷史快照。
- 開放的表格式:對于一個真正的開放表格式,支持多種數據存儲格式,如:parquet、orc、avro等,支持多種計算引擎,如:Spark、Flink、Hive、Trino/Presto。
- 流批接口支持:支持流式寫入、批量寫入,支持流式讀取、批量讀取。下文的測試中,主要測試了流式寫入和批量讀取的功能。
03 Flink CDC打通數據實時導入Iceberg實踐
當前使用Flink最新版本1.12,支持CDC功能和更好的流批一體。Apache Iceberg最新版本0.11已經支持Flink API方式upsert,如果使用編寫框架代碼的方式使用該功能,無異于鏡花水月,可望而不可及。本著SQL就是生產力的初衷,該測試使用最新Iceberg的master分支代碼編譯嘗鮮,并對源碼稍做修改,達到支持使用Flink SQL方式upsert。
先來了解一下什么是Row-Level Delete?該功能是指根據一個條件從一個數據集里面刪除指定行。那么為什么這個功能那么重要呢?眾所周知,大數據中的行級刪除不同于傳統數據庫的更新和刪除功能,在基于HDFS架構的文件系統上數據存儲只支持數據的追加,為了在該構架下支持更新刪除功能,刪除操作演變成了一種標記刪除,更新操作則是轉變為先標記刪除、后插入一條新數據。具體實現方式可以分為Copy on Write(COW)模式和Merge on Read(MOR)模式,其中Copy on Write模式可以保證下游的數據讀具有最大的性能,而Merge on Read模式保證上游數據插入、更新、和刪除的性能,減少傳統Copy on Write模式下寫放大問題。
在Apache Iceberg中目前實現的是基于Merge on Read模式實現的Row-Level Delete。在 Iceberg中MOR相關的功能是在Iceberg Table Spec Version 2: Row-level Deletes中進行實現的,V1是沒有相關實現的。雖然當前Apache Iceberg 0.11版本不支持Flink SQL方式進行Row-Level Delete,但為了方便測試,通過對源碼的修改支持Flink SQL方式。在不遠的未來,Apache Iceberg 0.12版本將會對Row-Level Delete進行性能和穩定性的加強。
Flink SQL CDC和Apache Iceberg的架構設計和整合如何巧妙,不能局限于紙上談兵,下面就實際操作一下,體驗其功能的強大和帶來的便捷。并且順便體驗一番流批一體,下面的離線查詢和實時upsert入湖等均使用Flink SQL完成。
1,數據入湖環境準備
以Flink SQL CDC方式將實時數據導入數據湖的環境準備非常簡單直觀,因為Flink支持流批一體功能,所以實時導入數據湖的數據,也可以使用Flink SQL離線或實時進行查詢。如下測試是使用Flink提供的sql-client完成的:
第一步,新建Kafka映射表,用于實時接收Topic中的changlog數據:
- id STRING,
- name STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'topic_name',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'properties.group.id' = 'testGroup',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'debezium-json'
第二步,新建iceberg結果表,用于存儲實時入湖的數據:
- CREATE TABLE iceberg_catalog.default.IcebergTable ( id STRING, name STRING );
注:
a)其中省略了配置catalog過程
b)當前iceberg 0.11默認創建表格式版本V1,通過代碼更改版本為V2,以支持upsert方式導入數據湖
第三步,啟動upsert方式實時入湖的Flink任務:
- SET table.dynamic-table-options.enabled=true;
- INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;
注:當前iceberg 0.11不支持Flink SQL形式upsert,通過修改源碼達到支持配置指定字段更新功能。
第四步,離線或者實時查詢統計IcebergTable表中的數據行數:
a)離線方式
- SET execution.type=batch;
- SELECT COUNT(*) FROM IcebergTable;
b)實時方式
- SET execution.type=streaming;
- SELECT COUNT(*) FROM IcebergTable;
2,數據入湖速度測試
數據入湖速度測試會根據環境配置、參數配置、數據格式等不同有所不同,下面是列出主要配置和測試出的數據作為參考。
a)資源配置情況
- TaskManager 內存4G,slot:1
- Checkpoint 1分鐘
- 測試數據列數 10列
- 測試數據行數 1000萬
- iceberg存儲格式 parquet
b)測試數據情況
數據入湖分為append和upsert兩種方式。append方式只能新增數據,不能對結果數據進行更新操作;upsert方式即能夠對結果數據更新。
append方式使用場景是導入數據之前已經明確該表數據不需要更新,如離線數據導入數據湖的場景,append方式下導入數據速度如下:
- INSERT INTO IcebergTable SELECT * FROM KafkaTable;
- 并行度1 12.2萬/秒
- 并行度2 19.6萬/秒
- 并行度4 28.3萬/秒
update方式使用場景是既有插入的數據又有對之前插入數據的更新的場景,如數據庫實時同步,upsert方式下導入數據速度,該方式需要指定在更新時以那個字段查找,類似于update語句中的where條件,一般設置為表的主鍵即可,如下:
- INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;
- 導入的數據 只有數據插入 只有數據更新
- 并行度1 3.2萬/秒 2.9萬/秒
- 并行度2 4.9萬/秒 4.2萬/秒
- 并行度4 6.1萬/秒 5.1萬/秒
c)結論
append方式導入速度遠大于upsert導入數據速度。在使用的時候,如沒有更新數據的場景時,則不需要upsert方式導入數據。
導入速度隨著并行度的增加而增加。
upsert方式數據的插入和更新速度相差不大,主要得益于MOR原因。
3,數據入湖任務運維
在實際使用過程中,默認配置下是不能夠長期穩定的運行的,一個實時數據導入iceberg表的任務,需要通過至少下述四點進行維護,才能使Iceberg表的入湖和查詢性能保持穩定。
a)壓縮小文件
Flink從Kafka消費的數據以checkpoint方式提交到Iceberg表,數據文件使用的是parquet格式,這種格式無法追加,而流式數據又不能等候太長時間,所以會不斷commit提交數據產生小文件。目前Iceberg提供了一個批任務action來壓縮小文件,需要定期周期性調用進行小文件的壓縮功能。示例代碼如下:
- Table table = ...
- Actions.forTable(table)
- .rewriteDataFiles()
- .targetSizeInBytes(100 * 1024 * 1024) // 100 MB
- .execute();
b)快照過期處理
iceberg本身的架構設計決定了,對于實時入湖場景,會產生大量的snapshot文件,快照過期策略是通過額外的定時任務周期執行,過期snapshot文件和過期數據文件均會被刪除。如果實際使用場景不需要time travel功能,則可以保留較少的snapshot文件。
- Table table = ...
- Actions.forTable(table)
- .expireSnapshots()
- .expireOlderThan(System.currentTimeMillis())
- .retainLast(5)
- .execute();
c)清理orphan文件
orphan文件的產生是由于正常或者異常的數據寫入但是未提交導致的,長時間積累會產生大量脫離元數據的孤立數據文件,所以也需要類似JVM的垃圾回收一樣,周期性清理這些文件。該功能不需要頻繁運行,設置為3-5天運行一次即可。
- Table table = ...
- Actions.forTable(table)
- .removeOrphanFiles()
- .execute();
d)刪除元數據文件
每次提交snapshot均會自動產生一個新的metadata文件,實時數據入庫會頻繁的產生大量metadata文件,需要通過如下配置達到自動刪除metadata文件的效果。
| Property | Description |
|---|---|
| write.metadata.delete-after-commit.enabled | Whether to delete old metadata files after each table commit |
| write.metadata.previous-versions-max | The number of old metadata files to keep |
4,數據入湖問題討論
這里主要討論數據一致性和順序性問題。
Q1: 程序BUG或者任務重啟等導致數據傳輸中斷,如何保證數據的一致性呢?
Answer:數據一致保證通過兩個方面實現,借助Flink實現的exactly once語義和故障恢復能力,實現數據嚴格一致性。借助Iceberg ACID能力來隔離寫入對分析任務的不利影響。
Q2:數據入湖否可保證全局順序性插入和更新?
Answer:不可以全局保證數據生產和數據消費的順序性,但是可以保證同一條數據的插入和更新的順序性。首先數據抽取的時候是單線程的,然后分發到Kafka的各個partition中,此時同一個key的變更數據打入到同一個Kafka的分區里面,Flink讀取的時候也能保證順序性消費每個分區中的數據,進而保證同一個key的插入和更新的順序性。
04 未來規劃
新的技術最終是要落地才能發揮其內在價值的,針對在實踐應用中面臨的紛繁復雜的數據,結合流計算技術Flink、表格式Iceberg,未來落地規劃主要集中在兩個方面,一是Iceberg集成到本行的實時計算平臺中,解決易用性的問題;二是基于Iceberg,構建準實時數倉進行探索和落地。
1,整合Iceberg到實時計算平臺
目前,我所負責的實時計算平臺是一個基于SQL的高性能實時大數據處理平臺,該平臺徹底規避繁重的底層流計算處理邏輯、繁瑣的提交過程等,為用戶打造一個只需關注實時計算邏輯的平臺,助力企業向實時化、智能化大數據轉型。
實時計算平臺未來將會整合Apache Iceberg數據源,用戶可以在界面配置Flink SQL任務,該任務以upsert方式實時解析changlog并導入到數據湖中。并增加小文件監控、定時任務壓縮小文件、清理過期數據等功能。
2,準實時數倉探索
本文對數據實時入湖從原理和實戰做了比較多的闡述,在完成實時數據入湖SQL化的功能以后,入湖后的數據有哪些場景的使用呢?下一個目標當然是入湖的數據分析實時化。比較多的討論是關于實時數據湖的探索,結合所在企業數據特點探索適合落地的實時數據分析場景成為當務之急。
隨著數據量的持續增大,和業務對時效性的嚴苛要求,基于Apache Flink和Apache Iceberg構建準實時數倉愈發重要和迫切,作為實時數倉的兩大核心組件,可以縮短數據導入、方便數據行級變更、支持數據流式讀取等。
本文轉載自微信公眾號「數據社」,可以通過以下二維碼關注。轉載本文請聯系數據社公眾號。









































