RisingWave x 特征工程:解鎖實時特征新范式
本文將介紹 RisingWave 在實時特征工程中的應用。RisingWave 是一款開源的流式數據庫,具有易用、健壯、上下游生態系統開放、性價比高等特點,支持 SQL 和 UDF 擴展,其架構包含接入層、計算層和存儲引擎,支持多種數據源和下游系統,通過物化視圖等實現增量實時計算。在實時特征工程中,它能夠助力數據攝入、數據清洗、特征構建、樣本拼接和特征查詢等環節,提供高效的狀態管理和 UDF 支持。此外,RisingWave 2.0 帶來了如 Premium 版本、云版本增強、對流批統一的改進等新特性。通過閱讀本文,讀者可深入了解 RisingWave 在實時數據處理領域的優勢與應用。
一、RisingWave 介紹
1. 項目背景與基本信息
RisingWave 是一款具有創新性的開源流處理系統,在實時數據處理領域展現出獨特優勢。其開源項目背景源于對革新流處理和數據庫管理的追求,于 2021 年初創立,并在 2022 年 4 月以 Apache2.0 協議在 GitHub 開源。經過三年打磨,已在全球多領域落地應用。
RisingWave是基于Rust的自研項目,采用存算分離架構,交互接口與 PostgreSQL 協議兼容,并可通過 UDF 拓展。其包含接入層、計算層與存儲層三層架構,由 meta 節點協調,計算節點執行流作業并帶有多級緩存,狀態持久化至基于對象存儲的存儲引擎。產品使命為解決易用性問題,降低實時應用開發、運維與運行成本,無論對實時計算新手還是資深從業者,都致力于提供便捷、穩定且高效的流處理方案。目前,應用領域涵蓋互聯網、金融、能源、供應鏈等多個行業,在實時監控告警、流表實時打寬、規則引擎、實時數據市場等場景均有應用。截至當前,全球日活集群已超 1700 個。
2. RisingWave 特點
(1)易用性
RisingWave 通過 SQL 作為交互接口,兼容 PostgreSQL 協議,用戶通過簡單的 SQL 即可實現復雜的實時需求,同時支持通過不同語言的 UDF 進行拓展。另外,RisingWave 不僅僅是流式計算引擎,而且帶有自研的存儲引擎,除了支持有狀態的復雜流計算外,實時分析的結果可以以物化視圖的方式通過 SQL 在 RisingWave 中查詢,我們稱其為 Serving。同時 RisingWave 流算子的內部狀態都抽象成了關系型表,也可以通過 SQL 查詢,大大提升了流計算的可觀測性。

(2)健壯性
RisingWave 定位為數據庫,所以健壯穩定是首要要求。實時性方面,可以達到亞秒級新鮮度,并實現了 Exactly Once。支持強一致持久化 checkpoint,當出現故障時可以立即從上一 checkpoint 恢復。基于存算分離的架構,可以實現 zero downtime 的彈性伸縮和快速恢復。同時,RisingWave 支持 20+ 路多流 join 和復雜流式變換,并且支持長時間窗口大狀態的流處理。

(3)開放的上下游生態系統
RisingWave 作為流處理系統,具備開放且多元的上下游生態系統。在上游 Source 方面,它支持多種常見的消息隊列(如 Kafka 等)、各類數據庫的變更數據捕獲(CDC),涵蓋 MySQL、PostgreSQL、Oracle 等關系型數據庫以及 MongoDB 等非關系型數據庫,并且支持如 Debezium 等多種 CDC 格式,同時也接納如數據湖、文件系統內文件等批式數據源。而在下游 Sink,不僅支持消息隊列,還支持 ClickHouse、StarRocks 等分析型數據庫以及 Elasticsearch、Redis 等非關系型數據庫,此外還實現了實時入湖功能。這種開放的生態系統,極大地拓展了 RisingWave 在不同數據場景下的應用范圍,使其能更好地融入多樣化的數據處理鏈路中。

(4)高性價比
實時計算相比于離線計算通常成本更高,而 RisingWave 通過多種優化,實現了高性價比。首先,使用低成本的對象存儲作為存儲后端,我們自研了基于 LSM 的存儲引擎降低存儲成本。RisingWave 支持多種對象存儲,比如 S3、Azure Blob 等,也可以自己部署 MinIO、HDFS、DFS。采用存算分離架構,計算和存儲可以獨立擴縮容。計算節點采用多級緩存,可以根據需求調整,并且支持 serverless compaction。

3. RisingWave 架構
RisingWave 的架構主要分為三層。最上層是接入層(Frontend),它負責解析和優化用戶請求,并生成執行計劃,這些計劃會被分布式調度到第二層 —— 計算層(Compute)執行。在流作業中,有狀態的算子其狀態會持久化到基于對象存儲(ObjectStore)的存儲引擎中。在這些組件之上,有一個 Meta 節點負責協調,起到控制器的作用。整體架構體現了 RisingWave 在流處理方面的高效設計,同時兼顧了存儲和協調功能。

二、RisingWave 在實時特征工程中的應用
1. 特征工程步驟與鏈路
實時特征工程包含 Training 鏈路和 Inference 鏈路。Training 鏈路包括從上游數據源攝入數據、清洗選擇、特征構建、樣本拼接和實時模型訓練。Inference 鏈路包括攝入數據構建行為特征、查詢 Feature Store 特征拼接和向 Model 喂入特征完成 Inference。

實時特征工程在架構上存在挑戰。引入的組件越多,運維越困難,工程師需熟悉多個系統。同時,組件增多會使穩定性難以保障,一個組件故障就可能影響整體。此外,影響實時性的因素變多,且上線周期變長,工程師需學習不同接口與組件交互,數據分散也導致回測困難。
2. RisingWave 的助力
RisingWave 在實時特征工程方面有諸多助力。它能用 SQL + UDF 構建 Streaming Pipeline,提供統一的數據源存儲,支持 Serving 查詢,并具備實時流式 Sink 功能,能夠有效簡化和優化實時特征工程的流程,提升效率。

接下來具體看一下鏈路中的每個步驟。
(1)數據攝入
在數據攝入環節,RisingWave 中可以使用 source connector 輕松接入多種數據源。

1)Source 相關助力
- 多樣化數據源支持
消息隊列(MQ):支持 Kafka、Pulsar、MQTT 等。
變更數據捕獲(CDC):支持 MySQL、PostgreSQL、TiDB、MongoDB 等數據庫的 CDC。
批處理數據源:支持 File System、Object Store、Iceberg 等。 - 消息編碼支持
支持 AVRO、JSON、PROTOBUF、CSV、BYTES 等編碼格式。 - 消息隊列支持指定消費位置指定
- 支持從 Schema Registry 自動獲取上游 Schema

2)Table 相關助力
- 數據源支持廣泛
Table 可以消費所有 Source 支持的數據源,能夠將各種來源的數據進行整合。 - 物化數據支持
將 Source 的數據物化到表,支持主鍵,便于數據的管理和查詢。 - 上游 CDC 支持
支持常見的 OLTP 數據庫(如 MySQL、PostgreSQL、Oracle、TiDB 等)和 NoSQL 數據庫(如 MongoDB)的 CDC。 - DML 支持
支持增刪改查(DML)操作,方便對數據進行處理和維護。 - 消息格式支持
支持多種消息格式,如 PLAIN、DEBEZIUM、CANAL、MAXWELL、UPSERT 等,便于與不同系統進行數據交互。

通過這些功能,RisingWave 在數據攝入環節能夠靈活、高效地處理各種數據源的數據,并提供方便的數據管理和操作功能。
(2)數據選擇和清洗
在 RisingWave 中,豐富的 SQL 函數可以幫助用戶輕松定義數據選擇和清洗的邏輯,同時通過物化視圖(Materialized View)構建特征工程的 Streaming Pipeline。
1)基于 SQL 進行數據選擇和清洗
- 離散化(Categorization)
可以使用 SQL 語句將數據離散化到多個桶中。例如,根據一定的條件將數據劃分到不同的類別。 - 異常值處理(Filtering)
通過 WHERE 條件來處理異常值。例如,篩選出符合特定范圍的數據,排除異常數據。 - 去重(Distinct On)
使用 DISTINCT ON 語句可以對指定列的數據進行去重操作,只保留一條記錄。 - 缺失值處理(Coalescing)
利用 SQL 函數(如 LAG)來填補缺失值,使缺失值變為上一個有效值。

2)基于物化視圖構建 Pipeline
物化視圖是一個增量實時維護流處理結果的抽象。當上游數據到來時,物化視圖會自動、實時、同步地增量維護流處理的結果。
- 支持 MV - on - MV 構建層級化的流處理管道,可以堆疊物化視圖來構建多層級的流處理流程。
- 物化視圖支持豐富的 SQL 語法,包括 JOIN、窗口函數、子查詢、分組等,還支持高級的流處理特性如 watermark,以及半結構化數據的處理函數。
- 物化視圖的結果是實時可查詢的,用戶可以通過 SQL 查詢來獲取物化視圖的結果,方便進行數據驗證和調試。


3)SQL 即流處理
RisingWave 中的 SQL 即流處理具有諸多優勢。它基于 SQL 構建流作業,具備豐富的查詢優化功能,如列裁剪、Filter 下推等。還支持子查詢解關聯、Join 重排序等操作,能夠將用戶編寫的 SQL 優化成高效的分布式流作業,方便用戶操作。

(3)特征構建
特征構建是實時特征工程的關鍵環節,下面我們從一些常用特征出發,看一下如何通過 RisingWave 進行特征構建
1)聚合特征和 Over 窗口計算
- 通過 CREATE MATERIALIZED VIEW 語句實現,例如計算用戶最近 30 天行為聚合統計,從清洗后的數據表(如 cleaned_events)中篩選出特定時間范圍內(NOW() - INTERVAL '30 DAYS'到NOW())的數據,按用戶 ID(user_id)和事件類型(event_type)進行分組,計算訪問次數(COUNT())和最后訪問時間(MAX(event_timestamp))。還可進一步計算如用戶過去 30 天最常瀏覽的 Top2 商品類別,先按用戶 ID 分區并按訪問次數降序排序,然后選擇排名前 2 的類別。

2)窗口特征
- Hop Window 和 Tumble Window:如創建 2 分鐘 hop 窗口聚合特征,從數據源(如 taxi_trips)中,以 completed_at 為時間字段,按 2 分鐘間隔進行窗口聚合,計算行程數量(count(trip_id))和總距離(sum(distance))。同樣,對于 2 分鐘 tumble 窗口聚合特征,使用 TUMBLE 函數并設置相應參數實現。這些窗口計算為時間序列數據的分析提供了靈活的方式。

- Session Window 與 Watermark:在源頭表(如 user_views)上定義 5 分鐘間隔的 watermark,用于處理亂序數據。然后創建 5 分鐘 session 窗口聚合特征,按用戶 ID 分區,以 viewed_at 為時間字段,計算每個會話的起始時間(first_value(viewed_at))和結束時間(last_value(viewed_at))。session 窗口能有效捕捉用戶在一段時間內的連續行為,對于分析用戶行為模式非常有用。

3)實時多流 Join
- Inner Join 示例
計算用戶過去一天內瀏覽的商品種類分布,通過 CREATE MATERIALIZED VIEW 將 user_clicks 表與 product_metadata 表進行 JOIN 操作,連接條件為 user_clicks.product_id = product_metadata.product_id,篩選出過去一天內的數據(user_clicks.event_time >= NOW() - INTERVAL '1 DAY'),按用戶 ID 和商品類別分組,統計各類別瀏覽次數(COUNT())。 - Outer Join 應用
可用于維度特征關聯,如將 user_events 表分別與 product_info、store_info 和 user_info 表進行左外連接(LEFT OUTER JOIN),獲取更豐富的用戶行為相關信息,包括產品、店鋪和用戶自身的詳細信息,為后續分析提供多維度數據。 - Window Join 功能
實現窗口特征拼接,例如將兩個以 completed_at 為時間字段、2 分鐘間隔的窗口(TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES')和 TUMBLE (taxi_fare, completed_at, INTERVAL '2 MINUTES'))進行連接,連接條件為行程 ID(trip_id)和窗口起始時間(window_start)相等,按窗口起始時間排序,從而整合行程和費用相關的窗口特征,為分析出租車業務數據提供了全面的視角。

實時多流 Join 是 RisingWave 的一個高亮特性,除了上面介紹的 Regular Join 和 Interval Join,還支持 Temporal Join,以及基于 Watermark 的 Windows Join。多流 Join 是流處理中的一個難點,而 RisingWave 憑借其架構優勢和豐富的優化,讓用戶在不感知調度和實現細節的情況下,可以輕松通過 SQL 構建包含多流 Join 的實時特征。

4)高效狀態管理
- 狀態過期清理
基于 DynamicFilter 算子實現,能夠生成正確強一致的流變更和存儲 delete tombstone,確保狀態存儲和 SQL 語義完全一致。在處理如用戶最近 30 天行為聚合統計等特征構建時,自動管理狀態的過期,避免無效數據占用存儲空間,保證數據的時效性和準確性。

- 長周期大狀態處理
算子狀態持久化在對象存儲,無單機狀態上限。
基于存算分離架構可實現秒級擴縮容。
自研云原生 LSM 存儲引擎。


在 RisingWave 中做了大量工作去優化狀態遠端存儲帶來的延遲。通過多級緩存機制,用戶可以根據實際場景在性能與成本間做出權衡。

- 內部狀態 SQL 可查
流算子內部狀態抽象成關系型 State Table。
可以通過 SHOW INTERNAL TABLES 查看算子內部狀態表,也可以通過 SQL 查詢。
適用于排查線上數據問題、優化流作業 SQL、學習流算子的狀態管理制等場景。

- 狀態復用
特征工程中,Source 數據清洗后的原始數據可以會物化成 MV,基于這些 MV 又可以創建不同的下游 MV,MV 之間還可以 join,這樣分層構建流作業,天然支持狀態復用。Source Table 支持 DML 進行數據訂正,訂正引起的變更會自動地同步到各個下游。

5)UDF
支持通過 CREATE FUNCTION 和 CREATE AGGREGATE 方式定義 UDF。


(4)Feature Serving
在 RisingWave中,Feature Serving 是實時特征工程的重要組成部分,提供了強大的功能用于特征查詢、數據分發和服務優化。
1)查詢與結果一致性
- 可查詢性
Materialized View 和 Table 均可查詢,支持 Batch Query 和 Streaming Query。用戶可以通過 SELECT 語句直接查詢物化視圖(如 user_feature)獲取特征數據,例如查詢特定用戶 ID(user_id = 15213)的特征。這種查詢方式方便快捷,能夠滿足不同場景下對特征數據的獲取需求。 - 結果一致性與調試回溯
Streaming 和 Batch Query 結果一致,這一特性使得用戶在開發和調試過程中更加便捷。用戶在創建物化視圖前可以先運行 Batch Query 來查看結果是否符合預期,進行數據驗證和邏輯調試。如果發現問題,可以方便地回溯和排查,因為兩種查詢方式的結果具有一致性,保證了數據的可靠性和可追溯性。 - 支持創建索引加速 Serving 查詢

2)索引加速查詢
- 索引創建與應用
支持在 Materialized View 和 Table 上創建索引來加速 Serving 查詢。用戶可以在 timestamp 列創建索引(如 CREATE INDEX idx_timestamp on user_feature(timestamp)),然后在查詢時利用該索引加速對 timestamp 列的范圍查詢(如 SELECT FROM user_feature WHERE timestamp < NOW() - INTERVAL ‘1 days’)。通過創建合適的索引,可以顯著提高查詢性能,減少數據檢索時間。 - 索引特性支持
支持指定 Include 列、Distributed 列,還支持表達式索引。例如,在 customers 表上創建索引加速點查(CREATE INDEX idx_c_phone on customer(c_phone)),在 orders 表上創建索引加速 JOIN 操作(CREATE INDEX idx_o_custkey ON orders(o_custkey)),以及在包含 JSONB 類型列的表上創建表達式索引。這些豐富的索引特性為優化查詢提供了多種選擇,適應不同的數據結構和查詢需求。
3)隔離與伸縮性
- 隔離 Streaming 與 Serving
支持隔離 Streaming 與 Serving,允許獨立伸縮。這意味著用戶可以根據實際需求分別調整 Streaming 和 Serving 的資源配置,優化系統性能。例如,在高并發查詢場景下,可以為 Serving 分配更多的計算資源以滿足查詢需求,而不會影響 Streaming 的實時數據處理能力。 - 資源優化與靈活性
通過獨立伸縮,用戶可以更好地平衡系統資源的利用,提高系統的整體效率和穩定性。無論是處理大規模實時數據的攝入和處理(Streaming),還是應對高并發的特征查詢(Serving),都能夠靈活配置資源,確保系統在不同負載下的良好性能表現。

4)數據分發到下游系統
支持將數據變更 Sink 到下游系統。

- Sink 功能與支持的系統
通過 Sink 可以實時將數據發送到多種下游系統,支持的 Connector 包括 Redis、Kafka、JDBC、Clickhouse、StarRocks、Doris、ElasticSearch、Cassandra、File、Iceberg 等。用戶可以根據實際業務需求選擇合適的下游系統進行數據分發,實現數據的進一步處理和分析。 - 數據格式與輸入源
支持多種數據格式,如 APPEND_ONLY、UPSERT、DEBEZIUM 等。Sink 的輸入可以是 Table/Materialized View,也可以是 SQL query。

5)支持 Subscription 訂閱變更

6)支持 Python-SDK 執行 SQL 和訂閱變更

讓我們再來整體回顧一下 RisingWave 在特征工程各環節起到的助力作用。首先是數據攝入,利用 RisingWave 可以便捷地導入不同數據源;接下來是數據選擇和清洗,基于 SQL 和 UDF,利用物化視圖分層構建流處理 pipeline;特征構建完成后,可以用 SQL 或 Python 進行特征查詢;最后,可以采用 push-based 也就是 sink 的方式將變更輸出到下游,也可以采用 pull-based subscribe 的方式獲取變更。

三、RisingWave 其他使用場景
1. 實時監控告警
用戶借助 RisingWave 實時處理數據,一旦監測到如設備故障等異常情況,便能迅速發出告警,實現自動修復或及時通知相關人員處理。

2. 流表實時打寬
當上游存在多個不同數據源的數據表時,RisingWave 可將這些表整合打寬成一張大寬表,以便在數據庫中生成報表或進行深入分析,為決策提供全面的數據支持。

3. 規則引擎
用戶通過 SQL 定義規則,利用其與 PostgreSQL 協議 的兼容性,結合如 Superset 等 BI 工具,可直觀展示和分析數據,依據規則對數據進行處理和判斷,如在金融交易中檢測異常交易行為。

4. 實時數據市場
不同部門利用 RisingWave 構建物化視圖,維護數據的可見性與權限。借助 dbt 工具,清晰管理數據血緣,保障數據質量與可追溯性,促進部門間高效的數據協作與共享。

四、RisingWave 2.0 更新內容
RisingWave 2.0 作為最新發布的版本,帶來了諸多重要更新。
首先,新增 Premium 版本,專為自部署集群打造,提供企業級支持,有力保障自部署時的穩定性與性能表現。同時,RisingWave 的 Cloud 版本在應用性方面持續增強,尤其在 2.0 版本中,針對 Streaming 和 Batch 的統一支持進行了顯著改進。例如,對 Batch Source、Batch Sink 以及 Batch Query 均進行了優化,提升了批量數據處理的效率與性能。
此外,該版本實現了自動的 Schema Change 和自動的 Schema Mapping 功能。這意味著當上游數據存在 Schema 時,用戶導入數據無需手動編寫 Schema,并且上游數據列的增減操作能夠自動同步至 RisingWave 中,極大地簡化了數據管理流程。同時針對創建 MV 時回填歷史數據這一資源消耗大且一次性的操作提供了進一步地的優化,優化了數據處理的完整性和效率。
RisingWave 2.0 通過這些更新,致力于為用戶提供更優質、高效、便捷的服務,期待用戶深入了解并反饋使用體驗,共同推動產品的持續優化。





























