精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

對(duì)Spark的那些【魔改】

大數(shù)據(jù) Spark
這兩年做 streamingpro 時(shí),不可避免的需要對(duì)Spark做大量的增強(qiáng)。就如同我之前吐槽的,Spark大量使用了new進(jìn)行對(duì)象的創(chuàng)建,導(dǎo)致里面的實(shí)現(xiàn)基本沒有辦法進(jìn)行替換。

前言

這兩年做 streamingpro 時(shí),不可避免的需要對(duì)Spark做大量的增強(qiáng)。就如同我之前吐槽的,Spark大量使用了new進(jìn)行對(duì)象的創(chuàng)建,導(dǎo)致里面的實(shí)現(xiàn)基本沒有辦法進(jìn)行替換。

對(duì)Spark的那些【魔改】

比如SparkEnv里有個(gè)屬性叫closureSerializer,是專門做任務(wù)的序列化反序列化的,當(dāng)然也負(fù)責(zé)對(duì)函數(shù)閉包的序列化反序列化。我們看看內(nèi)部是怎么實(shí)現(xiàn)的:

  1. val serializer = instantiateClassFromConf[Serializer]( 
  2.       "spark.serializer""org.apache.spark.serializer.JavaSerializer"
  3.     logDebug(s"Using serializer: ${serializer.getClass}"
  4.  
  5.     val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) 
  6.  
  7.     val closureSerializer = new JavaSerializer(conf) 
  8.  
  9. val envInstance = new SparkEnv( 
  10. ..... 
  11.  closureSerializer, .... 

這里直接new了一個(gè)JavaSerializer,并不能做配置。如果不改源碼,你沒有任何辦法可以替換掉掉這個(gè)實(shí)現(xiàn)。同理,如果我想替換掉Executor的實(shí)現(xiàn),基本也是不可能的。

今年有兩個(gè)大地方涉及到了對(duì)Spark的【魔改】,也就是不通過改源碼,使用原有發(fā)型包,通過添加新代碼的方式來對(duì)Spark進(jìn)行增強(qiáng)。

二層RPC的支持

我們知道,在Spark里,我們只能通過Task才能touch到Executor。現(xiàn)有的API你是沒辦法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都加載一個(gè)資源文件,現(xiàn)在是沒辦法做到的。為了能夠?qū)xecutor進(jìn)行直接的操作,那就需要建立一個(gè)新的通訊層。那具體怎么做呢?

首先,在Driver端建立一個(gè)Backend,這個(gè)比較簡(jiǎn)單,

  1. class PSDriverBackend(sc: SparkContext) extends Logging { 
  2.  
  3.   val conf = sc.conf 
  4.   var psDriverRpcEndpointRef: RpcEndpointRef = null 
  5.  
  6.   def createRpcEnv = { 
  7.     val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER 
  8.     val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS) 
  9.     val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS) 
  10.     var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt 
  11.     val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) { 
  12.       Some(CryptoStreamUtils.createKey(sc.conf)) 
  13.     } else { 
  14.       None 
  15.     } 
  16.     logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}"
  17.     var createSucess = false 
  18.     var count = 0 
  19.     val env = new AtomicReference[RpcEnv]() 
  20.     while (!createSucess && count < 10) { 
  21.       try { 
  22.         env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf, 
  23.           sc.env.securityManager, clientMode = !isDriver)) 
  24.         createSucess = true 
  25.       } catch { 
  26.         case e: Exception => 
  27.           logInfo("fail to create rpcenv", e) 
  28.           count += 1 
  29.           port += 1 
  30.       } 
  31.     } 
  32.     if (env.get() == null) { 
  33.       logError(s"fail to create rpcenv finally with attemp ${count} "
  34.     } 
  35.     env.get() 
  36.   } 
  37.  
  38.   def start() = { 
  39.     val env = createRpcEnv 
  40.     val pSDriverBackend = new PSDriverEndpoint(sc, env) 
  41.     psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend) 
  42.   } 
  43.  

這樣,你可以理解為在Driver端啟動(dòng)了一個(gè)PRC Server。要運(yùn)行這段代碼也非常簡(jiǎn)單,直接在主程序里運(yùn)行即可:

  1. // parameter server should be enabled by default 
  2.     if (!params.containsKey("streaming.ps.enable") || params.get("streaming.ps.enable").toString.toBoolean) { 
  3.       logger.info("ps enabled..."
  4.       if (ss.sparkContext.isLocal) { 
  5.         localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext) 
  6.         localSchedulerBackend.start() 
  7.       } else { 
  8.         logger.info("start PSDriverBackend"
  9.         psDriverBackend = new PSDriverBackend(ss.sparkContext) 
  10.         psDriverBackend.start() 
  11.       } 
  12.     } 

這里我們需要實(shí)現(xiàn)local模式和cluster模式兩種。

Driver啟動(dòng)了一個(gè)PRC Server,那么Executor端如何啟動(dòng)呢?Executor端似乎沒有任何一個(gè)地方可以讓我啟動(dòng)一個(gè)PRC Server? 其實(shí)有的,只是非常trick,我們知道Spark是允許自定義Metrics的,并且會(huì)調(diào)用用戶實(shí)現(xiàn)的metric特定的方法,我們只要開發(fā)一個(gè)metric Sink,在里面啟動(dòng)RPC Server,騙過Spark即可。具體時(shí)下如下:

  1. class PSServiceSink(val property: Properties, val registry: MetricRegistry, 
  2.                     securityMgr: SecurityManager) extends Sink with Logging { 
  3.   def env = SparkEnv.get 
  4.  
  5.   var psDriverUrl: String = null 
  6.   var psExecutorId: String = null 
  7.   var hostname: String = null 
  8.   var cores: Int = 0 
  9.   var appId: String = null 
  10.   val psDriverPort = 7777 
  11.   var psDriverHost: String = null 
  12.   var workerUrl: Option[String] = None 
  13.   val userClassPath = new mutable.ListBuffer[URL]() 
  14.  
  15.   def parseArgs = { 
  16.     //val runtimeMxBean = ManagementFactory.getRuntimeMXBean(); 
  17.     //var argv = runtimeMxBean.getInputArguments.toList 
  18.     var argv = System.getProperty("sun.java.command").split("\\s+").toList 
  19.  
  20.    ..... 
  21.     psDriverHost = host 
  22.     psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort 
  23.   } 
  24.  
  25.   parseArgs 
  26.  
  27.   def createRpcEnv = { 
  28.     val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER 
  29.     val bindAddress = hostname 
  30.     val advertiseAddress = "" 
  31.     val port = env.conf.getOption("spark.ps.executor.port").getOrElse("0").toInt 
  32.     val ioEncryptionKey = if (env.conf.get(IO_ENCRYPTION_ENABLED)) { 
  33.       Some(CryptoStreamUtils.createKey(env.conf)) 
  34.     } else { 
  35.       None 
  36.     } 
  37.     //logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}"
  38.     RpcEnv.create("PSExecutorBackend", bindAddress, port, env.conf, 
  39.       env.securityManager, clientMode = !isDriver) 
  40.   } 
  41.  
  42.   override def start(): Unit = { 
  43.  
  44.     new Thread(new Runnable { 
  45.       override def run(): Unit = { 
  46.         logInfo(s"delay PSExecutorBackend 3s"
  47.         Thread.sleep(3000) 
  48.         logInfo(s"start PSExecutor;env:${env}"
  49.         if (env.executorId != SparkContext.DRIVER_IDENTIFIER) { 
  50.           val rpcEnv = createRpcEnv 
  51.           val pSExecutorBackend = new PSExecutorBackend(env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores) 
  52.           PSExecutorBackend.executorBackend = Some(pSExecutorBackend) 
  53.           rpcEnv.setupEndpoint("ps-executor-endpoint", pSExecutorBackend) 
  54.         } 
  55.       } 
  56.     }).start() 
  57.  
  58.   } 
  59. ... 

到這里,我們就能成功啟動(dòng)RPC Server,并且連接上Driver中的PRC Server?,F(xiàn)在,你就可以在不修改Spark 源碼的情況下,盡情的寫通訊相關(guān)的代碼了,讓你可以更好的控制Executor。

比如在PSExecutorBackend 實(shí)現(xiàn)如下代碼:

  1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 
  2.     case Message.TensorFlowModelClean(modelPath) => { 
  3.       logInfo("clean tensorflow model"
  4.       TFModelLoader.close(modelPath) 
  5.       context.reply(true
  6.     } 
  7.     case Message.CopyModelToLocal(modelPath, destPath) => { 
  8.       logInfo(s"copying model: ${modelPath} -> ${destPath}"
  9.       HDFSOperator.copyToLocalFile(destPath, modelPath, true
  10.       context.reply(true
  11.     } 
  12.   } 

接著你就可以在Spark里寫如下的代碼調(diào)用了:

  1. val psDriverBackend = runtime.asInstanceOf[SparkRuntime].psDriverBackend psDriverBackend.psDriverRpcEndpointRef.send(Message.TensorFlowModelClean("/tmp/ok")) 

是不是很酷。

修改閉包的序列化方式

Spark的任務(wù)調(diào)度開銷非常大。對(duì)于一個(gè)復(fù)雜的任務(wù),業(yè)務(wù)邏輯代碼執(zhí)行時(shí)間大約是3-7ms,但是整個(gè)spark運(yùn)行的開銷大概是1.3s左右。

經(jīng)過詳細(xì)dig發(fā)現(xiàn),sparkContext里RDD轉(zhuǎn)化時(shí),會(huì)對(duì)函數(shù)進(jìn)行clean操作,clean操作的過程中,默認(rèn)會(huì)檢查是不是能序列化(就是序列化一遍,沒拋出異常就算可以序列化)。而序列化成本相當(dāng)高(默認(rèn)使用的JavaSerializer并且對(duì)于函數(shù)和任務(wù)序列化,是不可更改的),單次序列化耗時(shí)就達(dá)到200ms左右,在local模式下對(duì)其進(jìn)行優(yōu)化,可以減少600ms左右的請(qǐng)求時(shí)間。

當(dāng)然,需要申明的是,這個(gè)是針對(duì)local模式進(jìn)行修改的。那具體怎么做的呢?

我們先看看Spark是怎么調(diào)用序列化函數(shù)的,首先在SparkContext里,clean函數(shù)是這樣的:

  1. private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { 
  2.     ClosureCleaner.clean(f, checkSerializable) 
  3.     f 
  4.   } 

調(diào)用的是ClosureCleaner.clean方法,該方法里是這么調(diào)用學(xué)序列化的:

  1. try { 
  2.       if (SparkEnv.get != null) { 
  3.         SparkEnv.get.closureSerializer.newInstance().serialize(func) 
  4.       } 
  5.     } catch { 
  6.       case ex: Exception => throw new SparkException("Task not serializable", ex) 
  7.     } 

SparkEnv是在SparkContext初始化的時(shí)候創(chuàng)建的,該對(duì)象里面包含了closureSerializer,該對(duì)象通過new JavaSerializer創(chuàng)建。既然序列化太慢,又因?yàn)槲覀兤鋵?shí)是在Local模式下,本身是可以不需要序列化的,所以我們這里想辦法把closureSerializer的實(shí)現(xiàn)替換掉。正如我們前面吐槽,因?yàn)樵赟park代碼里寫死了,沒有暴露任何自定義的可能性,所以我們又要魔改一下了。

首先,我們新建一個(gè)SparkEnv的子類:

  1. class WowSparkEnv( 
  2.                    ....) extends SparkEnv( 

接著實(shí)現(xiàn)一個(gè)自定義的Serializer:

  1. class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance { 
  2.  
  3.   private def isClosure(cls: Class[_]): Boolean = { 
  4.     cls.getName.contains("$anonfun$"
  5.   } 
  6.  
  7.   override def serialize[T: ClassTag](t: T): ByteBuffer = { 
  8.     if (isClosure(t.getClass)) { 
  9.       val uuid = UUID.randomUUID().toString 
  10.       LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef]) 
  11.       ByteBuffer.wrap(uuid.getBytes()) 
  12.     } else { 
  13.       javaD.serialize(t) 
  14.     } 
  15.  
  16.   } 
  17.  
  18.   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { 
  19.     val s = StandardCharsets.UTF_8.decode(bytes).toString() 
  20.     if (LocalNonOpSerializerInstance.maps.containsKey(s)) { 
  21.       LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] 
  22.     } else { 
  23.       bytes.flip() 
  24.       javaD.deserialize(bytes) 
  25.     } 
  26.  
  27.   } 
  28.  
  29.   override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { 
  30.     val s = StandardCharsets.UTF_8.decode(bytes).toString() 
  31.     if (LocalNonOpSerializerInstance.maps.containsKey(s)) { 
  32.       LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] 
  33.     } else { 
  34.       bytes.flip() 
  35.       javaD.deserialize(bytes, loader) 
  36.     } 
  37.   } 
  38.  
  39.   override def serializeStream(s: OutputStream): SerializationStream = { 
  40.     javaD.serializeStream(s) 
  41.   } 
  42.  
  43.   override def deserializeStream(s: InputStream): DeserializationStream = { 
  44.     javaD.deserializeStream(s) 
  45.   } 

接著我們需要再封裝一個(gè)LocalNonOpSerializer,

  1. class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable { 
  2.   val javaS = new JavaSerializer(conf) 
  3.  
  4.   override def newInstance(): SerializerInstance = { 
  5.     new LocalNonOpSerializerInstance(javaS.newInstance()) 
  6.   } 
  7.  
  8.   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { 
  9.     javaS.writeExternal(out
  10.   } 
  11.  
  12.   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { 
  13.     javaS.readExternal(in
  14.   } 

現(xiàn)在,萬事俱備,只欠東風(fēng)了,我們?cè)趺床拍馨堰@些代碼讓Spark運(yùn)行起來。具體做法非常魔幻,實(shí)現(xiàn)一個(gè)enhance類:

  1. def enhanceSparkEnvForAPIService(session: SparkSession) = { 
  2.       val env = SparkEnv.get 
  3.    //創(chuàng)建一個(gè)新的WowSparkEnv對(duì)象,然后將里面的Serializer替換成我們自己的LocalNonOpSerializer 
  4.     val wowEnv = new WowSparkEnv( 
  5.  ..... 
  6.       new LocalNonOpSerializer(env.conf): Serializer, 
  7.  ....) 
  8.     // 將SparkEnv object里的實(shí)例替換成我們的 
  9.     //WowSparkEnv 
  10.     SparkEnv.set(wowEnv) 
  11.   //但是很多地方在SparkContext啟動(dòng)后都已經(jīng)在使用之前就已經(jīng)生成的SparkEnv,我們需要做些調(diào)整 
  12. //我們先把之前已經(jīng)啟動(dòng)的LocalSchedulerBackend里的scheduer停掉 
  13.     val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend] 
  14.  
  15.     val scheduler = ReflectHelper.field(localScheduler, "scheduler"
  16.  
  17.     val totalCores = localScheduler.totalCores 
  18.     localScheduler.stop() 
  19.  
  20.   //創(chuàng)建一個(gè)新的LocalSchedulerBackend 
  21.     val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores) 
  22.     wowLocalSchedulerBackend.start() 
  23.  //把SparkContext里的_schedulerBackend替換成我們的實(shí)現(xiàn) 
  24.     ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend) 
  25.   } 

完工。

其實(shí)還有很多

比如在Spark里,Python Worker默認(rèn)一分鐘沒有被使用是會(huì)被殺死的,但是在StreamingPro里,這些python worker因?yàn)槎家虞d模型,所以啟動(dòng)成本是非常高的,殺了之后再啟動(dòng)就沒辦法忍受了,通過類似的方式進(jìn)行魔改,從而使得空閑時(shí)間是可配置的。如果大家感興趣,可以翻看StreamingPro相關(guān)代碼。

責(zé)任編輯:未麗燕 來源: 簡(jiǎn)書
相關(guān)推薦

2019-11-13 15:46:56

硬件CPU主板

2017-03-02 17:40:20

Linux移動(dòng)存儲(chǔ)設(shè)備

2022-09-23 13:57:11

xxl-job任務(wù)調(diào)度中間件

2018-10-31 15:36:02

CPU優(yōu)點(diǎn)缺點(diǎn)

2017-12-25 10:40:01

Python單例字典模塊

2021-06-06 19:03:25

SQL大數(shù)據(jù)Spark

2021-04-30 07:33:58

微軟Android系統(tǒng)Surface Duo

2017-06-21 08:39:20

SparkScalaHDFS

2016-11-07 16:06:43

大數(shù)據(jù)SparkImpala

2021-07-26 08:49:27

Windows 11操作系統(tǒng)微軟

2021-12-13 17:53:19

谷歌Transformer技術(shù)

2022-04-18 11:05:36

開源github代碼庫

2023-06-13 07:06:30

RTX顯存位公版卡

2022-01-26 20:01:24

管理工具knife4j

2024-04-15 07:50:00

AI架構(gòu)

2018-03-01 08:39:34

HadoopSpark加密貨幣

2022-01-17 09:19:12

Transformer數(shù)據(jù)人工智能

2022-01-10 06:03:51

Windows 11操作系統(tǒng)微軟

2024-06-03 10:56:53

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

真人做人试看60分钟免费| 久久人91精品久久久久久不卡| 北条麻妃在线视频观看| 色一情一乱一区二区三区| 狠狠干综合网| 亚洲大胆人体av| 成人观看免费完整观看| 国产九色在线| 韩国三级在线一区| 久久久久久91香蕉国产| 国产成人无码一区二区在线观看 | 日韩av网站免费在线| 亚洲欧洲一区二区三区在线观看| 国模杨依粉嫩蝴蝶150p| 91在线免费看| 国产激情视频一区二区三区欧美 | 成年人在线观看| 日本成人在线视频网站| 这里只有精品久久| 国模大尺度视频| av在线免费网址| 99精品一区二区三区| 26uuu日韩精品一区二区| 国产av自拍一区| 91精品亚洲一区在线观看| 亚洲一二三四在线| 欧洲精品一区色| av免费观看在线| 亚洲午夜在线| 亚洲一区999| 91性高潮久久久久久久| 青草在线视频| 精品久久久久久久久久久下田| 欧美日韩在线精品一区二区三区激情| 国产精品二区二区三区| 日韩一区二区视频在线| 五月综合激情| 亚洲女人天堂av| 三级黄色片免费看| 久久久成人av毛片免费观看| 亚洲免费在线视频一区 二区| 久久久久久久久一区| 国产黄色片网站| 丝袜亚洲另类欧美| 久久久久久久国产| 貂蝉被到爽流白浆在线观看| 欧美调教网站| 日韩免费一区二区| 成人性生交免费看| 亚洲涩涩在线| 午夜伊人狠狠久久| 国产黄色激情视频| 毛片在线播放a| 国产欧美日产一区| 韩国一区二区三区美女美女秀 | 黄色在线播放网站| 久久精品一区二区三区不卡| 成人精品一二区| 国产精品国产一区二区三区四区 | 9l亚洲国产成人精品一区二三| 精品视频色一区| 国产xxxxx视频| 夜鲁夜鲁夜鲁视频在线播放| 亚洲午夜影视影院在线观看| 精品国产一区二区三区在线| 天堂地址在线www| 国产女主播一区| 日韩欧美一区二区三区四区| 久久免费看视频| 国产亚洲综合在线| 日本一区二区三区视频免费看| 嫩草研究院在线观看| 99精品桃花视频在线观看| 国产视频精品网| 日韩一级免费视频| www.亚洲精品| 国产综合色一区二区三区| 亚洲av无码片一区二区三区| 国产大陆精品国产| 国产福利不卡| 粉嫩av一区二区夜夜嗨| 成人精品在线视频观看| 国产麻豆日韩| 天天av综合网| 久久夜色精品国产噜噜av| 日产国产精品精品a∨| 国产一级在线| 一区视频在线播放| 国产福利片一区二区| 欧美三级黄网| 亚洲人妖av一区二区| 国产天堂视频在线观看| а√在线天堂官网| 欧美日韩午夜激情| av片中文字幕| 欧美三级电影网址| 欧美一区二区三区思思人| 香蕉视频色在线观看| 激情亚洲另类图片区小说区| 亚洲美女视频网站| 天堂在线中文视频| 欧美成人精品| 91精品国产高清久久久久久久久| 国产精品国产三级国产专区52| 鲁大师成人一区二区三区| 国产在线视频一区| 国产综合无码一区二区色蜜蜜| 97精品国产97久久久久久久久久久久| 日韩国产伦理| 好久没做在线观看| 91久久精品一区二区| 国产无遮挡猛进猛出免费软件| 日本在线一区二区三区| 日韩精品福利在线| 国产免费一区二区三区四区| 亚洲人人精品| 国产免费一区二区三区在线能观看| 亚洲一区二区人妻| 99国产精品国产精品毛片| 精品国产乱码久久久久久蜜柚| yw视频在线观看| 亚洲午夜久久久久久久久电影院 | 天堂中文在线8| 中文字幕亚洲精品在线观看| 免费看国产曰批40分钟| 精品女同一区二区三区在线观看| 欧美videofree性高清杂交| 性欧美一区二区| 黄色成人在线网址| 日本高清视频精品| www天堂在线| 亚洲国产精品激情在线观看| 一本久道高清无码视频| 日韩av电影资源网| 亚洲福利视频久久| 国产福利视频网站| 久久天堂成人| 国产麻豆乱码精品一区二区三区| 巨大荫蒂视频欧美另类大| 一本色道久久综合精品竹菊| 91香蕉视频免费看| 成人毛片在线| 国产91色在线|| 亚洲av电影一区| 亚洲精选视频在线| 久久久久久久久久一区| 极品美女一区二区三区| 韩国国内大量揄拍精品视频| 中日韩av在线| 国产午夜亚洲精品理论片色戒 | 制服丝袜中文字幕在线| 色屁屁一区二区| 国产精品久久久免费观看| 黄色日韩在线| 国产伦精品一区二区三区免费视频| 在线视频观看国产| 911精品国产一区二区在线| 大胸美女被爆操| 巨乳诱惑日韩免费av| 久久爱av电影| 国产理论在线| 日韩三级视频在线看| 91成人精品一区二区| 在线视频观看日韩| 不卡日韩av| 欧美1234区| 日韩片之四级片| 国产探花在线免费观看| 国产一区二区三区四| 伊人情人网综合| 亚洲a成人v| 日韩在线视频免费观看| 中文字幕在线视频第一页| 久久日韩精品一区二区五区| 精品少妇一区二区三区在线| 欧美黄色录像| 国产不卡一区二区在线播放| 欧美婷婷久久五月精品三区| 黑人精品xxx一区| 一级性生活大片| 久热精品在线| 在线观看成人av| 高清国产一区二区三区四区五区| 另类美女黄大片| 丰满少妇一级片| 欧美午夜女人视频在线| 国产传媒国产传媒| 久久99久久久久久久久久久| 亚洲精品中文综合第一页| 亚洲青青一区| 久久久久久久香蕉网| 天天av天天翘| 欧美视频精品在线观看| h色网站在线观看| 高清在线不卡av| 国产在线青青草| 日韩av有码| av在线不卡一区| av影院在线免费观看| 亚洲欧美国产视频| 96日本xxxxxⅹxxx17| 一区二区三区四区不卡在线 | 亚洲色图18p| 国产精品视频无码| 亚洲一区二区视频在线观看| 白嫩情侣偷拍呻吟刺激| 久久久久综合| 日韩国产精品毛片| 成人精品动漫一区二区三区| 日本国产高清不卡| yjizz视频网站在线播放| 7777精品伊人久久久大香线蕉| 国产在线观看99| 久久九九久精品国产免费直播| 日本高清久久久| 99热在线精品观看| 亚洲自拍的二区三区| 91成人短视频在线观看| 久久久久久久久电影| 成年人在线观看网站| 精品不卡在线视频| 艳妇乳肉豪妇荡乳av| 精品人伦一区二区三区蜜桃免费| 三级影片在线观看| 久久久电影一区二区三区| 亚洲熟女乱综合一区二区| 久久久蜜桃一区二区人| 男人的天堂avav| 色欧美自拍视频| 欧美日本国产精品| 伊人久久大香线蕉av超碰| 国产精品青草久久久久福利99| 草美女在线观看| 久久天天躁夜夜躁狠狠躁2022| 黄色av免费在线看| 亚洲第一国产精品| www.色亚洲| 欧美麻豆精品久久久久久| 中文字幕在线看人| 亚洲成va人在线观看| 日韩a级片在线观看| 国产午夜精品一区二区三区嫩草| 中文视频在线观看| 国产一区二区剧情av在线| 国产熟人av一二三区| 999亚洲国产精| dy888午夜| 日韩精品午夜| 日韩高清三级| 国产日产精品一区二区三区四区的观看方式| 91香蕉电影院| 二区三区精品| 91人人爽人人爽人人精88v| 亚洲欧美在线综合| 成人午夜小视频| 四虎影视成人精品国库在线观看| 日韩av免费在线播放| 涩涩网在线视频| 性视频1819p久久| av中文字幕电影在线看| 色综合视频网站| 日本h片在线观看| 九九视频直播综合网| 成年视频在线观看| 欧美片一区二区三区| 午夜影院免费在线| 久久久久国产精品一区| av伦理在线| 97涩涩爰在线观看亚洲| 午夜欧美激情| 欧美亚洲免费电影| 依依综合在线| 国产成人短视频| 欧美日韩尤物久久| 成人黄色免费片| 成人性片免费| 国产美女直播视频一区| www一区二区三区| 亚洲自拍小视频| 精品国产一区二区三区2021| 国产精品视频一区二区高潮| 国产香蕉久久| 91精品国产综合久久久久久丝袜| 成人h动漫免费观看网站| 国产一区二区三区无遮挡| 一区二区三区韩国免费中文网站| 日本免费高清不卡| 91久久电影| 亚洲五码在线观看视频| 在线播放一区| 成年人小视频网站| 韩国一区二区在线观看| www.四虎在线| 久久午夜羞羞影院免费观看| 老司机福利在线观看| 一卡二卡三卡日韩欧美| 国产午夜精品无码一区二区| 色婷婷国产精品久久包臀| 亚洲天堂网在线观看视频| 精品剧情v国产在线观看在线| 男人的天堂在线视频| 日韩在线国产精品| 91豆花视频在线播放| 国产精品黄色影片导航在线观看| 99久久久国产| 久久伊人资源站| 91精品国偷自产在线电影| 国产一级爱c视频| 蜜桃视频第一区免费观看| 国产性猛交96| 国产精品久久久久影院亚瑟| 日韩精品一区二区三| 欧美精品少妇一区二区三区| 欧美一级淫片aaaaaa| 伊人久久久久久久久久久| 黄网站在线播放| 欧美一级视频一区二区| 国产精品一站二站| 欧美日韩亚洲综合一区二区三区激情在线| 久久久影院免费| 成人观看免费完整观看| 久久精品国产亚洲aⅴ| 人妻 丝袜美腿 中文字幕| 国产视频亚洲色图| 欧美激情国产精品免费| 91久久国产综合久久| 国产强被迫伦姧在线观看无码| 亚洲欧美激情一区| 国内小视频在线看| 91沈先生在线观看| 成人国产精品一级毛片视频| 成人免费性视频| 国产激情视频一区二区三区欧美| 手机免费看av| 午夜av一区二区| www.av日韩| 久久精品亚洲一区| a成人v在线| 欧洲久久久久久| 亚洲中字黄色| 给我免费观看片在线电影的| 亚洲六月丁香色婷婷综合久久 | 中文字幕在线播放| 欧美一级电影久久| 成人精品在线| 中文字幕一区二区三区5566| 久久先锋影音| 国产十八熟妇av成人一区| 中文字幕一区免费在线观看| 亚洲无码精品一区二区三区| 日韩电影在线观看中文字幕 | 欧美激情按摩在线| 爱情电影网av一区二区| 日韩精品无码一区二区三区| 久久精品男女| 国产成人精品无码免费看夜聊软件| 天天色综合成人网| 五月婷婷丁香网| 性色av一区二区三区在线观看 | 91美女片黄在线观| 天天影视天天精品| 一级做a免费视频| 亚洲欧美怡红院| 国产女无套免费视频| 欧美成人免费在线观看| 超碰国产精品一区二页| 日本黄色a视频| 国产精品 欧美精品| 久久午夜无码鲁丝片| 亚洲第一福利网站| 成人私拍视频| 日韩欧美亚洲区| 国产最新精品精品你懂的| 538精品在线视频| 精品国产乱码久久久久久1区2区 | 久久激情五月丁香伊人| 欧美男男gaygay1069| 久久av喷吹av高潮av| 国产精品正在播放| 豆国产97在线 | 亚洲| 亚洲精品美女免费| 欧美三级电影网址| 精品少妇一区二区三区在线| 国产精品理论在线观看| 亚洲国产精彩视频| 国产精品久久色| 精品1区2区3区4区| 久久免费手机视频| 亚洲成av人乱码色午夜| 88xx成人网| 国产素人在线观看| 国产精品不卡在线| 午夜成人免费影院| 亚洲一区二区三区在线视频| 亚洲欧美久久久| 五月婷婷一区二区| 一区二区三区四区视频| 国产成人精品福利| 亚洲高清在线不卡| 色素色在线综合|