精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SparkStreaming與Kafka整合遇到的問題及解決方案

大數(shù)據(jù) Kafka Spark
最近工作中是做日志分析的平臺,采用了sparkstreaming+kafka,采用kafka主要是看中了它對大數(shù)據(jù)量處理的高性能,處理日志類應(yīng)用再好不過了,采用了sparkstreaming的流處理框架 主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務(wù),并且可以提供準(zhǔn)實時服務(wù)到elasticsearch中,可以實現(xiàn)準(zhǔn)實時定位系統(tǒng)日志。

前言

最近工作中是做日志分析的平臺,采用了sparkstreaming+kafka,采用kafka主要是看中了它對大數(shù)據(jù)量處理的高性能,處理日志類應(yīng)用再好不過了,采用了sparkstreaming的流處理框架 主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務(wù),并且可以提供準(zhǔn)實時服務(wù)到elasticsearch中,可以實現(xiàn)準(zhǔn)實時定位系統(tǒng)日志。

實現(xiàn)

Spark-Streaming獲取kafka數(shù)據(jù)的兩種方式-Receiver與Direct的方式。

一. 基于Receiver方式

這種方式使用Receiver來獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。代碼如下:

  1. SparkConf sparkConf = new SparkConf().setAppName("log-etl").setMaster("local[4]"); 
  2.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
  3.     int numThreads = Integer.parseInt("4"); 
  4.     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
  5.     topicMap.put("group-45", numThreads); 
  6.      //接收的參數(shù)分別是JavaStreamingConetxt,zookeeper連接地址,groupId,kafak的topic  
  7.     JavaPairReceiverInputDStream<String, String> messages = 
  8.     KafkaUtils.createStream(jssc, "172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181""1", topicMap); 

剛開始的時候系統(tǒng)正常運行,沒有發(fā)現(xiàn)問題,但是如果系統(tǒng)異常重新啟動sparkstreaming程序后,發(fā)現(xiàn)程序會重復(fù)處理已經(jīng)處理過的數(shù)據(jù),這種基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數(shù)據(jù)的傳統(tǒng)方式。這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。官方現(xiàn)在也已經(jīng)不推薦這種整合方式,官網(wǎng)相關(guān)地址 http://spark.apache.org/docs/latest/streaming-kafka-integration.html ,下面我們使用官網(wǎng)推薦的第二種方式kafkaUtils的createDirectStream()方式。

二.基于Direct的方式

這種新的不基于Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數(shù)據(jù)后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的***的offset,從而定義每個batch的offset的范圍。當(dāng)處理數(shù)據(jù)的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)。

代碼如下:

  1. SparkConf sparkConf = new SparkConf().setAppName("log-etl"); 
  2. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 
  3.  
  4. HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); 
  5. HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
  6. kafkaParams.put("metadata.broker.list", brokers); 
  7. // Create direct kafka stream with brokers and topics 
  8. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( 
  9.     jssc, 
  10.     String.class, 
  11.     String.class, 
  12.     StringDecoder.class, 
  13.     StringDecoder.class, 
  14.     kafkaParams, 
  15.     topicsSet 
  16. ); 

這種direct方式的優(yōu)點如下:

1.簡化并行讀?。喝绻x取多個partition,不需要創(chuàng)建多個輸入DStream然后對它們進(jìn)行union操作。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間,有一個一對一的映射關(guān)系。

2.一次且僅一次的事務(wù)機制:基于receiver的方式,在spark和zk中通信,很有可能導(dǎo)致數(shù)據(jù)的不一致。

3.高效率:在receiver的情況下,如果要保證數(shù)據(jù)的不丟失,需要開啟wal機制,這種方式下,為、數(shù)據(jù)實際上被復(fù)制了兩份,一份在kafka自身的副本中,另外一份要復(fù)制到wal中, direct方式下是不需要副本的。

三.基于Direct方式丟失消息的問題

貌似這種方式很***,但是還是有問題的,當(dāng)業(yè)務(wù)需要重啟sparkstreaming程序的時候,業(yè)務(wù)日志依然會打入到kafka中,當(dāng)job重啟后只能從***的offset開始消費消息,造成重啟過程中的消息丟失。kafka中的offset如下圖(使用kafkaManager實時監(jiān)控隊列中的消息):

當(dāng)停止業(yè)務(wù)日志的接受后,先重啟spark程序,但是發(fā)現(xiàn)job并沒有將先前打入到kafka中的數(shù)據(jù)消費掉。這是因為消息沒有經(jīng)過zk,topic的offset也就沒有保存

四.解決消息丟失的處理方案

一般有兩種方式處理這種問題,可以先spark streaming 保存offset,使用spark checkpoint機制,第二種是程序中自己實現(xiàn)保存offset邏輯,我比較喜歡第二種方式,以為這種方式可控,所有主動權(quán)都在自己手中。

先看下大體流程圖,

  1. SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("log-etl"); 
  2.  Set<String> topicSet = new HashSet<String>(); 
  3.         topicSet.add("group-45"); 
  4.         kafkaParam.put("metadata.broker.list""172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092"); 
  5.         kafkaParam.put("group.id""simple1"); 
  6.  
  7.         // transform java Map to scala immutable.map 
  8.         scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam); 
  9.         scala.collection.immutable.Map<String, String> scalaKafkaParam = 
  10.                 testMap.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { 
  11.                     public Tuple2<String, String> apply(Tuple2<String, String> v1) { 
  12.                         return v1; 
  13.                     } 
  14.                 }); 
  15.  
  16.         // init KafkaCluster 
  17.         kafkaCluster = new KafkaCluster(scalaKafkaParam); 
  18.  
  19.         scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet); 
  20.         immutableTopics = mutableTopics.toSet(); 
  21.         scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get(); 
  22.  
  23.         // kafka direct stream 初始化時使用的offset數(shù)據(jù) 
  24.         Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>(); 
  25.  
  26.         // 沒有保存offset時(該group***消費時), 各個partition offset 默認(rèn)為0 
  27.         if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) { 
  28.  
  29.             System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get()); 
  30.  
  31.             Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); 
  32.  
  33.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { 
  34.                 consumerOffsetsLong.put(topicAndPartition, 0L); 
  35.             } 
  36.  
  37.         } 
  38.         // offset已存在, 使用保存的offset 
  39.         else { 
  40.  
  41.             scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("simple1", topicAndPartitionSet2).right().get(); 
  42.  
  43.             Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp); 
  44.  
  45.             Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); 
  46.  
  47.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { 
  48.                 Long offset = (Long)consumerOffsets.get(topicAndPartition); 
  49.                 consumerOffsetsLong.put(topicAndPartition, offset); 
  50.             } 
  51.  
  52.         } 
  53.  
  54.         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); 
  55.         kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); 
  56.  
  57.         // create direct stream 
  58.         JavaInputDStream<String> message = KafkaUtils.createDirectStream( 
  59.                 jssc, 
  60.                 String.class, 
  61.                 String.class, 
  62.                 StringDecoder.class, 
  63.                 StringDecoder.class, 
  64.                 String.class, 
  65.                 kafkaParam, 
  66.                 consumerOffsetsLong, 
  67.                 new Function<MessageAndMetadata<String, String>, String>() { 
  68.                     public String call(MessageAndMetadata<String, String> v1) throws Exception { 
  69.                         System.out.println("接收到的數(shù)據(jù)《《==="+v1.message()); 
  70.                         return v1.message(); 
  71.                     } 
  72.                 } 
  73.         ); 
  74.  
  75.         // 得到rdd各個分區(qū)對應(yīng)的offset, 并保存在offsetRanges中 
  76.         final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>(); 
  77.  
  78.         JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { 
  79.             public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { 
  80.                 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
  81.                 offsetRanges.set(offsets); 
  82.                 return rdd; 
  83.             } 
  84.         }); 
  85.  
  86.         // output 
  87.         javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { 
  88.  
  89.             public Void call(JavaRDD<String> v1) throws Exception { 
  90.                 if (v1.isEmpty()) return null
  91.  
  92.                 List<String> list = v1.collect(); 
  93.                 for(String s:list){ 
  94.                     System.out.println("數(shù)據(jù)==="+s); 
  95.                 } 
  96.  
  97.                 for (OffsetRange o : offsetRanges.get()) { 
  98.  
  99.                     // 封裝topic.partition 與 offset對應(yīng)關(guān)系 java Map 
  100.                     TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); 
  101.                     Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>(); 
  102.                     topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); 
  103.  
  104.                     // 轉(zhuǎn)換java map to scala immutable.map 
  105.                     scala.collection.mutable.Map<TopicAndPartition, Object> testMap = 
  106.                             JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap); 
  107.                     scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = 
  108.                             testMap.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() { 
  109.                                 public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) { 
  110.                                     return v1; 
  111.                                 } 
  112.                             }); 
  113.  
  114.                     // 更新offset到kafkaCluster 
  115.                     kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); 
  116.                        System.out.println("原數(shù)據(jù)====》"+o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() 
  117.                     ); 
  118.                 } 
  119.                 return null
  120.             } 
  121.         }); 
  122.  
  123.         jssc.start(); 
  124.         jssc.awaitTermination(); 
  125.     } 

基本使用這種方式就可以解決數(shù)據(jù)丟失的問題。

責(zé)任編輯:武曉燕
相關(guān)推薦

2021-08-31 07:57:21

輪詢鎖多線編程Java

2010-06-12 12:46:04

Grub Rescue

2010-10-08 16:31:08

AjaxIE6

2019-10-08 16:05:19

Redis數(shù)據(jù)庫系統(tǒng)

2015-05-12 16:31:22

Elasticsear開源分布式搜索引擎

2018-10-24 19:59:45

Kubernetes混合云云擴展

2015-12-02 15:35:08

Redis Clust遷移解決方案

2024-10-30 11:00:00

Python列表索引

2016-01-27 15:07:11

IT運維管理/呼叫中心

2024-11-08 13:47:35

中文亂碼配置

2024-07-08 08:45:41

2020-06-11 08:24:33

CSS瀏覽器macOS

2010-08-31 16:09:04

DIV+CSS

2010-08-26 14:00:28

CSSmargin

2010-05-17 09:49:46

MySQL中文問題

2011-03-02 14:56:56

FileZilla425問題

2024-12-02 01:16:53

2010-08-23 09:53:41

DivCSSweb

2016-09-27 21:14:53

JavaURL

2024-05-22 19:10:18

跨域Web開發(fā)
點贊
收藏

51CTO技術(shù)棧公眾號

日韩成人在线免费观看| 亚洲欧美另类图片小说| 欧美在线欧美在线| 国产黄a三级三级| 国内精品视频| 欧美日韩免费一区| 色撸撸在线观看| 三级在线视频| 国产精品中文字幕日韩精品 | 亚洲一区二区在线| 日韩av一二三区| 久久福利综合| 日韩av在线电影网| aaaaaaaa毛片| 天天综合网站| 午夜精品一区二区三区电影天堂 | 国产精品剧情| 久久久久久久网| 91高跟黑色丝袜呻吟在线观看| 亚洲午夜18毛片在线看| 亚洲精品2区| 亚洲人成电影网站色xx| 亚洲精品成人无码毛片| 欧美美女福利视频| 色综合激情五月| 国产高清www| 黄视频在线观看网站| 欧美精彩视频一区二区三区| 精品欧美一区二区精品久久| 国产福利第一视频| 久久99精品久久久久久国产越南| 欧洲精品在线视频| 久久精品欧美一区二区| 91精品久久久久久久蜜月| 国产一区二区三区欧美| 欧美特黄一区二区三区| 欧美调教视频| 亚洲福利在线视频| 任你躁av一区二区三区| 国产一区二区三区免费观看在线| 欧美色综合网站| 又色又爽又高潮免费视频国产| 国产在线天堂www网在线观看| 中文字幕在线不卡一区二区三区| 日本一区二区精品| 三级国产在线观看| 99re视频精品| 精品久久久久久一区| 欧美一级特黄aaaaaa大片在线观看| 狠狠色狠狠色综合| 成人国产精品免费视频| 艳妇乳肉豪妇荡乳av| 久热成人在线视频| 成人女保姆的销魂服务| 国产精品伦一区二区三区| 久久av中文字幕片| 91精品免费视频| 99国产在线播放| 国产盗摄女厕一区二区三区| 91丝袜脚交足在线播放| www.国产麻豆| eeuss鲁片一区二区三区在线观看| 国产精品久久久久久久久久直播| 懂色av一区二区三区四区| 床上的激情91.| 国产一区视频观看| 毛片在线播放网站| 国产精品午夜久久| 久久福利一区二区| 精精国产xxxx视频在线野外| 欧美午夜激情小视频| aaa毛片在线观看| а√天堂资源国产精品| 91麻豆精品国产| 国产精品熟妇一区二区三区四区 | 欧美日产一区二区三区在线观看| 美女做暖暖视频免费在线观看全部网址91 | 91人人爽人人爽人人精88v| 97在线视频人妻无码| 国产iv一区二区三区| 精品免费一区二区三区蜜桃| 高清av在线| 玉足女爽爽91| aⅴ在线免费观看| 在线不卡一区| 日韩黄色在线免费观看| 亚洲区一区二区三| 亚洲精选久久| 国产精品无av码在线观看| 风流少妇一区二区三区91| 久久女同互慰一区二区三区| 在线视频一区观看| 精品三级久久| 91精品国产综合久久精品app| av免费观看不卡| 成人精品中文字幕| 国模精品一区二区三区色天香| 欧美一级做a爰片免费视频| 国产乱理伦片在线观看夜一区| 久久久精彩视频| 黄网页免费在线观看| 黑丝美女久久久| 日韩欧美中文在线视频| 久草在线成人| 久久免费高清视频| 91theporn国产在线观看| 91在线播放网址| 中国老女人av| 999国产精品亚洲77777| 亚洲精品在线免费观看视频| 中文字幕在线观看二区| 国产亚洲福利| julia一区二区中文久久94| a√在线中文网新版址在线| 婷婷一区二区三区| 中文字幕乱妇无码av在线| 精品国产一区二区三区噜噜噜 | 久久不射热爱视频精品| 无码人妻丰满熟妇精品区| 高清国产一区二区| 四虎4hu永久免费入口| 国产电影一区二区三区爱妃记| 亚洲成av人片在线观看香蕉| 麻豆天美蜜桃91| 美女诱惑一区二区| 日本视频精品一区| 成人性生交大片免费观看网站| 亚洲成人激情图| 久久久久久久久久网站| 美日韩一级片在线观看| 手机看片福利永久国产日韩| 亚洲综合电影| 日韩的一区二区| 日产精品久久久久| 成人毛片老司机大片| 中国一级黄色录像| 99综合久久| xxxxx91麻豆| 国产精品丝袜黑色高跟鞋| 国产精品无圣光一区二区| 亚洲性生活网站| 精品欧美激情在线观看| 国产精品福利网站| yourporn在线观看视频| 在线观看日韩电影| 精品无码在线观看| 麻豆精品久久精品色综合| 丝袜足脚交91精品| 亚洲伦理久久| 美女扒开尿口让男人操亚洲视频网站| 国产精品乱码久久久| 亚洲欧美另类在线| 美女扒开腿免费视频| 亚洲黄色高清| 欧美激情第六页| 欧美free嫩15| 色偷偷av亚洲男人的天堂| 一区二区三区免费在线| 亚洲人成在线观看一区二区| 久久人人爽人人片| 亚洲毛片av| 欧美一区二区三区精美影视| av有声小说一区二区三区| 日韩中文有码在线视频| 国产伦理一区二区| 亚洲1区2区3区4区| 在哪里可以看毛片| 久久国产视频网| 日韩国产小视频| 欧美日韩一区二区三区在线电影 | 精品国产依人香蕉在线精品| 国产精品亚洲lv粉色| 一个色妞综合视频在线观看| 9.1成人看片| 欧美a一区二区| 97av中文字幕| 欧美日韩麻豆| 国产日韩在线看| 超碰在线97国产| 国产亚洲精品久久久久久牛牛| 91在线精品入口| 亚洲mv在线观看| www.日本高清视频| 高清av一区二区| 日本熟妇人妻中出| 国产在线不卡| 视频一区二区三| 在线观看视频一区二区三区| 国产精品第一第二| 欧美hdxxx| 在线免费观看羞羞视频一区二区| 精品国产乱码久久久久久蜜臀网站| 精品美女国产在线| 免费在线观看黄色小视频| 成人avav影音| 午夜一区二区视频| 香蕉成人久久| 成年在线观看视频| 日本精品黄色| 国产一区免费在线| 精品一区二区三区中文字幕| 2018国产精品视频| 中文字幕在线观看播放| 亚洲美女久久久| 成人无码一区二区三区| 88在线观看91蜜桃国自产| 久久久久99精品成人片我成大片 | 99视频在线看| 亚洲三级在线免费观看| 国产传媒国产传媒| av激情亚洲男人天堂| 一级片免费在线观看视频| 久久蜜桃精品| av之家在线观看| 你懂的一区二区| 伊人久久大香线蕉成人综合网| 亚洲伊人春色| 精品婷婷色一区二区三区蜜桃| 精品久久国产一区| 国产综合视频在线观看| 日韩成人亚洲| 日韩免费观看在线观看| 狼人综合视频| 亚洲3p在线观看| 激情图片在线观看高清国产| 久久午夜a级毛片| 99青草视频在线播放视| 国产亚洲精品成人av久久ww| 色视频在线看| 亚洲欧美国产另类| 免费在线高清av| 亚洲男人的天堂网站| 亚洲 另类 春色 国产| 亚洲福利视频网站| 日本黄色大片视频| 亚洲成人久久久| 手机看片国产1024| 日韩av中文字幕在线免费观看| 欧美少妇bbw| 亚洲成人精品在线| 五月婷婷丁香六月| 日韩av综合中文字幕| 少妇荡乳情欲办公室456视频| 亚洲第一黄色网| 午夜影院免费视频| 亚洲欧洲日本专区| 国产高清一级毛片在线不卡| 在线视频欧美日韩| 在线观看完整版免费| 日韩在线免费视频观看| 麻豆免费在线视频| 欧美精品免费在线| 91探花在线观看| 欧美亚洲一区在线| 欧美性片在线观看| 国产精品丝袜白浆摸在线| 久久久精品一区二区毛片免费看| 91精品视频一区| 中文字幕一区图| 国产午夜精品一区| 视频一区中文| 少妇熟女一区二区| 亚洲视频日本| 黑森林福利视频导航| 欧美aaaaaa午夜精品| 国产永久免费网站| 成人高清伦理免费影院在线观看| 欧美黑人欧美精品刺激| 欧美国产日韩精品免费观看| 日本不卡一二区| 同产精品九九九| 在线观看一二三区| 日韩精品一区二区三区视频播放| 亚洲av成人精品一区二区三区在线播放 | 国产精品久久久久免费a∨大胸| 日本午夜免费一区二区| 99中文字幕| 久久99影视| 91精品国产毛片武则天| 国产农村妇女精品一二区| 怡红院亚洲色图| 97精品久久久久中文字幕| 亚洲av毛片基地| 亚洲成a人v欧美综合天堂下载 | 伊人网在线播放| 成人国产精品久久久久久亚洲| 国语一区二区三区| 亚洲欧洲一区二区| 亚洲激情另类| www.99r| 91在线精品一区二区三区| 日日操免费视频| 精品久久久久久久久久国产| 亚洲一级视频在线观看| 亚洲福利视频网| а√天堂官网中文在线| 日韩av大片在线| 一区二区中文字幕在线观看| 亚洲黄色成人久久久| 国产精品久久久免费| 毛毛毛毛毛毛毛片123| 国产日产欧美一区二区三区| av资源吧首页| 日韩一区二区免费电影| yourporn在线观看视频| 96精品视频在线| 视频二区欧美毛片免费观看| 日韩精品一区二区三区外面| 亚洲午夜91| 中文字幕日韩久久| 国产精品情趣视频| 日本高清不卡码| 精品99久久久久久| av在线下载| 成人午夜两性视频| 欧美大人香蕉在线| 日韩中文字幕免费在线| 91网站黄www| 久久亚洲成人av| 欧美一区二区成人6969| 在线视频二区| 国产成人亚洲综合91| 亚洲电影男人天堂| 国产精品无码av在线播放| 大尺度一区二区| 久久久久久免费观看| 日韩精品一区在线观看| 超碰在线观看免费| 成人美女免费网站视频| 欧美hentaied在线观看| 中文字幕国内自拍| 久久品道一品道久久精品| 好看的av在线| 精品一区二区三区三区| 在线精品亚洲欧美日韩国产| 精品伦精品一区二区三区视频| 黄色av一区| 2一3sex性hd| 福利视频第一区| 飘雪影院手机免费高清版在线观看| 97在线视频观看| 亚洲精品播放| 97在线播放视频| 国产日韩欧美精品电影三级在线| 无码人妻精品一区二区蜜桃色欲| 亚洲精品丝袜日韩| 丝袜美腿一区| 亚洲国产精品123| 国内成人免费视频| 强乱中文字幕av一区乱码| 欧美mv日韩mv亚洲| а√天堂资源官网在线资源| 久久综合精品一区| 日韩主播视频在线| 在线观看免费黄色网址| 欧美二区在线观看| 午夜影院免费在线| 久久96国产精品久久99软件| 久久综合九色综合欧美狠狠| 中文天堂资源在线| 欧美一区二区三区播放老司机| 蜜臀av在线| 久久久www免费人成黑人精品| 三级成人在线视频| 亚洲欧美精品aaaaaa片| 精品久久国产字幕高潮| 午夜影院在线观看国产主播| 四虎永久在线精品免费一区二区| 精品在线亚洲视频| 国产精品1234区| 亚洲性av网站| 欧美激情精品| 久久婷婷国产精品| 综合久久一区二区三区| 亚洲经典一区二区| 日韩免费观看网站| 女主播福利一区| 美女100%无挡| 在线不卡欧美精品一区二区三区| 大桥未久在线播放| 日韩中文一区| 成人免费视频网站在线观看| 波多野结衣一二区| 九九久久久久99精品| 久久不见久久见免费视频7| 小早川怜子一区二区三区| 精品福利一区二区| 黄色小网站在线观看| 久久99导航| 国产精品18久久久久| 天天爽夜夜爽人人爽| 欧美风情在线观看| 精品美女久久久| 美女搡bbb又爽又猛又黄www| 欧美日韩二区三区| 综合日韩av| 国产av熟女一区二区三区 | 黄色一级视频在线观看| 亚洲欧美激情另类校园| 日韩视频一二区|