精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Pulsar存儲計算分離架構設計之消息副本同步和故障轉移機制

開發 架構
在分布式消息系統的高可用性架構中,消息副本機制與故障轉移能力是確保數據可靠性和服務連續性的核心支柱,基本上每款消息中間件都有消息副本機制與故障轉移能力,包括但不限于Kafka、RocketMQ、RabbitMQ。

一、前言

我們繼續來講Pulsar存儲計算分離架構設計系列,這篇我們來說說消息副本和故障轉移機制。

在分布式消息系統的高可用性架構中,消息副本機制與故障轉移能力是確保數據可靠性和服務連續性的核心支柱,基本上每款消息中間件都有消息副本機制與故障轉移能力,包括但不限于Kafka、RocketMQ、RabbitMQ。

Apache Pulsar通過多副本存儲和智能故障轉移機制,構建了多層次容錯體系:其基于BookKeeper的持久化存儲層采用多副本(通常3副本)機制,將消息數據分散存儲在獨立的Bookie節點上,通過Quorum協議保證數據強一致性;計算層Broker則通過ZooKeeper協調實現故障自動檢測與流量切換,當主節點宕機時,備用節點能在秒級內接管服務,確保生產者/消費者的連接不中斷。這種存儲與計算解耦的架構設計,使Pulsar既能避免單點故障導致的數據丟失,又能實現業務無感知的故障恢復,特別適合金融交易、物聯網等對可靠性要求嚴苛的場景。

二、核心類圖

Pulsar的持久化復制機制核心流程如下:PersistentTopic作為消息處理的中心樞紐,通過ManagedLedger接口管理消息的持久化存儲,當需要跨集群復制時,PersistentReplicator組件會創建與遠程集群的連接并使用ProducerImpl發送消息。

復制流程從ManagedCursor讀取Entry消息開始,經過MessageImpl反序列化處理,通過ProducerImpl的sendAsync方法異步發送到目標集群,發送完成后通過回調機制更新游標的消費位置并觸發下一批消息的讀取,形成了一個完整的復制閉環。整個過程中DispatchRateLimiter負責控制復制速率防止網絡擁塞,而PersistentMessageExpiryMonitor則負責處理消息過期邏輯,確保系統資源的有效利用。

三、核心操作時序圖

Pulsar消息副本和故障轉移機制的核心流程如下:

  • 消息發布與本地存儲:客戶端向本地 PersistentTopic 發布消息,消息首先被持久化存儲到本地 ManagedLedger 中,存儲完成后向客戶端返回確認。
  • 消息復制流程:PersistentReplicator 通過 ManagedCursor 從本地 ManagedLedger 中讀取待復制的消息條目,然后通過 ProducerImpl 將消息異步發送到遠程集群,遠程集群將其存儲到自己的 ManagedLedger 中并返回確認。
  • 復制狀態更新:消息成功復制到遠程集群后,PersistentReplicator 會更新本地 ManagedCursor 的位置,標記已復制的消息,避免重復復制。
  • 故障處理與恢復:當與遠程集群的連接失敗時,PersistentReplicator 會檢測到連接異常并進行故障處理,包括重試機制,在連接恢復后重新建立連接并從上次中斷的位置繼續復制流程。

整個機制通過異步非阻塞的方式實現高效的消息復制,同時具備故障檢測和自動恢復能力,確保消息在跨集群環境中的可靠傳輸。

四、源碼分析

4.1 消息副本同步機制

當Entry成功添加到ManagedLedger后,會通過AddEntryCallback的addComplete方法回調通知調用方。

這是通過CountDownLatch機制同步等待異步操作完成的結果,這里的邏輯對應上面時序圖中的前兩步。

我們這里主要來看下PersistentReplicator讀取消息的機制,對應的時序圖中的Replicator。

PersistentTopic中的addProducer重寫了父類AbstractTopic的addProducer,其它的子類沒有重新,這是因為PersistentTopic有以下兩個功能要保障。

public class Policies {

    // 需要復制的集群信息
    public Set<String> replication_clusters = new HashSet<>();

  // ...

}

核心方法 readMoreEntries:

readMoreEntries 方法是 PersistentReplicator 中控制消息復制流程的核心實現,主要負責根據系統狀態和資源配置動態調整消息讀取策略。該方法首先通過 getAvailablePermits() 計算當前可讀取的消息數量,考慮了生產者隊列容量和限流設置等因素。如果允許讀取(availablePermits > 0),則根據可用許可和 readBatchSize 確定實際讀取消息數,并通過原子操作 HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE) 確保同一時間只有一個讀取操作在進行,避免重復讀取。當生產者不可寫時,會將讀取批量減小到1條消息以實現流量控制。如果觸發限流(返回-1),則延遲 MESSAGE_RATE_BACKOFF_MS 毫秒后重新嘗試讀取。最終通過 cursor.asyncReadEntriesOrWait() 異步從本地 ManagedLedger 讀取消息,讀取完成后回調 readEntriesComplete 方法進行處理,形成完整的復制循環。

接著看這個readEntriesComplete方法,上面readMoreEntries完成后,會回掉到readEntriesComplete方法里來。

org.apache.pulsar.broker.service.persistent.PersistentReplicator#readEntriesComplete

public void readEntriesComplete(List<Entry> entries, Object ctx) {
    if (log.isDebugEnabled()) {
        log.debug("[{}][{} -> {}] Read entries complete of {} messages", topicName, localCluster, remoteCluster,
                entries.size());
    }

    int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
    if (readBatchSize < maxReadBatchSize) {
        int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster,
                    remoteCluster, readBatchSize, newReadBatchSize);
        }

        readBatchSize = newReadBatchSize;
    }

    readFailureBackoff.reduceToHalf();

    boolean atLeastOneMessageSentForReplication = false;
    boolean isEnableReplicatedSubscriptions =
            brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();

    try {
        // This flag is set to true when we skip atleast one local message,
        // in order to skip remaining local messages.
        boolean isLocalMessageSkippedOnce = false;
        for (int i = 0; i < entries.size(); i++) {
            Entry entry = entries.get(i);
            int length = entry.getLength();
            ByteBuf headersAndPayload = entry.getDataBuffer();
            MessageImpl msg;
            try {
                msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
            } catch (Throwable t) {
                log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
                        localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                continue;
            }

            if (isEnableReplicatedSubscriptions) {
                checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
            }

            if (msg.isReplicated()) {
                // Discard messages that were already replicated into this region
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                msg.recycle();
                continue;
            }

            if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", topicName,
                            localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
                }
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                msg.recycle();
                continue;
            }

            if (msg.isExpired(messageTTLInSeconds)) {
                msgExpired.recordEvent(0/* no value stat */);
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName,
                            localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
                }
                cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                entry.release();
                msg.recycle();
                continue;
            }

            if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
                // The producer is not ready yet after having stopped/restarted. Drop the message because it will
                // recovered when the producer is ready
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", topicName,
                            localCluster, remoteCluster, entry.getPosition());
                }
                isLocalMessageSkippedOnce = true;
                entry.release();
                msg.recycle();
                continue;
            }

            dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));

            // Increment pending messages for messages produced locally
            PENDING_MESSAGES_UPDATER.incrementAndGet(this);

            msgOut.recordEvent(headersAndPayload.readableBytes());

            msg.setReplicatedFrom(localCluster);

            headersAndPayload.retain();

            getSchemaInfo(msg).thenAccept(schemaInfo -> {
                msg.setSchemaInfoForReplicator(schemaInfo);
                producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
            }).exceptionally(ex -> {
                log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
                        localCluster, remoteCluster, ex);
                returnnull;
            });

            atLeastOneMessageSentForReplication = true;
        }
    } catch (Exception e) {
        log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(),
                e);
    }

    HAVE_PENDING_READ_UPDATER.set(this, FALSE);

    if (atLeastOneMessageSentForReplication && !isWritable()) {
        // Don't read any more entries until the current pending entries are persisted
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", topicName,
                    localCluster, remoteCluster, atLeastOneMessageSentForReplication, isWritable());
        }
    } else {
        readMoreEntries();
    }
}

readEntriesComplete 方法是 PersistentReplicator 中處理從本地集群讀取消息完成后的核心回調方法。當從本地 ManagedLedger 讀取到消息后,該方法會遍歷所有讀取到的 Entry 條目,對每條消息進行反序列化和一系列過濾檢查,包括檢查是否為已復制的消息、是否應該復制到目標集群、是否過期等。對于需要復制的消息,會通過 producer.sendAsync 方法異步發送到遠程集群,發送時會附帶 ProducerSendCallback 回調來處理發送結果。成功發送到遠程集群的消息會被標記刪除,避免重復復制,同時根據發送狀態和系統負載情況決定是否繼續讀取更多消息進行復制,從而形成完整的消息復制循環流程。

這樣就完成了消息副本的同步機制了!!!

4.2 故障轉移機制

當與遠程集群的連接失敗時,PersistentReplicator 會檢測到連接異常并進行故障處理,包括重試機制,在連接恢復后重新建立連接并從上次中斷的位置繼續復制流程。

4.2.1 ProducerSendCallback.sendComplete 方法中處理發送異常的情況

當發送消息到遠程集群失敗時,會記錄錯誤日志并將游標回退,以便在連接恢復后重新發送消息。

4.2.2 readEntriesFailed 方法處理讀取條目失敗的情況

4.2.3 readMoreEntries 方法中的重試機制

之前上面分析過,我這里就不重復了。

} else if (availablePermits == -1) {
     // no permits from rate limit
     topic.getBrokerService().executor().schedule(
         () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
 }

當遇到限流等情況時,會安排延遲重試。

4.2.4 ProducerImpl 內部的連接管理和重試機制

ProducerImpl 內部的連接管理和重試機制主要通過 ClientCnx 對象管理與 broker 的連接,使用 getState() 方法檢查生產者狀態(如 Ready、Connecting、Closing、Closed 等),并在連接斷開時觸發 connectionClosed() 回調進行重連。在 sendAsync() 方法中,當發送失敗時會檢查是否需要重試,并使用 pendingMessages 隊列暫存待發送消息,通過 grabCnx() 方法獲取或重新建立連接。連接恢復處理通過 reconnectLater() 方法實現延遲重連邏輯,使用 backoff 策略控制重連間隔,并通過 connectionOpened() 回調處理連接成功建立后的操作。消息確認和重傳機制通過 ackReceived() 處理 broker 的確認響應,對于未確認的消息,在重連后會重新發送,并使用 pendingMessages 隊列跟蹤待確認消息,確保了 ProducerImpl 在網絡波動或 broker 重啟等異常情況下能夠自動恢復連接并保證消息的可靠傳輸。

以上四點的機制共同確保了在與遠程集群連接失敗時能夠檢測到異常并進行適當的故障處理和重試。

責任編輯:武曉燕 來源: 老周聊架構
相關推薦

2023-08-16 12:34:16

同步備份異步備份

2018-05-25 09:31:00

數據存儲高可用

2020-06-23 08:15:13

計算存儲分離

2024-08-18 14:09:24

2012-09-19 13:46:37

存儲存儲設計快速表態

2025-03-06 01:00:55

架構推送服務編程語言

2015-06-02 04:17:44

架構設計審架構設計說明書

2023-12-26 08:16:56

Kafka緩存架構客戶端

2015-06-02 04:34:05

架構設計

2025-03-13 08:30:00

MySQL架構主從同步

2012-05-30 09:43:45

業務邏輯層

2014-05-19 10:08:36

IM系統架構設計

2009-06-22 14:48:21

DRY架構設計

2017-11-24 08:32:04

架構設計存儲

2024-05-06 08:43:00

2025-02-26 07:20:07

2023-04-13 08:23:28

軟件架構設計

2024-09-18 09:04:33

架構模式查詢

2021-01-19 05:36:46

pulsar中間件消息

2024-02-26 00:00:00

Nginx服務器HTTP
點贊
收藏

51CTO技術棧公眾號

中文字幕在线视频第一页| 亚洲熟妇一区二区三区| 91高清在线观看视频| 国产乱子伦一区二区三区国色天香| 欧美成人精品不卡视频在线观看| 无码任你躁久久久久久老妇| 欧美特黄aaaaaaaa大片| 亚洲女同女同女同女同女同69| 国产精品日韩欧美一区二区| 精品免费囯产一区二区三区| 天天超碰亚洲| 亚洲精品第一页| 在线能看的av网站| 国产精品一品| 中文字幕欧美一区| 久久天天狠狠| 精品国产免费无码久久久| 99re国产精品| 久久亚洲国产精品成人av秋霞| 亚洲一区二区在线免费| 日韩综合av| 色婷婷综合在线| 日韩成人手机在线| 欧美成人hd| 久久亚洲一区二区三区明星换脸| 亚洲一区二区三区视频播放| 精品黑人一区二区三区| 亚洲视频福利| 久久伊人精品天天| 欧美人妻一区二区三区| 女仆av观看一区| 日韩一卡二卡三卡国产欧美| 欧美婷婷精品激情| 日本不良网站在线观看| 一区二区三区欧美久久| 亚洲欧洲久久| caoporn国产精品免费视频| 白白色 亚洲乱淫| 成人av片网址| 性猛交富婆╳xxx乱大交天津 | av成人免费在线观看| 国产在线播放91| 波多野结衣影片| 久久国产精品毛片| 91av在线播放视频| 日韩精品一区三区| 激情成人亚洲| 欧美精品videosex极品1| 欧美丰满熟妇bbbbbb| 久久人人99| 中文字幕日韩欧美| 国产精品天天干| 精品国产aⅴ| 国产午夜精品一区理论片飘花 | 亚洲+小说+欧美+激情+另类 | 51调教丨国产调教视频| 麻豆一区二区| 日韩精品一区二区三区第95| 欧美丰满少妇人妻精品| 任我爽精品视频在线播放| 精品国产一区二区亚洲人成毛片| 少妇丰满尤物大尺度写真| 国产一区二区三区免费观看在线 | 国产精自产拍久久久久久| 伊人手机在线视频| 蜜桃av综合| 国产精品久久久久免费a∨大胸| 日韩 国产 欧美| 日韩电影一区二区三区四区| 国产免费一区二区三区在线能观看| 免费在线不卡av| 久久99久久99| 99视频免费观看| 婷婷丁香花五月天| 久久久一区二区三区| 天堂√在线观看一区二区| 日本免费视频在线观看| 亚洲黄色尤物视频| 女人和拘做爰正片视频| 日韩在线免费| 91精品国产综合久久久久久久| www.五月天色| 久久久久久毛片免费看| 在线视频中文亚洲| 麻豆视频在线观看| 国产亚洲综合精品| 国产精品中文字幕久久久| a毛片在线免费观看| 99热99精品| 亚洲精品9999| 丝袜美腿av在线| 色综合天天视频在线观看| 嫩草影院国产精品| 给我免费播放日韩视频| 亚洲精品在线看| 黄色片子在线观看| 欧美一区=区| 亚洲va欧美va在线观看| 四虎电影院在线观看| 国产精品久久久久久久午夜片| 奇米777四色影视在线看| 新片速递亚洲合集欧美合集| 日韩欧美中文字幕一区| 亚洲一区视频在线播放| 黄色欧美日韩| 国产日本欧美在线观看 | 99国产精品国产精品毛片| 午夜精品视频在线观看一区二区 | 久久国产影院| 97超碰国产精品女人人人爽| 国产精品亚洲lv粉色| 26uuuu精品一区二区| 欧美一级中文字幕| jizz欧美| 精品视频在线播放| 麻豆成人在线视频| 黄网站免费久久| 欧美性bbwbbwbbwhd| 草美女在线观看| 67194成人在线观看| 四虎永久免费在线观看| 亚洲大胆在线| 99久久综合狠狠综合久久止| 在线观看二区| 色综合天天性综合| 国产精品无码电影| 国产综合婷婷| 97视频中文字幕| 成人短视频在线| 欧美区在线观看| 欧美性受xxxx黑人| 老司机精品视频网站| 精品日本一区二区三区| 大黄网站在线观看| 精品国偷自产国产一区| 成熟的女同志hd| 激情综合亚洲精品| 夜夜春亚洲嫩草影视日日摸夜夜添夜| 欧美大片免费高清观看| 国产丝袜一区视频在线观看| 日韩av男人天堂| 成人国产精品免费观看| 国产一区二区三区小说| 999在线精品| 久久久久久av| 天天干,夜夜操| 偷拍一区二区三区四区| 男男做爰猛烈叫床爽爽小说| 激情综合自拍| 精品人伦一区二区三区| 在线免费三级电影网站| 亚洲毛片在线观看.| 国产在线观看黄色| 久久综合五月天婷婷伊人| 国产主播在线看| 久久av影视| 国产精品亚洲第一区| 日韩免费啪啪| 日韩一区二区视频在线观看| 欧美成人精品欧美一级私黄| 国产.欧美.日韩| 国产精品无码人妻一区二区在线| 超碰地址久久| 欧美一级片久久久久久久| 日本a一级在线免费播放| 日本精品一区二区三区高清| 成人在线观看免费高清| 国产伦精品一区二区三区免费 | 欧美本精品男人aⅴ天堂| 国产精品不卡av| 久久久五月婷婷| 国产美女视频免费看| 午夜久久久久| 久久综合中文色婷婷| 91p九色成人| 精品国产美女在线| 人人妻人人玩人人澡人人爽| 欧美天堂在线观看| 四季av中文字幕| 国产a精品视频| 欧美黄色一级片视频| 五月精品视频| 精品一区二区三区自拍图片区 | av免费播放网站| 国产揄拍国内精品对白| 欧美二区在线视频| 日韩在线中文| 国产一区二区高清视频| 在线一区视频观看| 欧美国产视频日韩| 黄色小视频在线免费观看| 欧美巨大另类极品videosbest | 国产精品69毛片高清亚洲| 玩弄中年熟妇正在播放| 久久精品国产大片免费观看| 国产精品swag| 日本成人在线网站| 欧美一级在线播放| av超碰免费在线| 一区二区欧美久久| 色综合免费视频| 宅男在线国产精品| 日韩电影在线观看一区二区| 亚洲日本在线视频观看| 公侵犯人妻一区二区三区| 国产在线日韩欧美| 91av俱乐部| 亚洲人成毛片在线播放女女| 一本久道久久综合狠狠爱亚洲精品| 国产色噜噜噜91在线精品 | 国产绿帽刺激高潮对白| 欧美午夜精品久久久久久久| 欧美特黄一级片| 久久久久久9999| 2一3sex性hd| 国产999精品久久久久久绿帽| 热久久精品国产| 国产日韩一区| www.av毛片| 欧美日本一区二区视频在线观看| 五月天亚洲综合小说网| 美女亚洲一区| 精品亚洲一区二区三区四区五区高| 亚洲国产91视频| 日韩av免费在线| 在线观看涩涩| 2021久久精品国产99国产精品| 少妇av在线| 欧美成人精品在线播放| 欧美成年黄网站色视频| 在线播放国产一区二区三区| 欧美在线观看在线观看| 日韩高清不卡av| 嫩草影院一区二区| 精品对白一区国产伦| 精品毛片一区二区三区| 这里只有精品99re| av一区二区三| 日韩免费一区二区| 亚洲av色香蕉一区二区三区| 日韩一区二区免费在线观看| 国产欧美一级片| 欧美一区二区三区男人的天堂| 中文字幕在线观看第二页| 欧洲精品一区二区| 正在播放木下凛凛xv99| 欧美在线免费观看亚洲| 中文字幕 视频一区| 欧美丝袜丝交足nylons| 中文字幕精品在线观看| 欧美日韩性生活| 国产精品久久影视| 日韩一级片网址| 开心激情综合网| 日韩精品日韩在线观看| 欧美一区二区三区少妇| 国产一区二区三区视频| 中文字幕在线观看日本| 久久韩剧网电视剧| 国产天堂在线播放视频| 91禁外国网站| 日本精品不卡| 国产欧美日韩高清| 日韩高清二区| 国产一区视频观看| 国产日产一区| 伊人av成人| 韩国自拍一区| 国产a级片免费观看| 日韩精品国产精品| 捷克做爰xxxⅹ性视频| 成人av免费在线| 黄色片网站免费| 一区二区三区四区不卡在线 | 日日摸夜夜添夜夜添国产精品| 我要看一级黄色大片| 国产精品一二二区| 欧美bbbbb性bbbbb视频| 国产精品久久久久aaaa| 久久精品久久国产| 欧美影院一区二区三区| 国产欧美日韩成人| 日韩av中文字幕在线免费观看| 国产在线观看精品一区| 久久久国产精彩视频美女艺术照福利| 欧洲成人综合网| 国产91色在线播放| 亚洲午夜免费| 日韩av不卡在线播放| 欧美日本精品| 中文字幕在线观看第三页| 国产高清精品久久久久| 国产精品无码久久久久一区二区| 亚洲欧美日韩电影| 国产午夜麻豆影院在线观看| 欧美一区二区三区视频在线| 男人的天堂在线视频| 免费97视频在线精品国自产拍| 国产在线精彩视频| 亚洲中国色老太| 欧洲杯什么时候开赛| 美女黄色免费看| 久久精品久久精品| 久久久无码人妻精品一区| 一区二区三区日韩欧美| 在线视频播放大全| 亚洲欧美综合v| 17videosex性欧美| 亚洲一区二区免费| 成人久久一区| 国内外成人激情视频| 国产精品亚洲一区二区三区妖精| 一区二区三区伦理片| 五月激情综合网| 午夜精品久久久久久久第一页按摩| 国产一区二区动漫| 中国色在线日|韩| 国产伦精品一区二区| 91精品久久久久久久久久不卡| 欧美视频免费播放| bt欧美亚洲午夜电影天堂| 欧美毛片在线观看| 91精品婷婷国产综合久久| 成人在线观看一区| 日韩免费精品视频| 在线日韩一区| 成人黄色片视频| 99久久久国产精品| 日韩精品无码一区二区| 91精品国产色综合久久不卡电影| 91涩漫在线观看| 国产精品夫妻激情| 国产区精品区| 在线视频日韩一区| 国产喷白浆一区二区三区| 国产成人无码一区二区在线播放| 亚洲国产高清福利视频| 国内在线免费视频| 国产精品乱子乱xxxx| 韩国在线一区| yy1111111| 精品福利在线视频| 五月婷婷久久久| 69久久夜色精品国产7777| 色综合久久中文| 能看的毛片网站| 国产欧美精品国产国产专区| 日韩av免费播放| 色一区av在线| 亚洲精品第一| 日本高清视频免费在线观看| 国产成人一区在线| 久久综合加勒比| 亚洲精品美女在线| 自拍偷拍亚洲视频| 日韩欧美一区二区三区四区| 免费看精品久久片| www深夜成人a√在线| 日韩免费在线观看| 无遮挡爽大片在线观看视频 | 色噜噜狠狠色综合中国| www亚洲人| 成人夜晚看av| 红桃视频国产精品| 欧美色图亚洲激情| 欧美日韩综合在线| 91在线中字| 久久久久久久免费| 男女性色大片免费观看一区二区| 日本黄色录像视频| 精品福利一区二区三区免费视频| 国产一二三在线| 亚洲视频在线二区| 国产成人在线网站| 四虎精品永久在线| 日韩中文字幕视频在线观看| 91精品久久久久久综合五月天| 极品美女扒开粉嫩小泬| 国产欧美日韩亚州综合| 精品国自产拍在线观看| 欧美一级电影免费在线观看| 日韩精品看片| av电影在线播放| 欧美色窝79yyyycom| 色在线视频网| 视频在线一区二区三区| 国产成人在线看| 无码人妻精品一区二区三区9厂| 久久亚洲精品网站| 免费不卡中文字幕在线| 黄色a级三级三级三级| 午夜国产精品影院在线观看| h视频在线免费| 精品国产aⅴ麻豆| 久久国产精品99久久久久久老狼| 国产在线观看免费av| xxx一区二区| 沈樵精品国产成av片| 亚洲色图欧美另类| 欧美精品 国产精品|