精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink 精確一次語義原理深度解析

大數據
本文將從原理到源碼,深入剖析Flink精確一次提交的實現機制,涵蓋Checkpoint流程、狀態管理、兩階段提交及與外部系統的集成等關鍵環節。

在分布式流處理系統中,"精確一次"(Exactly-Once, EO)語義是數據一致性的黃金標準,它確保每條數據在處理過程中不丟失、不重復,且只被處理一次。Apache Flink作為業界領先的流處理框架,通過Checkpoint機制、兩階段提交協議(Two-Phase Commit, 2PC)和狀態管理等核心技術,實現了端到端的精確一次語義。

本文將從原理到源碼,深入剖析Flink精確一次提交的實現機制,涵蓋Checkpoint流程、狀態管理、兩階段提交及與外部系統的集成等關鍵環節。

一、精確一次語義的核心挑戰與Flink的解決思路

1. 什么是精確一次語義?

在流處理場景中,數據處理的"一次"語義可分為三個層次:

  • 至少一次(At-Least-Once):數據至少被處理一次,可能因故障恢復導致重復處理。
  • 最多一次(At-Most-Once):數據最多被處理一次,可能因故障導致數據丟失。
  • 精確一次(Exactly-Once):數據嚴格被處理一次,無丟失、無重復。

端到端精確一次要求從數據源(如Kafka)→ Flink處理 → 外部存儲(如Kafka、HDFS)的整個鏈路都滿足EO語義。其核心挑戰在于:

  • 內部狀態一致性:Flink作業內部的算子狀態(如聚合結果、窗口數據)在故障恢復后需與輸入數據嚴格對齊。
  • 外部寫入一致性:寫入外部系統的數據需與Flink內部狀態同步提交,避免"內部狀態已提交,外部寫入失敗"或反之的情況。

2. Flink的解決思路

Flink通過以下三大核心技術實現端到端EO:

  • 分布式快照(Checkpoint):基于Chandy-Lamport算法的輕量級異步快照機制,定期保存作業全局狀態,為故障恢復提供一致性基準點。
  • 可插拔狀態后端(State Backend):管理狀態的存儲與訪問,支持內存、文件系統(HDFS)、RocksDB等多種存儲方式,確保狀態的持久化與高效恢復。
  • 兩階段提交協議(2PC):協調外部系統與Flink內部狀態的提交,實現"要么全部提交,要么全部回滾"的原子性。

二、Checkpoint機制:精確一次的基石

Checkpoint是Flink實現EO的核心機制,它通過定期生成作業全局狀態的快照,確保故障發生后能恢復到某個一致的狀態。Flink的Checkpoint基于Chandy-Lamport算法改進,具有輕量級、異步、增量等特點。

1. Checkpoint的核心概念

(1) Barrier(屏障)

Barrier是Checkpoint的核心觸發信號,它是一條特殊的數據記錄,由JobManager(作業協調器)注入到Source算子,并隨著數據流向下游算子廣播。Barrier將數據流分割為"Barrier之前的數據"和"Barrier之后的數據",算子收到Barrier后,會觸發當前狀態的快照。

Barrier的特性:

  • 廣播性:Barrier會廣播到所有并行算子實例,確保所有算子對齊到同一檢查點。
  • 對齊性:對于多輸入流算子(如KeyedJoin),需等待所有輸入流的Barrier到達后才能觸發快照,避免狀態不一致。
  • 異步性:快照過程異步執行,不阻塞數據流的正常處理。

(2) Checkpoint流程

一個完整的Checkpoint流程可分為以下步驟(以單并行度作業為例):

① 觸發Checkpoint:JobManager定期向所有Source算子發送TriggerCheckpoint消息,指定Checkpoint ID(如ckp-1)。

② Source注入Barrier:Source算子收到觸發消息后,停止處理新數據,在當前輸出位置插入Barrier(標記為ckp-1),然后將Barrier廣播給下游算子,同時將自身的狀態(如Kafka的offset)保存到狀態后端。

③ 中間算子快照與Barrier傳遞:中間算子(如Map、KeyedAgg)收到Barrier后,執行以下操作:

  • 暫停處理新數據:等待所有輸入流的Barrier到達(對齊階段)。
  • 快照狀態:將當前算子狀態(如聚合結果、窗口數據)異步保存到狀態后端。
  • 傳遞Barrier:向下游算子廣播Barrier,繼續處理新數據。

④ Sink算子確認與外部系統預提交:Sink算子收到Barrier后,快照自身狀態,并與外部系統交互(如Kafka事務預提交),向JobManager返回AcknowledgeCheckpoint消息。

⑤ Checkpoint完成:當所有算子都返回確認消息后,JobManager將Checkpoint標記為"已完成",并持久化Checkpoint元數據(如狀態存儲路徑、算子狀態偏移量)。

2. Barrier對齊:多輸入流的一致性保證

對于多輸入流算子(如KeyedJoin),Barrier對齊是確保狀態一致性的關鍵。假設算子有兩個輸入流(Input1和Input2),對齊過程如下:

  • 部分Barrier到達:假設Input1的Barrier先到達,算子會暫停處理Input1的數據,但繼續處理Input2的數據(因為其Barrier未到)。
  • 所有Barrier到達:當Input2的Barrier也到達后,算子觸發狀態快照,并向下游廣播Barrier。
  • 恢復處理:快照完成后,算子恢復處理兩個輸入流的數據。

對齊的意義:確保快照中包含的是"所有輸入流Barrier之前的數據"的處理結果,避免因部分輸入流延遲導致狀態不一致。

3. Checkpoint源碼解析

(1) Checkpoint觸發:JobManager端

Checkpoint的觸發由CheckpointCoordinator類(位于org.apache.flink.runtime.checkpoint包)負責。核心邏輯如下:

// CheckpointCoordinator.java
public void triggerCheckpoint(long timestamp, CheckpointProperties props) {
    // 1. 生成Checkpoint ID
    long checkpointID = checkpointIdCounter.getAndIncrement();
    
    // 2. 向所有Source任務發送TriggerCheckpoint消息
    for (ExecutionVertex vertex: tasksToTrigger) {
        if (vertex.getExecutionState() == ExecutionState.RUNNING) {
            vertex.triggerCheckpoint(checkpointID, timestamp, props);
        }
    }
}

CheckpointCoordinator是JobManager的核心組件,負責:

  • 定期觸發Checkpoint(通過ScheduledExecutorService調度)。
  • 跟蹤Checkpoint狀態(等待所有算子確認)。
  • 處理Checkpoint超時或失敗。

(2) Barrier注入與傳遞:StreamTask端

Source算子收到TriggerCheckpoint消息后,會在StreamTask(流處理任務基類)中注入Barrier。核心邏輯在StreamTask.performCheckpoint方法:

// StreamTask.java
privatevoidperformCheckpoint(CheckpointMetaData checkpointMetaData)throws Exception {
    // 1. 向下游廣播Barrier
    operatorChain.broadcastCheckpointBarrier(
        checkpointMetaData.getCheckpointId(),
        checkpointMetaData.getTimestamp(),
        checkpointMetaData.getCheckpointOptions()
    );
    
    // 2. 異步快照算子狀態
    Future<SnapshotResult> snapshotFuture = checkpointingOperation.snapshotState();
    
    // 3. 注冊回調,等待快照完成后通知JobManager
    snapshotFuture.thenAccept(snapshotResult -> {
        acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), snapshotResult);
    });
}
  • operatorChain.broadcastCheckpointBarrier:通過RecordWriter向所有輸出流寫入Barrier。
  • checkpointingOperation.snapshotState:調用算子的snapshotState方法,將狀態保存到狀態后端(如RocksDB)。

(3) Barrier對齊:StreamInputProcessor端

對于多輸入流算子,Barrier對齊由StreamInputProcessor(輸入處理器)實現。核心邏輯如下:

// StreamInputProcessor.java
public InputStatus pollNext()throws Exception {
    while (true) {
        // 1. 從輸入通道讀取數據或Barrier
        BufferOrEventbufferOrEvent= inputGate.getNextBufferOrEvent();
        
        if (bufferOrEvent.isBuffer()) {
            // 2. 如果是普通數據,檢查是否需要對齊
            if (checkAlignmentNeeded()) {
                // 當前通道的Barrier未到,暫停處理,緩存數據
                return InputStatus.MORE_AVAILABLE;
            } else {
                // 無需對齊,將數據交給算子處理
                return pushToOperator(bufferOrEvent.getBuffer());
            }
        } elseif (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
            // 3. 如果是Barrier,觸發對齊邏輯
            handleBarrier((CheckpointBarrier) bufferOrEvent.getEvent());
        }
    }
}

privatevoidhandleBarrier(CheckpointBarrier barrier) {
    // 記錄當前通道的Barrier到達
    barrierHandler.addBarrier(barrier.getChannelIndex(), barrier.getId());
    
    // 檢查所有通道的Barrier是否都已到達
    if (barrierHandler.isAllBarriersReceived()) {
        // 觸發狀態快照
        triggerCheckpoint(barrier.getId());
        // 重置對齊狀態,繼續處理數據
        barrierHandler.reset();
    }
}
  • checkAlignmentNeeded:判斷是否有其他輸入流的Barrier未到,若未到則緩存當前數據。
  • handleBarrier:處理Barrier到達事件,當所有通道的Barrier都到達后,觸發快照。

三、狀態管理:一致性的持久化保障

狀態是Flink流處理的核心,用于存儲算子的中間結果(如聚合值、窗口數據)。Flink通過**狀態后端(State Backend)**管理狀態的存儲、訪問和持久化,確保Checkpoint時狀態能被正確保存,故障恢復時能被準確加載。

1. 狀態的分類

Flink中的狀態分為兩類:

(1) Keyed State(鍵控狀態)

僅用于KeyedStream,狀態與某個Key綁定,不同Key的狀態獨立存儲。常見類型:

  • ValueState:單值狀態(如某個Key的計數器)。
  • ListState:列表狀態(如某個Key的窗口數據)。
  • MapState:映射狀態(如某個Key的維度信息)。

(2) Operator State(算子狀態)

與算子并行實例綁定,不依賴Key。常見類型:

  • ListState:每個并行實例維護一個列表(如Kafka Source的offset列表)。
  • BroadcastState:廣播狀態(如配置數據,所有并行實例共享)。

2. 狀態后端的實現

Flink提供三種內置狀態后端,其核心差異在于存儲位置和快照機制:

狀態后端

存儲位置

快照方式

適用場景

MemoryStateBackend

TaskManager內存

同步全量快照

本地調試、小狀態作業

FsStateBackend

內存+文件系統

同步全量快照

中等規模狀態、需要容錯

RocksDBStateBackend

本地RocksDB

異步增量快照

大規模狀態、長窗口作業

RocksDBStateBackend是生產環境最常用的后端,其核心優勢:

  • 增量快照:僅保存上次Checkpoint后變化的狀態數據,減少快照開銷。
  • 本地存儲:狀態存儲在TaskManager的本地磁盤,避免內存OOM。
  • 異步持久化:快照過程異步執行,不影響數據流處理。

3. 狀態快照與恢復源碼解析

(1) 狀態快照:SnapshotStrategy

狀態后端的快照邏輯由SnapshotStrategy接口定義,以RocksDBStateBackend為例,其增量快照實現為RocksDBIncrementalSnapshotStrategy:

// RocksDBIncrementalSnapshotStrategy.java
public SnapshotResultSupplier snapshotState(long checkpointId)throws Exception {
    // 1. 獲取RocksDB的 SST文件列表(增量數據)
    List<StateMetaInfoSnapshot> metaInfoSnapshots = metaHandler.snapshot();
    List<StreamStateHandle> sstFiles = uploadSstFiles(checkpointId);
    
    // 2. 生成增量快照元數據(包含SST文件路徑、偏移量等)
    IncrementalRemoteKeyedStateHandlestateHandle=newIncrementalRemoteKeyedStateHandle(
        checkpointId,
        sstFiles,
        metaInfoSnapshots
    );
    
    // 3. 返回快照結果(包含狀態句柄)
    return SnapshotResultSupplier.of(stateHandle);
}
  • uploadSstFiles:將RocksDB的增量SST文件上傳到分布式文件系統(如HDFS)。
  • IncrementalRemoteKeyedStateHandle:描述增量快照的元數據,恢復時用于定位狀態文件。

(2) 狀態恢復:StateBackend

故障恢復時,StateBackend根據Checkpoint元數據加載狀態。核心邏輯在RocksDBStateBackend.restoreKeyedState:

// RocksDBStateBackend.java
publicvoidrestoreKeyedState(List<KeyedStateHandle> stateHandles)throws Exception {
    for (KeyedStateHandle stateHandle : stateHandles) {
        if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            // 1. 下載增量SST文件到本地
            IncrementalRemoteKeyedStateHandleincrementalHandle=
                (IncrementalRemoteKeyedStateHandle) stateHandle;
            downloadSstFiles(incrementalHandle.getSstFiles());
            
            // 2. 將SST文件導入RocksDB實例
            rocksDB.restore(incrementalHandle.getMetaInfoSnapshots());
        }
    }
}
  • downloadSstFiles:從分布式文件系統下載SST文件到TaskManager本地磁盤。
  • rocksDB.restore:將SST文件導入RocksDB,恢復狀態數據。

四、兩階段提交協議:端到端精確一次的核心

Flink通過Checkpoint實現了內部狀態的精確一次,但端到端EO還需協調外部系統的寫入。例如,若Sink算子將數據寫入Kafka,需確保"內部狀態提交"與"Kafka數據寫入"原子性:要么同時成功,要么同時失敗。

Flink基于**兩階段提交協議(2PC)**實現了這一目標,核心抽象是TwoPhaseCommitSinkFunction(位于org.apache.flink.streaming.api.functions.sink包)。

1. 兩階段提交的核心流程

TwoPhaseCommitSinkFunction將Sink操作分為兩個階段,與Checkpoint流程緊密耦合:

階段1:預提交(Pre-commit)

  • 觸發時機:Checkpoint過程中,算子快照狀態后。
  • 操作:將數據寫入外部系統的"臨時區域"(如Kafka的事務日志),但不提交,此時外部系統不可見數據。
  • 目的:確保數據已持久化到外部系統,但未對外生效,可隨時回滾。

階段2:提交(Commit)

  • 觸發時機:所有算子完成Checkpoint,JobManager通知"Checkpoint完成"后。
  • 操作:通知外部系統提交預提交的數據(如Kafka提交事務),數據對外可見。
  • 異常處理:若提交失敗,Flink會重試(通過恢復Checkpoint后重新提交)。

2. TwoPhaseCommitSinkFunction的核心方法

TwoPhaseCommitSinkFunction是一個抽象類,用戶需實現以下方法以適配外部系統:

方法名

作用

beginTransaction

開啟一個新事務(如Kafka的beginTransaction)

invoke

將數據寫入事務緩沖區(如Kafka的send方法,數據寫入事務日志)

preCommit

預提交事務(如Kafka的sendOffsetsToTransaction,提交offset到事務)

commit

提交事務(如Kafka的commitTransaction)

abort

回滾事務(如Kafka的abortTransaction)

3. Flink + Kafka端到端EO案例

以Flink消費Kafka數據,處理后寫入Kafka為例,說明端到端EO的實現:

(1) Source端:KafkaConsumer的offset管理

Flink的FlinkKafkaConsumer將Kafka的offset作為算子狀態存儲在OperatorState中,Checkpoint時持久化offset。恢復時,從Checkpoint加載offset,確保消費位置不丟失。

(2) Sink端:KafkaProducer的事務寫入

FlinkKafkaProducer繼承TwoPhaseCommitSinkFunction,實現Kafka的事務寫入:

// FlinkKafkaProducer.java(簡化版)
publicclassFlinkKafkaProducer<T> extendsTwoPhaseCommitSinkFunction<T, KafkaTransactionState, Void> {
    
    @Override
    protected KafkaTransactionState beginTransaction()throws Exception {
        // 1. 開啟Kafka事務
        KafkaProducer<byte[], byte[]> producer = getKafkaProducer();
        producer.beginTransaction();
        returnnewKafkaTransactionState(producer.getProducerId(), producer.getEpoch());
    }
    
    @Override
    protectedvoidinvoke(KafkaTransactionState transaction, T value, Context context)throws Exception {
        // 2. 將數據寫入事務緩沖區(未提交)
        producer.send(newProducerRecord<>(topic, value.getBytes()));
    }
    
    @Override
    protectedvoidpreCommit(KafkaTransactionState transaction)throws Exception {
        // 3. 預提交:將offset寫入事務日志(確保消費與寫入一致性)
        Map<TopicPartition, OffsetAndMetadata> offsets = getOffsetsToCommit();
        producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    }
    
    @Override
    protectedvoidcommit(KafkaTransactionState transaction)throws Exception {
        // 4. 提交事務,數據對外可見
        producer.commitTransaction();
    }
    
    @Override
    protectedvoidabort(KafkaTransactionState transaction)throws Exception {
        // 5. 異常時回滾事務
        producer.abortTransaction();
    }
}

(3) 端到端流程時序

  • Checkpoint觸發:JobManager向Source發送TriggerCheckpoint。
  • Source快照offset:FlinkKafkaConsumer將當前offset保存到狀態后端,廣播Barrier。
  • Sink預提交:FlinkKafkaProducer收到Barrier后,調用preCommit,將數據寫入Kafka事務日志(未提交)。
  • Checkpoint完成:所有算子確認后,JobManager通知Sink"Checkpoint完成"。
  • Sink提交事務:FlinkKafkaProducer調用commit,Kafka提交事務,數據對外可見。
  • 故障恢復:若步驟5失敗,Flink從上次Checkpoint恢復,重新調用commit(Kafka事務冪等,重復提交無影響)。

4. 兩階段提交源碼解析

(1) 預提交與狀態快照

TwoPhaseCommitSinkFunction的snapshotState方法在Checkpoint時調用,觸發預提交并保存事務狀態:

// TwoPhaseCommitSinkFunction.java
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // 1. 調用用戶實現的preCommit(預提交事務)
    preCommit(currentTransaction);
    
    // 2. 將當前事務狀態保存到狀態后端(如Kafka的producerId、epoch)
    List<State<TxT>> transactions = new ArrayList<>();
    transactions.add(currentTransaction);
    state.clear();
    state.add(transactions);
}
  • preCommit:用戶實現的外部系統預提交邏輯(如Kafka的sendOffsetsToTransaction)。
  • state.add:將事務狀態(如Kafka事務標識)保存到OperatorState,故障恢復時用于重新提交。

(2) 提交與回滾

Checkpoint完成后,JobManager調用notifyCheckpointComplete通知Sink提交事務:

// TwoPhaseCommitSinkFunction.java
publicvoidnotifyCheckpointComplete(long checkpointId)throws Exception {
    // 1. 從狀態后端獲取Checkpoint對應的事務狀態
    Iterator<State<TxT>> iterator = state.get().iterator();
    if (iterator.hasNext()) {
        State<TxT> state = iterator.next();
        TxTtransaction= state.getTransaction();
        
        // 2. 調用用戶實現的commit(提交事務)
        commit(transaction);
        
        // 3. 清理已提交的事務狀態
        iterator.remove();
    }
}
  • commit:用戶實現的外部系統提交邏輯(如Kafka的commitTransaction)。
  • 若notifyCheckpointComplete未調用(如JobManager掛掉),恢復時會從狀態后端加載未提交的事務,重新調用commit。

五、精確一次的語義邊界與優化

1. 語義邊界

Flink的端到端精確一次語義需滿足以下條件:

  • Source支持可重置偏移量:如Kafka、Pulsar等,能從指定offset重新消費。
  • Sink支持事務或冪等寫入:如Kafka事務、HDFS冪等寫入、MySQL事務等。
  • 狀態后端支持持久化:如FsStateBackend、RocksDBStateBackend,確保狀態可恢復。
  • Checkpoint配置正確:需啟用Checkpoint(enableCheckpointing(true)),并設置CheckpointingMode.EXACTLY_ONCE。

2. 性能優化

精確一次語義會帶來額外開銷(如Barrier對齊、兩階段提交),可通過以下方式優化:

(1) 增量Checkpoint

使用RocksDBStateBackend,僅保存增量狀態數據,減少快照時間和網絡開銷。

(2) 對齊超時(Alignment Timeout)

對于低延遲要求高的場景,可設置setAlignmentTimeout,允許部分算子在對齊超時后跳過對齊(犧牲部分一致性換取低延遲)。

(3) Unaligned Checkpoint(非對齊Checkpoint)

Flink 1.11引入非對齊Checkpoint,跳過Barrier對齊階段,直接緩存所有輸入通道的數據并快照,大幅降低延遲(適合高延遲、高吞吐場景)。

六、總結

Flink的精確一次提交語義是Checkpoint機制、狀態管理、兩階段提交協議三者協同的結果:

  • Checkpoint:通過Barrier和分布式快照,實現內部狀態的一致性基準點。
  • 狀態管理:通過可插拔狀態后端,確保狀態的持久化與高效恢復。
  • 兩階段提交:協調外部系統與內部狀態的原子性提交,實現端到端EO。

從源碼層面看,CheckpointCoordinator負責全局協調,StreamTask實現Barrier傳遞與狀態快照,TwoPhaseCommitSinkFunction封裝外部系統的兩階段提交邏輯。這種分層設計使得Flink在保證嚴格一致性的同時,兼顧了靈活性和性能。

正是這些精巧的設計,讓Flink成為實時數倉、CEP、實時ETL等場景的首選流處理框架,為企業的實時數據處理提供了可靠的一致性保障。

責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2022-02-19 09:09:37

數倉Flink CP分布式

2011-08-12 09:30:02

MongoDB

2021-02-01 08:41:45

Flink語義數據

2022-02-20 10:47:54

Flink CP通用算法實時數倉

2021-06-02 07:07:09

Flink處理語義

2019-11-08 16:05:54

Promise前端鏈式調用

2011-11-15 13:34:22

蘋果iTunes Matc

2014-08-29 09:09:33

2021-05-26 11:06:06

Kubernetes網絡故障集群節點

2024-03-18 09:10:00

死鎖日志binlog

2011-06-28 10:41:50

DBA

2020-10-24 13:50:59

Python編程語言

2021-12-27 10:08:16

Python編程語言

2020-10-18 12:53:29

黑科技網站軟件

2020-03-10 07:51:35

面試諷刺標準

2020-03-18 13:07:16

華為

2017-01-23 12:40:45

設計演講報表數據

2019-08-19 08:01:50

Flink數據管理內存

2024-05-28 00:00:02

Java線程程序

2021-06-29 10:18:07

Kafka宕機系統
點贊
收藏

51CTO技術棧公眾號

68精品国产免费久久久久久婷婷| 亚洲va韩国va欧美va精品| 欧美一级电影久久| 国产免费a级片| 免费男女羞羞的视频网站在线观看| 国产成人在线视频网站| 欧美激情在线观看视频| 国模大尺度视频| av福利在线导航| 久久影院电视剧免费观看| 国产精品久久久久久影视| 99自拍视频在线| 97青娱国产盛宴精品视频| 精品久久久久久久中文字幕| 国产在线精品一区二区三区》 | 中文字幕一区二区三区四区五区人| 国产免费不卡视频| 亚洲精品欧洲| 中文在线不卡视频| 国产麻豆剧传媒精品国产| 在线天堂中文资源最新版| 中文字幕久久午夜不卡| 91|九色|视频| 久久久久久无码精品大片| 天天做综合网| 亚洲精品天天看| 激情久久综合网| 男人av在线播放| 亚洲老司机在线| 欧美区高清在线| 国产精品无码久久av| 国产视频一区欧美| 欧美精品生活片| 亚洲一级中文字幕| 日韩影片在线观看| 欧洲一区二区av| 日韩成人三级视频| 在线免费av电影| av一区二区三区| 91九色国产视频| yjizz国产| 欧美网站在线| 中文字幕亚洲专区| 欧美人与性囗牲恔配| 国产精品对白| 日韩欧美视频一区| 天天操狠狠操夜夜操| 久久电影tv| 精品国产成人在线| 91精品国产吴梦梦| 日韩伦理在线观看| 国产欧美日韩视频一区二区| 91日本在线视频| 中文字幕视频二区| 国产日韩1区| 久久成人在线视频| 亚洲人与黑人屁股眼交| 日韩一区电影| 在线精品视频视频中文字幕| 欧美丰满少妇人妻精品| 电影91久久久| 欧美三级一区二区| 天天干天天干天天干天天干天天干| 国产无遮挡裸体视频在线观看| 亚洲欧美日韩系列| 宅男av一区二区三区| 国产毛片av在线| 久久美女艺术照精彩视频福利播放| 国产中文欧美精品| 91精品国产乱码久久| 日韩在线a电影| 日本成人黄色片| 草久久免费视频| 日韩视频三区| 91国内精品久久| 人妻丰满熟妇av无码区| 欧美一级二区| 国产精品久久久久av免费| 久久久久久久久久免费视频| 亚洲国产成人精品女人| 精品自拍视频在线观看| 免费视频一二三区| 亚洲高清网站| 欧美专区日韩视频| 中文字幕av免费观看| 男女性色大片免费观看一区二区| 国产精品视频专区| 97人妻精品视频一区| 久久国产精品露脸对白| 国产欧美在线视频| 国产av无码专区亚洲av| 懂色av一区二区三区免费看| 成人综合av网| 日韩电影网址| 国产精品麻豆欧美日韩ww| 在线亚洲美日韩| 欧美1234区| 欧美性猛交xxxx黑人猛交| www.亚洲天堂网| 欧美高清免费| 日韩欧美高清一区| 少妇大叫太粗太大爽一区二区| 久久91麻豆精品一区| 久久精品91久久久久久再现| 久久网中文字幕| 免费在线日韩av| 成人日韩av在线| 亚洲精华国产精华精华液网站| 成人网男人的天堂| 色播亚洲婷婷| 国产探花在线观看| 在线观看av一区二区| 手机精品视频在线| 亚洲国产欧美日韩在线观看第一区 | 深田咏美在线x99av| 免费网站黄在线观看| 亚洲一区在线观看网站| 欧美性猛交久久久乱大交小说 | а天堂8中文最新版在线官网| 亚洲人成网站在线| av黄色在线网站| 香蕉大人久久国产成人av| 亚洲精品电影网站| 日韩欧美国产成人精品免费| 国产欧美日韩一级| 亚洲综合小说区| 成人亚洲性情网站www在线观看| 一区在线观看视频| 欧美极品欧美精品欧美图片| 久久久久久久久成人| 亚洲女人天堂av| 久草免费新视频| 蜜臀av性久久久久蜜臀av麻豆| 国产精品美女久久久久av福利| 狠狠色伊人亚洲综合网站l| 一区二区三区欧美久久| 一女二男3p波多野结衣| 最新国产精品视频| 欧美黄色三级网站| 国产精品自产拍| 国产亚洲精品7777| www一区二区www免费| 中文字幕区一区二区三| 久久久精品视频成人| 天天干,天天干| 成人aaaa免费全部观看| 天天想你在线观看完整版电影免费| 欧美日韩精品免费观看视完整| 精品久久国产字幕高潮| 国内偷拍精品视频| 国产精品一区二区在线播放 | 成人在线免费观看网站| 2020国产精品视频| 无码国精品一区二区免费蜜桃| 亚洲精品一二三四区| 欧美美女一级片| 欧美色图在线播放| 国产精品成人国产乱一区 | 视频直播国产精品| 五月天中文字幕| 国产欧美日韩综合精品一区二区| 青青在线视频免费| 九九综合在线| 日本中文字幕不卡免费| 日韩美女一级视频| 91黄视频在线| 欧美成人短视频| 麻豆精品久久精品色综合| 神马影院午夜我不卡影院| 日韩三区在线| 中文字幕国产日韩| 97精品人妻一区二区三区| 国产精品电影院| 一级片免费在线观看视频| 亚洲久久久久| 国产 高清 精品 在线 a| 男女视频在线| 日韩av影视综合网| 黄色在线视频网址| 亚洲欧美黄色片| 久久精品国产免费看久久精品| 日本婷婷久久久久久久久一区二区| 秋霞国产精品| 久久精品青青大伊人av| 99在线精品视频免费观看20| 一区二区三区蜜桃| 国产精品jizz| 日韩精品国产精品| 一区二区三区四区欧美日韩| 精品一区二区三区中文字幕视频| 欧美精品一区在线播放| 少妇又色又爽又黄的视频| 黑人精品xxx一区| 国产午夜精品久久久久久久久| 久久aⅴ国产欧美74aaa| 91免费在线| 日本一二三不卡| 精品综合久久久久| 欧美日本二区| 欧美大香线蕉线伊人久久国产精品| 亚洲成a人片| 中文字幕九色91在线| 国产女人高潮的av毛片| 亚洲第一主播视频| 成人免费无遮挡无码黄漫视频| 精品一区二区免费在线观看| 成人午夜免费在线视频| 国产一区二区三区探花| 亚洲japanese制服美女| 神马午夜在线视频| 理论片在线不卡免费观看| 亚洲 欧美 自拍偷拍| 精品视频1区2区| 日韩av黄色片| 国产精品久久久久影视| av电影在线播放| 性伦欧美刺激片在线观看| 宅男av一区二区三区| 一区二区三区四区在线看| 国产精品国产三级国产aⅴ9色| 91香蕉在线观看| 一区二区三区国产在线观看| 亚洲精品久久久久久动漫器材一区 | 秋霞午夜鲁丝一区二区| 丝袜诱惑制服诱惑色一区在线观看| 国产盗摄视频在线观看| 欧美日韩爱爱| 国产精品视频福利| 欧洲亚洲精品久久久久| 97成人超碰免| 牛牛精品视频在线| 日韩毛片中文字幕| 韩国av永久免费| 欧美精品三级日韩久久| 999视频在线| 午夜精品福利久久久| 5566中文字幕| 久久久国际精品| 激情综合丁香五月| 国产成人精品免费视频网站| 亚洲免费一级视频| 性欧美videos另类喷潮| 欧美视频免费看欧美视频| 中文视频一区| 中文字幕欧美日韩一区二区三区| 国产精品毛片av| 99高清视频有精品视频| 亚洲精品一区av| 国产精品综合不卡av| 欧美freesex| 久久久久久中文字幕| 欧美videossex| 久久久亚洲精品视频| 91蜜桃在线视频| 欧美人成在线视频| 五月天激情在线| 欧美猛少妇色xxxxx| 色黄网站在线观看| 久久91精品国产| av网站在线免费| 欧美成人黄色小视频| 日韩黄色影院| 久久九九全国免费精品观看| 在线视频91p| 久久这里有精品| av网站大全在线| 欧美国产日产韩国视频| 神马午夜伦理不卡| 免费av一区二区| 亚洲色图美国十次| 欧美激情精品久久久| 在线中文免费视频| 久久久久久久久久国产精品| 超黄网站在线观看| 91sa在线看| 欧美与亚洲与日本直播| 青青草精品毛片| 国产一区一一区高清不卡| 国产精品黄视频| 高清不卡一区| 国产精品一区二| 日韩美脚连裤袜丝袜在线| 欧洲精品亚洲精品| 99成人超碰| 97超碰国产精品| 久久xxxx精品视频| 国产免费999| 国产乱码精品1区2区3区| 在线免费观看污视频| 欧美极品美女视频| 国产精品6666| 欧美区一区二区三区| 色婷婷av一区二区三区之红樱桃| 亚洲一级一级97网| 丁香花电影在线观看完整版| 国产经典一区二区| 国产成人澳门| 热这里只有精品| 日韩中文字幕不卡| 制服丝袜第一页在线观看| 中文字幕在线不卡一区 | 黄黄的网站在线观看| 97久久久久久| 日韩一二三区在线观看| 亚洲精品一区二| 在线综合亚洲| 绯色av蜜臀vs少妇| 国产精品乱人伦| 国产精品久久久久久久久久精爆| 欧美成人性福生活免费看| 成a人片在线观看www视频| 97免费视频在线| 亚洲天堂中文字幕在线观看| 在线国产伦理一区| 久久久久一区| 亚洲av无码一区二区三区网址 | 中文字幕+乱码+中文乱码www| 亚洲国产天堂网精品网站| 午夜视频在线| 国产精品99久久久久久www| 日韩欧美ww| 亚洲美免无码中文字幕在线| 国产精品一区二区三区99| a一级免费视频| 欧美三级午夜理伦三级中视频| 免费成人av电影| 91精品国产91久久久久| 一区二区在线视频观看| 国产欧美123| 国产在线一区观看| 91麻豆精品成人一区二区| 欧美图片一区二区三区| 国产精品99999| 国产精品69久久| 国产一区二区三区探花| 50路60路老熟妇啪啪| 久久久午夜精品理论片中文字幕| 久久精品视频久久| 欧美tickling网站挠脚心| 日韩欧美一起| 国产精品自拍首页| 1000部精品久久久久久久久| jjzz黄色片| 亚洲国产另类av| 色噜噜一区二区三区| 午夜精品久久久久久久久久久久| 欧美亚洲色图校园春色| 成人毛片一区二区| 久久先锋影音av鲁色资源网| xxxx.国产| 在线观看欧美日韩| 亚洲日本中文| 日本国产中文字幕| 99久久久无码国产精品| 国产特黄大片aaaa毛片| 亚洲精品自拍偷拍| 91国拍精品国产粉嫩亚洲一区| 亚洲黄色一区二区三区| 久久成人18免费观看| 欧美黄色免费看| 日韩高清中文字幕| 国产超碰精品| 小说区视频区图片区| 风流少妇一区二区| 日韩在线视频免费播放| 亚洲欧美日韩综合| 成人国产一区二区三区精品麻豆| 日本黄色播放器| 成人一区二区视频| 亚洲欧美一区二区三区在线观看| 一二美女精品欧洲| 精品国产亚洲日本| www在线观看免费| 国产午夜精品久久久久久免费视 | 婷婷视频在线播放| 国产suv一区二区三区88区| 偷偷操不一样的久久| 伊人久久精品视频| 亚洲国产aⅴ精品一区二区| 国产一区二区视频播放| 亚洲国产精品v| 亚洲精品无amm毛片| 日本亚洲精品在线观看| 伊人久久大香线| 手机av免费看| 欧美一区二区久久| 欧美18av| 隔壁人妻偷人bd中字| 2019国产精品| av中文字幕免费在线观看| 欧美性受xxxx黑人猛交| 天天综合久久| 亚洲区免费视频| 欧美成人在线直播| 国产一区二区主播在线| 97在线国产视频| 亚洲欧洲日韩在线| 欧美理论在线观看| 国产91视觉| 韩国av一区二区|