TiFlink:使用TiKV和Flink實現強一致的物化視圖
在本年初的TiDB Hackathon上,我和一眾隊友嘗試使用Flink為TiDB添加物化視圖功能,并摘得了“最佳人氣獎”。可以說,物化視圖在這屆比賽中可謂是一個熱點。單單是結合Flink實現相關功能的隊伍就有三四個。必須承認的是,在比賽結束時我們項目的完成度很低,雖然基本思路已經定型,最終呈現的結果卻遠沒達到預期。經過半年多斷斷續續的修補,在今天終于可以發布一個 預覽版本 給大家試用。這篇文章就是對我們思路和成果的一個介紹。
相比其他隊伍,我們的主要目標是實現強一致的物化視圖構建。也就是保證查詢時的物化視圖可以達到接近快照隔離(Snapshot Isolation)的隔離級別,而不是一般流處理系統的最終一致性(Eventual Consistency)。關于實現一致性的討論在下文有詳細介紹。
使用簡介
盡管是一個實驗性的項目,我們仍然探索了一些方便實用的特性,包括:
- TiFlinkApp
關于TiFlink實用的詳細信息,請參考 README 。下面是快速啟動一個任務的代碼片段:
- TiFlinkApp.newBuilder()
- .setJdbcUrl("jdbc:mysql://root@localhost:4000/test") // Please make sure the user has correct permission
- .setQuery(
- "select id, "
- + "first_name, "
- + "last_name, "
- + "email, "
- + "(select count(*) from posts where author_id = authors.id) as posts "
- + "from authors")
- // .setColumnNames("a", "b", "c", "d") // Override column names inferred from the query
- // .setPrimaryKeys("a") // Specify the primary key columns, defaults to the first column
- // .setDefaultDatabase("test") // Default TiDB database to use, defaults to that specified by JDBC URL
- .setTargetTable("author_posts") // TiFlink will automatically create the table if not exist
- // .setTargetTable("test", "author_posts") // It is possible to sepecify the full table path
- .setParallelism(3) // Parallelism of the Flink Job
- .setCheckpointInterval(1000) // Checkpoint interval in milliseconds. This interval determines data refresh rate
- .setDropOldTable(true) // If TiFlink should drop old target table on start
- .setForceNewTable(true) // If to throw an error if the target table already exists
- .build()
- .start(); // Start the app
物化視圖(流處理系統)的一致性
目前主流的物化視圖(流處理)系統主要使用最終一致性。也就是說盡管最終結果會收斂到一致的狀態,但在處理期間終端用戶仍可能查詢到一些不一致的結果。最終一致性在很多應用中被證明是足夠的,那么更強的一致性是否真的需要呢?這里的一致性和Flink的Exact Once語義又有什么關系呢?有必要進行一些介紹。
ACID
ACID是數據庫的一個基本的概念。一般來說,作為CDC日志來源的數據庫已經保證了這四條要求。但是在使用CDC數據進行流式處理的時候,其中的某些約束卻有可能被破壞。
最典型的情況是失去Atomic特性。這是因為在CDC 日志中,一個事務的修改可能覆蓋多條記錄,流處理系統如果以行為單位進行處理,就有可能破壞原子性。也就是說,在結果集上進行查詢的用戶看到的事務是不完整的。
一個典型的案例如下:
在上述案例中,我們有一個賬戶表,賬戶表之間會有轉賬操作,由于轉賬操作涉及多行修改,因此往往會產生多條記錄。假設我們有如下一條SQL定義的物化視圖,計算所有賬戶余額的總和:
- SELECT SUM(balance) FROM ACCOUNTS;
顯然,如果我們只存在表內賬戶之間的轉賬,這個查詢返回的結果應該恒為某一常數。但是由于目前一般的流處理系統不能處理事務的原子性,這條查詢產生的結果卻可能是不斷波動的。實際上,在一個不斷并發修改的源表上,其波動甚至可能是無界的。
盡管在最終一致的模型下,上述查詢的結果在經過一段時間之后將會收斂到正確值,但沒有原子性保證的物化視圖仍然限制的應用場景:假設我想實現一個當上述查詢結果偏差過大時進行報警的工具,我就有可能會接收到很多虛假報警。也就是說此時在數據庫端并沒有任何異常,數值的偏差只是來源于流處理系統內部。
在分布式系統中,還有另一種破壞原子性的情況,就是當一個事務修改產生的副作用分布在多個不同的節點處。如果在這時不使用2PC等方法進行分布式提交,則也會破壞原子性:部分節點(分區)上的修改先于其他節點生效,從而出現不一致。
線性一致性
不同于由單機數據庫產生的CDC日志(如MySQL的Binlog),TiDB這類分布式數據庫產生的日志會有線性一致性的問題。在我們的場景下,線性一致性的問題可以描述為:從用戶的角度先后執行的一些操作,其產生的副作用(日志)由于消息系統傳遞的延遲,以不同的先后順序被流處理系統處理。
假設我們有訂單表(ORDERS)和付款信息表(PAYMENTS)兩個表,用戶必須先創建訂單才能進行支付,因此下列查詢的結果必然是正數:
- WITH order_amount AS (SELECT SUM(amount) AS total FROM ORDERS),
- WITH payment_amount AS (SELECT SUM(amount) AS total FROM PAYMENTS)
- SELECT order_amount.total - payment_amount.total
- FROM order_amount, payment_amount;
但是由于ORDERS表和PAYMENTS表在分別存儲在不同的節點上,因此流處理系統消費他們的速度可能是不一致的。也就是說,流處理系統可能已經看到了支付信息的記錄,但是其對應的訂單信息還沒到達。因此就可能觀察到上述查詢出現負數的結果。
在流處理系統中,有一個Watermark的概念可以用來同步不同表的數據的處理進度,但是它并不能避免上述線性一致性問題。這是因為Watermark只要求時間戳小于其的所有記錄都已經到達,不要求時間戳大于其的記錄都沒有到達。也就是說,盡管ORDERS表和PAYMENTS表現在擁有相同的Watermark,后者仍然可能會有一些先到的記錄已經生效。
由此可見,單純依靠Watermark本身是無法處理線性一致性問題的,必須和源數據庫的時間產生系統和消息系統配合。
更強一致性的需求
盡管最終一致性在很多場景下是夠用的,但其依然存在很多問題:
- 誤導用戶:由于很多用戶并不了解一致性相關的知識,或者對其存在一定的誤解,導致其根據尚未收斂的查詢結果做出了決策。這種情況在大部分關系型數據庫都默認較強一致性的情況下是應該避免的
- 可觀測性差:由于最終一致性并沒有收斂時間的保證,再考慮到線性一致性問題的存在,很難對流處理系統的延遲、數據新鮮度、吞吐量等指標進行定義。比如說用戶看到的JOIN的結果可能是表A當前的快照和表B十分鐘前的快照聯接的結果,此時應如何定義查詢結果的延遲度呢?
- 限制了部分需求的實現:正如上文所提到的,由于不一致的內部狀態,導致某些告警需求要么無法實現,要么需要延遲等待一段時間。否則用戶就不得不接受較高的誤報率
實際上,更強一致性的缺乏還導致了一些運維操作,特別是DDL類的操作難以利用之前計算好的結果。參考關系型數據庫和NoSQL數據庫的發展歷史,我們相信目前主流的最終一致性只是受限于技術發展的權宜之計,隨著相關理論和技術研究的進步,更強的一致性將會慢慢成為流處理系統的主流。
技術方案簡介
這里詳細介紹一下TiFlink在技術方案上的考慮,以及如何實現了強一致的物化視圖(StreamSQL)維護。
TiKV和Flink
盡管這是一個TiDB Hackthon項目,因此必然會選擇TiDB/TiKV相關的組件,但是在我看來TiKV作為物化視圖系統的中間存儲方案具備很多突出的優勢:
- TiKV是一個比較成熟分布式KV存儲,而分布式環境是下一代物化視圖系統必須要支持的場景。利用TiKV配套的Java Client,我們可以方便的對其進行操作。同時TiDB本身作為一個HTAP系統,正好為物化視圖這個需求提供了一個Playground
- TiKV提供了基于Percolator模型的事務支持和MVCC,這是TiFlink實現強一致流處理的基礎。在下文中可以看到,TiFlink對TiKV的寫入主要是以接連不斷的事務的形式進行的
- TiKV原生提供了對CDC日志輸出的支持。實際上TiCDC組件正是利用這一特性實現的CDC日志導出功能。在TiFlink中,為了實現批流一體并簡化系統流程,我們選擇直接調用TiKV的CDC GRPC接口,因此也放棄了TiCDC提供的一些特性
我們最初的想法本來是直接將計算功能集成進TiKV,選擇Flink則是在比賽過程中進一步思考后得到的結論。選擇Flink的主要優勢有:
- Flink是目前市面上最成熟的Stateful流處理系統,其對處理任務的表達能力強,支持的語義豐富,特別是支持批流一體的StreamSQL實現,是我們可以專心于探索我們比較關注的功能,如強一致性等
- Flink比較完整地Watermark,而我們發現其基于Checkpoint實現的Exactly Once Delivery語義可以很方便地和TiKV結合來實現事務處理。實際上,Flink自己提供的一些支持Two Phase Commit的Sink就是結合Checkpoint來進行提交的
- Flink的流處理(特別是StreamSQL)本身就基于物化視圖的理論,在比較新的版本開始提供的DynamicTable接口,就是為了方便將外部的Change Log引入系統。它已經提供了對INSERT、DELETE、UPDATE等多種CDC操作的支持
當然,選擇TiKV+Flink這樣的異構架構也會引入一些問題,比如SQL語法的不匹配,UDF無法共享等問題。在TiFlink中,我們以Flink的SQL系統和UDF為準,將其作為TiKV的一個外掛系統使用,但同時提供了方便的建表功能。
強一致的物化視圖的實現思路
這一部分將介紹TiFlink如何在TiDB/TiKV的基礎上實現一個比較強的一致性級別:延遲快照隔離(Stale Snapshot Isolation)。在這種隔離級別下,查詢者總是查詢到歷史上一個一致的快照狀態。在傳統的快照隔離中,要求查詢者在$T$時間能且只能觀察到Commit時間小于$T$的所有事務。而延遲快照隔離只能保證觀察到$T-\Delta t$之前所有已提交的事務。
在TiDB這樣支持事務的分布式數據庫上實現強一致的物化視圖,最簡單的思路就是使用一個接一個的事務來更新視圖。事務在開始時讀取到的是一個一致的快照,而使用分布式事務對物化視圖進行更新,本身也是一個強一致的操作,且具有ACID的特性,因此得以保證一致性。
為了將Flink和這樣的機制結合起來且實現增量維護,我們利用了TiKV本身已經提供的一些特性:
- TiKV使用Time Oracle為所有的操作分配時間戳,因此雖然是一個分布式系統,其產生的CDC日志中的事務的時間戳實際上是有序的
- TiKV的節點(Region)可以產生連續不斷的增量日志(Change Log),這些日志包含了事務的各種原始信息并包含時間戳信息
- TiKV的增量日志會定期產生Resolved Timestamp,聲明當前Region不再會產生時間戳更老的消息。因此很適合用來做Watermark
- TiKV提供了分布式事務,允許我們控制一批修改的可見性
因此TiFlink的基本實現思路就是:
- 利用流批一體的特性,以某全局時間戳對源表進行快照讀取,此時可以獲得所有源表的一個一致性視圖
- 切換到增量日志消費,利用Flink的DynamicTable相關接口,實現物化視圖的增量維護和輸出
- 以一定的節奏Commit修改,使得所有的修改以原子的事務方式寫入目標表,從而為物化視圖提供一個又一個更新視圖
以上幾點的關鍵在于協調各個節點一起完成分布式事務,因此有必要介紹一下TiKV的分布式事務執行原理。
TiKV的分布式事務
TiKV的分布式事務基于著名的Percolator模型。Percolator模型本身要求存儲層的KV Store有MVCC的支持和單行讀寫的原子性和樂觀鎖(OCC)。在此基礎上它采用以下步驟完成一次事務:
- 指定一個事務主鍵(Primary Key)和一個開始時間戳并寫入主鍵
- 其他行在Prewrite時以副鍵(Secondary Key)的形式寫入,副鍵會指向主鍵并具有上述開始時間戳
- 在所有節點Prewrite完成后,可以提交事務,此時應先Commit主鍵,并給定一個Commit時間戳
- 主鍵Commit成功后事務實際上已經提交成功,但此時為了方便讀取,可以多節點并發地對副鍵進行Commit并執行清理工作,之后寫入的行都將變為可見
上述分布式事務之所以可行,是因為對主鍵的Commit是原子的,分布在不同節點的副鍵是否提交成功完全依賴于主鍵,因此其他的讀取者在讀到Prewrite后但還沒Commit的行時,會去檢查主鍵是否已Commit。讀取者也會根據Commit時間戳判斷某一行數據是否可見。Cleanup操作如果中途故障,在之后的讀取者也可以代行。
為了實現快照隔離,Percolator要求寫入者在寫入時檢查并發的Prewrite記錄,保證他們的時間戳符合一定的要求才能提交事務。本質上是要求寫入集重疊的事務不能同時提交。在我們的場景中假設物化視圖只有一個寫入者且事務是連續的,因此無需擔心這點。
在了解了TiKV的分布式事務原理之后,要考慮的就是如何將其與Flink結合起來。在TiFlink里,我們利用Checkpoint的機制來實現全局一致的事務提交。
使用Flink進行分布式事務提交
從上面的介紹可以看出,TiKV的分布式事務提交可以抽象為一次2PC。Flink本身有提供實現2PC的Sink,然而并不能直接用在我們的場景下。原因是Percolator模型在提交時需要有全局一致的事務開始時間戳和提交時間戳。而且僅僅是在Sink端實現2PC是不足以實現強一致隔離級別的:我們還需要在Source端配合,使得每個事務恰好讀入所需的增量日志。
幸運的是,Flink的2PC提交機制實際上是由Checkpoint驅動的:當Sink接收到Checkpoint請求時,會完成必要的任務以進行提交。受此啟發,我們可以實現一對Source和Sink,讓他們使用Checkpoint的ID共享Transaction的信息,并配合Checkpoint的過程完成2PC。而為了使不同節點可以對事務的信息(時間戳,主鍵)等達成一致,需要引入一個全局協調器。事務和全局協調器的接口定義如下:
- public interface Transaction {
- public enum Status {
- NEW,
- PREWRITE,
- COMMITTED,
- ABORTED;
- };
- long getCheckpointId();
- long getStartTs();
- default long getCommitTs();
- default byte[] getPrimaryKey();
- default Status getStatus();
- }
- public interface Coordinator extends AutoCloseable, Serializable {
- Transaction openTransaction(long checkpointId);
- Transaction prewriteTransaction(long checkpointId, long tableId);
- Transaction commitTransaction(long checkpointId);
- Transaction abortTransaction(long checkpointId);
- }
使用上述接口,各個Source和Sink節點可以使用CheckpointID開啟事務或獲得事務ID,協調器會負責分配主鍵并維護事務的狀態。為了方便起見,事務Commit時對主鍵的提交操作也放在協調器中執行。協調器的實現有很多方法,目前TiFlink使用最簡單的實現:在JobManager所在進程中啟動一個GRPC服務。基于TiKV的PD(ETCD)或TiKV本身實現分布式的協調器也是可能的。
上圖展示了在Flink中執行分布式事務和Checkpoint之間的協調關系。一次事務的具體過程如下:
- Source先從TiKV接收到增量日志,將他們按照時間戳Cache起來,等待事務的開始
- 當Checkpoint進程開始時,Source會先接收到信號。在Source端的Checkpoint與日志接收服務運行在不同的線程中
- Checkpoint線程先通過全局協調器獲得當前事務的信息(或開啟一個新事務),分布式情況下一個CheckpointID對應的事務只會開啟一次
- 得到事務的開始時間戳后,Source節點開始將Cache中小于此時間戳的已提交修改Emit到下游計算節點進行消費。此時Source節點也會Emit一些Watermark
- 當所有Source節點完成上述操作后,Checkpoint在Source節點成功完成,此后會向后繼續傳播,根據Flink的機制,Checkpoint在每個節點都會保證其到達之前的所有Event都已被消費
- 當Checkpoint到達Sink時,之前傳播到Sink的Event都已經被Prewrite過了,此時可以開始事務的提交過程。Sink在內部狀態中持久化事務的信息,以便于錯誤時恢復,在所有Sink節點完成此操作后,會在回調中調用協調器的Commit方法從而提交事務
- 提交事務后,Sink會啟動線程進行Secondary Key的清理工作,同時開啟一個新的事務
注意到,在第一個Checkpoint開始前,Sink可能已經開始接收到寫入的數據了,而此時它還沒有事務的信息。為了解決這一問題,TiFlink在任務開始時會直接啟動一個初始事務,其對應的CheckpointID是0,用于提交最初的一些寫入。這樣的話,在 CheckpointID=1 的Checkpoint完成時,實際上提交的是這個0事務。事務和Checkpoint以這樣的一種錯位的方式協調執行。
下圖展示了包含協調器在內的整個TiFlink任務的架構:
基于以上的系統設計,我們就得到了一個在TiKV上實現延遲快照隔離的物化視圖。
其他設計考慮
眾所周知,KSQL是Flink之外另一個流行的流處理系統,它直接與Kafka消息隊列系統結合,用戶無需部署兩套處理系統,因此受到一些用戶的青睞。很多用戶也使用KSQL實現類似物化視圖這樣的需求。然而在我看來,這種強耦合于消息隊列的流處理系統并不適合物化視圖的使用場景。
KSQL可以說是Log Oriented數據處理系統的的代表,在這種系統中,數據的本源在于日志信息,所有的表都是為了方便查詢而消費日志信息從而構建出來的視圖。這種系統具有模型簡單、容易實現、可以長時間保存日志記錄等優點。
與之相對是Table Oriented數據處理系統,MySQL、TiDB/TiKV都屬于這一類系統。這一類系統的所有修改操作都作用于表數據結構,雖然期間也會有日志生成,但往往對表數據結構和日志的修改是一起協調進行的。這里日志的主要是為持久化和事務服務,往往不會留存太長時間。相比于Log Oriented數據處理系統,這類系統對寫入和事務的處理都更為復雜一點,然而卻擁有更強可擴展性的要求。
歸根結底,這是因為Log Oriented系統中的數據是以日志的形式存儲,因此在擴展時往往需要進行成本較高的Rehash,也更難實現再平衡。而Table Oriented的系統,數據主要以表的形式存儲,因此可以以某些列進行有序排列,從而方便在一致性Hash的支持下實現Range的切分、合并和再平衡。
個人認為,在批流一體的物化視圖場景下,長時間保存日志并無太大的意義(因為總是可以從源表的快照恢復數據)。相反,隨著業務的發展不斷擴展數據處理任務和視圖是一件比較重要的事。從這個角度來看Table Oriented系統似乎更適合作為物化視圖需求的存儲承載介質。
當然,在實時消費增量Log時發生的分區合并或分裂是一個比較難處理的問題。TiKV在這種情況下會拋出一個GRPC錯誤。TiFlink目前使用的是比較簡單的靜態映射方法處理任務和分區之間的關系,在未來可以考慮更為合理的解決方案。
總結
本文介紹了使用Flink在TiKV上實現強一致的物化視圖的基本原理。以上原理已經基本上在TiFlink系統中實現,歡迎各位讀者試用。以上所有的討論都基于Flink的最終一致模型的保證,即:流計算的結果只與消費的Event和他們在自己流中的順序有關,與他們到達系統的順序以及不同流之間的相對順序無關。
目前的TiFlink系統還有很多值得提高的點,如:
- 支持非Integer型主鍵和聯合主鍵
- 更好的TiKV Region到Flink任務的映射
- 更好的Fault Tolerance和任務中斷時TiKV事務的清理工作
- 完善的單元測試
如果各位讀者對TiFlink感興趣的話,歡迎試用并提出反饋意見,如果能夠貢獻代碼幫助完善這個系統那就再好不過了。
關于物化視圖系統一致性的思考是我今年最主要的收獲之一。實際上,最初我們并沒有重視這一方面,而是在不斷地交流當中才認識到這是一個有價值且很有挑戰性的問題。通過TiFlink的實現,可以說是基本上驗證了上述方法實現延遲快照一致性的可行性。當然,由于個人的能力水平有限,如果存在什么紕漏,也歡迎各位提出討論。
最后,如果我們假設上述延遲快照一致性的論述是正確的,那么實現真正的快照隔離的方法也就呼之欲出。不知道各位讀者能否想到呢?































