精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

FlinkSQL 電商訂單狀態追蹤與實時處理代碼

大數據
本系統基于FlinkSQL構建,實現從訂單創建到完成的全生命周期狀態管理,并實時觸發庫存更新、物流調度等關鍵業務操作。

一、系統架構概述

在大型電商平臺中,訂單狀態的實時追蹤與處理是保障交易流暢性的核心環節。本系統基于FlinkSQL構建,實現從訂單創建到完成的全生命周期狀態管理,并實時觸發庫存更新、物流調度等關鍵業務操作。系統采用分層架構設計,包含數據接入層、處理層和輸出層,各層之間通過事件流緊密銜接,確保狀態變更的毫秒級響應。

二、環境準備與基礎配置

 1. Flink環境配置

-- 設置Flink執行參數
SET execution.checkpointing.interval=30000;-- 30秒一次檢查點
SET execution.checkpointing.timeout =60000;-- 檢查點超時時間
SET execution.checkpointing.mode= EXACTLY_ONCE;-- 精確一次語義
SET state.backend ='rocksdb';-- 使用RocksDB作為狀態后端
SET state.ttl.ttl =86400000;-- 狀態保留1天
SET parallelism.default=12;-- 默認并行度12

2. 基礎數據類型定義

-- 定義訂單狀態枚舉類型
CREATETYPE OrderStatus ASENUM(
'PENDING_PAYMENT',-- 待付款
'PAID',-- 已付款
'PROCESSING',-- 處理中
'SHIPPED',-- 已發貨
'DELIVERED',-- 已送達
'CANCELLED',-- 已取消
'REFUNDED',-- 已退款
'EXPIRED'-- 已過期
);

-- 定義庫存操作類型枚舉
CREATETYPE InventoryOpType ASENUM(
'INCREASE',-- 增加庫存
'DECREASE',-- 減少庫存
'FREEZE',-- 凍結庫存
'UNFREEZE',-- 解凍庫存
'ADJUST'-- 調整庫存
);

-- 定義物流調度狀態枚舉
CREATETYPE LogisticsStatus ASENUM(
'PENDING_DISPATCH',-- 待調度
'DISPATCHED',-- 已調度
'IN_TRANSIT',-- 運輸中
'OUT_FOR_DELIVERY',-- 配送中
'DELIVERED',-- 已送達
'FAILED'-- 配送失敗
);

三、數據接入層設計

1. 訂單事件流接入 (Kafka)

-- 訂單狀態變更事件流
CREATETABLE order_status_events (
  order_id STRING,-- 訂單ID
  user_id STRING,-- 用戶ID
status OrderStatus,-- 訂單狀態
  prev_status OrderStatus,-- 上一狀態
  status_time TIMESTAMP(3),-- 狀態變更時間
  payment_time TIMESTAMP(3),-- 支付時間(如有)
  cancel_reason STRING,-- 取消原因(如有)
  operation_user STRING,-- 操作人
  ext_info MAP<STRING, STRING>,-- 擴展信息
  event_time AS PROCTIME(),-- 處理時間
  WATERMARK FOR status_time AS status_time -INTERVAL'5'SECOND-- 水印定義,允許5秒延遲
)WITH(
'connector'='kafka',
'topic'='order_status_events',
'properties.bootstrap.servers'='kafka-broker:9092',
'properties.group.id'='flink_order_tracking_group',
'format'='json',
'json.fail-on-missing-field'='false',
'json.ignore-parse-errors'='true',
'scan.startup.mode'='earliest-offset',
'properties.auto.offset.reset'='earliest'
);

-- 訂單商品明細流
CREATETABLE order_item_events (
  order_id STRING,-- 訂單ID
  item_id STRING,-- 商品ID
  sku_id STRING,-- SKU ID
  product_name STRING,-- 商品名稱
  quantity INT,-- 數量
  price DECIMAL(10,2),-- 單價
  discount DECIMAL(10,2),-- 折扣金額
  create_time TIMESTAMP(3),-- 創建時間
  WATERMARK FOR create_time AS create_time -INTERVAL'5'SECOND
)WITH(
'connector'='kafka',
'topic'='order_item_events',
'properties.bootstrap.servers'='kafka-broker:9092',
'properties.group.id'='flink_order_item_group',
'format'='json'
);

2. 庫存數據接入 (MySQL + CDC)

-- 商品庫存基礎表 (CDC方式接入)
CREATETABLE product_inventory (
  product_id STRING,-- 商品ID
  sku_id STRING,-- SKU ID
  total_stock INT,-- 總庫存
  available_stock INT,-- 可用庫存
  frozen_stock INT,-- 凍結庫存
  locked_stock INT,-- 鎖定庫存
  update_time TIMESTAMP(3),-- 更新時間
PRIMARYKEY(product_id, sku_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='mysql-inventory',
'port'='3306',
'username'='flink_user',
'password'='flink_password',
'database-name'='inventory_db',
'table-name'='product_inventory',
'server-time-zone'='Asia/Shanghai'
);

3. 物流信息接入 (Kafka + HBase)

-- 物流單事件流
CREATETABLE logistics_events (
  logistics_id STRING,-- 物流單ID
  order_id STRING,-- 訂單ID
status LogisticsStatus,-- 物流狀態
  status_time TIMESTAMP(3),-- 狀態時間
  location STRING,-- 當前位置
  courier_id STRING,-- 快遞員ID
  courier_name STRING,-- 快遞員姓名
  WATERMARK FOR status_time AS status_time -INTERVAL'10'SECOND
)WITH(
'connector'='kafka',
'topic'='logistics_events',
'properties.bootstrap.servers'='kafka-broker:9092',
'properties.group.id'='flink_logistics_group',
'format'='json'
);

-- 物流區域信息表 (HBase)
CREATETABLE logistics_area_info (
  area_id STRING,-- 區域ID
  province STRING,-- 省份
  city STRING,-- 城市
  district STRING,-- 區/縣
  warehouse_id STRING,-- 對應倉庫ID
PRIMARYKEY(area_id)NOT ENFORCED
)WITH(
'connector'='hbase-2.2',
'table-name'='logistics:area_info',
'zookeeper.quorum'='zk-node1,zk-node2,zk-node3',
'zookeeper.znode.parent'='/hbase'
);

四、數據處理層設計

1. 訂單狀態流轉核心邏輯

(1) 訂單狀態清洗與規范化

-- 創建訂單狀態事件清洗視圖
CREATEVIEW cleaned_order_status_events AS
SELECT
  order_id,
  user_id,
status,
  prev_status,
  status_time,
  payment_time,
-- 標準化取消原因
CASE
WHENstatus='CANCELLED'THEN
CASE
WHEN cancel_reason ISNULLOR cancel_reason =''THEN'UNKNOWN'
WHEN cancel_reason IN('user_cancel','用戶取消')THEN'USER_CANCEL'
WHEN cancel_reason IN('stock_out','庫存不足')THEN'STOCK_OUT'
WHEN cancel_reason IN('payment_timeout','支付超時')THEN'PAYMENT_TIMEOUT'
ELSE'OTHER'
END
ELSENULL
ENDAS cancel_reason_standardized,
  operation_user,
  ext_info,
-- 添加訂單創建時間(首次狀態變更)
  FIRST_VALUE(status_time)OVER(PARTITIONBY order_id ORDERBY status_time)AS create_time,
-- 計算狀態持續時間(與上一狀態比較)
  status_time - LAG(status_time)OVER(PARTITIONBY order_id ORDERBY status_time)AS status_duration,
  event_time
FROM order_status_events;

(2) 訂單狀態生命周期追蹤

-- 訂單狀態生命周期表 (使用狀態函數追蹤完整生命周期)
CREATETABLE order_lifecycle (
  order_id STRING,
  user_id STRING,
  create_time TIMESTAMP(3),
  pending_payment_time TIMESTAMP(3),
  paid_time TIMESTAMP(3),
  processing_time TIMESTAMP(3),
  shipped_time TIMESTAMP(3),
  delivered_time TIMESTAMP(3),
  cancelled_time TIMESTAMP(3),
  refunded_time TIMESTAMP(3),
  expired_time TIMESTAMP(3),
  cancel_reason STRING,
  current_status OrderStatus,
  status_updated_time TIMESTAMP(3),
-- 各狀態持續時間
  pending_payment_duration BIGINT,
  processing_duration BIGINT,
  shipping_duration BIGINT,
  delivery_duration BIGINT,
  overall_duration BIGINT,
  last_updated AS PROCTIME()
)WITH(
'connector'='upsert-kafka',
'topic'='order_lifecycle',
'properties.bootstrap.servers'='kafka-broker:9092',
'key.format'='json',
'key.json.ignore-parse-errors'='true',
'value.format'='json',
'value.json.fail-on-missing-field'='false'
);

-- 寫入訂單生命周期表
INSERTINTO order_lifecycle
SELECT
  order_id,
  user_id,
MAX(create_time)AS create_time,
MAX(CASEWHENstatus='PENDING_PAYMENT'THEN status_time END)AS pending_payment_time,
MAX(CASEWHENstatus='PAID'THEN status_time END)AS paid_time,
MAX(CASEWHENstatus='PROCESSING'THEN status_time END)AS processing_time,
MAX(CASEWHENstatus='SHIPPED'THEN status_time END)AS shipped_time,
MAX(CASEWHENstatus='DELIVERED'THEN status_time END)AS delivered_time,
MAX(CASEWHENstatus='CANCELLED'THEN status_time END)AS cancelled_time,
MAX(CASEWHENstatus='REFUNDED'THEN status_time END)AS refunded_time,
MAX(CASEWHENstatus='EXPIRED'THEN status_time END)AS expired_time,
MAX(CASEWHENstatus='CANCELLED'THEN cancel_reason_standardized END)AS cancel_reason,
  LAST_VALUE(status)OVER(PARTITIONBY order_id ORDERBY status_time ROWSBETWEENUNBOUNDEDPRECEDINGANDUNBOUNDEDFOLLOWING)AS current_status,
MAX(status_time)AS status_updated_time,
-- 計算各狀態持續時間(秒)
  TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='PENDING_PAYMENT'THEN status_time END),
MAX(CASEWHENstatus='PAID'THEN status_time END))AS pending_payment_duration,
  TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='PAID'THEN status_time END),
MAX(CASEWHENstatus='SHIPPED'THEN status_time END))AS processing_duration,
  TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='SHIPPED'THEN status_time END),
MAX(CASEWHENstatus='OUT_FOR_DELIVERY'THEN status_time END))AS shipping_duration,
  TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='OUT_FOR_DELIVERY'THEN status_time END),
MAX(CASEWHENstatus='DELIVERED'THEN status_time END))AS delivery_duration,
  TIMESTAMPDIFF(SECOND,MAX(create_time),MAX(status_time))AS overall_duration
FROM cleaned_order_status_events
GROUPBY order_id, user_id;

2. 庫存更新邏輯處理

(1) 庫存操作事件生成

-- 創建庫存操作事件視圖
CREATEVIEW inventory_operation_events AS
WITH order_item_agg AS(
-- 聚合訂單商品信息
SELECT
    order_id,
    COLLECT_LIST(ROW(sku_id, quantity))AS items,
MAX(create_time)AS create_time
FROM order_item_events
GROUPBY order_id
)
-- 生成庫存操作事件
SELECT
  UUID()AS op_id,-- 操作ID
  o.order_id,
  UNNEST(items).sku_id AS sku_id,
  UNNEST(items).quantity AS quantity,
-- 根據訂單狀態確定庫存操作類型
CASE
WHEN o.status='PAID'THEN'FREEZE'-- 支付成功,凍結庫存
WHEN o.status='SHIPPED'THEN'DECREASE'-- 已發貨,減少庫存
WHEN o.status='CANCELLED'AND o.prev_status IN('PAID','PROCESSING')THEN'UNFREEZE'-- 已取消,解凍庫存
WHEN o.status='REFUNDED'THEN'INCREASE'-- 已退款,增加庫存
ELSENULL
ENDAS op_type,
  o.status_time AS op_time,
'ORDER_SYSTEM'AS source_system,
  o.event_time
FROM cleaned_order_status_events o
JOIN order_item_agg oi ON o.order_id = oi.order_id
-- 過濾出需要庫存操作的狀態變更
WHERE
(o.status='PAID'AND o.prev_status ='PENDING_PAYMENT')OR
(o.status='SHIPPED'AND o.prev_status ='PROCESSING')OR
(o.status='CANCELLED'AND o.prev_status IN('PAID','PROCESSING'))OR
(o.status='REFUNDED');

(2) 庫存并發控制與更新

-- 創建庫存更新結果表
CREATETABLE inventory_update_results (
  op_id STRING,
  order_id STRING,
  sku_id STRING,
  op_type InventoryOpType,
  quantity INT,
  prev_available_stock INT,
  new_available_stock INT,
  prev_frozen_stock INT,
  new_frozen_stock INT,
  op_time TIMESTAMP(3),
  process_time TIMESTAMP(3),
status STRING,-- SUCCESS, FAILED, RETRY
  message STRING,
PRIMARYKEY(op_id)NOT ENFORCED
)WITH(
'connector'='jdbc',
'url'='jdbc:mysql://mysql-inventory:3306/inventory_db',
'table-name'='inventory_update_results',
'username'='flink_user',
'password'='flink_password',
'sink.buffer-flush.max-rows'='100',
'sink.buffer-flush.interval'='5s',
'sink.max-retries'='3'
);

-- 庫存更新主邏輯
INSERTINTO inventory_update_results
SELECT
  op_id,
  order_id,
  sku_id,
  op_type,
  quantity,
  prev_available,
  new_available,
  prev_frozen,
  new_frozen,
  op_time,
CURRENT_TIMESTAMPAS process_time,
CASE
WHEN(op_type ='DECREASE'AND prev_available < quantity)THEN'FAILED'
WHEN(op_type ='FREEZE'AND prev_available < quantity)THEN'FAILED'
ELSE'SUCCESS'
ENDASstatus,
CASE
WHEN(op_type ='DECREASE'AND prev_available < quantity)THEN'Insufficient stock'
WHEN(op_type ='FREEZE'AND prev_available < quantity)THEN'Insufficient stock to freeze'
ELSE'Operation successful'
ENDAS message
FROM(
-- 使用Flink的狀態函數進行庫存原子更新
SELECT
    op_id,
    order_id,
    sku_id,
    op_type,
    quantity,
    op_time,
-- 根據操作類型計算新庫存值
CASE op_type
WHEN'INCREASE'THEN available_stock + quantity
WHEN'DECREASE'THEN available_stock - quantity
WHEN'FREEZE'THEN available_stock - quantity
WHEN'UNFREEZE'THEN available_stock + quantity
ELSE available_stock
ENDAS new_available,
    available_stock AS prev_available,
-- 處理凍結庫存
CASE op_type
WHEN'FREEZE'THEN frozen_stock + quantity
WHEN'UNFREEZE'THEN frozen_stock - quantity
ELSE frozen_stock
ENDAS new_frozen,
    frozen_stock AS prev_frozen
FROM inventory_operation_events
-- 關聯當前庫存信息
JOIN product_inventory FOR SYSTEM_TIME ASOF event_time
ON inventory_operation_events.sku_id = product_inventory.sku_id
) t
-- 過濾掉無效操作類型
WHERE op_type ISNOTNULL;

3. 物流調度觸發與優化

(1) 物流單創建與調度

-- 創建物流調度指令表
CREATETABLE logistics_dispatch_commands (
  command_id STRING,
  order_id STRING,
  user_id STRING,
  sku_id STRING,
  quantity INT,
  warehouse_id STRING,
  target_province STRING,
  target_city STRING,
  target_district STRING,
  target_address STRING,
  required_delivery_time TIMESTAMP(3),
  priority STRING,-- HIGH, MEDIUM, LOW
  create_time TIMESTAMP(3),
status STRING,-- PENDING, DISPATCHED, FAILED
PRIMARYKEY(command_id)NOT ENFORCED
)WITH(
'connector'='kafka',
'topic'='logistics_dispatch_commands',
'properties.bootstrap.servers'='kafka-broker:9092',
'key.format'='json',
'value.format'='json',
'sink.partitioner'='round-robin'
);

-- 物流調度觸發邏輯
INSERTINTO logistics_dispatch_commands
SELECT
  UUID()AS command_id,
  o.order_id,
  o.user_id,
  oi.sku_id,
  oi.quantity,
  la.warehouse_id,
  u.province,
  u.city,
  u.district,
  u.address,
-- 計算期望送達時間(根據商品類型)
CASE
WHEN p.category ='fresh'THEN o.status_time +INTERVAL'24'HOUR
WHEN p.category ='digital'THEN o.status_time +INTERVAL'48'HOUR
ELSE o.status_time +INTERVAL'72'HOUR
ENDAS required_delivery_time,
-- 根據訂單金額確定優先級
CASE
WHENSUM(oi.quantity * oi.price)>1000THEN'HIGH'
WHENSUM(oi.quantity * oi.price)>500THEN'MEDIUM'
ELSE'LOW'
ENDOVER(PARTITIONBY o.order_id)AS priority,
  o.status_time AS create_time,
'PENDING'ASstatus
FROM cleaned_order_status_events o
-- 關聯訂單商品信息
JOIN order_item_events oi ON o.order_id = oi.order_id
-- 關聯用戶收貨地址
JOIN user_address FOR SYSTEM_TIME ASOF o.event_time
ON o.user_id = user_address.user_id 
AND user_address.is_default =TRUE
-- 關聯商品信息獲取分類
JOIN product_info FOR SYSTEM_TIME ASOF o.event_time
ON oi.sku_id = product_info.sku_id
-- 關聯物流區域信息獲取最優倉庫
JOIN logistics_area_info la 
ON user_address.district = la.district
-- 僅處理已支付待發貨的訂單
WHERE o.status='PAID'
AND o.prev_status ='PENDING_PAYMENT'
-- 添加冪等性控制,防止重復調度
ANDNOTEXISTS(
SELECT1FROM logistics_dispatch_commands 
WHERE order_id = o.order_id ANDstatus!='FAILED'
);

(2) 物流效率監控與優化

-- 創建物流時效監控視圖
CREATEVIEW logistics_efficiency_metrics AS
WITH order_logistics AS(
-- 關聯訂單與物流信息
SELECT
    o.order_id,
    o.status_time AS paid_time,
    l.logistics_id,
MIN(CASEWHEN l.status='DISPATCHED'THEN l.status_time END)AS dispatched_time,
MIN(CASEWHEN l.status='IN_TRANSIT'THEN l.status_time END)AS transit_time,
MIN(CASEWHEN l.status='OUT_FOR_DELIVERY'THEN l.status_time END)AS delivery_time,
MIN(CASEWHEN l.status='DELIVERED'THEN l.status_time END)AS received_time,
    la.warehouse_id,
    la.city AS warehouse_city,
    u.city AS target_city
FROM cleaned_order_status_events o
LEFTJOIN logistics_events l ON o.order_id = l.order_id
LEFTJOIN logistics_area_info la ON l.location = la.area_id
LEFTJOIN user_address u ON o.user_id = u.user_id AND u.is_default =TRUE
WHERE o.status='PAID'
GROUPBY o.order_id, o.status_time, l.logistics_id, la.warehouse_id, la.city, u.city
)
-- 計算各環節時效指標
SELECT
  order_id,
  logistics_id,
  warehouse_id,
  warehouse_city,
  target_city,
  paid_time,
  dispatched_time,
  transit_time,
  delivery_time,
  received_time,
-- 計算各階段耗時(分鐘)
  TIMESTAMPDIFF(MINUTE, paid_time, dispatched_time)AS warehouse_processing_minutes,
  TIMESTAMPDIFF(MINUTE, dispatched_time, transit_time)AS first_mile_minutes,
  TIMESTAMPDIFF(MINUTE, transit_time, delivery_time)AS line_haul_minutes,
  TIMESTAMPDIFF(MINUTE, delivery_time, received_time)AS last_mile_minutes,
  TIMESTAMPDIFF(MINUTE, paid_time, received_time)AS total_delivery_minutes,
-- 判斷是否超時
CASE
WHEN TIMESTAMPDIFF(MINUTE, paid_time, received_time)>1440THEN'OVERDUE'-- >24小時
WHEN TIMESTAMPDIFF(MINUTE, paid_time, received_time)>720THEN'AT_RISK'-- >12小時
ELSE'ON_TIME'
ENDAS delivery_status
FROM order_logistics;

-- 創建物流效率監控結果表
CREATETABLE logistics_efficiency_monitor (
  order_id STRING,
  warehouse_id STRING,
  warehouse_city STRING,
  target_city STRING,
  total_delivery_minutes INT,
  delivery_status STRING,
  warehouse_processing_minutes INT,
  first_mile_minutes INT,
  line_haul_minutes INT,
  last_mile_minutes INT,
  monitoring_time TIMESTAMP(3),
PRIMARYKEY(order_id)NOT ENFORCED
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://es-node1:9200,http://es-node2:9200',
'index'='logistics_efficiency_{yyyyMMdd}',
'document-id.key-delimiter'='$',
'sink.bulk-flush.max-actions'='1000',
'sink.bulk-flush.max-size'='2mb',
'sink.bulk-flush.interval'='10s',
'format'='json'
);

-- 寫入物流效率監控數據
INSERTINTO logistics_efficiency_monitor
SELECT
  order_id,
  warehouse_id,
  warehouse_city,
  target_city,
  total_delivery_minutes,
  delivery_status,
  warehouse_processing_minutes,
  first_mile_minutes,
  line_haul_minutes,
  last_mile_minutes,
CURRENT_TIMESTAMPAS monitoring_time
FROM logistics_efficiency_metrics
WHERE received_time ISNOTNULL;-- 僅處理已收貨訂單

4. 異常訂單檢測與處理

-- 創建訂單異常檢測視圖
CREATEVIEW abnormal_order_detection AS
SELECT
  order_id,
  user_id,
  current_status,
  status_time,
  create_time,
-- 計算訂單各階段超時情況
CASE
WHEN current_status ='PENDING_PAYMENT'
AND TIMESTAMPDIFF(MINUTE, create_time,CURRENT_TIMESTAMP)>30THEN'PAYMENT_TIMEOUT'
WHEN current_status ='PROCESSING'
AND TIMESTAMPDIFF(HOUR, paid_time,CURRENT_TIMESTAMP)>24THEN'PROCESSING_TIMEOUT'
WHEN current_status ='SHIPPED'
AND TIMESTAMPDIFF(HOUR, shipped_time,CURRENT_TIMESTAMP)>72THEN'DELIVERY_TIMEOUT'
ELSENULL
ENDAS abnormal_type,
-- 計算超時時間(分鐘)
CASE
WHEN current_status ='PENDING_PAYMENT'
THEN TIMESTAMPDIFF(MINUTE, create_time,CURRENT_TIMESTAMP)-30
WHEN current_status ='PROCESSING'
THEN TIMESTAMPDIFF(MINUTE, paid_time,CURRENT_TIMESTAMP)-1440
WHEN current_status ='SHIPPED'
THEN TIMESTAMPDIFF(MINUTE, shipped_time,CURRENT_TIMESTAMP)-4320
ELSE0
ENDAS overtime_minutes,
-- 獲取用戶歷史異常訂單數
(SELECTCOUNT(*)FROM order_lifecycle 
WHERE user_id = o.user_id AND abnormal_type ISNOTNULL)AS user_abnormal_count,
CURRENT_TIMESTAMPAS detection_time
FROM order_lifecycle o
-- 檢測異常條件
WHERE(
(current_status ='PENDING_PAYMENT'
AND TIMESTAMPDIFF(MINUTE, create_time,CURRENT_TIMESTAMP)>30)
OR
(current_status ='PROCESSING'
AND TIMESTAMPDIFF(HOUR, paid_time,CURRENT_TIMESTAMP)>24)
OR
(current_status ='SHIPPED'
AND TIMESTAMPDIFF(HOUR, shipped_time,CURRENT_TIMESTAMP)>72)
)
-- 排除已處理的異常訂單
AND order_id NOTIN(SELECT order_id FROM abnormal_order_handling);

-- 創建異常訂單處理指令表
CREATETABLE abnormal_order_handling (
  order_id STRING,
  abnormal_type STRING,
  overtime_minutes INT,
  user_abnormal_count INT,
  detection_time TIMESTAMP(3),
handler STRING,
  handling_action STRING,
  handling_time TIMESTAMP(3),
status STRING,-- PENDING, PROCESSED, RESOLVED
  notes STRING,
PRIMARYKEY(order_id)NOT ENFORCED
)WITH(
'connector'='jdbc',
'url'='jdbc:mysql://mysql-order:3306/order_db',
'table-name'='abnormal_order_handling',
'username'='flink_user',
'password'='flink_password'
);

-- 自動生成異常訂單處理指令
INSERTINTO abnormal_order_handling
SELECT
  order_id,
  abnormal_type,
  overtime_minutes,
  user_abnormal_count,
  detection_time,
-- 根據異常類型和用戶歷史異常數分配處理人員
CASE
WHEN user_abnormal_count >5THEN'vip_customer_service'
WHEN abnormal_type ='DELIVERY_TIMEOUT'THEN'logistics_support'
ELSE'order_support'
ENDAShandler,
-- 自動建議處理動作
CASE
WHEN abnormal_type ='PAYMENT_TIMEOUT'THEN'CANCEL_ORDER'
WHEN abnormal_type ='PROCESSING_TIMEOUT'THEN'ESCALATE_PROCESSING'
WHEN abnormal_type ='DELIVERY_TIMEOUT'THEN'CHECK_LOGISTICS'
ENDAS handling_action,
CURRENT_TIMESTAMPAS handling_time,
'PENDING'ASstatus,
CASE
WHEN user_abnormal_count >5THEN'High-risk customer, manual review required'
ELSE'Auto-generated handling instruction'
ENDAS notes
FROM abnormal_order_detection;

五、數據寫出層設計

1. 實時監控指標輸出

-- 創建訂單處理實時指標表
CREATETABLE order_processing_metrics (
  metric_time TIMESTAMP(3),
  order_count BIGINT,
  paid_count BIGINT,
  shipped_count BIGINT,
  delivered_count BIGINT,
  cancelled_count BIGINT,
  avg_payment_time DOUBLE,
  avg_processing_time DOUBLE,
  avg_delivery_time DOUBLE,
  abnormal_order_rate DOUBLE,
PRIMARYKEY(metric_time)NOT ENFORCED
)WITH(
'connector'='prometheus',
'url'='http://prometheus-server:9090/api/v1/write',
'namespace'='ecommerce',
'metric.name'='order_processing_metrics'
);

-- 計算并輸出訂單處理指標
INSERTINTO order_processing_metrics
SELECT
  TUMBLE_START(status_time,INTERVAL'5'MINUTE)AS metric_time,
COUNT(DISTINCT order_id)AS order_count,
COUNT(DISTINCTCASEWHENstatus='PAID'THEN order_id END)AS paid_count,
COUNT(DISTINCTCASEWHENstatus='SHIPPED'THEN order_id END)AS shipped_count,
COUNT(DISTINCTCASEWHENstatus='DELIVERED'THEN order_id END)AS delivered_count,
COUNT(DISTINCTCASEWHENstatus='CANCELLED'THEN order_id END)AS cancelled_count,
-- 計算平均支付時間(秒)
AVG(TIMESTAMPDIFF(SECOND, create_time, paid_time))AS avg_payment_time,
-- 計算平均處理時間(秒)
AVG(TIMESTAMPDIFF(SECOND, paid_time, shipped_time))AS avg_processing_time,
-- 計算平均配送時間(秒)
AVG(TIMESTAMPDIFF(SECOND, shipped_time, delivered_time))AS avg_delivery_time,
-- 異常訂單率
CASEWHENCOUNT(DISTINCT order_id)=0THEN0
ELSECOUNT(DISTINCTCASEWHEN abnormal_type ISNOTNULLTHEN order_id END)*1.0/COUNT(DISTINCT order_id)
ENDAS abnormal_order_rate
FROM order_lifecycle
LEFTJOIN abnormal_order_detection a ON order_lifecycle.order_id = a.order_id
-- 使用5分鐘滾動窗口聚合
GROUPBY TUMBLE(status_time,INTERVAL'5'MINUTE);

2. 下游系統通知與集成

-- 創建訂單狀態變更通知表(Kafka)
CREATETABLE order_status_notifications (
  notification_id STRING,
  order_id STRING,
  user_id STRING,
status OrderStatus,
  prev_status OrderStatus,
  status_time TIMESTAMP(3),
  notification_type STRING,-- APP_PUSH, SMS, EMAIL
  message_content STRING,
  priority INT,
  create_time TIMESTAMP(3),
statusAS'PENDING',
PRIMARYKEY(notification_id)NOT ENFORCED
)WITH(
'connector'='kafka',
'topic'='order_status_notifications',
'properties.bootstrap.servers'='kafka-broker:9092',
'key.format'='json',
'value.format'='json'
);

-- 生成訂單狀態變更通知
INSERTINTO order_status_notifications
SELECT
  UUID()AS notification_id,
  order_id,
  user_id,
status,
  prev_status,
  status_time,
-- 根據狀態類型確定通知方式
CASE
WHENstatusIN('CANCELLED','REFUNDED')THEN'SMS'
WHENstatus='DELIVERED'THEN'APP_PUSH'
ELSE'APP_PUSH'
ENDAS notification_type,
-- 動態生成通知內容
CASEstatus
WHEN'PAID'THEN CONCAT('Order ', order_id,' has been paid successfully')
WHEN'SHIPPED'THEN CONCAT('Order ', order_id,' has been shipped')
WHEN'DELIVERED'THEN CONCAT('Order ', order_id,' has been delivered')
WHEN'CANCELLED'THEN CONCAT('Order ', order_id,' has been cancelled: ', cancel_reason_standardized)
WHEN'REFUNDED'THEN CONCAT('Order ', order_id,' has been refunded')
ELSE CONCAT('Order ', order_id,' status updated to ',status)
ENDAS message_content,
-- 設置通知優先級
CASE
WHENstatusIN('CANCELLED','REFUNDED')THEN1
WHENstatusIN('PAID','DELIVERED')THEN2
ELSE3
ENDAS priority,
CURRENT_TIMESTAMPAS create_time
FROM cleaned_order_status_events
-- 僅對關鍵狀態變更發送通知
WHEREstatusIN('PAID','SHIPPED','DELIVERED','CANCELLED','REFUNDED');

六、系統優化與高級特性

1. 狀態管理與優化

-- 創建帶狀態TTL優化的訂單狀態視圖
CREATEVIEW order_status_with_ttl AS
SELECT
  order_id,
  user_id,
status,
  status_time,
  ROW_NUMBER()OVER(PARTITIONBY order_id ORDERBY status_time DESC)AS rn
FROM order_status_events
-- 使用Flink的狀態TTL功能自動清理過期狀態
WITH(
'state.ttl'='86400000',-- 狀態保留1天
'state.cleanup-strategy'='EMBEDDED'
)
WHERE rn =1;-- 只保留最新狀態

2. 雙流JOIN優化

-- 優化的訂單與庫存雙流JOIN
CREATEVIEW order_inventory_joined AS
SELECT
/*+ OPTIONS('lookup.join.cache.ttl'='30s', 'lookup.join.cache.size'='10000') */
  o.order_id,
  o.status,
  o.status_time,
  oi.sku_id,
  oi.quantity,
  pi.available_stock,
  pi.frozen_stock,
  pi.total_stock
FROM cleaned_order_status_events o
JOIN order_item_events oi ON o.order_id = oi.order_id
-- 使用緩存優化的LOOKUP JOIN
JOIN product_inventory FOR SYSTEM_TIME ASOF o.event_time
ON oi.sku_id = product_inventory.sku_id
WHERE o.status='PAID';

3. 動態配置與規則引擎

-- 創建規則配置表 (MySQL)
CREATETABLE order_processing_rules (
  rule_id STRING,
  rule_type STRING,-- INVENTORY_RULE, LOGISTICS_RULE, NOTIFICATION_RULE
  priority INT,
  condition_expr STRING,
  action_expr STRING,
  effective_time TIMESTAMP(3),
  expire_time TIMESTAMP(3),
status STRING,-- ACTIVE, INACTIVE
  create_time TIMESTAMP(3),
  update_time TIMESTAMP(3),
PRIMARYKEY(rule_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='mysql-config',
'port'='3306',
'username'='flink_user',
'password'='flink_password',
'database-name'='config_db',
'table-name'='order_processing_rules'
);

-- 使用動態規則處理訂單
CREATEVIEW order_processing_with_rules AS
SELECT
  o.*,
  r.rule_id,
  r.action_expr
FROM cleaned_order_status_events o
JOIN order_processing_rules r
ON r.rule_type ='INVENTORY_RULE'
AND r.status='ACTIVE'
AND o.status_time >= r.effective_time
AND(r.expire_time ISNULLOR o.status_time < r.expire_time)
-- 這里可以集成Flink的SQL函數來動態評估規則條件
AND o.status='PAID';

七、系統監控與運維

1. 數據質量監控

-- 創建數據質量監控表
CREATETABLE data_quality_metrics (
  metric_time TIMESTAMP(3),
  source_table STRING,
  total_records BIGINT,
  null_order_id_count BIGINT,
  late_records_count BIGINT,
  schema_violation_count BIGINT,
  avg_processing_time DOUBLE,
  error_rate DOUBLE
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://es-node1:9200,http://es-node2:9200',
'index'='data_quality_metrics_{yyyyMMdd}',
'format'='json'
);

-- 監控訂單事件流數據質量
INSERTINTO data_quality_metrics
SELECT
  TUMBLE_START(event_time,INTERVAL'1'MINUTE)AS metric_time,
'order_status_events'AS source_table,
COUNT(*)AS total_records,
COUNT(CASEWHEN order_id ISNULLTHEN1END)AS null_order_id_count,
COUNT(CASEWHEN status_time < event_time -INTERVAL'5'SECONDTHEN1END)AS late_records_count,
0AS schema_violation_count,-- 需要通過Flink的DDL驗證配置獲取
AVG(TIMESTAMPDIFF(MILLISECOND, status_time, PROCTIME()))AS avg_processing_time,
CASEWHENCOUNT(*)=0THEN0
ELSECOUNT(CASEWHEN order_id ISNULLORstatusISNULLTHEN1END)*1.0/COUNT(*)
ENDAS error_rate
FROM order_status_events
GROUPBY TUMBLE(event_time,INTERVAL'1'MINUTE);

2. 慢查詢監控

-- 啟用Flink的查詢監控
SET'execution.profile.enabled'='true';
SET'execution.profile.sample-interval'='1000';
SET'execution.profile.delay'='0';

-- 創建查詢性能監控表
CREATETABLE query_performance_metrics (
  query_id STRING,
  job_name STRING,
  start_time TIMESTAMP(3),
  end_time TIMESTAMP(3),
  duration_ms BIGINT,
  rows_read BIGINT,
  rows_written BIGINT,
  peak_memory_usage BIGINT,
  state_size BIGINT,
  backpressure_count INT
)WITH(
'connector'='jdbc',
'url'='jdbc:mysql://mysql-monitor:3306/monitor_db',
'table-name'='query_performance_metrics',
'username'='flink_user',
'password'='flink_password'
);

本系統基于FlinkSQL構建了一個完整的電商訂單狀態追蹤與實時處理平臺,實現了從數據接入、處理到輸出的全流程覆蓋。系統具有以下特點:

  • 完整性:覆蓋訂單狀態追蹤、庫存管理、物流調度等電商核心業務流程
  • 實時性:基于Flink的流處理能力,實現毫秒級狀態響應與處理
  • 可靠性:通過檢查點、狀態管理和冪等性設計確保數據一致性
  • 可擴展性:模塊化設計支持業務規則動態調整和功能擴展
  • 可監控性:完善的指標收集和監控體系,確保系統穩定運行
責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2025-10-29 07:38:45

2023-08-07 18:45:30

電商訂單訂單類型批量發貨

2014-12-15 09:32:17

StormSpark

2017-08-09 13:30:21

大數據Apache Kafk實時處理

2011-12-30 13:50:21

流式計算Hadoop

2017-11-21 14:14:04

PHPnode.js圖片訪問

2019-09-04 09:31:40

日志Flink監控

2017-08-31 16:36:26

2017-02-14 15:37:32

KappaLambda

2025-03-04 08:00:00

JavaiTextPDFPDF

2018-06-11 17:37:23

高并發與實時處理技術

2021-07-21 10:22:02

數據存儲

2017-11-03 15:05:56

Storm數據處理服務器

2013-04-27 12:18:58

大數據全球技術峰會京東

2016-11-08 12:49:27

大數據分布式系統Druid-IO

2013-08-30 09:59:23

用友用友U8+

2025-11-14 01:20:00

2015-07-14 10:53:28

2024-12-26 17:16:59

2023-03-06 07:35:30

狀態機工具訂單狀態
點贊
收藏

51CTO技術棧公眾號

三级做a全过程在线观看| 国产精品白嫩白嫩大学美女| 日本韩国欧美| 欧美国产一区二区在线观看| 92福利视频午夜1000合集在线观看| 久草视频在线资源站| 亚洲+小说+欧美+激情+另类| 欧美日韩一区不卡| r级无码视频在线观看| 二区在线视频| 成人99免费视频| 国产精品日韩久久久久| 日本一级黄色大片| 欧美mv日韩| 亚洲精品综合久久中文字幕| www.成年人| 日韩在线影院| 亚洲v中文字幕| 中日韩在线视频| 涩涩视频在线观看免费| 国产成人亚洲综合a∨婷婷图片| 欧美一区深夜视频| 欧美三级小视频| 欧美1级片网站| 亚洲精品视频中文字幕| 久久久久99人妻一区二区三区| 91福利精品在线观看| 午夜成人免费视频| 久久视频免费在线| 97在线观看免费观看高清| 成人av网站免费观看| 91香蕉亚洲精品| 99成人精品视频| 国产精品久久久久久久久久妞妞| 欧美巨大黑人极品精男| 国产黄色录像视频| 久久av综合| 亚洲国产精品va在线| 香蕉视频色在线观看| 成人亚洲视频| 色婷婷精品久久二区二区蜜臀av| 国产欧美精品aaaaaa片| 在线午夜影院| 中文字幕五月欧美| 亚洲精品成人a8198a| 极品白浆推特女神在线观看 | 911亚洲精选| 久久三级中文| 欧美一级艳片视频免费观看| 午夜剧场在线免费观看| 欧美男女视频| 欧美日韩在线播放三区| 啊啊啊国产视频| 欧美极品免费| 欧美视频在线一区| 少妇性l交大片| 国产麻豆久久| 欧美在线观看视频一区二区| caopor在线视频| 在线观看欧美日韩电影| 色综合天天狠狠| 中文字幕无码不卡免费视频| 英国三级经典在线观看| 日韩欧美成人精品| 日韩手机在线观看视频| 国产成人午夜性a一级毛片| 欧美三区在线观看| 91 视频免费观看| 日韩免费成人| 亚洲国产精品电影| 黄色国产在线观看| 国产在视频线精品视频www666| 一区二区三区无码高清视频| 国产成人精品视频免费| 亚洲五月综合| 国外成人在线播放| 狠狠人妻久久久久久| 日韩国产在线观看一区| 成人啪啪免费看| 黑人精品一区二区三区| 久久久久综合网| 椎名由奈jux491在线播放 | 成人高潮片免费视频| 成人一区在线观看| 九九99久久| av在线首页| 一区二区三区日韩精品视频| 日韩av黄色网址| 91福利精品在线观看| 4438x成人网最大色成网站| 涩视频在线观看| 欧美美女在线| 久久久999国产| 久热这里只有精品6| 久久国产乱子精品免费女| 91在线视频九色| 天堂av网在线| 亚洲精品福利视频网站| 成人羞羞国产免费网站| www.欧美| 亚洲人成网站色ww在线| 校园春色 亚洲| 视频一区二区三区在线| caoporen国产精品| 国产高清视频免费最新在线| 亚洲激情第一区| www日韩在线观看| 亚洲精品在线a| 日韩在线免费高清视频| 日本少妇xxxx动漫| 久久草av在线| 欧美精品欧美精品| 波多野结依一区| 3d成人h动漫网站入口| 精品少妇人妻一区二区黑料社区| 欧美日韩国产色综合一二三四| 国产第一区电影| 三级在线观看网站| 亚洲人被黑人高潮完整版| 熟女人妇 成熟妇女系列视频| 99国产精品免费网站| 精品国产视频在线| 日韩一级在线视频| 不卡电影一区二区三区| 日本一道在线观看| 久久久久毛片| 国产香蕉97碰碰久久人人| 国产精久久久久久| 国产精品一色哟哟哟| 亚洲人一区二区| 欧美片第1页| 日韩高清中文字幕| 久久这里只有精品国产| 国产揄拍国内精品对白| 亚洲欧美日产图| 福利精品一区| 在线视频中文亚洲| 午夜久久久久久久久久影院| 久久综合精品国产一区二区三区 | 欧美videos中文字幕| 日本一二三区在线观看| 久久精品国产亚洲a| 欧洲在线视频一区| 都市激情亚洲一区| 亚洲美女动态图120秒| 特级毛片www| 久久久精品国产99久久精品芒果| 国产午夜福利100集发布| 国产精品国产| 91精品国产乱码久久久久久久久| 亚洲精品久久久久久久久久 | 欧美国产日韩一区二区| www.xxx国产| 亚洲午夜激情av| 李丽珍裸体午夜理伦片| 国产精品久久久亚洲一区| 久久国产主播精品| 午夜裸体女人视频网站在线观看| 亚洲精品成人久久| 中文字幕视频网站| 久久九九影视网| 网站一区二区三区| 91麻豆精品国产91久久久平台 | 亚洲色图丝袜| 国产精品小说在线| 超碰超碰在线| 精品国产123| 国产成人在线视频观看| 久久久久久免费| 手机看片一级片| 亚洲一区二区三区无吗| 国产精品嫩草在线观看| 樱花草涩涩www在线播放| 国产亚洲激情视频在线| 97免费观看视频| 亚洲二区在线视频| 波多野结衣av在线免费观看| 日韩av网站免费在线| 伊人天天久久大香线蕉av色| 无人区乱码一区二区三区| 久久久久久国产| 国产日本在线观看| 91精品国产综合久久蜜臀| 久久久无码精品亚洲国产| 久久综合久久久久88| 中文字幕一区久久| 在线精品一区| 中文字幕中文字幕99| 欧美变态网站| 国产在线高清精品| 欧美男人天堂| 久久精品国亚洲| 香蕉久久国产av一区二区| 欧美日韩一区久久| 国产无套内射又大又猛又粗又爽| 国产欧美精品一区二区三区四区| 亚洲精品鲁一鲁一区二区三区 | 国产精品黄页免费高清在线观看| 成人在线app| 亚洲美女av网站| 性猛交xxxx乱大交孕妇印度| 欧美性受xxxx| 国产精品一区二区6| 亚洲桃色在线一区| 中文字幕网站在线观看| 国产91精品在线观看| 手机在线免费观看毛片| 极品中文字幕一区| 亚洲一卡二卡| 国产一区不卡| 国产精品一区二区你懂得| 另类一区二区三区| 欧洲永久精品大片ww免费漫画| 羞羞视频在线观看不卡| 在线视频欧美性高潮| 天堂91在线| 欧美mv日韩mv国产网站app| 伊人网免费视频| 91福利国产精品| 日韩三级免费看| 一区二区三区视频在线观看| 999精品在线视频| 国产欧美日韩在线看| 国产亚洲色婷婷久久99精品91| 国产乱妇无码大片在线观看| 亚洲精品www.| 水野朝阳av一区二区三区| 极品美女扒开粉嫩小泬| 欧美日本国产| 青青视频免费在线| 91精品天堂福利在线观看| 午夜精品美女久久久久av福利| 伊人久久大香线蕉| 久久综合色一本| 欧美日韩一区二区三区在线电影 | 亚洲精品字幕在线| 欧美一卡2卡3卡4卡| 国产精品无码免费播放| 欧美另类变人与禽xxxxx| 中文字幕第31页| 欧美探花视频资源| 精品乱码一区内射人妻无码| 在线看日本不卡| 在线观看国产黄| 欧美放荡的少妇| 国产叼嘿视频在线观看| 日韩一区二区精品葵司在线| 国产a级免费视频| 日韩免费在线观看| 成人午夜免费在线观看| 精品伦理精品一区| 欧美77777| 日韩精品极品在线观看| 欧洲视频在线免费观看| 亚洲人成电影网站色…| 丁香婷婷在线观看| 最近日韩中文字幕中文| 久久黄色美女电影| 欧美成年人视频网站| 黄色的视频在线观看| 午夜精品久久久久久久男人的天堂 | 久草在在线视频| 老司机精品视频导航| 特级黄色片视频| 不卡免费追剧大全电视剧网站| 久久人人妻人人人人妻性色av| 99re成人精品视频| 能免费看av的网站| 国产精品久久毛片a| 波多野结衣家庭教师| 亚洲不卡一区二区三区| 无码一区二区三区在线观看| 欧美日韩不卡一区| 午夜精品无码一区二区三区| 亚洲激情小视频| 成人午夜电影在线观看| 欧美黑人xxxx| 外国成人直播| 亚洲自拍偷拍在线| 青青操综合网| 亚洲一区二区免费视频软件合集 | 欧美激情第6页| 国产欧美一区二区三区精品酒店| 国产日韩av在线| 久久动漫网址| 一区二区三区国| 国产精品亚洲欧美| 国产探花在线观看视频| 99热99精品| 国产精品视频看看| 天天操天天综合网| 91成品人影院| 日韩精品视频三区| 成人黄色网址| 国产99久久精品一区二区 夜夜躁日日躁 | 奇米影视7777精品一区二区| 午夜性福利视频| 亚洲国产精品高清| 日本亚洲色大成网站www久久| 欧美午夜精品久久久| 天堂网2014av| 欧美成人亚洲成人| 成人不卡视频| 精品乱色一区二区中文字幕| 午夜精品视频一区二区三区在线看| 免费无码毛片一区二三区| 美国av一区二区| 国产精品无码午夜福利| 一级日本不卡的影视| 在线观看中文字幕网站| 亚洲韩国欧洲国产日产av | 久久精品av| 免费在线观看日韩视频| 丰满亚洲少妇av| 三级全黄做爰视频| 在线观看91视频| 台湾av在线二三区观看| 欧美大秀在线观看| 亚洲男人在线| 色噜噜狠狠一区二区三区| 国产亚洲在线观看| 久久久久99人妻一区二区三区| 国产精品久久久久久久久快鸭| 国产无套丰满白嫩对白| 亚洲黄色有码视频| 秋霞在线视频| 99久久99久久| 欧美黄污视频| 1314成人网| 亚洲精品中文字幕在线观看| 91高潮大合集爽到抽搐| 中文字幕久久久av一区| 国产另类xxxxhd高清| 麻豆精品传媒视频| 国产精品日本| 99久久人妻无码中文字幕系列| 亚洲主播在线播放| 亚洲AV无码国产精品午夜字幕 | 69成人免费视频| 亚洲精品国产拍免费91在线| 国产精品186在线观看在线播放| 亚洲a级在线播放观看| 999久久久精品国产| 欧美日韩中文不卡| 国产精品久久久久影院亚瑟| 中文字幕av久久爽| 中文字幕久热精品在线视频| 99久久久国产精品免费调教网站 | 99re资源| 国产精品草草| 黄色免费看视频| 疯狂欧美牲乱大交777| 涩涩视频在线观看免费| 国产精欧美一区二区三区| 成人在线免费观看网站| 久久久久xxxx| 一区二区三区av电影| 人人妻人人澡人人爽久久av | 五月婷婷亚洲综合| 亚洲天堂第一页| 日韩成人免费av| www.激情网| 99久久精品国产网站| 国产高清中文字幕| 色综合伊人色综合网站| 日韩三级不卡| 成人毛片一区二区| 久久久久国产一区二区三区四区 | 原纱央莉成人av片| 日本一区视频在线播放| 激情文学综合插| 国产在线观看你懂的| 亚洲免费av片| 国产激情精品一区二区三区| 国产精品入口芒果| 久久九九久精品国产免费直播| 国产乱叫456在线| 97久久精品人人澡人人爽缅北| 国产精品羞羞答答在线观看| 日本中文字幕精品—区二区| 亚洲最新在线观看| 黄色小视频在线观看| 91pron在线| 久久在线精品| 欧美日韩国产精品一区二区三区| 国产视频久久久久| 婷婷久久免费视频| 欧美 丝袜 自拍 制服 另类| 亚洲欧洲日产国产综合网| 人妻无码中文字幕免费视频蜜桃| 国产精品福利在线观看网址| 欧美国产高清| 丰满的亚洲女人毛茸茸| 日韩一区二区三区三四区视频在线观看 | 狠狠v欧美ⅴ日韩v亚洲v大胸| 亚洲一区二区三区sesese| 亚洲在线视频| 欧美日韩免费做爰视频| 中文字幕精品av| 日韩美女毛片|