精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spring Boot + MQTT:消息交互輕松實現

網絡 網絡管理
MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協議,專為資源受限的設備和低帶寬、高延遲的網絡環境設計。在物聯網(IoT)領域,MQTT因其低開銷和高效能而被廣泛應用。

介紹

MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協議,專為資源受限的設備和低帶寬、高延遲的網絡環境設計。在物聯網(IoT)領域,MQTT因其低開銷和高效能而被廣泛應用。

圖片圖片

使用

安裝說明

https://docs.emqx.com/zh/emqx/v4.3/getting-started/install.html#zip-%E5%8E%8B%E7%BC%A9%E5%8C%85%E5%AE%89%E8%A3%85-linux%E3%80%81macos%E3%80%81windows

圖片圖片

默認賬號:admin/public,登錄后進入首頁:

圖片圖片

依賴引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

# MQTT配置
mqtt:
  broker-url: tcp://localhost:1883 # MQTT代理服務器地址
  client-id: yian-mqtt-client # 客戶端ID,必須唯一
  username: admin # 用戶名(如果需要)
  password: 1qazxsw2 # 密碼(如果需要)
  topics:
    topic1: test/topic_1
    topic2: test/topic_2
    topic3: test/topic_3
  qos: 1 # 消息質量等級(0、1或2)
  automatic-reconnect: true# 自動重連
  clean-session: true# 是否清除會話
  connection-timeout: 5000 # 連接超時時間(秒)
  keep-alive-interval: 30 # 保持連接時間(秒)
  pool-config:
    core-size: 8
    max-size: 32
    queue-capacity: 1024
    thread-name-prefix: mqtt-worker-
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    /**
     * MQTT Broker地址
     */
    private String brokerUrl;

    /**
     * 客戶端ID
     */
    private String clientId;

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * QoS級別 (0/1/2)
     */
    private int qos = 1;
    /**
     * 自動重連
     */
    private boolean automaticReconnect = false;
    /**
     * 清理會話
     */
    private boolean cleanSession = false;

    /**
     * 連接超時時間(ms)
     */
    private int connectionTimeout = 5000;

    /**
     * 保持連接間隔(秒)
     */
    private int keepAliveInterval = 30;

    /**
     * 主題配置
     */
    private Topics topics = new Topics();
    /**
     * 線程相關參數
     */
    private PoolConfig poolConfig = new PoolConfig();

    @Data
    public static class PoolConfig {
        private int coreSize = 8;
        private int maxSize = 16;
        private int queueCapacity = 1000;
        private String threadPrefix = "mqtt-worker-";
    }

    @Data
    public static class Topics {
        /**
         * topic1
         */
        private String topic1;

        /**
         * topic2
         */
        private String topic2;

        /**
         * topic3
         */
        private String topic3;

    }

    // 需要顯式聲明空構造器
    public MqttProperties() {
    }
}

MQTT配置類

@Slf4j
@Configuration
public class MqttConfig {
    private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");

    private final MqttProperties mqttProperties;
    private final MqttCallback mqttCallback;

    public MqttConfig(MqttProperties mqttProperties, MqttCallback mqttCallback) {
        this.mqttProperties = mqttProperties;
        this.mqttCallback = mqttCallback;
    }

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = createMqttClient();
        MqttConnectOptions options = buildMqttConnectOptions();

        try {
            client.connect(options);
            log.info("MQTT連接成功,Broker地址: {}", mqttProperties.getBrokerUrl());
            subscribeTopics(client);
        } catch (MqttException e) {
            log.error("MQTT連接異常: {},錯誤碼: {}", e.getMessage(), e.getReasonCode(), e);
            throw new RuntimeException("MQTT連接失敗", e);
        }

        client.setCallback(mqttCallback);
        return client;
    }

    private MqttClient createMqttClient() throws MqttException {
        return new MqttClient(
                mqttProperties.getBrokerUrl(),
                generateClientId(),
                new MemoryPersistence()
        );
    }

    private String generateClientId() {
        return Optional.ofNullable(mqttProperties.getClientId())
                .filter(StringUtils::hasText)
                .orElseGet(() -> "CLIENT_" + System.currentTimeMillis());
    }

    private MqttConnectOptions buildMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        options.setCleanSession(mqttProperties.isCleanSession());

        Optional.ofNullable(mqttProperties.getUsername())
                .filter(StringUtils::hasText)
                .ifPresent(options::setUserName);

        Optional.ofNullable(mqttProperties.getPassword())
                .filter(StringUtils::hasText)
                .map(String::toCharArray)
                .ifPresent(options::setPassword);

        options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        return options;
    }

    private void subscribeTopics(MqttClient client) throws MqttException {
        List<String> topics = getTopicsToSubscribe();
        for (String topic : topics) {
            try {
                client.subscribe(topic, mqttProperties.getQos());
                log.info("成功訂閱主題: {}", topic);
            } catch (MqttException e) {
                log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
                throw e;
            }
        }
    }

    private List<String> getTopicsToSubscribe() {
        return Optional.ofNullable(mqttProperties.getTopics())
                .map(t -> Arrays.asList(t.getTopic1()))
                .filter(list -> !list.contains(null))
                .orElse(DEFAULT_TOPICS);
    }

    @Bean("mqttExecutor")
    public Executor mqttExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(mqttProperties.getPoolConfig().getCoreSize());
        executor.setMaxPoolSize(mqttProperties.getPoolConfig().getMaxSize());
        executor.setQueueCapacity(mqttProperties.getPoolConfig().getQueueCapacity());
        executor.setThreadNamePrefix(mqttProperties.getPoolConfig().getThreadPrefix());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

消息回調,處理接收的消息

@Slf4j
@Component
public class MqttMessageListener implements MqttCallback {
    private static final int MAX_RETRY_ATTEMPTS = 10;
    private static final long INITIAL_RETRY_DELAY = 1_000L;
    private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");

    private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
    private final AtomicInteger retryCounter = new AtomicInteger(0);

    private final Map<String, MessageHandler> topicHandlers = new ConcurrentHashMap<>();

    private final TestService testService;
    private final MqttProperties mqttProperties;
    @Lazy
    @Autowired
    private MqttClient mqttClient;


    public MqttMessageListener(TestService testService, MqttProperties mqttProperties) {
        this.testService = testService;
        this.mqttProperties = mqttProperties;
        initializeHandlers();
    }

    private void initializeHandlers() {
        topicHandlers.put(mqttProperties.getTopics().getTopic1(), this::handleMessage2);
    }

    @Override
    public void connectionLost(Throwable cause) {
        log.error("MQTT連接中斷,原因: {}", cause.getMessage());
        scheduleReconnect();
    }

    private synchronized void scheduleReconnect() {
        int attempt = retryCounter.incrementAndGet();
        if (attempt > MAX_RETRY_ATTEMPTS) {
            log.error("達到最大重連次數[{}],停止重連嘗試", MAX_RETRY_ATTEMPTS);
            return;
        }
        long delay = INITIAL_RETRY_DELAY * (long) Math.pow(2, attempt - 1);
        log.info("將在{}ms后嘗試第{}次重連...", delay, attempt);

        reconnectScheduler.schedule(() -> {
            try {
                if (!mqttClient.isConnected()) {
                    mqttClient.reconnect();
                }
                subscribeTopics(mqttClient);
                mqttClient.setCallback(this);
                retryCounter.set(0);
                log.info("MQTT連接恢復成功");
            } catch (MqttException e) {
                log.error("第{}次重連失敗: {}", attempt, e.getMessage(),e);
                scheduleReconnect();
            }
        }, delay, TimeUnit.MILLISECONDS);
    }


    private void subscribeTopics(MqttClient client) throws MqttException {
        List<String> topics = getTopicsToSubscribe();
        for (String topic : topics) {
            try {
                client.subscribe(topic, mqttProperties.getQos());
                log.info("成功訂閱主題: {}", topic);
            } catch (MqttException e) {
                log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
                throw e;
            }
        }
    }

    private List<String> getTopicsToSubscribe() {
        return Optional.ofNullable(mqttProperties.getTopics())
                .map(t -> Arrays.asList(t.getTopic1()))
                .filter(list -> !list.contains(null))
                .orElse(DEFAULT_TOPICS);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String payload = validatePayload(message.getPayload());
            log.info("收到消息 [Topic:{}][QoS:{}] {}", topic, message.getQos(), payload);

            MessageHandler handler = Optional.ofNullable(topicHandlers.get(topic))
                    .orElseThrow(() -> new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION));

            asyncProcessMessage(() -> {
                try {
                    handler.handle(payload);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });

        } catch (JSONException e) {
            log.error("消息格式錯誤 [Topic:{}]: {}", topic, e.getMessage(), e);
        } catch (MqttException e) {
            log.error("消息處理失敗 [Topic:{}]: {}", topic, e.getMessage(), e);
        } catch (Exception e) {
            log.error("未知處理異常 [Topic:{}]: {}", topic, e.getMessage(), e);
        }
    }

    private String validatePayload(byte[] payload) throws JSONException {
        String json = new String(payload);
        try {
            JSON.parse(json); // 完整解析 JSON,捕獲格式異常
        } catch (JSONException e) {
            log.error("非法JSON格式: {}", json);
            throw new JSONException("非法JSON格式", e);
        }
        return json;
    }

    @Async("mqttExecutor")
    public void asyncProcessMessage(Runnable task) {
        task.run();
    }

    private void handleMessage2(String payload) {
        testService.handleMessage2(payload);
    }
    private void handleMessage3(String payload) {
        testService.handleMessage3(payload);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            if (token.getException() != null) {
                log.error("消息投遞失敗 [MessageId:{}]", token.getMessageId(), token.getException());
            } else {
                log.info("消息投遞成功 [MessageId:{}]", token.getMessageId());
            }
        } catch (Exception e) {
            log.error("獲取投遞狀態失敗", e);
        }
    }

    @FunctionalInterface
    private interface MessageHandler {
        void handle(String payload) throws Exception;
    }
}

消息發布

@Slf4j
@Component
public class MqttPublisher {

    @Autowired
    @Lazy
    private MqttClient mqttClient;

    @Value("${mqtt.qos:1}")
    private int qosLevel;

    private final Object publishLock = new Object();

    /**
     * 發布消息
     */
    public void publishMessage(String msg, String topic) {
        try {
            synchronized (publishLock) { // 線程安全鎖
                MqttMessage message = new MqttMessage();
                message.setPayload(msg.getBytes());
                message.setQos(qosLevel);
                message.setRetained(false); // 不保留消息

                mqttClient.publish(topic, message);
                log.info("Topic:{} 響應已發送: {}", topic, msg);
            }
        } catch (MqttException e) {
            log.error("MQTT發布失敗: {}", e.getMessage());
            handlePublishFailure(msg, topic, e);
        }
    }

    /**
     * 消息失敗重試機制
     */
    private void handlePublishFailure(String msg, String topic, MqttException e) {
        try {
            if (mqttClient.isConnected()) {
                mqttClient.disconnect();
            }
            mqttClient.connect();
            publishMessage(msg, topic); // 重試發布
        } catch (MqttException ex) {
            log.error("消息重試失敗: {}", ex.getMessage(),ex);
        }
    }

    /**
     * 消息持久化存儲(QoS 1保障)
     */
    private void saveFailedMessage(JSONObject msg) {
        // 實現數據庫存儲邏輯(此處需補充DAO操作)
        log.warn("持久化失敗消息: {}", msg);
    }
}

測試

啟動程序:

圖片圖片

查看主題:

圖片圖片

發布消息:

圖片圖片

責任編輯:武曉燕 來源: 一安未來
相關推薦

2024-10-11 11:32:22

Spring6RSocket服務

2025-05-29 01:33:00

微服務架構系統

2025-04-09 02:02:00

Spring框架開發

2021-08-04 10:22:27

鴻蒙HarmonyOS應用

2023-10-15 22:40:25

插件JIB

2020-04-23 15:59:04

SpringKafka集群

2024-08-09 08:52:26

2021-09-03 06:46:34

Spring 6pring Boot 項目

2021-09-15 09:02:20

Spring 6Spring BootJava

2022-07-13 08:36:57

MQ架構設計模式

2023-10-18 15:25:29

數據源數據庫

2025-02-08 08:16:16

2025-02-17 00:00:45

接口支付寶沙箱

2024-09-12 14:50:08

2024-10-25 08:41:18

消息隊列RedisList

2022-09-22 13:28:34

Redis分布式鎖

2022-09-29 08:28:57

SpringRedis分布式

2024-10-17 11:24:04

2024-01-04 18:01:55

高并發SpringBoot

2025-07-01 01:00:00

Spring消息系統Redis
點贊
收藏

51CTO技術棧公眾號

久久这里只有精品国产| 污污网站免费看| 天堂а√在线8种子蜜桃视频| 亚洲欧美网站| 日韩在线精品视频| 潘金莲一级淫片aaaaaaa| 在线男人天堂| 自拍偷拍欧美激情| 免费久久99精品国产自| 888奇米影视| 亚洲大胆在线| 色偷偷88888欧美精品久久久| 国产精品熟妇一区二区三区四区| 不卡一二三区| 亚洲毛片av在线| 任我爽在线视频精品一| 精品久久久久中文慕人妻| 六月天综合网| 高清亚洲成在人网站天堂| 免费看的黄色录像| 日韩精品a在线观看91| 欧美美女黄视频| jizzjizzxxxx| 免费毛片在线看片免费丝瓜视频 | 在线区一区二视频| 男人添女人下部视频免费| 成年人视频在线看| 91蝌蚪国产九色| www.成人av| 亚洲综合精品在线| 老司机免费视频久久| 国内精品久久久久| 欧美日韩综合一区二区| 日韩精品免费| 亚洲天堂色网站| 变态另类丨国产精品| 中文字幕一区二区三区中文字幕| 欧美日韩一区不卡| www.xxx亚洲| 黑人精品一区| 亚洲成人自拍偷拍| 丁香六月激情网| caopeng在线| 中文字幕一区二区三区四区不卡 | 在线视频一区二区免费| 北条麻妃在线视频观看| jizz一区二区三区| 亚洲国产精品一区二区www| 国产又粗又长又爽视频| 毛片网站在线免费观看| 国产精品国产三级国产普通话三级| 欧美亚洲另类久久综合| 三级无遮挡在线观看| av高清不卡在线| 国产综合18久久久久久| 免费看黄色一级视频| 成人免费观看视频| 国产一区高清视频| 日韩在线免费看| 久久欧美中文字幕| 日韩电影大全在线观看| 国产一区二区三区福利| 欧美激情在线观看视频免费| 日韩电影天堂视频一区二区| av影片免费在线观看| 国产精品久久久久久久久免费桃花| 亚洲国产精品视频一区| 婷婷激情在线| 亚洲综合一二区| 日韩国产一级片| 天堂av中文在线观看| 色国产综合视频| 男女男精品视频站| 国产欧美88| 精品成人免费观看| 色无极影院亚洲| 成人影视亚洲图片在线| 欧美成人精品在线| 国产精品美女视频| 97免费视频观看| 毛片电影在线| 欧美午夜一区二区三区免费大片| 日本77777| 成人免费直播在线| 亚洲免费视频一区二区| 天堂av免费在线| 韩国av一区| 日韩免费观看视频| 国产女主播福利| 99视频超级精品| 亚洲国产精品www| 国产盗摄一区二区| 91久久香蕉国产日韩欧美9色| 国产aⅴ爽av久久久久| 久久91在线| 色av中文字幕一区| 日韩精品一区二区在线播放| 日本vs亚洲vs韩国一区三区二区| 91蜜桃网站免费观看| 欧美成人免费| 一区二区三区在线观看国产| caopor在线视频| 日韩中文字幕无砖| 在线中文字幕日韩| 亚洲男人第一av| 激情五月激情综合网| 美女视频久久| 青青草原国产在线| 欧美中文字幕一区| 亚洲久久久久久| 午夜精品视频一区二区三区在线看| 91精品91久久久久久| 国产精品老熟女视频一区二区| 99这里都是精品| 黄色特一级视频| 国产极品久久久久久久久波多结野| 欧美成人乱码一区二区三区| 国产一二三av| 亚洲永久在线| 国产精品一区视频网站| 欧美三级理伦电影| 91国偷自产一区二区三区成为亚洲经典| 真实乱偷全部视频| 99久久视频| 日本高清视频一区| 亚洲欧美激情国产综合久久久| 国产精品美女久久久久av爽李琼| 91视频 -- 69xx| 日韩成人视屏| 欧美不卡视频一区发布| 中文字幕丰满人伦在线| 久久精品一区二区三区四区| 欧美不卡在线播放| 亚洲啊v在线免费视频| 久久这里有精品视频| 亚洲一级视频在线观看| 国产人成一区二区三区影院| 日韩人妻精品无码一区二区三区| 国产女人18毛片水真多18精品| 另类色图亚洲色图| 91中文字幕在线播放| 国产精品沙发午睡系列990531| 可以在线看的黄色网址| 亚洲理论电影| 欧美中文在线观看国产| 亚洲av毛片成人精品| 天天色综合天天| 欧美熟妇精品一区二区蜜桃视频 | 久久91精品国产91久久跳| 在线观看黄色网| 国产精品久久久久永久免费观看| 免费看污污网站| 成人av资源电影网站| 国产精品九九九| 99青草视频在线播放视| 欧美日韩一区二区欧美激情| 国产91在线播放九色| 国产在线一区观看| 免费观看国产视频在线| 91在线一区| 久久人人爽人人爽人人片av高清| 香蕉国产在线视频| 色婷婷综合久久久中文一区二区| 四虎永久免费影院| 日本女优在线视频一区二区| 一区二区在线中文字幕电影视频| 性欧美video另类hd尤物| 九九精品在线视频| 色综合免费视频| 日韩欧美中文字幕在线播放| 色一情一交一乱一区二区三区| 蜜臀91精品一区二区三区| 一区二区在线观| 大桥未久女教师av一区二区| 国产91精品久| 五月香视频在线观看| 日韩免费视频一区二区| 91国产丝袜播放在线| 国产日韩欧美精品在线| 天天做天天干天天操| 欧美日韩综合| 蜜桃传媒视频麻豆第一区免费观看| 999国产精品亚洲77777| 免费99精品国产自在在线| 天天综合网在线观看| 欧美性淫爽ww久久久久无| √天堂中文官网8在线| 成人激情免费网站| 中文字幕国产传媒| 欧美激情在线| 任我爽在线视频精品一| 国产精品视频一区二区三区综合| 97精品视频在线| 在线观看h片| 亚洲国产福利在线| 在线观看国产精品视频| 五月天亚洲婷婷| 老司机精品免费视频| 成人免费视频播放| 中文字幕中文在线| 另类激情亚洲| 国产玉足脚交久久欧美| 日韩成人影院| 精品欧美一区二区精品久久| 成人午夜888| 秋霞av国产精品一区| a天堂中文在线官网在线| 亚洲男人的天堂在线| www.久久伊人| 欧美日韩国产在线播放网站| 日本视频免费在线| 亚洲免费在线观看| 中文字幕黄色网址| 99精品热视频| 永久免费未满蜜桃| 激情五月播播久久久精品| 国产天堂在线播放| 中国女人久久久| 国产91在线亚洲| 2023国产精品久久久精品双| 日韩电影免费观看在| 亚洲三级性片| 国严精品久久久久久亚洲影视| 99精品美女视频在线观看热舞| 国产成+人+综合+亚洲欧美丁香花| gogo久久| 久久久久国产一区二区三区| av网址在线播放| 深夜福利日韩在线看| 黄色av网站在线看| 亚洲精品视频免费在线观看| 好吊色一区二区三区| 日韩一级免费一区| 国产乱淫av片免费| 欧美日韩一区二区三区视频| 欧美高清69hd| 欧亚洲嫩模精品一区三区| 久久久精品视频网站| 黑人巨大精品欧美一区二区| 国产精品suv一区二区| 艳妇臀荡乳欲伦亚洲一区| 一级黄色录像视频| 亚洲老妇xxxxxx| 校园春色 亚洲| 一区二区三区久久| 免费在线观看亚洲| 亚洲图片自拍偷拍| 国产精品theporn动漫| 精品美女国产在线| aaa人片在线| 色哟哟亚洲精品| 日日夜夜狠狠操| 在线看不卡av| 小泽玛利亚一区二区三区视频| 91久久精品一区二区| 中文字字幕在线中文乱码| 欧美日韩久久久久久| 国产精选久久久| 日韩欧美国产电影| 亚洲欧洲精品视频| 亚洲欧美在线免费观看| 国产黄色片在线播放| 中文字幕日韩欧美精品在线观看| 欧美激情午夜| 操日韩av在线电影| bl视频在线免费观看| 欧洲成人免费aa| 97欧美成人| 亚洲a级在线播放观看| 综合激情五月婷婷| 欧美成ee人免费视频| 日韩高清欧美| 狠狠精品干练久久久无码中文字幕| 国产精品hd| 日本一极黄色片| 国产一区二区三区久久久| 色悠悠在线视频| 久久久久9999亚洲精品| 在线国产视频一区| 亚洲自拍另类综合| 波多野结衣 久久| 欧美另类久久久品| 天天操天天干天天爽| 在线成人激情黄色| 国产桃色电影在线播放| 日韩美女写真福利在线观看| 自拍偷拍亚洲| 久久久久无码国产精品一区| 第四色成人网| 青青草精品视频在线| 蜜臀av一区二区在线免费观看| 自拍一级黄色片| 久久精品人人做人人综合| 亚洲欧美小视频| 日本道色综合久久| 亚洲黄色在线免费观看| 一区二区欧美久久| 8x8ⅹ拨牐拨牐拨牐在线观看| 国产精品高清免费在线观看| 亚洲精品一区二区三区中文字幕 | 国产精品久久| www黄色在线| 国产精品12区| 黄大色黄女片18免费| 精品久久久久国产| 国产极品久久久| 中文字幕亚洲色图| 啊啊啊久久久| 97人摸人人澡人人人超一碰| 国产在线日韩精品| 尤物av无码色av无码| 国产成人免费在线视频| 91导航在线观看| 色一情一伦一子一伦一区| 亚洲精品中文字幕成人片 | 国产av无码专区亚洲av毛网站| 色综合久久久网| 免费观看国产视频| 欧美另类极品videosbest最新版本| 偷拍中文亚洲欧美动漫| 国产精品伊人日日| 综合久久十次| 99精品999| 国产精品久久毛片a| 欧美黄色一级大片| 国产视频欧美视频| 国产在线观看www| 国产精品一区二| 永久91嫩草亚洲精品人人| 粉色视频免费看| 国产精品久久久久久户外露出| 亚洲精品国产精品乱码视色| 精品中文视频在线| 在线男人天堂| 女同一区二区| 老色鬼久久亚洲一区二区| 草草影院第一页| 欧美性猛交xxxx| 麻豆影视在线| 国产精品成熟老女人| 国产欧美日韩影院| 九九视频精品在线观看| 国产日韩欧美精品电影三级在线| 日日骚av一区二区| 亚洲网站在线看| 欧美三级精品| 色噜噜一区二区| 老司机精品视频在线| 亚洲精品自拍视频在线观看| 欧美嫩在线观看| 在线观看午夜av| 国产美女在线精品免费观看| 亚洲电影av| 中国毛片在线观看| 91福利在线导航| 色老头视频在线观看| 成人激情视频在线播放| 中文字幕日韩欧美精品高清在线| 91porn在线| 精品成人av一区| 黄色av网址在线免费观看| 91精品国产综合久久香蕉最新版 | 国产欧美日韩三区| 日本一区二区三区久久| 日韩性生活视频| 视频一区日韩精品| 久在线观看视频| 国产色婷婷亚洲99精品小说| 一级片视频播放| 久久久久久有精品国产| 一区二区三区韩国免费中文网站| 亚洲77777| 亚洲乱码国产乱码精品精可以看| 动漫av一区二区三区| 日韩美女主播视频| 99久精品视频在线观看视频| 麻豆av免费看| 色综合视频在线观看| 黄网站在线播放| 国产综合av一区二区三区| 免费高清不卡av| 国产亚洲欧美精品久久久www| 精品呦交小u女在线| 成人在线视频国产| 免费无码不卡视频在线观看| 欧美国产日韩一二三区| 丰满人妻一区二区三区免费视频 | 国产极品在线视频| 中文字幕免费在线观看视频一区| 国产女同91疯狂高潮互磨| 欧美做爰性生交视频| 亚洲理论电影网| 美国黄色a级片| 欧美日韩国产影片| 午夜影院一区| 男女啪啪免费观看| 久久精品亚洲国产奇米99| 亚洲第一视频在线播放| 国产精品久久久久久久久男 | 亚洲18在线看污www麻豆|