精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink SQL 四種 Join 方式詳解:原理、場景與實戰

大數據
本文將深入剖析每種 Join 的底層原理、適用場景,并通過完整代碼示例演示實戰用法,幫助讀者掌握 Flink SQL Join 的選型與優化。

一、引言:流處理中的 Join 挑戰與 Flink SQL 的解決方案

在數據處理領域,Join 是關聯多表數據的核心操作。批處理中(如 Hive、Spark SQL),Join 通常針對有限數據集,通過全量數據匹配即可完成。但在流處理場景中,數據是無限、實時、無界的,Join 操作面臨三大核心挑戰:

  • 數據延遲與亂序:流數據可能因網絡延遲、節點故障等原因亂序到達,如何確保 Join 的正確性?
  • 狀態管理:流 Join 需要存儲歷史數據用于后續匹配,如何高效管理狀態(內存/磁盤)并避免無限增長?
  • 實時性與準確性平衡:不同業務場景對延遲(如實時監控)和準確性(如賬單核對)要求不同,如何靈活適配?

Flink 作為業界領先的流處理引擎,其 SQL API 提供了四種核心 Join 方式,分別針對不同場景解決了上述問題:Regular Join(常規 Join)、Interval Join(區間 Join)、Temporal Join(時態 Join) 和 Lookup Join(維表 Join)。本文將深入剖析每種 Join 的底層原理、適用場景,并通過完整代碼示例演示實戰用法,幫助讀者掌握 Flink SQL Join 的選型與優化。

二、Regular Join:常規雙流 Join

1. 定義與核心思想

Regular Join 是 Flink SQL 中最基礎的 Join 類型,語法與標準 SQL 一致(如 INNER JOIN、LEFT JOIN)。它持續關聯兩條流的數據:當任一流收到新數據時,會掃描另一條流的所有歷史數據,生成匹配結果并更新下游。

核心特點:

  • 無時間限制:只要兩條流的數據滿足 Join 條件,無論時間差多大,都會關聯(例如,訂單流在 1 小時后收到用戶流的數據,仍會觸發 Join)。
  • 狀態持久化:兩條流的所有數據都會存儲在狀態中(基于 StateBackend),用于后續匹配。
  • 結果更新:下游會收到“插入”(Insert)、“更新”(Update)、“撤回”(Retract)三種類型的結果(取決于 Join 類型)。

2. 適用場景

Regular Join 適用于對數據完整性要求高、允許延遲且數據量較小的場景,例如:

  • 實時用戶畫像補全:將用戶行為流與用戶屬性流關聯,即使屬性數據延遲到達,也能更新行為記錄的用戶信息。
  • 離線數據實時修正:當歷史數據需要修正時(如用戶地址更新),通過 Regular Join 可將修正后的數據與實時流關聯,更新下游結果。

3. 實現原理

Regular Join 的底層基于 Flink 的 KeyedCoProcessFunction 算子實現,核心流程如下:

  • 數據分區:根據 Join 條件中的 ON 子句(如 user_id)對兩條流進行 KeyBy,確保相同 Key 的數據進入同一子任務。
  • 狀態存儲:每條流的數據以 MapState 形式存儲,Key 為 Join Key,Value 為對應數據的列表(支持多條數據,如同一用戶的多個訂單)。
  • 匹配與觸發:當任一流收到數據時,從另一條流的狀態中讀取相同 Key 的所有數據,進行匹配計算,并將結果發送到下游。例如,訂單流收到 order_id=1, user_id=101 時,會從用戶流狀態中查詢 user_id=101 的所有用戶數據,生成 Join 結果。
  • 狀態清理:默認情況下,Regular Join 的狀態永不清理(需手動配置 State TTL,否則可能導致 OOM)。

4. 語法說明

Regular Join 支持標準 SQL 的 Join 類型,語法如下:

SELECT 
    o.order_id, 
    u.user_name, 
    o.amount
FROM orders AS o  -- 訂單流
[INNER|LEFT|RIGHT|FULL] JOIN users AS u  -- 用戶流
ON o.user_id = u.user_id;

5. 詳細樣例代碼

(1) 場景描述

實時關聯訂單流(orders)和用戶流(users),通過 user_id 關聯,輸出訂單 ID、用戶名稱和訂單金額。假設數據可能延遲到達(如用戶信息在訂單產生后 10 分鐘才更新)。

(2) 環境準備

  • Flink 版本:1.18
  • 依賴:flink-java、flink-streaming-java、flink-clients、flink-connector-kafka(用于數據源)
  • 數據源:Kafka 中的 orders 和 users 主題,數據格式為 JSON。

(3) 步驟

步驟 1:創建 Kafka 表(DDL)

-- 訂單流表(Kafka 源)
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND-- 水位線(Regular Join 不強制依賴,但建議定義)
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

-- 用戶流表(Kafka 源)
CREATE TABLE users (
    user_id STRING,
    user_name STRING,
    age INT,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='users',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

-- 結果表(Print 控制臺輸出)
CREATE TABLE order_user_result (
    order_id STRING,
    user_name STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='print'
);

步驟 2:編寫 Regular Join SQL

-- INNER JOIN:僅輸出兩條流都匹配的數據
INSERT INTO order_user_result
SELECT
    o.order_id, 
    u.user_name, 
    o.amount
FROM orders AS o
INNERJOIN users AS u
ON o.user_id = u.user_id;

-- LEFT JOIN:輸出訂單流所有數據,用戶流不匹配時 user_name 為 NULL
-- INSERT INTO order_user_result
-- SELECT 
--     o.order_id, 
--     u.user_name, 
--     o.amount
-- FROM orders AS o
-- LEFT JOIN users AS u
-- ON o.user_id = u.user_id;

步驟 3:準備測試數據

  • 訂單流數據(orders 主題):
{"order_id":"order_1","user_id":"101","amount":100.50,"event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","user_id":"102","amount":200.00,"event_time":"2023-10-01 10:01:00"}
{"order_id":"order_3","user_id":"101","amount":150.75,"event_time":"2023-10-01 10:02:00"}
  • 用戶流數據(users 主題):
{"user_id":"101","user_name":"Alice","age":25,"update_time":"2023-10-01 10:00:30"}
{"user_id":"102","user_name":"Bob","age":30,"update_time":"2023-10-01 10:01:30"}
{"user_id":"101","user_name":"Alice_Update","age":26,"update_time":"2023-10-01 10:03:00"}  -- 用戶 101 信息更新

步驟 4:執行結果分析

INNER JOIN 結果:

+I[order_1, Alice, 100.50]  -- 訂單 1 與用戶 101 匹配(10:00:30 到達)
+I[order_2, Bob, 200.00]    -- 訂單 2 與用戶 102 匹配(10:01:30 到達)
+I[order_3, Alice, 150.75]  -- 訂單 3 與用戶 101 匹配(10:02:00 到達,此時用戶 101 名稱仍為 Alice)
-U[order_1, Alice, 100.50]  -- 撤回舊結果(用戶 101 信息更新)
+I[order_1, Alice_Update, 100.50]  -- 插入更新后的結果(訂單 1 關聯到新用戶名稱)
-U[order_3, Alice, 150.75]  -- 撤回訂單 3 的舊結果
+I[order_3, Alice_Update, 150.75]  -- 插入訂單 3 的新結果

關鍵觀察:

  • 當用戶 101 的信息更新(user_name 從 "Alice" 變為 "Alice_Update")時,所有關聯 user_id=101 的訂單(order_1、order_3)都會觸發結果更新(先撤回舊記錄,再插入新記錄)。
  • Regular Join 的狀態會保存所有歷史數據(如訂單流的所有訂單、用戶流的所有用戶),因此需配置 State TTL 避免內存溢出。

步驟 5:配置 State TTL(優化)

在 flink-conf.yaml 或 Table API 中配置狀態過期時間:

-- 在表級別配置 State TTL(僅對 Regular Join 生效)
CREATE TABLE orders_with_ttl (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset',
    'join.state.ttl'='1h'-- Join 狀態過期時間為 1 小時
);

6. 注意事項

  • 狀態膨脹風險:Regular Join 的狀態會無限增長(除非配置 TTL),僅適用于數據量小的場景。若數據量大,建議使用 Interval Join 或 Lookup Join。
  • 結果更新頻率:若上游數據頻繁更新(如用戶信息每秒修改),會導致下游產生大量更新記錄,需評估下游系統(如 Kafka、數據庫)的承受能力。

Join 類型選擇:

  • INNER JOIN:僅輸出匹配數據,狀態存儲兩條流的數據。
  • LEFT JOIN:輸出左流所有數據,右流不匹配時為 NULL,狀態存儲左流所有數據 + 右流匹配數據。
  • FULL JOIN:輸出兩條流所有數據,不匹配時為 NULL,狀態存儲兩條流所有數據(狀態最大,慎用)。

三、Interval Join:區間 Join

1. 定義與核心思想

Interval Join 是基于時間區間的雙流 Join,要求兩條流的數據在指定時間范圍內才能匹配。語法上通過 BETWEEN ... AND ... 定義時間區間,僅支持事件時間(Event Time)。

核心特點:

  • 時間區間限制:僅當兩條流的數據時間戳差值在 [lower_bound, upper_bound] 區間內時才關聯(例如,訂單流數據時間戳 ±5 分鐘內的支付流數據)。
  • 狀態自動清理:利用 Watermark 機制,當 Watermark 超過數據時間戳 + 上界時,自動清理過期數據,避免狀態膨脹。
  • 僅支持追加流:Interval Join 的輸入流必須是追加流(Append-only),不支持更新或撤回(因為時間區間內的數據一旦匹配,后續不會再更新)。

2. 適用場景

Interval Join 適用于有時間關聯要求、數據量大且需高效清理狀態的場景,例如:

  • 訂單支付關聯:訂單產生后,僅關聯 5 分鐘內的支付記錄(超時未支付則認為訂單失效)。
  • 物流軌跡匹配:快遞攬收后,僅關聯 1 小時內的運輸記錄(超時未運輸則觸發異常告警)。

3. 實現原理

Interval Join 的底層基于 KeyedCoProcessFunction 和 Watermark 機制,核心流程如下:

(1) 時間區間定義:通過 ON o.order_id = p.order_id AND o.event_time BETWEEN p.event_time - INTERVAL '5' MINUTE AND p.event_time + INTERVAL '5' MINUTE 定義時間范圍。

(2) 數據緩存與注冊定時器:

  • 當左流(訂單流)收到數據 o 時,將其存入 MapState(Key 為 Join Key,Value 為數據 + 時間戳),并注冊一個定時器(觸發時間為 o.event_time + upper_bound)。
  • 右流(支付流)同理,緩存數據并注冊定時器。

(3) 匹配與觸發:

  • 當左流收到數據 o 時,從右流狀態中查詢相同 Key 且時間戳在 [o.event_time - lower_bound, o.event_time + upper_bound] 內的數據,生成匹配結果。
  • 當右流收到數據 p 時,同理匹配左流數據。

(4) 狀態清理:當定時器觸發(即 Watermark ≥ 數據時間戳 + upper_bound),從狀態中刪除該數據(因為后續不會再有時間區間內的數據到達)。

4. 語法說明

Interval Join 的語法需在 ON 子句中添加時間區間條件:

SELECT 
    o.order_id, 
    p.payment_id, 
    o.amount
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id  -- Join 條件
AND o.event_time BETWEEN p.event_time - INTERVAL '5' MINUTE AND p.event_time + INTERVAL '5' MINUTE;  -- 時間區間

時間區間規則:

  • BETWEEN ... AND ... 定義的是左流時間相對于右流時間的范圍(或反之)。
  • 區間必須是對稱的(如 ±5 分鐘),且僅支持事件時間(需定義 WATERMARK)。

5. 詳細樣例代碼

(1) 場景描述

關聯訂單流(orders)和支付流(payments),僅匹配訂單產生后 5 分鐘內的支付記錄,輸出訂單 ID、支付 ID 和訂單金額。超時未支付的訂單不會出現在結果中。

(2) 環境準備

與 Regular Join 相同,使用 Kafka 作為數據源,新增支付流表 payments。

步驟 1:創建支付流表(DDL)

-- 支付流表(Kafka 源)
CREATE TABLE payments (
    payment_id STRING,
    order_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='payments',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='interval_join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

-- 結果表(Print 輸出)
CREATE TABLE order_payment_result (
    order_id STRING,
    payment_id STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='print'
);

步驟 2:編寫 Interval Join SQL

INSERT INTO order_payment_result
SELECT
    o.order_id, 
    p.payment_id, 
    o.amount
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id
AND o.event_time BETWEEN p.event_time -INTERVAL'5'MINUTEAND p.event_time +INTERVAL'5'MINUTE;

步驟 3:準備測試數據

訂單流數據(orders 主題):

{"order_id":"order_1","user_id":"101","amount":100.50,"event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","user_id":"102","amount":200.00,"event_time":"2023-10-01 10:05:00"}  -- 10:05:00 產生,需匹配 10:00:00-10:10:00 的支付記錄
{"order_id":"order_3","user_id":"103","amount":300.00,"event_time":"2023-10-01 10:15:00"}  -- 10:15:00 產生,需匹配 10:10:00-10:20:00 的支付記錄

支付流數據(payments 主題):

{"payment_id": "pay_1", "order_id": "order_1", "amount": 100.50, "event_time": "2023-10-01 10:03:00"}  -- 匹配 order_1(10:00:00 ±5 分鐘內)
{"payment_id": "pay_2", "order_id": "order_2", "amount": 200.00, "event_time": "2023-10-01 10:12:00"}  -- 匹配 order_2(10:05:00 ±5 分鐘內,10:12:00 在 10:00:00-10:10:00 外?不,區間是 o.event_time BETWEEN p.event_time -5min AND p.event_time +5min,即 p.event_time -5min ≤ o.event_time ≤ p.event_time +5min → 10:07:00 ≤ o.event_time ≤ 10:17:00,order_2 的 10:05:00 不在此區間!需調整區間定義)

修正時間區間邏輯:原 SQL 中 o.event_time BETWEEN p.event_time -5min AND p.event_time +5min 表示“訂單時間在支付時間 ±5 分鐘內”,但實際需求是“支付時間在訂單時間 +5 分鐘內”。因此需調整為:

AND p.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '5' MINUTE;  -- 支付時間在訂單時間后 5 分鐘內

重新測試支付數據:

{"payment_id":"pay_1","order_id":"order_1","amount":100.50,"event_time":"2023-10-01 10:03:00"}  -- 10:03:00 在 order_1 的 10:00:00-10:05:00 內,匹配
{"payment_id":"pay_2","order_id":"order_2","amount":200.00,"event_time":"2023-10-01 10:08:00"}  -- 10:08:00 在 order_2 的 10:05:00-10:10:00 內,匹配
{"payment_id":"pay_3","order_id":"order_3","amount":300.00,"event_time":"2023-10-01 10:21:00"}  -- 10:21:00 超出 order_3 的 10:15:00-10:20:00,不匹配

步驟 4:執行結果分析

Interval Join 結果:

+I[order_1, pay_1, 100.50]  -- order_1 與 pay_1 匹配(10:03:00 在 10:00:00-10:05:00 內)
+I[order_2, pay_2, 200.00]  -- order_2 與 pay_2 匹配(10:08:00 在 10:05:00-10:10:00 內)

關鍵觀察:

  • order_3 的支付記錄 pay_3 因超時(10:21:00 > 10:15:00 +5 分鐘)未匹配,不會出現在結果中。
  • 狀態清理:當 Watermark 超過 order_1.event_time +5分鐘(即 10:05:00)時,order_1 的數據會從狀態中刪除,不再參與后續匹配。

6. 注意事項

  • 僅支持事件時間:Interval Join 必須基于事件時間,需在表 DDL 中定義 WATERMARK。
  • 時間區間方向:需明確是“左流時間在右流時間區間內”還是“右流時間在左流時間區間內”,避免邏輯錯誤。
  • 不支持 OUTER JOIN:Flink SQL 的 Interval Join 僅支持 INNER JOIN,不支持 LEFT/RIGHT JOIN(因為時間區間限制可能導致一端數據無法匹配,而 OUTER JOIN 需輸出未匹配數據,與狀態清理機制沖突)。
  • 數據亂序影響:若數據亂序嚴重(如 Watermark 延遲較大),可能導致已匹配的數據因狀態清理而無法關聯,需合理設置 Watermark 延遲(如 WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE)。

四、Temporal Join:時態 Join

1. 定義與核心思想

Temporal Join 是基于時間版本的流與維度表 Join,用于關聯流表(如訂單流)和 changelog 流(如用戶維度表的變更記錄),獲取流表數據發生時間點的維度表版本。語法上通過 FOR SYSTEM_TIME AS OF 指定時間點。

核心特點:

  • 時間版本匹配:根據流表的時間戳(事件時間/處理時間),匹配維度表在對應時間點的版本(而非最新版本)。
  • 支持 changelog 流:維度表必須是 changelog 流(包含 INSERT、UPDATE、DELETE),通常通過 CDC(Change Data Capture)工具(如 Debezium)同步數據庫變更。
  • 主鍵要求:維度表必須定義主鍵(PRIMARY KEY),用于唯一標識不同版本的數據。

2. 適用場景

Temporal Join 適用于需要歷史版本數據關聯的場景,例如:

訂單關聯用戶歷史版本:訂單產生時,用戶可能是“普通會員”,后續升級為“VIP”,需關聯訂單發生時的用戶等級(而非當前等級)。

審計日志分析:關聯操作日志與配置表的歷史版本,還原操作發生時的配置信息。

3. 實現原理

Temporal Join 的底層基于 KeyedCoProcessFunction 和版本狀態管理,核心流程如下:

  • 版本狀態存儲:維度表(changelog 流)的數據以 MapState 存儲,Key 為主鍵,Value 為“時間戳 → 數據”的映射(即每個主鍵對應多個版本的數據,按時間戳排序)。
  • 時間點匹配:當流表收到數據 s(時間戳為 t)時,從維度表狀態中查詢主鍵對應的版本列表,找到時間戳 ≤ t 的最新版本(即 max(version_time) ≤ t)。
  • 結果生成:將流表數據 s 與匹配的維度表版本數據關聯,輸出結果。
  • 版本清理:通過 State TTL 清理過期的版本數據(如僅保留最近 7 天的版本)。

4. 語法說明

Temporal Join 的語法需在 Join 條件中添加 FOR SYSTEM_TIME AS OF:

SELECT 
    o.order_id, 
    u.user_name, 
    u.level  -- 用戶等級(歷史版本)
FROM orders AS o
JOIN users FOR SYSTEM_TIME AS OF o.event_time AS u  -- 關聯訂單發生時間點的用戶版本
ON o.user_id = u.user_id;

關鍵語法點:

  • FOR SYSTEM_TIME AS OF o.event_time:指定以流表 orders 的 event_time 為時間點,匹配維度表 users 的版本。
  • 支持事件時間(o.event_time)和處理時間(PROCTIME()),事件時間更常用(需定義 WATERMARK)。

5. 詳細樣例代碼

(1) 場景描述

關聯訂單流(orders)和用戶維度表(users_cdc),獲取訂單發生時用戶的等級(level)。用戶維度表通過 CDC 同步 MySQL 的變更(包含用戶等級更新)。

(2) 環境準備

  • 依賴:flink-connector-mysql-cdc(用于 MySQL CDC 源)
  • 數據源:訂單流(Kafka)、用戶維度表(MySQL CDC)。

步驟 1:創建訂單流表(Kafka)

CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='temporal_join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

步驟 2:創建用戶維度表(MySQL CDC)

CREATE TABLE users_cdc (
    user_id STRING,
    user_name STRING,
    level STRING,  -- 用戶等級(普通、VIP)
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'5'SECOND,
    PRIMARY KEY (user_id) NOT ENFORCED  -- 主鍵(必須定義)
) WITH (
    'connector'='mysql-cdc',
    'hostname'='localhost',
    'port'='3306',
    'username'='root',
    'password'='123456',
    'database-name'='flink_test',
    'table-name'='users',
    'server-time-zone'='Asia/Shanghai'
);

步驟 3:編寫 Temporal Join SQL

-- 結果表(Print 輸出)
CREATE TABLE order_user_level_result (
    order_id STRING,
    user_name STRING,
    level STRING,
    order_time TIMESTAMP(3)
) WITH (
    'connector'='print'
);

-- Temporal Join:關聯訂單發生時間點的用戶版本
INSERT INTO order_user_level_result
SELECT
    o.order_id, 
    u.user_name, 
    u.level,
    o.event_time AS order_time
FROM orders AS o
JOIN users_cdc FORSYSTEM_TIMEASOF o.event_time AS u
ON o.user_id = u.user_id;

步驟 4:準備測試數據

MySQL 用戶表初始數據:

-- flink_test.users 表
+---------+------------+--------+---------------------+
| user_id | user_name  | level  | update_time         |
+---------+------------+--------+---------------------+
| 101     | Alice      | 普通   | 2023-10-01 09:00:00 |
| 102     | Bob        | VIP    | 2023-10-01 09:00:00 |
+---------+------------+--------+---------------------+

訂單流數據(orders 主題):

{"order_id": "order_1", "user_id": "101", "amount": 100.50, "event_time": "2023-10-01 10:00:00"}  -- 10:00:00 的訂單,用戶 101 此時等級為“普通”
{"order_id": "order_2", "user_id": "102", "amount": 200.00, "event_time": "2023-10-01 10:05:00"}  -- 10:05:00 的訂單,用戶 102 此時等級為“VIP”

MySQL 用戶表更新數據(模擬 CDC 變更):

-- 2023-10-01 10:02:00,用戶 101 升級為 VIP
UPDATE users SET level = 'VIP', update_time = '2023-10-01 10:02:00' WHERE user_id = '101';

步驟 5:執行結果分析

Temporal Join 結果:

+I[order_1, Alice, 普通, 2023-10-01 10:00:00]  -- order_1 發生時(10:00:00),用戶 101 等級為“普通”(10:02:00 的升級不影響)
+I[order_2, Bob, VIP, 2023-10-01 10:05:00]      -- order_2 發生時(10:05:00),用戶 102 等級為“VIP”

關鍵觀察:

  • order_1 關聯的是用戶 101 在 10:00:00 的版本(“普通”),即使 10:02:00 升級為“VIP”,也不會影響歷史訂單的關聯結果。
  • 維度表的狀態存儲了用戶 101 的兩個版本(09:00:00 的“普通”和 10:02:00 的“VIP”),通過 FOR SYSTEM_TIME AS OF 匹配對應時間點的版本。

6. 注意事項

  • 維度表主鍵:必須定義 PRIMARY KEY,否則無法區分不同版本的數據。
  • CDC 數據格式:維度表需是 changelog 流(如 MySQL CDC、Postgres CDC),支持 INSERT、UPDATE、DELETE 操作。
  • 時間類型選擇:優先使用事件時間(FOR SYSTEM_TIME AS OF o.event_time),處理時間(FOR SYSTEM_TIME AS OF PROCTIME())會關聯當前最新的維度表版本,無法獲取歷史版本。
  • 狀態管理:維度表的版本狀態會隨時間增長,需配置 State TTL(如 'changelog.state.ttl' = '7d')清理過期版本。

五、Lookup Join:維表 Join

1. 定義與核心思想

Lookup Join 是流表與外部維表(如 MySQL、Redis、HBase)的實時查詢 Join,當流表收到數據時,通過 Join Key 查詢外部維表,獲取關聯數據。語法上通過 FOR SYSTEM_TIME AS OF 標記(與 Temporal Join 類似,但底層實現不同)。

核心特點:

  • 外部存儲依賴:維表存儲在外部系統(非 Flink 狀態),如 MySQL(關系型數據庫)、Redis(緩存)、HBase(列式存儲)。
  • 實時查詢:流表每條數據都會觸發一次外部維表的查詢(同步或異步),無需預先加載維表數據。
  • 無狀態存儲:Flink 不存儲維表數據(僅緩存查詢結果),適用于維表數據量大或頻繁變更的場景。

2. 適用場景

Lookup Join 適用于維表數據量大、需實時查詢的場景,例如:

  • 訂單關聯商品信息:商品信息存儲在 MySQL 中(百萬級數據),訂單流實時查詢 MySQL 獲取商品名稱、價格。
  • 實時風控:規則存儲在 Redis 中,用戶行為流實時查詢 Redis 獲取風控規則(規則頻繁更新)。

3. 實現原理

Lookup Join 的底層基于 LookupFunction(同步)或 AsyncLookupFunction(異步),核心流程如下:

(1) 流表數據觸發:當流表收到數據 s 時,提取 Join Key(如 product_id)。

(2) 外部維表查詢:

  • 同步查詢:調用 LookupFunction 的 lookup 方法,阻塞式查詢外部維表(如 MySQL 的 SELECT * FROM products WHERE product_id = ?),返回關聯數據。
  • 異步查詢:調用 AsyncLookupFunction 的 asyncLookup 方法,通過異步 IO(如 Netty)查詢外部維表,避免阻塞流處理線程(推薦,吞吐量更高)。
  • 3結果生成:將流表數據 s 與查詢到的維表數據關聯,輸出結果(若維表無匹配數據,根據 Join 類型輸出 NULL 或跳過)。
  • 緩存優化(可選):為減少外部存儲壓力,可在 LookupFunction 中添加緩存(如 Guava Cache),緩存查詢結果(需設置 TTL 和大小限制)。

4. 語法說明

Lookup Join 的語法與 Temporal Join 類似,但維表是外部存儲的連接器(如 JDBC、Redis):

SELECT 
    o.order_id, 
    p.product_name, 
    o.amount
FROM orders AS o
JOIN products FOR SYSTEM_TIME AS OF o.event_time AS p  -- 關聯外部維表 products
ON o.product_id = p.product_id;

關鍵語法點:

  • FOR SYSTEM_TIME AS OF o.event_time:標記為 Lookup Join(語義上表示“查詢當前時間點的維表數據”,實際取決于外部存儲的查詢邏輯)。
  • 維表連接器需支持 Lookup 功能(如 JDBC 連接器的 'lookup.cache.max-rows' 參數)。

5. 詳細樣例代碼

(1) 場景描述

關聯訂單流(orders)和 MySQL 維表(products),實時查詢商品名稱(product_name)。訂單流數據量較大(每秒 1000 條),商品維表數據量 10 萬條,需異步查詢優化性能。

(2) 環境準備

? 依賴:flink-connector-jdbc(用于 JDBC 維表連接)

? 數據源:訂單流(Kafka)、商品維表(MySQL)。

步驟 1:創建訂單流表(Kafka)

CREATE TABLE orders (
    order_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='lookup_join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

步驟 2:創建商品維表(MySQL JDBC)

CREATE TABLE products (
    product_id STRING,
    product_name STRING,
    price DECIMAL(10, 2),
    PRIMARY KEY (product_id) NOT ENFORCED  -- 主鍵(用于查詢)
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:3306/flink_test',
    'username'='root',
    'password'='123456',
    'table-name'='products',
    'lookup.cache.max-rows'='10000',  -- 緩存最大行數
    'lookup.cache.ttl'='1h',          -- 緩存 TTL(1 小時)
    'lookup.max-retries'='3'           -- 查詢失敗重試次數
);

步驟 3:編寫 Lookup Join SQL

-- 結果表(Print 輸出)
CREATE TABLE order_product_result (
    order_id STRING,
    product_name STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='print'
);

-- Lookup Join:關聯 MySQL 商品維表
INSERT INTO order_product_result
SELECT
    o.order_id, 
    p.product_name, 
    o.amount
FROM orders AS o
JOIN products FORSYSTEM_TIMEASOF o.event_time AS p
ON o.product_id = p.product_id;

步驟 4:準備測試數據

MySQL 商品維表數據:

-- flink_test.products 表
+------------+--------------+-------+
| product_id | product_name | price |
+------------+--------------+-------+
| P001       | 手機         | 5999  |
| P002       | 電腦         | 8999  |
+------------+--------------+-------+

訂單流數據(orders 主題):

{"order_id":"order_1","product_id":"P001","amount":5999.00,"event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","product_id":"P002","amount":8999.00,"event_time":"2023-10-01 10:00:01"}
{"order_id":"order_3","product_id":"P003","amount":100.00,"event_time":"2023-10-01 10:00:02"}  -- P003 不存在,LEFT JOIN 時 product_name 為 NULL

步驟 5:執行結果分析

Lookup Join 結果(INNER JOIN):

+I[order_1, 手機, 5999.00]  -- P001 查詢成功,關聯商品名稱“手機”
+I[order_2, 電腦, 8999.00]  -- P002 查詢成功,關聯商品名稱“電腦”

若改為 LEFT JOIN:

INSERT INTO order_product_result
SELECT 
    o.order_id, 
    p.product_name, 
    o.amount
FROM orders AS o
LEFT JOIN products FOR SYSTEM_TIME AS OF o.event_time AS p
ON o.product_id = p.product_id;

結果:

+I[order_1, 手機, 5999.00]
+I[order_2, 電腦, 8999.00]
+I[order_3, NULL, 100.00]  -- P003 在維表中不存在,product_name 為 NULL

步驟 6:異步查詢優化(Java API 示例)

若需更高性能,可通過 Java API 實現異步 Lookup Join(Flink SQL 默認同步,需自定義連接器):

// 自定義異步 Lookup Function(簡化示例)
publicclassAsyncMySQLLookupFunctionextendsAsyncTableFunction<Row> {

    privatefinal String url;
    privatefinal String username;
    privatefinal String password;
    privatefinal String tableName;
    private ExecutorService executorService;

    publicAsyncMySQLLookupFunction(String url, String username, String password, String tableName) {
        this.url = url;
        this.username = username;
        this.password = password;
        this.tableName = tableName;
    }

    @Override
    publicvoidopen(FunctionContext context)throws Exception {
        executorService = Executors.newFixedThreadPool(10);  // 異步線程池
    }

    publicvoideval(CompletableFuture<Collection<Row>> future, Object... keys) {
        StringproductId= (String) keys[0];
        executorService.submit(() -> {
            try (Connectionconn= DriverManager.getConnection(url, username, password);
                 PreparedStatementstmt= conn.prepareStatement(
                     "SELECT product_name, price FROM " + tableName + " WHERE product_id = ?")) {
                stmt.setString(1, productId);
                ResultSetrs= stmt.executeQuery();
                if (rs.next()) {
                    StringproductName= rs.getString("product_name");
                    doubleprice= rs.getDouble("price");
                    future.complete(Collections.singletonList(Row.of(productName, price)));
                } else {
                    future.complete(Collections.emptyList());  // 無匹配數據
                }
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
    }

    @Override
    publicvoidclose()throws Exception {
        executorService.shutdown();
    }
}

6. 注意事項

(11) 異步 IO 優化:同步查詢會阻塞流處理線程,導致吞吐量下降,建議優先使用異步 IO(需外部存儲支持異步客戶端,如 MySQL 的異步驅動)。

(2) 緩存配置:合理設置 'lookup.cache.max-rows' 和 'lookup.cache.ttl',避免緩存穿透(大量查詢未緩存的數據)或緩存擊穿(緩存過期瞬間大量請求直達數據庫)。

(3) 維表更新延遲:Lookup Join 查詢的是外部維表的實時數據,若維表更新延遲(如 MySQL 主從同步延遲),可能導致關聯結果不準確。

(4) Join 類型選擇:

  • INNER JOIN:僅輸出維表匹配的數據,適用于強關聯場景(如訂單必須關聯商品)。
  • LEFT JOIN:輸出流表所有數據,維表不匹配時為 NULL,適用于弱關聯場景(如日志關聯維度信息)。

六、四種 Join 方式對比與選型

1. 核心特性對比

特性

Regular Join

Interval Join

Temporal Join

Lookup Join

Join 類型

雙流 Join

雙流 Join

流與 changelog 維表 Join

流與外部維表 Join

時間限制

有(時間區間)

有(時間點版本)

無(查詢實時數據)

狀態管理

存儲雙流所有數據(風險大)

存儲區間內數據(自動清理)

存儲維度表版本(需 TTL)

無狀態(依賴外部存儲)

支持 OUTER JOIN

是(INNER/LEFT/RIGHT/FULL)

否(僅 INNER)

是(LEFT/INNER)

是(LEFT/INNER)

適用數據量

小數據量

中大數據量

中等數據量(維度表)

大數據量(維表)

延遲

低(實時匹配)

低(實時匹配)

低(實時匹配)

中(依賴外部存儲查詢延遲)

典型場景

實時數據補全(允許延遲)

時間區間關聯(如訂單支付)

歷史版本關聯(如審計)

外部維表查詢(如商品信息)

2. 選型決策樹

選擇 Flink SQL Join 方式時,可按以下流程決策:

(1) 是否關聯外部維表(如 MySQL、Redis)?

  • 是 → 選擇 Lookup Join(需優化異步 IO 和緩存)。
  • 否 → 進入下一步。

(2) 是否需要關聯歷史版本數據(如訂單發生時的用戶等級)?

  • 是 → 選擇 Temporal Join(需維度表是 changelog 流,如 CDC)。
  • 否 → 進入下一步。

(3) 是否有嚴格的時間區間限制(如訂單 5 分鐘內支付)?

  • 是 → 選擇 Interval Join(需定義事件時間和 WATERMARK)。
  • 否 → 選擇 Regular Join(需配置 State TTL 避免狀態膨脹)。

3. 性能優化建議

(1) Regular Join:配置 'join.state.ttl' 清理過期狀態,避免 OOM;優先使用 INNER JOIN 減少狀態存儲。

(2) Interval Join:合理設置時間區間(避免過寬導致狀態膨脹,過窄導致數據遺漏);調整 Watermark 延遲(如 INTERVAL '1' MINUTE)平衡數據完整性和實時性。

(3) Temporal Join:為維度表配置 'changelog.state.ttl' 清理過期版本;使用事件時間(FOR SYSTEM_TIME AS OF o.event_time)而非處理時間,確保歷史版本關聯準確性。

(4) Lookup Join:啟用異步 IO('lookup.async' = 'true')提升吞吐量;配置緩存('lookup.cache.max-rows' 和 'lookup.cache.ttl')減少外部存儲壓力;選擇高性能外部存儲(如 Redis 替代 MySQL)。

(5) 通用優化:

  • KeyBy 選擇:確保 Join Key 分布均勻,避免數據傾斜(如 user_id 比隨機 ID 更好)。
  • 并行度調整:根據數據量和資源設置合理并行度(如 Kafka 分區數與 Flink 并行度一致)。
  • 監控與調優:通過 Flink Web UI 監控 State 大小、Checkpoint 時間、算子延遲等指標,動態調整參數。

七、實戰案例:電商實時數倉中的 Join 應用

1. 場景背景

某電商平臺需構建實時數倉,核心需求包括:

  • 訂單實時關聯用戶信息:獲取訂單發生時的用戶等級(歷史版本),用于用戶分群分析。
  • 訂單支付實時關聯:統計訂單產生后 5 分鐘內的支付成功率,監控支付轉化漏斗。
  • 訂單商品實時補全:關聯商品維表(MySQL),獲取商品名稱、類目等信息,用于實時大屏展示。

2. 技術選型

根據需求,結合四種 Join 方式的特點,選型如下:

  • 訂單關聯用戶等級:使用 Temporal Join(用戶表通過 CDC 同步,需歷史版本)。
  • 訂單支付關聯:使用 Interval Join(5 分鐘時間區間限制,自動清理狀態)。
  • 訂單商品補全:使用 Lookup Join(商品表數據量大,存儲在 MySQL,需實時查詢)。

3. 架構設計

Kafka(訂單流) → Flink SQL(Temporal Join + Interval Join + Lookup Join) → Kafka(結果層)
                     ↑
MySQL(用戶 CDC)   Redis(商品緩存,可選)

4. 核心代碼實現

步驟 1:創建源表

-- 訂單流(Kafka)
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    order_status STRING,  -- CREATED, PAID, CANCELED
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- 用戶 CDC 表(MySQL)
CREATE TABLE users_cdc (
    user_id STRING,
    user_name STRING,
    level STRING,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'5'SECOND,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector'='mysql-cdc',
    'hostname'='localhost',
    'port'='3306',
    'username'='root',
    'password'='123456',
    'database-name'='flink_test',
    'table-name'='users'
);

-- 支付流(Kafka)
CREATE TABLE payments (
    payment_id STRING,
    order_id STRING,
    amount DECIMAL(10, 2),
    payment_status STRING,  -- SUCCESS, FAIL
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='payments',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- 商品維表(MySQL)
CREATE TABLE products (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DECIMAL(10, 2),
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:3306/flink_test',
    'username'='root',
    'password'='123456',
    'table-name'='products',
    'lookup.cache.max-rows'='50000',
    'lookup.cache.ttl'='30min'
);

步驟 2:訂單關聯用戶等級(Temporal Join)

-- 訂單用戶關聯結果表(Kafka)
CREATE TABLE order_user_result (
    order_id STRING,
    user_id STRING,
    user_name STRING,
    level STRING,
    order_time TIMESTAMP(3)
) WITH (
    'connector'='kafka',
    'topic'='order_user_result',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- Temporal Join:關聯訂單發生時的用戶等級
INSERT INTO order_user_result
SELECT
    o.order_id,
    o.user_id,
    u.user_name,
    u.level,
    o.event_time AS order_time
FROM orders AS o
JOIN users_cdc FORSYSTEM_TIMEASOF o.event_time AS u
ON o.user_id = u.user_id;

步驟 3:訂單支付關聯(Interval Join)

-- 訂單支付關聯結果表(Kafka)
CREATE TABLE order_payment_result (
    order_id STRING,
    payment_id STRING,
    payment_status STRING,
    order_time TIMESTAMP(3),
    payment_time TIMESTAMP(3)
) WITH (
    'connector'='kafka',
    'topic'='order_payment_result',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- Interval Join:關聯訂單 5 分鐘內的支付記錄
INSERT INTO order_payment_result
SELECT
    o.order_id,
    p.payment_id,
    p.payment_status,
    o.event_time AS order_time,
    p.event_time AS payment_time
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id
AND p.event_time BETWEEN o.event_time AND o.event_time +INTERVAL'5'MINUTE;

步驟 4:訂單商品補全(Lookup Join)

-- 訂單商品關聯結果表(Kafka)
CREATE TABLE order_product_result (
    order_id STRING,
    product_id STRING,
    product_name STRING,
    category STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='kafka',
    'topic'='order_product_result',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- Lookup Join:關聯商品維表
INSERT INTO order_product_result
SELECT
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.amount
FROM orders AS o
LEFTJOIN products FORSYSTEM_TIMEASOF o.event_time AS p
ON o.product_id = p.product_id;

5. 結果驗證與分析

(1) 測試數據

訂單流:

{"order_id":"order_1","user_id":"101","product_id":"P001","amount":5999.00,"order_status":"CREATED","event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","user_id":"102","product_id":"P002","amount":8999.00,"order_status":"CREATED","event_time":"2023-10-01 10:01:00"}

用戶 CDC(初始數據:101 為普通,102 為 VIP;10:02:00 101 升級為 VIP):

-- 初始
INSERT INTO users (user_id, user_name, level, update_time) VALUES ('101', 'Alice', '普通', '2023-10-01 09:00:00');
INSERT INTO users (user_id, user_name, level, update_time) VALUES ('102', 'Bob', 'VIP', '2023-10-01 09:00:00');
-- 升級
UPDATE users SET level = 'VIP', update_time = '2023-10-01 10:02:00' WHERE user_id = '101';

支付流:

{"payment_id":"pay_1","order_id":"order_1","amount":5999.00,"payment_status":"SUCCESS","event_time":"2023-10-01 10:03:00"}  -- order_1 的 5 分鐘內支付
{"payment_id":"pay_2","order_id":"order_2","amount":8999.00,"payment_status":"FAIL","event_time":"2023-10-01 10:07:00"}      -- order_2 的 5 分鐘內支付(10:01:00-10:06:00,10:07:00 超時,不匹配)

商品維表:

INSERT INTO products (product_id, product_name, category, price) VALUES ('P001', '手機', '數碼', 5999.00);
INSERT INTO products (product_id, product_name, category, price) VALUES ('P002', '電腦', '數碼', 8999.00);

(2) 結果分析

① 訂單用戶關聯(Temporal Join):

order_1: user_id=101, level=普通(10:00:00 的版本,升級不影響)
order_2: user_id=102, level=VIP(10:01:00 的版本)

符合預期,關聯了訂單發生時的用戶等級。

② 訂單支付關聯(Interval Join):

order_1: payment_id=pay_1, status=SUCCESS(10:03:00 在 10:00:00-10:05:00 內)
order_2: 無匹配(10:07:00 超出 10:01:00-10:06:00)

僅關聯了 5 分鐘內的支付記錄,超時支付被過濾。

③ 訂單商品補全(Lookup Join):

order_1: product_name=手機, category=數碼
order_2: product_name=電腦, category=數碼

實時查詢 MySQL 商品維表,成功補全商品信息。

八、總結與展望

Flink SQL 的四種 Join 方式覆蓋了流處理中所有常見的關聯場景:

  • Regular Join:通用雙流 Join,適用于對數據完整性要求高、允許延遲的小數據量場景。
  • Interval Join:基于時間區間的雙流 Join,解決時間關聯需求,自動清理狀態,適合大數據量。
  • Temporal Join:流與 changelog 維表的歷史版本 Join,滿足審計、歷史分析等場景。
  • Lookup Join:流與外部維表的實時查詢 Join,適用于維表數據量大、需實時更新的場景。
責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2014-12-25 09:41:15

Android加載方式

2024-03-20 15:33:12

2009-04-02 09:46:19

排名函數排序SQL 2005

2010-09-28 15:40:51

SQL刪除重復記錄

2023-10-21 21:13:00

索引SQL工具

2013-10-17 09:25:52

2021-12-22 09:34:01

Golagn配置方式

2020-11-10 10:08:41

Kubernetes容器開發

2020-06-24 07:49:13

Kubernetes場景網絡

2019-10-25 10:35:49

Java用法場景

2013-06-14 15:24:57

Android開發移動開發數據存儲方式

2010-08-05 09:33:08

Flex頁面跳轉

2020-06-12 08:28:29

JavaScript開發技術

2017-04-17 19:31:03

Android多線程

2010-07-28 13:54:42

Flex數據綁定

2022-03-25 14:47:24

Javascript數據類型開發

2023-05-22 08:03:28

JavaScrip枚舉定義

2021-08-02 14:37:36

鴻蒙HarmonyOS應用

2021-06-25 08:00:00

物聯網醫療技術

2013-05-13 09:48:47

網絡接入接入方法綜合布線
點贊
收藏

51CTO技術棧公眾號

国产乱码精品一区二区三区亚洲人| 婷婷综合激情网| 亚洲精品2区| 欧美大片拔萝卜| 日韩a∨精品日韩在线观看| 国产免费a∨片在线观看不卡| 日韩高清不卡在线| 欧美老妇交乱视频| 国产中年熟女高潮大集合| 精品国产一区二区三区2021| 欧美日韩加勒比精品一区| 亚洲精品乱码久久久久久蜜桃91| 亚洲精品.www| 免费在线看成人av| 91亚洲国产成人久久精品| 日本一二三不卡| 高清视频在线观看一区| 好吊色在线视频| 亚洲欧美综合国产精品一区| 亚洲欧美日韩高清| 欧美熟妇精品一区二区| 视频在线日韩| 亚洲午夜久久久| 中文字幕久久一区| 国产原创av在线| 成人听书哪个软件好| 国产精品丝袜白浆摸在线 | 老司机精品在线| 欧美久久久久久久久久| 91看片就是不一样| av免费在线视| 久久精品视频在线免费观看| 国产精品区免费视频| 国产三级第一页| 免费精品视频在线| 国产成人高清激情视频在线观看| 国产真实乱人偷精品视频| 婷婷综合伊人| xvideos亚洲人网站| 超碰97在线资源站| 成人爽a毛片| 欧美videossexotv100| 夜夜夜夜夜夜操| 欧美美女被草| 欧美三级电影在线看| 狠狠热免费视频| 玛雅亚洲电影| 日本乱人伦aⅴ精品| 久久精品99国产| 欧美电影免费观看网站| 欧美性jizz18性欧美| 国产精品丝袜久久久久久消防器材| 国内在线视频| 亚洲第一主播视频| 亚洲熟妇无码一区二区三区导航| 波多野结依一区| 亚洲成人777| 国产视频九色蝌蚪| 成人免费看视频网站| 欧美天堂在线观看| 激情婷婷综合网| 免费日韩成人| 欧美一区二区三区男人的天堂| 九一精品久久久| 精品三级久久久| 精品国产乱码久久久久久1区2区| 美女伦理水蜜桃4| 秋霞在线一区| 亚洲日韩中文字幕| 天堂av免费在线| 一区二区三区午夜探花| 亚洲天堂久久久久久久| 亚洲精品美女在线观看| 精品影片一区二区入口| 亚洲婷婷影院| 中文字幕在线成人| 日本在线一级片| 亚洲成人资源| 国产精品久久久久久久久久 | 国内外成人免费视频| 亚洲日本香蕉视频| 中文字幕欧美激情一区| 国产日本欧美在线| 99热99re6国产在线播放| 欧美视频第一页| 性chinese极品按摩| 国产一区二区三区黄网站| 精品欧美一区二区久久| 亚洲精品乱码久久久久久不卡| 国产一区二区三区四区大秀| 久久精品国产成人| 免费在线不卡视频| 另类中文字幕网| 国产一区二区黄色| 天天在线视频色| 婷婷六月综合亚洲| 在线视频日韩一区| 2021年精品国产福利在线| 亚洲伦理中文字幕| 日日噜噜夜夜狠狠久久波多野| 亚洲免费高清| 成人av在线亚洲| 青青视频在线观| 亚洲激情综合网| 可以免费在线看黄的网站| 日韩精品视频一区二区三区| 日韩精品在线免费观看| 69av视频在线| 日本不卡在线视频| 国产午夜精品一区| 成人免费在线| 欧美性做爰猛烈叫床潮| 久久久久成人精品无码中文字幕| 欧美激情另类| 人妖精品videosex性欧美| 性中国xxx极品hd| 国产精品私房写真福利视频| 欧美色图色综合| 日韩精品中文字幕吗一区二区| 一区二区成人精品| 在线观看免费国产视频| 激情六月婷婷久久| 日本精品一区| 女厕盗摄一区二区三区| 555夜色666亚洲国产免| 蜜桃av乱码一区二区三区| 亚洲激情自拍| 亚洲直播在线一区| 欧美激情黑人| 欧美日韩国产成人在线免费| 先锋影音av在线| 国产一区白浆| 国内成+人亚洲| 国产网红在线观看| 欧美不卡一二三| 希岛爱理中文字幕| 韩国欧美一区二区| 亚洲一区二区在| 国产精品无码久久久久| 中文字幕久久久av一区| www.五月婷婷.com| 日本一区二区综合亚洲| 国产裸体免费无遮挡| 少妇精品导航| 国产97色在线| 国产大片在线免费观看| 在线观看成人免费视频| 精品成人无码一区二区三区| 欧美亚洲网站| 欧洲亚洲一区二区三区四区五区| 亚洲精品永久免费视频| 亚洲摸下面视频| 国产伦精品一区二区三区视频我| 久久久久久久久久久黄色| 久久久免费视频网站| 免费短视频成人日韩| 国产精品777| 日本三级在线视频| 日韩av网站免费在线| 中文字幕av一区二区| 做爰视频毛片视频| 日韩一区中文字幕| 日韩av影视大全| 欧美激情在线| 久久riav二区三区| 欧美性suv| 色噜噜狠狠狠综合曰曰曰88av| 国产又粗又猛又爽又黄的视频一| 亚洲日本在线a| 中文字幕人妻一区| 久久亚洲色图| 最新不卡av| 成人av综合网| 国产精品99久久99久久久二8| 在线观看美女网站大全免费| 欧美一区二区国产| 国产极品在线播放| 久久久精品影视| 91国内在线播放| 亚洲性图久久| 婷婷久久五月天| 中文久久电影小说| 国产精品久久久久秋霞鲁丝| 国产在线高清视频| 日韩电视剧免费观看网站| 五月天中文字幕| 亚洲一区二区三区四区的 | 91精品导航| 日本免费一区二区三区视频观看| 日本视频不卡| 日韩精品免费一线在线观看| 一级全黄裸体免费视频| 亚洲成av人在线观看| 亚洲一级片在线播放| 国产99久久久国产精品潘金| 亚洲人成无码www久久久| 亚洲综合专区| 日韩电影天堂视频一区二区| 亚洲日本va| 国产精品白丝jk喷水视频一区| 午夜羞羞小视频在线观看| 亚洲欧美国产另类| 亚洲精品国产av| 欧美日本在线视频| 五月婷婷亚洲综合| 伊人婷婷欧美激情| 高清国产在线观看| 99re视频这里只有精品| av亚洲天堂网| 天使萌一区二区三区免费观看| 国产高清不卡无码视频| 狠狠色狠狠色综合婷婷tag| 国产成人免费观看| 午夜精品久久久久久毛片| 欧美亚洲免费电影| 牛牛精品视频在线| 精品国产依人香蕉在线精品| 亚洲人视频在线观看| 精品国产一区二区精华| 国产精品久久久久久久免费看| 日本福利一区二区| 亚洲 欧美 视频| 亚洲五码中文字幕| 欧美激情图片小说| 国产精品美日韩| 中文字幕在线观看免费高清| bt7086福利一区国产| 丰满人妻一区二区三区大胸| 九色porny丨国产精品| 少妇黄色一级片| 欧美资源在线| 青青草原成人网| 国产精品三上| 日韩欧美国产免费| 亚洲三级影院| 天堂…中文在线最新版在线| 在线看片一区| 69sex久久精品国产麻豆| 欧美涩涩视频| 人妻av无码专区| 午夜国产精品视频免费体验区| 欧美h视频在线观看| 四季av在线一区二区三区 | 国产一级久久久久毛片精品| 26uuu久久天堂性欧美| 少妇饥渴放荡91麻豆| 99久久99久久精品免费观看| 欧美一区二区免费在线观看| www.欧美精品一二区| 亚洲av网址在线| 91热门视频在线观看| 精品少妇一区二区三区免费观| 国产精品色在线网站| 天天色 色综合| 久视频在线观看| 香蕉成人伊视频在线观看| 在线观看国产亚洲| 色av综合在线| 91亚洲视频在线观看| 欧美一区二区精品久久911| 亚洲a视频在线| 亚洲精品狠狠操| 九九九伊在人线综合| 一个色综合导航| 91精品国产91久久久久久青草| 欧美黑人性视频| 僵尸再翻生在线观看免费国语| 久久久久久久久久久免费| 超级白嫩亚洲国产第一| 日本欧美精品在线| 欧美亚洲黄色| 成人一区二区三区四区| 神马日本精品| 亚洲春色在线| 午夜欧美理论片| www黄色av| 久久99热狠狠色一区二区| 中文字幕一二三区| 97精品超碰一区二区三区| 国产黄a三级三级| 亚洲午夜成aⅴ人片| 国产成人精品亚洲| 日韩欧美国产高清| 久久伊伊香蕉| 久久国产精品99国产精| 夜鲁夜鲁夜鲁视频在线播放| 国产精品亚发布| 女仆av观看一区| 一级一片免费播放| 国产精品久久久一区二区| 欧美在线aaa| av在线免费不卡| 男人天堂a在线| 最新日本在线观看| 欧美黑人性视频| 国外成人福利视频| 国产女主播一区二区三区| 欧美日韩中字| 鲁一鲁一鲁一鲁一色| 国产综合久久久久久鬼色 | 国产一区二区在线视频观看| 精品盗摄一区二区三区| 瑟瑟视频在线| 欧美一级片一区| 深夜激情久久| 亚洲无玛一区| 久久亚洲色图| 大地资源二中文在线影视观看 | 久久福利一区二区| 美腿丝袜在线亚洲一区 | 国产精品一区二区三区av麻| www.国产亚洲| 精品夜夜嗨av一区二区三区| 日本xxxx黄色| 国产亚洲精品久| 国产精品一区二区三区四| 欧美不卡123| 肉肉视频在线观看| 91色视频在线观看| 色琪琪久久se色| 精品久久久噜噜噜噜久久图片| 99久久免费视频.com| 青青草手机视频在线观看| 欧美精品色一区二区三区| av中文字幕一区二区三区| 日韩美女激情视频| 神马香蕉久久| 日本网站免费在线观看| 成人性视频免费网站| 国产极品国产极品| 欧美一区永久视频免费观看| 欧美日韩在线看片| 国产欧美一区二区三区久久| 成人在线免费小视频| 国产理论在线播放| 波多野结衣一区| 久久九九精品99国产精品| 黄色软件视频在线观看| 国产精品swag| 亚洲国产裸拍裸体视频在线观看乱了中文 | 亚洲国产古装精品网站| 男人天堂亚洲| 国产精品免费在线 | 国产精品人人爽人人爽| 久久久久综合网| 99re这里只有精品在线| 亚洲欧美一区二区三区四区 | 37p粉嫩大胆色噜噜噜| 黑人欧美xxxx| 黄色在线免费观看大全| 国产精品91一区| 日韩av在线播放网址| 日本在线播放一区二区| 最近中文字幕一区二区三区| 国产一区二区三区在线观看| 欧美成人一二三| 好吊妞国产欧美日韩免费观看网站| 免费看欧美黑人毛片| 91免费观看在线| 日韩精品一区二区亚洲av观看| 亚洲天堂av综合网| 95精品视频| 日本aa在线观看| 97国产一区二区| 欧美在线视频精品| 久久精品中文字幕电影| 18国产精品| 欧美一级片中文字幕| 国产精品丝袜一区| 亚洲爆乳无码一区二区三区| 精品一区二区三区在线视频| 樱花草www在线| 亚洲一区二区三区免费视频| 午夜影院免费体验区| 国产精品久久av| 一区二区三区国产精华| 亚洲欧美日本一区| 欧美浪妇xxxx高跟鞋交| 黄污视频在线观看| 日本一区二区高清视频| 国产一区在线观看麻豆| 五月婷婷开心网| 日韩在线激情视频| 成人18夜夜网深夜福利网| 五月婷婷深爱五月| 亚洲精品欧美二区三区中文字幕| www.天天干.com| 日韩美女主播视频| 综合av在线| 男人的天堂av网| 日韩欧美一二三四区| 欧美日韩123区| 99国产精品白浆在线观看免费| 国产亚洲成av人在线观看导航| 99久久精品无免国产免费| 青青草成人在线| 国内自拍一区| 妖精视频在线观看免费| 亚洲第一区中文字幕| 91成人福利社区|