精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Springboot整合Kafka Stream實時統計數據

開發 前端 Kafka
Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數據。它建立在流式處理的一些重要的概念之上:如何區分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應用程序狀態。

[[417927]]

環境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介紹

Kafka在0.10版本推出了Stream API,提供了對存儲在Kafka內的數據進行流式處理和分析的能力。

流式計算一般被用來和批量計算做比較。批量計算往往有一個固定的數據集作為輸入并計算結果。而流式計算的輸入往往是“無界”的(Unbounded Data),持續輸入的,即永遠拿不到全量數據去做計算;同時,計算結果也是持續輸出的,只能拿到某一個時刻的結果,而不是最終的結果。

Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數據。它建立在流式處理的一些重要的概念之上:如何區分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應用程序狀態。

Kafka Streams的門檻非常低:和編寫一個普通的Kafka消息處理程序沒有太大的差異,可以通過多進程部署來完成擴容、負載均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特點:

  • 被設計成一個簡單的、輕量級的客戶端類庫,能夠被集成到任何Java應用中
  • 除了Kafka之外沒有任何額外的依賴,利用Kafka的分區模型支持水平擴容和保證順序性
  • 通過可容錯的狀態存儲實現高效的狀態操作(windowed joins and aggregations)
  • 支持exactly-once語義
  • 支持紀錄級的處理,實現毫秒級的延遲
  • 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流處理拓撲

  • 流是Kafka Streams提供的最重要的抽象:它表示一個無限的、不斷更新的數據集。流是不可變數據記錄的有序、可重放和容錯序列,其中數據記錄定義為鍵值對。
  • Stream Processing Application是使用了Kafka Streams庫的應用程序。它通過processor topologies定義計算邏輯,其中每個processor topology都是多個stream processor(節點)通過stream組成的圖。
  • A stream processor 是處理器拓撲中的節點;它表示一個處理步驟,通過每次從拓撲中的上游處理器接收一個輸入記錄,將其操作應用于該記錄,來轉換流中的數據,并且隨后可以向其下游處理器生成一個或多個輸出記錄。

有兩種特殊的processor:

Source Processor 源處理器是一種特殊類型的流處理器,它沒有任何上游處理器。它通過使用來自一個或多個kafka topic的記錄并將其轉發到其下游處理器,從而從一個或多個kafka topic生成其拓撲的輸入流。

Sink Processor 接收器處理器是一種特殊類型的流處理器,沒有下游處理器。它將從其上游處理器接收到的任何記錄發送到指定的kafka topic。

Springboot整合Kafka Stream實時統計數據

相關的核心概念查看如下鏈接

Springboot整合Kafka Stream實時統計數據

下面演示Kafka Stream 在Springboot中的應用

依賴

  1. <dependency> 
  2.   <groupId>org.springframework.boot</groupId> 
  3.   <artifactId>spring-boot-starter-web</artifactId> 
  4.   </dependency> 
  5. <dependency> 
  6.   <groupId>org.springframework.kafka</groupId> 
  7.   <artifactId>spring-kafka</artifactId> 
  8. </dependency> 
  9. <dependency> 
  10.   <groupId>org.apache.kafka</groupId> 
  11.   <artifactId>kafka-streams</artifactId> 
  12. </dependency> 

配置

  1. server: 
  2.   port: 9090 
  3. spring: 
  4.   application: 
  5.     name: kafka-demo 
  6.   kafka: 
  7.     streams: 
  8.       application-id: ${spring.application.name
  9.       properties: 
  10.         spring.json.trusted.packages: '*' 
  11.     bootstrap-servers: 
  12.     - localhost:9092 
  13.     - localhost:9093 
  14.     - localhost:9094 
  15.     producer: 
  16.       acks: 1 
  17.       retries: 10 
  18.       key-serializer: org.apache.kafka.common.serialization.StringSerializer 
  19.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer 
  20.       properties: 
  21.         spring.json.trusted.packages: '*' 
  22.     consumer: 
  23.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
  24.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer 
  25.       enable-auto-commitfalse 
  26.       group-id: ConsumerTest 
  27.       auto-offset-reset: latest 
  28.       properties: 
  29.         session.timeout.ms: 12000 
  30.         heartbeat.interval.ms: 3000 
  31.         max.poll.records: 100 
  32.         spring.json.trusted.packages: '*' 
  33.     listener: 
  34.       ack-mode: manual-immediate 
  35.       type: batch 
  36.       concurrency: 8 
  37.     properties: 
  38.       max.poll.interval.ms: 300000 

消息發送

  1. @Service 
  2. public class MessageSend { 
  3.   @Resource 
  4.   private KafkaTemplate<String, Message> kafkaTemplate ; 
  5.   public void sendMessage2(Message message) { 
  6.     kafkaTemplate.send(new ProducerRecord<String, Message>("test", message)).addCallback(result -> { 
  7.       System.out.println("執行成功..." + Thread.currentThread().getName()) ; 
  8.     }, ex -> { 
  9.       System.out.println("執行失敗") ; 
  10.       ex.printStackTrace() ; 
  11.     }) ; 
  12.   } 

消息監聽

  1. @KafkaListener(topics = {"test"}) 
  2. public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  3.   for (ConsumerRecord<String, Message> record : records) { 
  4.     System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  5.   } 
  6.   try { 
  7.     TimeUnit.SECONDS.sleep(0) ; 
  8.   } catch (InterruptedException e) { 
  9.     e.printStackTrace(); 
  10.   } 
  11.   ack.acknowledge() ; 
  12.      
  13. @KafkaListener(topics = {"demo"}) 
  14. public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  15.   for (ConsumerRecord<String, Message> record : records) { 
  16.     System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  17.   } 
  18.   ack.acknowledge() ; 

Kafka Stream處理

消息轉換并轉發其它Topic

  1. @Bean 
  2. public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) { 
  3.   KStream<Object, Object> stream = streamsBuilder.stream("test"); 
  4.   stream.map((key, value) -> { 
  5.     System.out.println("原始消息內容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ; 
  6.     return new KeyValue<>(key"{\"title\": \"123123\", \"message\": \"重新定義內容\"}".getBytes(Charset.forName("UTF-8"))) ; 
  7.   }).to("demo") ; 
  8.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

Stream對象處理

  1. @Bean 
  2. public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.map((key, value) -> { 
  8.     value.setTitle("XXXXXXX") ; 
  9.     return new KeyValue<>(key, value) ; 
  10.   }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ; 
  11.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

分組處理

  1. @Bean 
  2. public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .count() 
  15.   .toStream().print(Printed.toSysOut()); 
  16.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

聚合

  1. @Bean 
  2. public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream().print(Printed.toSysOut()); 
  19.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

Filter過濾數據

  1. @Bean 
  2. public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream() 
  19.   .filter((key, value) -> !"2".equals(key)) 
  20.   .print(Printed.toSysOut()); 
  21.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

過濾Key不等于"2"

分支多流處理

  1. @Bean 
  2. public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   // 分支,多流處理 
  8.   KStream<String, Message>[] arrStream = stream.branch( 
  9.     (key, value) -> "男".equals(value.getSex()),  
  10.     (key, value) -> "女".equals(value.getSex())); 
  11.   Stream.of(arrStream).forEach(as -> { 
  12.     as.foreach((key, message) -> { 
  13.       System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ; 
  14.     }); 
  15.   }); 
  16.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

多字段分組

不能使用多個selectKey,后面的會覆蓋前面的

  1. @Bean 
  2. public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream 
  8.   .selectKey(new KeyValueMapper<String, Message, String>() { 
  9.     @Override 
  10.     public String apply(String key, Message value) { 
  11.       System.out.println(Thread.currentThread().getName()) ; 
  12.       return value.getTime() + " | " + value.getOrgCode() ; 
  13.     } 
  14.   }) 
  15.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  16.   .count() 
  17.   .toStream().print(Printed.toSysOut()); 
  18.   return stream; 

執行結果:

Springboot整合Kafka Stream實時統計數據

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2020-04-24 09:01:23

網絡安全數據泄露黑客

2011-10-09 10:33:12

2015-07-29 11:21:13

JavaScript統計數據

2023-07-18 10:43:14

物聯網IOT

2010-04-21 11:27:55

Oracle數據庫

2009-12-24 08:51:08

Linux用戶

2014-12-31 10:56:57

Windows PhoWP

2019-06-27 05:00:26

物聯網統計數據IOT

2022-10-25 09:11:47

物聯網IoT工業物聯網

2018-10-22 06:27:32

網絡犯罪數據泄露攻擊

2022-10-26 15:17:58

數字存儲數據中心

2020-05-09 22:54:48

物聯網安全物聯網IOT

2024-03-06 08:03:09

2022-04-28 07:31:41

Springkafka數據量

2022-06-20 09:49:25

Linux發行版

2019-07-22 05:01:38

物聯網IOT技術

2025-08-04 01:45:00

2022-06-20 14:19:55

FedoraEPELLinux

2021-02-18 16:10:03

物聯網工業4.0人工智能

2022-04-09 11:53:52

供應鏈攻擊
點贊
收藏

51CTO技術棧公眾號

欧美bbbbb性bbbbb视频| 中文在线资源观看网站视频免费不卡| 亚洲风情在线资源站| 国产一区av在线| 免费在线黄网站| 亚洲性在线观看| 九热爱视频精品视频| 亚洲一区在线视频观看| 成人亚洲激情网| 天天干天天操天天拍| 成人福利av| www.色综合.com| 欧美日韩国产二区| 亚洲一二区在线观看| 岛国在线大片| 精品视频在线播放一区二区三区| 久久精品在这里| 午夜精品一区二区三区在线 | 在线观看的日韩av| 欧美一区二区私人影院日本| 亚洲激情啪啪| 国产精华7777777| 欧美色图在线播放| 91极品美女在线| 欧美一区二区综合| 男人天堂av在线播放| 天堂在线精品| 欧美日韩中文字幕在线| 久久偷窥视频| 中文字幕黄色片| 欧美女人交a| 欧美va亚洲va| www.av片| 欧美视频综合| 麻豆精品一区二区av白丝在线| 中文字幕欧美精品日韩中文字幕| 亚洲高清免费在线观看| 三区四区电影在线观看| 久久激五月天综合精品| 欧美一二三视频| 欧洲美一区二区三区亚洲 | 国产一区二区三区四区| 色拍拍在线精品视频8848| 欧美性猛交内射兽交老熟妇| 丰满少妇在线观看bd| 国产一级久久| 亚洲视频国产视频| 午夜免费一级片| 成人在线视频免费| 亚洲精品成人a在线观看| av在线不卡一区| 日韩欧美性视频| 精品成人影院| 欧美一级日韩不卡播放免费| 蜜臀av.com| 欧美自拍第一页| 久久久久国产精品一区二区| 深夜福利国产精品| 少妇极品熟妇人妻无码| 成人免费看黄| 欧美日韩国产专区| 亚洲精品一区二区三区樱花| 男女污污视频在线观看| 久久久综合网站| 91欧美激情另类亚洲| 日本少妇吞精囗交| 日韩欧美大片| 亚洲成人网在线| 欧美男女交配视频| 国产成人精品一区二区三区在线 | av一级黄色片| 国产精品亚洲欧美| 国产aⅴ夜夜欢一区二区三区 | 香蕉在线观看视频| 97久久亚洲| 欧美日韩在线免费视频| 99热久久这里只有精品| 日本在线天堂| 亚洲欧美日韩一区| 任我爽在线视频精品一| 成年人视频在线免费观看| 亚洲国产高清不卡| 日韩中文在线字幕| 国产三级在线免费观看| 中文字幕av免费专区久久| 天天做天天爱天天高潮| 国产在线观看免费| 菠萝蜜视频在线观看一区| 国产精品中文字幕在线观看| av大片免费观看| 欧美精品综合| 人妖精品videosex性欧美| 青娱乐免费在线视频| 精品一区二区三区的国产在线观看| 国产一区二区成人| 欧美日韩三级在线观看| 国产精品久久久久久久| 这里只有精品丝袜| 2025国产精品自拍| 天天超碰亚洲| 97在线免费观看| 中文字幕欧美人妻精品| 欧美专区一区二区三区| 国产精品综合久久久| 欧洲成人一区二区三区| 国产精品私房写真福利视频| 今天免费高清在线观看国语| 国产精品专区免费| 日韩一区二区电影在线| 舐め犯し波多野结衣在线观看| 综合激情一区| 久热精品视频在线观看| 亚洲视频重口味| 99久久婷婷这里只有精品 | 老司机午夜网站| 自拍偷拍亚洲视频| 精品福利在线视频| 777777av| 黑人巨大亚洲一区二区久 | 国产尤物视频在线| 一区二区视频在线看| 国产三级三级三级看三级| 韩国美女久久| 欧美va亚洲va| 久艹在线观看视频| 狂野欧美一区| 国产精品久久久久久av福利| 欧美一区二区三区久久久| 久久免费黄色| 国产精品久久久久久久久久久久午夜片| 亚洲爆乳无码一区二区三区| 成人美女视频在线观看18| 国产精品乱码视频| 国产精品刘玥久久一区| 一区二区三区精品在线| 青青草精品视频在线| 亚洲我射av| 日韩美女一区二区三区| 免费看毛片的网站| 九九免费精品视频在线观看| 国产+人+亚洲| 乱子伦一区二区三区| 久久国产乱子精品免费女| 91精品久久久久久久久久久久久久| 亚洲中文一区二区三区| 久久久99久久精品欧美| 妺妺窝人体色www在线小说| а_天堂中文在线| 精品久久久久久久大神国产| 中文字幕第3页| 欧美色综合网| 91免费版黄色| 免费在线观看一级毛片| 五月婷婷激情综合| 别急慢慢来1978如如2| 色哟哟精品丝袜一区二区| 91国产美女视频| 亚洲网站在线免费观看| 日本一区二区三区高清不卡| 激情五月五月婷婷| 清纯唯美激情亚洲| 亚洲人成电影网站色| 福利网址在线观看| 欧美国产日产图区| 欧美美女性视频| 日韩av字幕| 久久精品久久久久| 波多野结衣啪啪| 国产香蕉久久精品综合网| 99视频精品全部免费看| 清纯唯美激情亚洲| 97精品国产97久久久久久免费| 天天综合网天天综合| 日韩美女啊v在线免费观看| 黄色一级视频片| 99久热在线精品视频观看| 久久夜色撩人精品| 开心激情综合网| 日韩欧美成人精品| 91动漫免费网站| 久久精品一区| 亚洲欧美日韩精品综合在线观看| 亚洲福利影视| 中文字幕精品国产| 国产精品综合在线| 国产偷v国产偷v亚洲高清| 日本黄色福利视频| 亚洲午夜黄色| 亚洲精品日产aⅴ| 自拍视频在线免费观看| 欧美日韩一区二区在线| 纪美影视在线观看电视版使用方法| 久久99精品国产麻豆婷婷| 欧美精品卡一卡二| jizzjizz欧美69巨大| 97免费高清电视剧观看| 韩国三级一区| 亚洲男人第一av网站| 国产精品一区二区6| 国产精品久久久久久久岛一牛影视 | 亚洲国产激情av| 色婷婷狠狠18禁久久| 噜噜噜躁狠狠躁狠狠精品视频| 久久久国产精华液999999| 精品欧美一区二区三区在线观看 | 自拍偷拍精品视频| 亚洲综合激情另类小说区| 精品人伦一区二区| 久久久久国产精品午夜一区| 国产成人精品免费看在线播放| 偷拍精品福利视频导航| 亚洲自拍偷拍视频| 精品免费av一区二区三区| 国产一级揄自揄精品视频| 丁香花免费高清完整在线播放 | 精品美女久久久| 波多野结衣一区二区三区在线观看| 久久天堂av| 人九九综合九九宗合| 久草在线视频网站| 亚洲精品国精品久久99热| 欧美精品二区三区| 亚洲精选在线视频| 波多野结衣免费观看| 中文字幕一区二区三三| 色综合久久久久久久久五月| 成人在线不卡| 国产福利成人在线| 中文字幕日本在线观看| 日韩高清人体午夜| 午夜一区二区三区四区| 午夜不卡av免费| 九九热精彩视频| 亚洲男女一区二区三区| 国精产品视频一二二区| 国产欧美日韩麻豆91| 一道本在线观看| www国产成人| 色悠悠久久综合网| 欧美激情四色| 免费在线观看91| 偷拍亚洲精品| 免费精品视频一区| 中日韩免视频上线全都免费| 91精品免费久久久久久久久| 日韩三区免费| 国产精品欧美一区二区| 欧美人体视频xxxxx| 美女国内精品自产拍在线播放| 黄色小视频免费观看| 日韩欧美中文字幕一区| 精品人妻aV中文字幕乱码色欲| 欧美日韩国产一区二区三区| 日本熟妇色xxxxx日本免费看| 亚洲一区免费观看| 日本少妇毛茸茸高潮| 欧美日韩国产在线看| www.久久久久久久| 91福利在线观看| 又污又黄的网站| 91精品国产综合久久精品app| 99国产精品久久久久99打野战| 欧美日韩在线影院| 免费无码国产精品| 欧美揉bbbbb揉bbbbb| 91国产精品一区| 欧美午夜美女看片| 欧美亚洲另类小说| 欧美裸体bbwbbwbbw| 久久国产黄色片| 亚洲精品一卡二卡| 国产网友自拍视频| 综合久久综合久久| avtt天堂在线| 国产精品国产三级国产aⅴ中文| 91麻豆精品成人一区二区| 亚洲黄色性网站| 成人精品免费在线观看| 在线一区二区三区做爰视频网站| 国产无精乱码一区二区三区| 五月天中文字幕一区二区| 精品久久久久久久久久久久久久久久久久 | 一本色道久久88亚洲综合88| 免费观看国产视频| 亚洲欧美日韩天堂| 老司机在线永久免费观看| 亚洲欧美制服中文字幕| 午夜激情在线观看| 午夜精品在线观看| 国产91在线播放精品| 成人欧美一区二区| 欧美电影在线观看一区| 精品视频在线观看| 久久久伦理片| 国产亚洲福利社区| 国产一区二区三区黄网站| 国产免费一区二区三区在线能观看 | 欧美被日视频| 九九九久久国产免费| 午夜精品成人av| 欧美专区第一页| 日韩深夜福利网站| 精品日本一区二区三区在线观看| 日韩欧美字幕| 国产中文字幕在线免费观看| 国产一区二区福利| 亚洲综合123| 久久久亚洲欧洲日产国码αv| 五月天av网站| 在线免费观看日本一区| 黄色av网址在线| 中文综合在线观看| 激情黄产视频在线免费观看| 97视频在线观看亚洲| 亚洲老司机网| 欧美日韩系列| 欧美日一区二区| 被灌满精子的波多野结衣| 久久国产精品色婷婷| 亚洲精品视频久久久| 久久久精品国产免大香伊| 久久久久无码国产精品不卡| 亚洲亚洲精品在线观看| 国产精品视频无码| 在线电影中文日韩| 成人福利av| 麻豆蜜桃91| 亚洲精品乱码久久久久久蜜桃麻豆| 免费av手机在线观看| 狠狠狠色丁香婷婷综合激情| 精品国产午夜福利在线观看| 国产欧美一区二区三区沐欲| 日韩精品在线免费视频| 亚洲成人av资源网| 日韩伦理av| 欧美孕妇与黑人孕交| 国产精品对白久久久久粗| 久久99欧美| 国产精品久久观看| 午夜免费看毛片| 中文字幕中文字幕一区| 做爰无遮挡三级| 国产亚洲a∨片在线观看| 亚洲啊v在线| 鲁丝一区鲁丝二区鲁丝三区| 国产精品主播| av网站免费在线播放| 国产精品美女久久久久aⅴ| 欧美日韩在线观看成人| 91精品在线免费观看| 亚洲欧美日韩动漫| 最近中文字幕2019免费| 国产精品亲子伦av一区二区三区| 亚洲综合在线做性| 欧美日韩国产成人精品| 又大又长粗又爽又黄少妇视频| 亚洲国产一区二区视频| 天天躁日日躁狠狠躁伊人| 2024亚洲男人天堂| 日韩av黄色片| 久久精品亚洲| av黄色免费网站| 欧美在线一二三四区| 91九色在线porn| 91色中文字幕| 黄色成人在线网址| 插我舔内射18免费视频| 欧美色另类天堂2015| 成人18在线| 97久久精品午夜一区二区| 99av国产精品欲麻豆| av片中文字幕| 国产精品乱码一区二区三区软件| 一级特黄录像免费看| 欧美成人小视频| 卡通动漫国产精品| 我看黄色一级片| 亚洲精品老司机| 五十路在线视频| 国产精品国产亚洲伊人久久 | 国产精品久久国产| 91一区二区在线观看| 国产少妇在线观看| 亚洲护士老师的毛茸茸最新章节 | 亚洲欧洲视频| 日韩精品电影一区二区| 制服丝袜国产精品| xxxcom在线观看| 色综合影院在线观看| 高清不卡在线观看av| 欧美做爰啪啪xxxⅹ性| 精品国产一区二区精华| av大全在线| 国产在线精品成人一区二区三区| 欧美精品导航| 精品人妻一区二区三区四区| 精品区一区二区| 成人国产一区| 天堂…中文在线最新版在线| 国产精品天干天干在线综合|