精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Kubernetes Informer基本原理,你明白了嗎?

開發(fā) 前端
informer 本質(zhì)其實(shí)就是一個通過 deltaFifo 建立生產(chǎn)消費(fèi)機(jī)制,并且?guī)в斜镜鼐彺婧退饕约翱梢宰曰卣{(diào)事件的 apiServer 的客戶端庫。

本文分析 k8s controller 中 informer 啟動的基本流程

不論是 k8s 自身組件,還是自己編寫 controller,都需要通過 apiserver 監(jiān)聽 etcd 事件來完成自己的控制循環(huán)邏輯。

如何高效可靠進(jìn)行事件監(jiān)聽,k8s 客戶端工具包 client-go 提供了一個通用的 informer 包,通過 informer,可以方便和高效的進(jìn)行 controller 開發(fā)。

informer 包提供了如下的一些功能:

1、本地緩存(store)

2、索引機(jī)制(indexer)

3、Handler 注冊功能(eventHandler)

1、informer 架構(gòu)

整個 informer 機(jī)制架構(gòu)如下圖(圖片源自 Client-go):

圖片圖片

可以看到這張圖分為上下兩個部分,上半部分由 client-go 提供,下半部分則是需要自己實(shí)現(xiàn)的控制循環(huán)邏輯

本文主要分析上半部分的邏輯,包括下面幾個組件:

1.1、Reflector:

從圖上可以看到 Reflector 是一個和 apiserver 交互的組件,通過 list 和 watch api 將資源對象壓入隊(duì)列

1.2、DeltaFifo:

DeltaFifo的結(jié)構(gòu)體示意如下:

type DeltaFIFO struct {
  ...
  // We depend on the property that items in the s    et are in
  // the queue and vice versa, and that all Deltas in this
  // map have at least one Delta.
  items map[string]Deltas
  queue []string
  ...
}

主要分為兩部分,fifo 和 delta

(1)fifo:先進(jìn)先出隊(duì)列

對應(yīng)結(jié)構(gòu)體中的 queue,結(jié)構(gòu)體示例如下:

[default/centos-fd77b5886-pfrgn, xxx, xxx]

(2)delta:對應(yīng)結(jié)構(gòu)體中的items,存儲了資源對象并且攜帶了資源操作類型的一個 map,結(jié)構(gòu)體示例如下:

map:{"default/centos-fd77b5886-pfrgn":[{Replaced &Pod{ObjectMeta: ${pod參數(shù)}], "xxx": [{},{}]}

消費(fèi)者從 queue 中 pop 出對象進(jìn)行消費(fèi),并從 items 獲取具體的消費(fèi)操作(執(zhí)行動作 Update/Deleted/Sync,和執(zhí)行的對象 object spec)

1.3、Indexer:

client-go 用來存儲資源對象并自帶索引功能的本地存儲,deltaFIFO 中 pop 出的對象將存儲到 Indexer。

indexer 與 etcd 集群中的數(shù)據(jù)保持一致,從而 client-go 可以直接從本地緩存獲取資源對象,減少 apiserver 和 etcd 集群的壓力。

2、一個基本例子

func main() {

  stopCh := make(chan struct{})
  defer close(stopCh)
  
  // (1)New a k8s clientset
  masterUrl := "172.27.32.110:8080"
  config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")
  if err != nil {
    klog.Errorf("BuildConfigFromFlags err, err: %v", err)
  }
  
  clientset, err := k.NewForConfig(config)
  if err != nil {
    klog.Errorf("Get clientset err, err: %v", err)
  }
  
  // (2)New a sharedInformers factory
  sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)
  
  
  // (3)Register a informer
  //  f.informers[informerType] = informer,
  //  the detail for informer is build in NewFilteredPodInformer()
  podInformer := sharedInformers.Core().V1().Pods().Informer()
  
  // (4)Register event handler
  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
        mObj := obj.(v1.Object)
        klog.Infof("Get new obj: %v", mObj)
        klog.Infof("Get new obj name: %s", mObj.GetName())
      },
  })
  
  // (5)Start all informers
  sharedInformers.Start(stopCh)
  
  // (6)A cronjob for cache sync
  if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
    klog.Infof("Cache sync fail!")
  }
  
  // (7)Use lister
  podLister := sharedInformers.Core().V1().Pods().Lister()
  pods, err := podLister.List(labels.Everything())
  if err != nil {
    klog.Infof("err: %v", err)
  }
  klog.Infof("len(pods), %d", len(pods))
  for _, v := range pods {
    klog.Infof("pod: %s", v.Name)
  }
  
  <- stopChan
}

上面就是一個簡單的 informer 的使用例子,整個過程如上述幾個步驟,著重說一下(2)、(3)、(4)、(5)四個步驟

3、流程分析

3.1、New a sharedInformers factory

sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)

factory := &sharedInformerFactory{
  client:           client,
  namespace:        v1.NamespaceAll,
  defaultResync:    defaultResync,
  informers:        make(map[reflect.Type]cache.SharedIndexInformer),
  startedInformers: make(map[reflect.Type]bool),
  customResync:     make(map[reflect.Type]time.Duration),
}

這個過程就是創(chuàng)建一個 informer 的工廠 sharedInformerFactory,sharedInformerFactory 中有一個 informers 對象,里面是一個 informer 的 map,sharedInformerFactory 是為了防止過多的重復(fù) informer 監(jiān)聽 apiserver,導(dǎo)致 apiserver 壓力過大,在同一個服務(wù)中,不同的 controller 使用同一個 informer

3.2、Register a informer

這個過程主要是生成和注冊 informer 到 sharedInformerFactory

podInformer := sharedInformers.Core().V1().Pods().Informer()

func (f *podInformer) Informer() cache.SharedIndexInformer {
  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

### f.factory.InformerFor:
### 注冊 informer 
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
  ...
  informer = newFunc(f.client, resyncPeriod)
  f.informers[informerType] = informer
  return informer
}

### f.defaultInformer:
### 生成 informer
func (f *podInformer) defaultInformer(client k.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client k.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
  return cache.NewSharedIndexInformer(
    &cache.ListWatch{
    ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
      if tweakListOptions != nil {
        tweakListOptions(&options)
      }
      return client.CoreV1().Pods(namespace).List(context.TODO(), options)
    },
    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
      if tweakListOptions != nil {
        tweakListOptions(&options)
      }
      return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
    },
    },
    &corev1.Pod{},
    resyncPeriod,
    indexers,
  )
}

### cache.NewSharedIndexInformer:
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  realClock := &clock.RealClock{}
  sharedIndexInformer := &sharedIndexInformer{
    processor:                       &sharedProcessor{clock: realClock},
    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    listerWatcher:                   lw,
    objectType:                      exampleObject,
    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
    clock:                           realClock,
  }
  return sharedIndexInformer
}

首先通過 f.defaultInformer 方法生成 informer,然后通過 f.factory.InformerFor 方法,將 informer 注冊到 sharedInformerFactory

3.3、Register event handler

這個過程展示如何注冊一個回調(diào)函數(shù),以及如何觸發(fā)這個回調(diào)函數(shù)

### podInformer.AddEventHandler:
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {

  ...
  listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(),  initialBufferSize)
  if !s.started {
    s.processor.addListener(listener)
    return
  }
  ...
}

### s.processor.addListener(listener):
func (p *sharedProcessor) addListener(listener *processorListener) {
  p.addListenerLocked(listener)
  if p.listenersStarted {
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)
  }
}

### listener.run:
func (p *processorListener) run() {
  // this call blocks until the channel is closed.  When a panic happens during the notification
  // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
  // the next notification will be attempted.  This is usually better than the alternative of never
  // delivering again.
  stopCh := make(chan struct{})
  wait.Until(func() {
    for next := range p.nextCh {
      switch notification := next.(type) {        // 通過next結(jié)構(gòu)體本身的類型來判斷事件類型
      case updateNotification:
        p.handler.OnUpdate(notification.oldObj, notification.newObj)
      case addNotification:
        p.handler.OnAdd(notification.newObj)
      case deleteNotification:
        p.handler.OnDelete(notification.oldObj)
      default:
        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
      }
    }
    // the only way to get here is if the p.nextCh is empty and closed
    close(stopCh)
  }, 1*time.Second, stopCh)
}

### listener.pop:
func (p *processorListener) pop() {

  var nextCh chan<- interface{}
  var notification interface{}
  for {
    select {
    case nextCh <- notification:
      // Notification dispatched
      var ok bool
      notification, ok = p.pendingNotifications.ReadOne()
      if !ok { // Nothing to pop
        nextCh = nil // Disable this select case
      }
    case notificationToAdd, ok := <-p.addCh:
      if !ok {
        return
      }
      if notification == nil { // No notification to pop (and pendingNotifications is empty)
        // Optimize the case - skip adding to pendingNotifications
        notification = notificationToAdd
        nextCh = p.nextCh
      } else { // There is already a notification waiting to be dispatched
        p.pendingNotifications.WriteOne(notificationToAdd)
      }
    }
  }
}

這個過程總結(jié)就是:

(1)AddEventHandler 到 sharedProcessor,注冊事件回調(diào)函數(shù)到 sharedProcessor

(2)listener pop 方法里會監(jiān)聽 p.addCh,通過 nextCh = p.nextCh 將 addCh 將事件傳遞給 p.nextCh

(3)listener run 方法里會監(jiān)聽 p.nextCh,收到信號之后,判斷是屬于什么類型的方法,并且執(zhí)行前面注冊的 Handler

所以后面需要關(guān)注當(dāng)資源對象發(fā)生變更時,是如何將變更信號給 p.addCh,進(jìn)一步觸發(fā)回調(diào)函數(shù)的

3.4、Start all informers

通過 sharedInformers.Start(stopCh)啟動所有的 informer,代碼如下:

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  for informerType, informer := range f.informers {
    if !f.startedInformers[informerType] {
      go informer.Run(stopCh)
      f.startedInformers[informerType] = true
    }
  }
}

我們的例子中其實(shí)就只啟動了 PodInformer,接下來看到 podInformer 的 Run 方法做了什么

### go informer.Run(stopCh):

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}){
  defer utilruntime.HandleCrash()

  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{   // Deltafifo
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,
  })
  cfg := &Config{
    Queue:            fifo,         // Deltafifo
    ListerWatcher:    s.listerWatcher,  // listerWatcher
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,
    // HandleDeltas, added to process, and done in processloop
    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,
  }

  func() {
    ...
    s.controller = New(cfg)
    ...
  }
  
  s.controller.Run(stopCh)
}
### s.controller.Run(stopCh)
func (c *controller) Run(stopCh <-chan struct{}) {

  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  c.reflector = r

  // Run reflector
  wg.StartWithChannel(stopCh, r.Run)  

  // Run processLoop, pop from deltafifo and do ProcessFunc,
  // ProcessFunc is the s.HandleDeltas before
  wait.Until(c.processLoop, time.Second, stopCh)
}

可以看到上面的邏輯首先生成一個 DeltaFifo,然后接下來的邏輯分為兩塊,生產(chǎn)和消費(fèi):

(1)生產(chǎn)—r.Run:

主要的邏輯就是利用 list and watch 將資源對象包括操作類型壓入隊(duì)列 DeltaFifo

#### r.Run:

func (r *Reflector) Run(stopCh <-chan struct{}) {
// 執(zhí)行l(wèi)istAndWatch
if err := r.ListAndWatch(stopCh);
}

// 執(zhí)行ListAndWatch流程
func (r *Reflector)ListAndWatch(stopCh <-chan struct{}) error{
  // 1、list:
  // (1)、list pods, 實(shí)際調(diào)用的是podInformer里的ListFunc方法,
  // client.CoreV1().Pods(namespace).List(context.TODO(), options)
  
  r.listerWatcher.List(opts)
  // (2)、獲取資源版本號,用于watch
  resourceVersion = listMetaInterface.GetResourceVersion()
  
  //  (3)、數(shù)據(jù)轉(zhuǎn)換,轉(zhuǎn)換成列表
  items, err := meta.ExtractList(list)
  
  // (4)、將資源列表中的資源對象和版本號存儲到DeltaFifo中
  r.syncWith(items, resourceVersion);
  
  // 2、watch,無限循環(huán)去watch apiserver,當(dāng)watch到事件的時候,執(zhí)行watchHandler將event事件壓入fifo
  for {
    // (1)、watch pods, 實(shí)際調(diào)用的是podInformer里的WatchFunc方法,
    // client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
    w, err := r.listerWatcher.Watch(options)
    
    // (2)、watchHandler
    // watchHandler watches pod,更新DeltaFifo信息,并且更新resourceVersion
    if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
  }
}

### r.watchHandler
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    ...
loop:
  for {
    select {
    case event, ok := <-w.ResultChan():
      newResourceVersion := meta.GetResourceVersion()
      switch event.Type {
      case watch.Added:
        err := r.store.Add(event.Object)    // Add event to srore, store的具體方法在fifo中
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
        }
      ...
      }
      *resourceVersion = newResourceVersion
      r.setLastSyncResourceVersion(newResourceVersion)
      eventCount++
    }
  }
  ...
}

### r.store.Add:
## 即為deltaFifo的add方法:

func (f *DeltaFIFO) Add(obj interface{}) error {
  ...
  return f.queueActionLocked(Added, obj)
  ...
}

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  id, err := f.KeyOf(obj)
  if err != nil {
    return KeyError{obj, err}
  }
  newDeltas := append(f.items[id], Delta{actionType, obj})
  newDeltas = dedupDeltas(newDeltas)
  if len(newDeltas) > 0 {
    if _, exists := f.items[id]; !exists {
      f.queue = append(f.queue, id)
    }

    f.items[id] = newDeltas
    f.cond.Broadcast()          // 通知所有阻塞住的消費(fèi)者
  }
  ...
  return nil
}
(2)消費(fèi)—c.processLoop:

消費(fèi)邏輯就是從 DeltaFifo pop 出對象,然后做兩件事情:(1)觸發(fā)前面注冊的 eventhandler (2)更新本地索引緩存 indexer,保持?jǐn)?shù)據(jù)和 etcd 一致

func (c *controller) processLoop() {
  for {
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
  }
}

### Queue.Pop:
## Queue.Pop是一個帶有處理函數(shù)的pod方法,首先先看Pod邏輯,即為deltaFifo的pop方法:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  for {                       // 無限循環(huán)
    for len(f.queue) == 0 {
      f.cond.Wait()       // 阻塞直到生產(chǎn)端broadcast方法通知
    }
    id := f.queue[0]
    item, ok := f.items[id]
    delete(f.items, id)
    err := process(item)        // 執(zhí)行處理方法
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)     // 如果處理失敗的重新加入到fifo中重新處理
      err = e.Err
    }
    return item, err
  }
}

### c.config.Process:
## c.config.Process是在初始化controller的時候賦值的,即為前面的s.HandleDeltas

### s.HandleDeltas:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  s.blockDeltas.Lock()
  defer s.blockDeltas.Unlock()
  // from oldest to newest
  for _, d := range obj.(Deltas) {
    switch d.Type {
    case Sync, Replaced, Added, Updated:
      s.cacheMutationDetector.AddObject(d.Object)
        if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
          if err := s.indexer.Update(d.Object); err != nil {
            return err
          }
          isSync := false
          switch {
          case d.Type == Sync:
            // Sync events are only propagated to listeners that requested resync
            isSync = true
          case d.Type == Replaced:
            if accessor, err := meta.Accessor(d.Object); err == nil {
                if oldAccessor, err := meta.Accessor(old); err == nil {
                  // Replaced events that didn't change resourceVersion are treated as resync events
                  // and only propagated to listeners that requested resync
                  isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                }
            }
          }
          s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
        } else {
          if err := s.indexer.Add(d.Object); err != nil {
            return err
          }
          s.processor.distribute(addNotification{newObj: d.Object}, false)
        }
    case Deleted:
      if err := s.indexer.Delete(d.Object); err != nil {
        return err
      }
      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    }
  }
  return nil
}

可以看到上面主要執(zhí)行兩部分邏輯:

s.processor.distribute
#### s.processor.distribute:
### 例如新增通知:s.processor.distribute(addNotification{newObj: d.Object}, false)
### 其中addNotification就是add類型的通知,后面會通過notification結(jié)構(gòu)體的類型來執(zhí)行不同的eventHandler

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  p.listenersLock.RLock()
  defer p.listenersLock.RUnlock()
  
  if sync {
    for _, listener := range p.syncingListeners {
      listener.add(obj)
    }
  } else {
    for _, listener := range p.listeners {
      listener.add(obj)
    }
  }
}

func (p *processorListener) add(notification interface{}) {
  p.addCh <- notification     // 新增notification到addCh
}

這里 p.addCh 對應(yīng)到前面說的關(guān)注對象 p.addCh,processorListener 收到 addCh 信號之后傳遞給 nextCh,然后通過 notification 結(jié)構(gòu)體的類型來執(zhí)行不同的 eventHandler

s.indexer 的增刪改:

這個就是本地?cái)?shù)據(jù)的緩存和索引,自定義控制邏輯里面會通過 indexer 獲取操作對象的具體參數(shù),這里就不展開細(xì)講了。

4、總結(jié)

至此一個 informer 的 client-go 部分的流程就走完了,可以看到啟動 informer 主要流程就是:

1、Reflector ListAndWatch:

(1)通過一個 reflector run 起來一個帶有 list 和 watch api 的 client

(2)list 到的 pod 列表通過 DeltaFifo 存儲,并更新最新的 ResourceVersion

(3)繼續(xù)監(jiān)聽 pod,監(jiān)聽到的 pod 操作事件繼續(xù)存儲到 DeltaFifo 中

2、DeltaFifo 生產(chǎn)和消費(fèi):

(1)生產(chǎn):list and watch 到的事件生產(chǎn)壓入隊(duì)列 DeltaFifo

(2)消費(fèi):執(zhí)行注冊的 eventHandler,并更新本地 indexer

所以 informer 本質(zhì)其實(shí)就是一個通過 deltaFifo 建立生產(chǎn)消費(fèi)機(jī)制,并且?guī)в斜镜鼐彺婧退饕约翱梢宰曰卣{(diào)事件的 apiServer 的客戶端庫。

5、參考

責(zé)任編輯:武曉燕 來源: 政采云技術(shù)
相關(guān)推薦

2019-11-28 10:45:28

ZooKeeper源碼分布式

2022-08-16 07:57:30

架構(gòu)

2012-01-12 14:37:34

jQuery

2009-02-24 09:43:00

IP電話原理

2011-11-29 12:17:00

2013-04-07 14:09:55

Android應(yīng)用基本

2010-08-20 13:29:33

OFDM

2016-08-18 00:04:09

網(wǎng)絡(luò)爬蟲抓取系統(tǒng)服務(wù)器

2020-03-21 14:57:14

手機(jī)定位智能手機(jī)APP

2021-02-08 21:40:04

SockmapBPF存儲

2016-08-17 23:53:29

網(wǎng)絡(luò)爬蟲抓取系統(tǒng)

2010-09-08 15:25:15

Linux系統(tǒng)分區(qū)

2009-06-11 09:56:09

MySQL Repli原理

2011-07-07 14:10:21

Cocoa 內(nèi)省 hash

2020-12-29 16:55:44

ZooKeeper運(yùn)維數(shù)據(jù)結(jié)構(gòu)

2010-03-17 13:35:02

2010-03-18 20:13:03

Java socket

2011-07-07 14:46:10

Cocoa Xcode

2010-06-18 17:28:37

Linux Anacr

2019-04-30 08:15:31

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

亚洲熟妇av乱码在线观看| 中文字幕在线永久| 在线观看三级视频| 99精品国产99久久久久久白柏| 91大神在线播放精品| 精品少妇人妻一区二区黑料社区| 久久人体av| 亚洲不卡一区二区三区| 日韩欧美一区二区三区四区五区 | 禁断一区二区三区在线| 欧美丰满一区二区免费视频| 成人午夜免费在线| 四虎久久免费| 97久久超碰国产精品| 国产精品午夜国产小视频| 久久精品无码人妻| 欧美自拍偷拍| 亚洲精品成人久久| 亚洲精品第三页| 91精品论坛| 亚洲国产视频在线| 一本一道久久a久久精品综合| 日韩一级片免费观看| 久久99九九99精品| 日本精品久久中文字幕佐佐木| 岛国毛片在线观看| 日本不卡二三区| 日韩精品免费视频| 国产伦精品一区二区三区妓女下载| 国产精品久久久久av电视剧| 亚洲国产欧美在线| 超碰在线免费观看97| shkd中文字幕久久在线观看| 91女人视频在线观看| 亚洲精品免费av| 伊人精品在线视频| 日本视频中文字幕一区二区三区| 亚州欧美日韩中文视频| 激情综合五月网| 久久久久亚洲| 色阁综合伊人av| 亚洲最大的黄色网| 成人自拍在线| 日韩精品专区在线影院观看 | 成人黄色毛片| 色综合视频一区二区三区高清| 999在线观看视频| 欧美黑人猛交| 亚洲一区免费观看| www.日本三级| 国产又色又爽又黄刺激在线视频| 一区二区在线观看视频| 亚洲成年人专区| 国产一二区在线观看| 国产精品久久久久久久久快鸭| 日本在线视频不卡| 成a人片在线观看www视频| 久久精品亚洲国产奇米99| 欧美高清视频一区| 欧美精品a∨在线观看不卡| 91香蕉视频在线| 久久这里精品国产99丫e6| 深夜影院在线观看| 久久久综合九色合综国产精品| 免费观看国产成人| 国产中文在线观看| 国产精品美女久久久久久| 亚洲视频在线二区| www视频在线看| 亚洲图片自拍偷拍| 乱妇乱女熟妇熟女网站| 原纱央莉成人av片| 精品视频资源站| 亚洲AV无码久久精品国产一区| 天堂久久av| 亚洲精品国产综合区久久久久久久| 800av在线播放| 免费不卡中文字幕在线| 尤物99国产成人精品视频| 日本二区三区视频| 合欧美一区二区三区| 91成人天堂久久成人| 狠狠躁夜夜躁人人爽视频| 激情五月婷婷综合| 国产高清在线一区二区| 婷婷丁香花五月天| 91蝌蚪porny九色| 中文字幕久久综合| 国产激情视频在线看| 91成人免费在线视频| 伊人五月天婷婷| 日韩影片在线观看| 亚洲视频在线观看| 国产97免费视频| 免费永久网站黄欧美| 国产精品入口夜色视频大尺度| 国产成人精品免费看在线播放| 国产精品揄拍100视频| 99精品电影| 97香蕉久久超级碰碰高清版| 在线免费一区二区| 国产成人免费在线视频| 秋霞久久久久久一区二区| 直接在线观看的三级网址| 色哟哟在线观看一区二区三区| 一区二区久久精品| 伊人久久综合影院| 久久91精品国产91久久跳| 一级片在线观看免费| 国产精品 欧美精品| 日本一区二区在线| 91黄页在线观看| 在线综合+亚洲+欧美中文字幕| 亚洲最大免费视频| 欧美精品99| 国产日韩欧美视频| 美女毛片在线看| 亚洲午夜国产一区99re久久| 亚洲色图偷拍视频| 成人久久综合| 欧美诱惑福利视频| 成人久久精品人妻一区二区三区| 国产精品丝袜黑色高跟| www.com毛片| 老司机精品在线| 久久国产精品影视| 一本大道伊人av久久综合| 久久一区二区视频| 青青青免费在线| 最新精品在线| 欧美成人精品h版在线观看| 国模私拍一区二区| 国产午夜精品一区二区三区四区| 国产h视频在线播放| 一区三区自拍| 九九视频直播综合网| 国产超碰人人模人人爽人人添| 国产精品美女www爽爽爽| 欧美国产日韩在线播放| 亚洲成人一品| 庆余年2免费日韩剧观看大牛| 蜜桃在线一区二区| 亚洲福利一区二区三区| 稀缺呦国内精品呦| 最新日韩在线| 激情五月综合色婷婷一区二区| 爱福利在线视频| 精品国产91洋老外米糕| 久久这里只有精品国产| 成人小视频在线| 国产va亚洲va在线va| 高清一区二区三区| 性亚洲最疯狂xxxx高清| 青青国产在线| 欧美日韩一区二区在线播放| 中文在线一区二区三区| 久久激情网站| 少妇免费毛片久久久久久久久| 欧美日韩尤物久久| 深夜福利一区二区| 一二三区中文字幕| 一区二区三区中文字幕电影 | 中文字幕精品www乱入免费视频| 无码人妻熟妇av又粗又大| 国产日韩av一区二区| 日韩毛片在线免费看| 不卡av一区二区| 91久久久久久国产精品| 国产黄色小视频在线| 日韩天堂在线观看| 日本网站在线播放| 国产清纯美女被跳蛋高潮一区二区久久w | 欧美一区二区三区啪啪| 久久久久久久久久91| 91视频免费观看| 在线观看国产一级片| 中文字幕亚洲精品乱码| 国产精品一区二区三区在线观| 色在线中文字幕| 视频在线一区二区| www.天天干.com| 欧美性猛交xxxx乱大交蜜桃| 久久久久久成人网| 国产二区国产一区在线观看| 久久久999视频| 久久中文字幕av| 成人久久18免费网站漫画| sm久久捆绑调教精品一区| 亚洲色图校园春色| a级片在线免费看| 欧美天天综合色影久久精品| 任我爽在线视频| 99久久免费精品高清特色大片| 男人插女人下面免费视频| 中文字幕免费一区二区| 奇米视频888战线精品播放| 榴莲视频成人app| 秋霞成人午夜鲁丝一区二区三区| bt在线麻豆视频| 亚洲品质视频自拍网| 99热这里只有精品1| 一本到三区不卡视频| 黄色一级片中国| 国产嫩草影院久久久久| 精品少妇人妻av一区二区三区| 日本vs亚洲vs韩国一区三区二区| 国产成人艳妇aa视频在线 | 不卡大黄网站免费看| 国产视频手机在线播放| 亚洲免费观看| av动漫在线播放| 日韩av二区| 免费中文日韩| 国产精品网站在线看| 国产日韩在线看| 小早川怜子影音先锋在线观看| 国内精品久久久久久久影视简单| 国产伦精品免费视频| 岛国在线视频网站| 欧美人在线视频| 欧美r级在线| 在线视频中文亚洲| 午夜av免费在线观看| 欧美成人三级电影在线| 国产一区二区三区中文字幕| 欧洲精品一区二区| 亚洲影院在线播放| 黑人狂躁日本妞一区二区三区 | 黄色在线免费观看大全| 亚洲电影中文字幕| 国产黄色小视频在线观看| 欧美三级蜜桃2在线观看| 日本熟女毛茸茸| 精品人伦一区二区三区蜜桃网站| 免费一级片视频| 有坂深雪av一区二区精品| 日韩av毛片在线观看| 国产精品欧美精品| 国产综合精品久久久久成人av| 久久久久久久久久久黄色| 国产精品久久久免费观看| youjizz久久| 99久久人妻精品免费二区| 成人黄色国产精品网站大全在线免费观看| 中文字幕第22页| 加勒比av一区二区| 亚洲欧美手机在线| 国产综合色精品一区二区三区| 免费av不卡在线| 国内精品伊人久久久久av一坑 | 国产不卡精品在线| 91欧美精品午夜性色福利在线 | 男人添女人下部高潮视频在线观看 | 伊人久久婷婷| 黄色国产一级视频| 国产欧美日韩综合一区在线播放 | 在线成人性视频| 国产精品久久久乱弄| 国产精品你懂的| 久久久久久久久网| 亚洲欧美伊人| 日本xxxxxxxxxx75| 亚洲少妇自拍| 国产超碰在线播放| 精品亚洲成a人| 黄色片子免费看| 不卡视频免费播放| b站大片免费直播| 国产精品成人免费| 2021亚洲天堂| 激情av一区二区| 中文av免费观看| 日韩欧美一级片| 亚洲欧美日韩免费| 在线亚洲午夜片av大片| 国产欧美黑人| 97人人做人人爱| abab456成人免费网址| 92看片淫黄大片看国产片| 红杏视频成人| 亚洲欧美影院| 亚洲激精日韩激精欧美精品| 熟妇人妻无乱码中文字幕真矢织江 | 欧美激情在线一区二区三区| 激情无码人妻又粗又大| 一区二区三区在线观看视频| 天堂中文在线网| 欧美另类videos死尸| 人人妻人人玩人人澡人人爽| 一区二区三区回区在观看免费视频 | 久久夜色精品国产噜噜av| 成人欧美一区二区三区黑人一| 亚洲国产欧美日韩另类综合| 五月婷婷激情五月| 日韩女优毛片在线| 国产福利在线看| 久久久久久国产免费 | 亚洲欧美日韩精品一区二区 | 青青青在线视频播放| 首页欧美精品中文字幕| 丰满少妇一区二区三区专区| 久久蜜桃av一区精品变态类天堂| 成人免费毛片东京热| 色婷婷亚洲综合| 亚洲国产视频一区二区三区| 影音先锋日韩有码| 瑟瑟视频在线看| 97人人香蕉| 成人久久综合| 韩国日本在线视频| 丁香婷婷综合色啪| 精品一区二区在线观看视频| 色综合中文字幕| 亚洲av少妇一区二区在线观看 | 精精国产xxxx视频在线中文版| 国产精品久久久久久久久久久久久久 | 精品国产99国产精品| 浪潮av一区| 国产精品人成电影在线观看| 三级精品视频| 大伊香蕉精品视频在线| 国产米奇在线777精品观看| 国产jk精品白丝av在线观看| 欧美日韩国产丝袜美女| 精品人妻一区二区三区日产乱码 | 精彩视频一区二区三区| 美女被到爽高潮视频| 欧美日韩亚洲一区二| 人妻中文字幕一区| 97国产精品视频人人做人人爱| 欧美不卡在线观看| 天天做天天爱天天高潮| 久久精品国产秦先生| 亚洲日本精品视频| 一本大道久久a久久综合婷婷| 无码国产精品一区二区色情男同| 欧美激情综合色| 亚洲一区二区三区日本久久九| 秋霞在线一区二区| 六月婷婷色综合| 美国精品一区二区| 欧美三级视频在线观看 | 久久久久久久久久久人体| 国产精品久久久久久av公交车| 伊人情人网综合| 精品亚洲porn| 黄视频网站免费看| 欧美一区二区成人| 午夜伦理在线视频| 国产精品xxx在线观看www| 国产在线成人| 粉嫩av懂色av蜜臀av分享| 欧美午夜激情在线| 久青草国产在线| 国产精品久久久久久久久免费 | 欧美夫妻性生活xx| 国产调教精品| 日本一区二区黄色| 国产欧美一区二区精品仙草咪| 中文字幕日韩三级| 日韩在线视频观看| 久久久91麻豆精品国产一区| 老司机激情视频| 91一区二区三区在线播放| 伊人中文字幕在线观看 | 蜜桃视频日韩| 日产国产高清一区二区三区| 中国毛片直接看| 精品国产1区2区3区| 国模套图日韩精品一区二区| 日韩国产精品一区二区| 国产综合一区二区| 国产在线观看你懂的| 亚洲欧美国产视频| 亚洲伦理久久| 999一区二区三区| 久久精品亚洲一区二区三区浴池| 中文字幕 欧美激情| 欧美裸体男粗大视频在线观看 | 正在播放亚洲| 粉嫩蜜臀av国产精品网站| 91玉足脚交嫩脚丫在线播放| 日韩中文在线中文网在线观看 | 日韩av在线电影观看| 激情都市一区二区| 亚洲精品www久久久久久| 一区二区在线视频播放| 日韩免费一级| 无码人妻丰满熟妇区毛片| 亚洲美女偷拍久久| 免费一级在线观看播放网址| 亚洲一区二区三区视频| 国产精品久久久久久模特| 久久国产高清视频| 日韩大陆毛片av| 亚洲tv在线| 日本www在线播放| 亚洲靠逼com| 国产视频精选在线| 国产精品我不卡|