精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

聊一聊時間輪的實現

開發 后端
在netty 和kafka 這兩種優秀的中間件中,都有時間輪的實現。文章最后,我們模擬kafka 中scala 的代碼實現java版的時間輪。

[[414553]]

上一篇我們講了定時器的幾種實現,分析了在大數據量高并發的場景下這幾種實現方式就有點力不從心了,從而引出時間輪這種數據結構。在netty 和kafka 這兩種優秀的中間件中,都有時間輪的實現。文章最后,我們模擬kafka 中scala 的代碼實現java版的時間輪。

Netty 的時間輪實現

接口定義

Netty 的實現自定義了一個超時器的接口io.netty.util.Timer,其方法如下:

  1. public interface Timer 
  2.     //新增一個延時任務,入參為定時任務TimerTask,和對應的延遲時間 
  3.     Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); 
  4.     //停止時間輪的運行,并且返回所有未被觸發的延時任務 
  5.     Set < Timeout > stop(); 
  6. public interface Timeout 
  7.     Timer timer(); 
  8.     TimerTask task(); 
  9.     boolean isExpired(); 
  10.     boolean isCancelled(); 
  11.     boolean cancel(); 

Timeout接口是對延遲任務的一個封裝,其接口方法說明其實現內部需要維持該延遲任務的狀態。后續我們分析其實現內部代碼時可以更容易的看到。

Timer接口有唯一實現HashedWheelTimer。首先來看其構造方法,如下:

  1. public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) 
  2.     //省略代碼,省略參數非空檢查內容。 
  3.     wheel = createWheel(ticksPerWheel); 
  4.     mask = wheel.length - 1; 
  5.     //省略代碼,省略槽位時間范圍檢查,避免溢出以及小于 1 毫秒。 
  6.     workerThread = threadFactory.newThread(worker); 
  7.     //省略代碼,省略資源泄漏追蹤設置以及時間輪實例個數檢查 

mask 的設計和HashMap一樣,通過限制數組的大小為2的次方,利用位運算來替代取模運算,提高性能。

構建循環數組

首先是方法createWheel,用于創建時間輪的核心數據結構,循環數組。來看下其方法內容

  1. private static HashedWheelBucket[] createWheel(int ticksPerWheel) 
  2.     //省略代碼,確認 ticksPerWheel 處于正確的區間 
  3.     //將 ticksPerWheel 規范化為 2 的次方冪大小。 
  4.     ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); 
  5.     HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; 
  6.     for(int i = 0; i < wheel.length; i++) 
  7.     { 
  8.         wheel[i] = new HashedWheelBucket(); 
  9.     } 
  10.     return wheel; 

數組的長度為 2 的次方冪方便進行求商和取余計算。

HashedWheelBucket內部存儲著由HashedWheelTimeout節點構成的雙向鏈表,并且存儲著鏈表的頭節點和尾結點,方便于任務的提取和插入。

新增延遲任務

方法HashedWheelTimer#newTimeout用于新增延遲任務,下面來看下代碼:

  1. public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) 
  2.     //省略代碼,用于參數檢查 
  3.     start(); 
  4.     long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; 
  5.     if(delay > 0 && deadline < 0) 
  6.     { 
  7.         deadline = Long.MAX_VALUE; 
  8.     } 
  9.     HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); 
  10.     timeouts.add(timeout); 
  11.     return timeout; 

可以看到任務并沒有直接添加到時間輪中,而是先入了一個 mpsc 隊列,我簡單說下 mpsc【多生產者單一消費者隊列】 是 JCTools 中的并發隊列,用在多個生產者可同時訪問隊列,但只有一個消費者會訪問隊列的情況。,采用這個模式主要出于提升并發性能考慮,因為這個隊列只有線程workerThread會進行任務提取操作。

工作線程如何執行

  1. public void run() 
  2.     {//代碼塊① 
  3.         startTime = System.nanoTime(); 
  4.         if(startTime == 0) 
  5.         { 
  6.             //使用startTime==0 作為線程進入工作狀態模式標識,因此這里重新賦值為1 
  7.             startTime = 1; 
  8.         } 
  9.         //通知外部初始化工作線程的線程,工作線程已經啟動完畢 
  10.         startTimeInitialized.countDown(); 
  11.     } 
  12.     {//代碼塊② 
  13.         do { 
  14.             final long deadline = waitForNextTick(); 
  15.             if(deadline > 0) 
  16.             { 
  17.                 int idx = (int)(tick & mask); 
  18.                 processCancelledTasks(); 
  19.                 HashedWheelBucket bucket = wheel[idx]; 
  20.                 transferTimeoutsToBuckets(); 
  21.                 bucket.expireTimeouts(deadline); 
  22.                 tick++; 
  23.             } 
  24.         } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); 
  25.     } 
  26.     {//代碼塊③ 
  27.         for(HashedWheelBucket bucket: wheel) 
  28.         { 
  29.             bucket.clearTimeouts(unprocessedTimeouts); 
  30.         } 
  31.         for(;;) 
  32.         { 
  33.             HashedWheelTimeout timeout = timeouts.poll(); 
  34.             if(timeout == null
  35.             { 
  36.                 break; 
  37.             } 
  38.             if(!timeout.isCancelled()) 
  39.             { 
  40.                 unprocessedTimeouts.add(timeout); 
  41.             } 
  42.         } 
  43.         processCancelledTasks(); 
  44.     } 

看 waitForNextTick,是如何得到下一次執行時間的。

  1. private long waitForNextTick() 
  2.     long deadline = tickDuration * (tick + 1);//計算下一次需要檢查的時間 
  3.     for(;;) 
  4.     { 
  5.         final long currentTime = System.nanoTime() - startTime; 
  6.         long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; 
  7.         if(sleepTimeMs <= 0)//說明時間已經到了 
  8.         { 
  9.             if(currentTime == Long.MIN_VALUE) 
  10.             { 
  11.                 return -Long.MAX_VALUE; 
  12.             } 
  13.             else 
  14.             { 
  15.                 return currentTime; 
  16.             } 
  17.         } 
  18.         //windows 下有bug  sleep 必須是10 的倍數 
  19.         if(PlatformDependent.isWindows()) 
  20.         { 
  21.             sleepTimeMs = sleepTimeMs / 10 * 10; 
  22.         } 
  23.         try 
  24.         { 
  25.             Thread.sleep(sleepTimeMs);// 等待時間到來 
  26.         } 
  27.         catch(InterruptedException ignored) 
  28.         { 
  29.             if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) 
  30.             { 
  31.                 return Long.MIN_VALUE; 
  32.             } 
  33.         } 
  34.     } 

簡單的說就是通過 tickDuration 和此時已經滴答的次數算出下一次需要檢查的時間,時候未到就sleep等著。

任務如何入槽的。

  1. private void transferTimeoutsToBuckets() { 
  2.             //最多處理100000 怕任務延遲 
  3.             for(int i = 0; i < 100000; ++i) { 
  4.                 //從隊列里面拿出任務呢 
  5.                 HashedWheelTimer.HashedWheelTimeout timeout = (HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll(); 
  6.                 if (timeout == null) { 
  7.                     break; 
  8.                 } 
  9.  
  10.                 if (timeout.state() != 1) { 
  11.                     long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration; 
  12.                     //計算排在第幾輪 
  13.                     timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length; 
  14.                     long ticks = Math.max(calculated, this.tick); 
  15.                     //計算放在哪個槽中 
  16.                     int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask); 
  17.                     HashedWheelTimer.HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex]; 
  18.                     //入槽,就是鏈表入隊列 
  19.                     bucket.addTimeout(timeout); 
  20.                 } 
  21.             } 
  22.  
  23.         } 

如何執行的

  1. public void expireTimeouts(long deadline) { 
  2.             HashedWheelTimer.HashedWheelTimeout next
  3.             //拿到槽的鏈表頭部 
  4.             for(HashedWheelTimer.HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) { 
  5.                 boolean remove = false
  6.                 if (timeout.remainingRounds <= 0L) {//如果到這輪l  
  7.                     if (timeout.deadline > deadline) { 
  8.                         throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); 
  9.                     } 
  10.  
  11.                     timeout.expire();//執行 
  12.                     remove = true
  13.                 } else if (timeout.isCancelled()) { 
  14.                     remove = true
  15.                 } else { 
  16.                     --timeout.remainingRounds;//輪數-1 
  17.                 } 
  18.  
  19.                 next = timeout.next;//繼續下一任務 
  20.                 if (remove) { 
  21.                     this.remove(timeout);//移除完成的任務 
  22.                 } 
  23.             } 
  24.         } 

就是通過輪數和時間雙重判斷,執行完了移除任務。

小結一下

總體上看 Netty 的實現就是上文說的時間輪通過輪數的實現,完全一致。可以看出時間精度由 TickDuration 把控,并且工作線程的除了處理執行到時的任務還做了其他操作,因此任務不一定會被精準的執行。

而且任務的執行如果不是新起一個線程,或者將任務扔到線程池執行,那么耗時的任務會阻塞下個任務的執行。

并且會有很多無用的 tick 推進,例如 TickDuration 為1秒,此時就一個延遲350秒的任務,那就是有349次無用的操作。出現空推。

但是從另一面來看,如果任務都執行很快(當然你也可以異步執行),并且任務數很多,通過分批執行,并且增刪任務的時間復雜度都是O(1)來說。時間輪還是比通過優先隊列實現的延時任務來的合適些。

Kafka 中的時間輪

上面我們說到 Kafka 中的時間輪是多層次時間輪實現,總的而言實現和上述說的思路一致。不過細節有些不同,并且做了點優化。

先看看添加任務的方法。在添加的時候就設置任務執行的絕對時間。

Kafka 中的時間輪

上面我們說到 Kafka 中的時間輪是多層次時間輪實現,總的而言實現和上述說的思路一致。不過細節有些不同,并且做了點優化。

先看看添加任務的方法。在添加的時候就設置任務執行的絕對時間。

  1. def add(timerTaskEntry: TimerTaskEntry): Boolean = { 
  2.     val expiration = timerTaskEntry.expirationMs 
  3.  
  4.     if (timerTaskEntry.cancelled) { 
  5.       // Cancelled 
  6.       false 
  7.     } else if (expiration < currentTime + tickMs) { 
  8.       // 如果已經到期 返回false 
  9.       // Already expired 
  10.       false 
  11.     } else if (expiration < currentTime + interval) {//如果在本層范圍內 
  12.       // Put in its own bucket 
  13.       val virtualId = expiration / tickMs 
  14.       val bucket = buckets((virtualId % wheelSize.toLong).toInt)//計算槽位 
  15.       bucket.add(timerTaskEntry)//添加到槽內雙向鏈表中 
  16.  
  17.       // Set the bucket expiration time 
  18.       if (bucket.setExpiration(virtualId * tickMs)) {//更新槽時間 
  19.         // The bucket needs to be enqueued because it was an expired bucket 
  20.         // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced 
  21.         // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle 
  22.         // will pass in the same value and hence return false, thus the bucket with the same expiration will not 
  23.         // be enqueued multiple times. 
  24.         queue.offer(bucket)//將槽加入DelayQueue,由DelayQueue來推進執行 
  25.       } 
  26.       true 
  27.     } else { 
  28.       //如果超過本層能表示的延遲時間,則將任務添加到上層。這里看到上層是按需創建的。 
  29.       // Out of the interval. Put it into the parent timer 
  30.       if (overflowWheel == null) addOverflowWheel() 
  31.       overflowWheel.add(timerTaskEntry) 
  32.     } 
  33.   } 

那么時間輪是如何推動的呢?Netty 中是通過固定的時間間隔掃描,時候未到就等待來進行時間輪的推動。上面我們分析到這樣會有空推進的情況。

而 Kafka 就利用了空間換時間的思想,通過 DelayQueue,來保存每個槽,通過每個槽的過期時間排序。這樣擁有最早需要執行任務的槽會有優先獲取。如果時候未到,那么 delayQueue.poll 就會阻塞著,這樣就不會有空推進的情況發送。

我們來看下推進的方法。

  1. def advanceClock(timeoutMs: Long): Boolean = { 
  2. //從延遲隊列中獲取槽 
  3.     var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) 
  4.     if (bucket != null) { 
  5.       writeLock.lock() 
  6.       try { 
  7.         while (bucket != null) { 
  8.           // 更新每層時間輪的currentTime 
  9.           timingWheel.advanceClock(bucket.getExpiration()) 
  10.           //因為更新了currentTime,進行一波任務的重新插入,來實現任務時間輪的降級 
  11.           bucket.flush(reinsert) 
  12.           //獲取下一個槽 
  13.           bucket = delayQueue.poll() 
  14.         } 
  15.       } finally { 
  16.         writeLock.unlock() 
  17.       } 
  18.       true 
  19.     } else { 
  20.       false 
  21.     } 
  22.   } 
  23.    
  24.  // Try to advance the clock 
  25.   def advanceClock(timeMs: Long): Unit = { 
  26.     if (timeMs >= currentTime + tickMs) { 
  27.      // 必須是tickMs 整數倍 
  28.       currentTime = timeMs - (timeMs % tickMs) 
  29.       //推動上層時間輪也更新currentTime 
  30.       // Try to advance the clock of the overflow wheel if present 
  31.       if (overflowWheel != null) overflowWheel.advanceClock(currentTime) 
  32.     } 
  33.   } 

從上面的 add 方法我們知道每次對比都是根據expiration < currentTime + interval 來進行對比的,而advanceClock 就是用來推進更新 currentTime 的。

小結一下

Kafka 用了多層次時間輪來實現,并且是按需創建時間輪,采用任務的絕對時間來判斷延期,并且對于每個槽(槽內存放的也是任務的雙向鏈表)都會維護一個過期時間,利用 DelayQueue 來對每個槽的過期時間排序,來進行時間的推進,防止空推進的存在。

每次推進都會更新 currentTime 為當前時間戳,當然做了點微調使得 currentTime 是 tickMs 的整數倍。并且每次推進都會把能降級的任務重新插入降級。

可以看到這里的 DelayQueue 的元素是每個槽,而不是任務,因此數量就少很多了,這應該是權衡了對于槽操作的延時隊列的時間復雜度與空推進的影響。

模擬kafka的時間輪實現java版

定時器

  1. public class Timer { 
  2.  
  3.     /** 
  4.      * 底層時間輪 
  5.      */ 
  6.     private TimeWheel timeWheel; 
  7.  
  8.     /** 
  9.      * 一個Timer只有一個delayQueue 
  10.      */ 
  11.     private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>(); 
  12.  
  13.     /** 
  14.      * 過期任務執行線程 
  15.      */ 
  16.     private ExecutorService workerThreadPool; 
  17.  
  18.     /** 
  19.      * 輪詢delayQueue獲取過期任務線程 
  20.      */ 
  21.     private ExecutorService bossThreadPool; 
  22.  
  23.     /** 
  24.      * 構造函數 
  25.      */ 
  26.     public Timer() { 
  27.         timeWheel = new TimeWheel(1000, 2, System.currentTimeMillis(), delayQueue); 
  28.         workerThreadPool = Executors.newFixedThreadPool(100); 
  29.         bossThreadPool = Executors.newFixedThreadPool(1); 
  30.         //20ms獲取一次過期任務 
  31.         bossThreadPool.submit(() -> { 
  32.             while (true) { 
  33.                 this.advanceClock(1000); 
  34.             } 
  35.         }); 
  36.     } 
  37.  
  38.     /** 
  39.      * 添加任務 
  40.      */ 
  41.     public void addTask(TimerTask timerTask) { 
  42.         //添加失敗任務直接執行 
  43.         if (!timeWheel.addTask(timerTask)) { 
  44.             workerThreadPool.submit(timerTask.getTask()); 
  45.         } 
  46.     } 
  47.  
  48.     /** 
  49.      * 獲取過期任務 
  50.      */ 
  51.     private void advanceClock(long timeout) { 
  52.         try { 
  53.             TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); 
  54.             if (timerTaskList != null) { 
  55.  
  56.                 //推進時間 
  57.                 timeWheel.advanceClock(timerTaskList.getExpiration()); 
  58.                 //執行過期任務(包含降級操作) 
  59.                 timerTaskList.flush(this::addTask); 
  60.             } 
  61.         } catch (Exception e) { 
  62.             e.printStackTrace(); 
  63.         } 
  64.     } 

任務

  1. public class TimerTask { 
  2.  
  3.     /** 
  4.      * 延遲時間 
  5.      */ 
  6.     private long delayMs; 
  7.  
  8.     /** 
  9.      * 任務 
  10.      */ 
  11.     private MyThread task; 
  12.  
  13.     /** 
  14.      * 時間槽 
  15.      */ 
  16.     protected TimerTaskList timerTaskList; 
  17.  
  18.     /** 
  19.      * 下一個節點 
  20.      */ 
  21.     protected TimerTask next
  22.  
  23.     /** 
  24.      * 上一個節點 
  25.      */ 
  26.     protected TimerTask pre; 
  27.  
  28.     /** 
  29.      * 描述 
  30.      */ 
  31.     public String desc
  32.  
  33.     public TimerTask(long delayMs, MyThread task) { 
  34.         this.delayMs = System.currentTimeMillis() + delayMs; 
  35.         this.task = task; 
  36.         this.timerTaskList = null
  37.         this.next = null
  38.         this.pre = null
  39.     } 
  40.  
  41.     public MyThread getTask() { 
  42.         return task; 
  43.     } 
  44.  
  45.     public long getDelayMs() { 
  46.         return delayMs; 
  47.     } 
  48.  
  49.     @Override 
  50.     public String toString() { 
  51.         return desc
  52.     } 

時間槽

  1. public class TimerTaskList implements Delayed { 
  2.  
  3.     /** 
  4.      * 過期時間 
  5.      */ 
  6.     private AtomicLong expiration = new AtomicLong(-1L); 
  7.  
  8.     /** 
  9.      * 根節點 
  10.      */ 
  11.     private TimerTask root = new TimerTask(-1L, null); 
  12.  
  13.     { 
  14.         root.pre = root; 
  15.         root.next = root; 
  16.     } 
  17.  
  18.     /** 
  19.      * 設置過期時間 
  20.      */ 
  21.     public boolean setExpiration(long expire) { 
  22.         return expiration.getAndSet(expire) != expire; 
  23.     } 
  24.  
  25.     /** 
  26.      * 獲取過期時間 
  27.      */ 
  28.     public long getExpiration() { 
  29.         return expiration.get(); 
  30.     } 
  31.  
  32.     /** 
  33.      * 新增任務 
  34.      */ 
  35.     public void addTask(TimerTask timerTask) { 
  36.         synchronized (this) { 
  37.             if (timerTask.timerTaskList == null) { 
  38.                 timerTask.timerTaskList = this; 
  39.                 TimerTask tail = root.pre; 
  40.                 timerTask.next = root; 
  41.                 timerTask.pre = tail; 
  42.                 tail.next = timerTask; 
  43.                 root.pre = timerTask; 
  44.             } 
  45.         } 
  46.     } 
  47.  
  48.     /** 
  49.      * 移除任務 
  50.      */ 
  51.     public void removeTask(TimerTask timerTask) { 
  52.         synchronized (this) { 
  53.             if (timerTask.timerTaskList.equals(this)) { 
  54.                 timerTask.next.pre = timerTask.pre; 
  55.                 timerTask.pre.next = timerTask.next
  56.                 timerTask.timerTaskList = null
  57.                 timerTask.next = null
  58.                 timerTask.pre = null
  59.             } 
  60.         } 
  61.     } 
  62.  
  63.     /** 
  64.      * 重新分配 
  65.      */ 
  66.     public synchronized void flush(Consumer<TimerTask> flush) { 
  67.         TimerTask timerTask = root.next
  68.         while (!timerTask.equals(root)) { 
  69.             this.removeTask(timerTask); 
  70.             flush.accept(timerTask); 
  71.             timerTask = root.next
  72.         } 
  73.         expiration.set(-1L); 
  74.     } 
  75.  
  76.     @Override 
  77.     public long getDelay(TimeUnit unit) { 
  78.         return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); 
  79.     } 
  80.  
  81.     @Override 
  82.     public int compareTo(Delayed o) { 
  83.         if (o instanceof TimerTaskList) { 
  84.             return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); 
  85.         } 
  86.         return 0; 
  87.     } 

時間輪

  1. public class TimeWheel { 
  2.  
  3.     /** 
  4.      * 一個時間槽的范圍 
  5.      */ 
  6.     private long tickMs; 
  7.  
  8.     /** 
  9.      * 時間輪大小 
  10.      */ 
  11.     private int wheelSize; 
  12.  
  13.     /** 
  14.      * 時間跨度 
  15.      */ 
  16.     private long interval; 
  17.  
  18.     /** 
  19.      * 時間槽 
  20.      */ 
  21.     private TimerTaskList[] timerTaskLists; 
  22.  
  23.     /** 
  24.      * 當前時間 
  25.      */ 
  26.     private long currentTime; 
  27.  
  28.     /** 
  29.      * 上層時間輪 
  30.      */ 
  31.     private volatile TimeWheel overflowWheel; 
  32.  
  33.     /** 
  34.      * 一個Timer只有一個delayQueue 
  35.      */ 
  36.     private DelayQueue<TimerTaskList> delayQueue; 
  37.  
  38.     public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) { 
  39.         this.currentTime = currentTime; 
  40.         this.tickMs = tickMs; 
  41.         this.wheelSize = wheelSize; 
  42.         this.interval = tickMs * wheelSize; 
  43.         this.timerTaskLists = new TimerTaskList[wheelSize]; 
  44.         //currentTime為tickMs的整數倍 這里做取整操作 
  45.         this.currentTime = currentTime - (currentTime % tickMs); 
  46.         this.delayQueue = delayQueue; 
  47.         for (int i = 0; i < wheelSize; i++) { 
  48.             timerTaskLists[i] = new TimerTaskList(); 
  49.         } 
  50.     } 
  51.  
  52.     /** 
  53.      * 創建或者獲取上層時間輪 
  54.      */ 
  55.     private TimeWheel getOverflowWheel() { 
  56.         if (overflowWheel == null) { 
  57.             synchronized (this) { 
  58.                 if (overflowWheel == null) { 
  59.                     overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue); 
  60.                 } 
  61.             } 
  62.         } 
  63.         return overflowWheel; 
  64.     } 
  65.  
  66.     /** 
  67.      * 添加任務到時間輪 
  68.      */ 
  69.     public boolean addTask(TimerTask timerTask) { 
  70.         long expiration = timerTask.getDelayMs(); 
  71.         //過期任務直接執行 
  72.         if (expiration < currentTime + tickMs) { 
  73.             return false
  74.         } else if (expiration < currentTime + interval) { 
  75.             //當前時間輪可以容納該任務 加入時間槽 
  76.             Long virtualId = expiration / tickMs; 
  77.             int index = (int) (virtualId % wheelSize); 
  78.             System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration); 
  79.             TimerTaskList timerTaskList = timerTaskLists[index]; 
  80.             timerTaskList.addTask(timerTask); 
  81.             if (timerTaskList.setExpiration(virtualId * tickMs)) { 
  82.                 //添加到delayQueue中 
  83.                 delayQueue.offer(timerTaskList); 
  84.             } 
  85.         } else { 
  86.             //放到上一層的時間輪 
  87.             TimeWheel timeWheel = getOverflowWheel(); 
  88.             timeWheel.addTask(timerTask); 
  89.         } 
  90.         return true
  91.     } 
  92.  
  93.     /** 
  94.      * 推進時間 
  95.      */ 
  96.     public void advanceClock(long timestamp) { 
  97.         if (timestamp >= currentTime + tickMs) { 
  98.             currentTime = timestamp - (timestamp % tickMs); 
  99.             if (overflowWheel != null) { 
  100.                 //推進上層時間輪時間 
  101.                 System.out.println("推進上層時間輪時間 time="+System.currentTimeMillis()); 
  102.                 this.getOverflowWheel().advanceClock(timestamp); 
  103.             } 
  104.         } 
  105.     } 

我們來模擬一個請求,超時和不超時的情況

首先定義一個Mythread 類,用于設置任務超時的值。

  1. public class MyThread implements Runnable{ 
  2.     CompletableFuture<String> cf; 
  3.     public MyThread(CompletableFuture<String>  cf){ 
  4.         this.cf = cf; 
  5.     } 
  6.     public void run(){ 
  7.         if (!cf.isDone()) { 
  8.             cf.complete("超時"); 
  9.         } 
  10.     } 

模擬超時

  1. public static void main(String[] args) throws Exception{ 
  2.         Timer timer = new Timer(); 
  3.         CompletableFuture<String> base =CompletableFuture.supplyAsync(()->{ 
  4.             try { 
  5.                 Thread.sleep(3000); 
  6.             } catch (InterruptedException e) { 
  7.                 e.printStackTrace(); 
  8.             } 
  9.             return  "正常返回"
  10.         }); 
  11.         TimerTask timerTask2 = new TimerTask(1000, new MyThread(base)); 
  12.         timer.addTask(timerTask2); 
  13.         System.out.println("base.get==="+base.get()); 
  14.     } 

模擬正常返回

  1. public static void main(String[] args) throws Exception{ 
  2.         Timer timer = new Timer(); 
  3.         CompletableFuture<String> base =CompletableFuture.supplyAsync(()->{ 
  4.             try { 
  5.                 Thread.sleep(300); 
  6.             } catch (InterruptedException e) { 
  7.                 e.printStackTrace(); 
  8.             } 
  9.             return  "正常返回"
  10.         }); 
  11.         TimerTask timerTask2 = new TimerTask(2000, new MyThread(base)); 
  12.         timer.addTask(timerTask2); 
  13.         System.out.println("base.get==="+base.get()); 
  14.     } 

本文轉載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉載本文請聯系小汪哥寫代碼公眾號。

 

責任編輯:武曉燕 來源: 小汪哥寫代碼
相關推薦

2023-09-27 09:04:50

2023-07-06 13:56:14

微軟Skype

2022-04-13 18:01:39

CSS組件技巧

2020-09-08 06:54:29

Java Gradle語言

2021-01-28 22:31:33

分組密碼算法

2020-05-22 08:16:07

PONGPONXG-PON

2023-09-22 17:36:37

2018-06-07 13:17:12

契約測試單元測試API測試

2025-02-18 00:00:05

vue后端權限

2023-09-27 16:39:38

2024-10-28 21:02:36

消息框應用程序

2021-12-06 09:43:01

鏈表節點函數

2021-03-01 18:37:15

MySQL存儲數據

2021-07-16 11:48:26

模型 .NET微軟

2023-09-20 23:01:03

Twitter算法

2024-09-12 10:06:21

2024-03-11 07:46:40

React優先級隊列二叉堆

2022-07-06 14:16:19

Python數據函數

2021-02-06 08:34:49

函數memoize文檔

2021-01-29 08:32:21

數據結構數組
點贊
收藏

51CTO技術棧公眾號

精品日韩99亚洲| 日韩美女啊v在线免费观看| 欧美又大粗又爽又黄大片视频| 亚洲永久无码7777kkk| 超免费在线视频| 国产亚洲精品超碰| 91免费在线观看网站| 你懂的国产视频| 午夜欧美在线| 亚洲精品在线视频| wwwxxxx在线观看| 日韩av福利| 亚洲最新在线观看| 天堂一区二区三区| 日本韩国在线观看| 日韩中文字幕亚洲一区二区va在线| 久久香蕉频线观| 久久亚洲AV成人无码国产野外| 曰本一区二区| 毛片av一区二区三区| 久久国产精品免费视频| 李宗瑞91在线正在播放| 日韩成人18| 在线观看亚洲精品视频| 国产一二三在线视频| 在线视频婷婷| 久久久三级国产网站| 99久久久精品免费观看国产 | 欧洲精品久久久| 最新一区二区三区| 欧美日韩第一| 精品一区精品二区| 美女流白浆视频| 欧洲精品久久久久毛片完整版| 欧美日韩久久久久| 欧洲金发美女大战黑人| 嫩草香蕉在线91一二三区| 国产偷国产偷精品高清尤物| 精品一区二区三区日本| 丰满人妻av一区二区三区| 国产精品一区二区久久精品爱涩| 国产精品入口福利| 亚洲精品国产精品国自产网站按摩| 一区二区动漫| 91av国产在线| 欧美激情亚洲综合| 亚洲精品激情| 久久久噜噜噜久噜久久| 国产在线精品观看| 伊人精品视频| 欧美精品九九久久| 国产无套在线观看| 99亚洲视频| 欧美性视频在线| 天天综合网入口| 欧美亚洲三级| 91精品国产91久久久久久| 国产性生活网站| 国产精品vip| 欧美激情网友自拍| 国产主播在线播放| 国产午夜精品一区二区三区欧美 | 欧美少妇一级片| 男人天堂手机在线| 亚洲免费观看高清完整版在线观看| 在线观看国产一区| 亚洲小说区图片| 一区二区三区**美女毛片| 超碰超碰超碰超碰超碰| 色老头在线观看| 亚洲网友自拍偷拍| 日本一道本久久| 欧美日韩激情电影| 在线成人高清不卡| 最新国产精品自拍| 亚洲成a人片77777在线播放 | 精品一区二区三区国产| 久久久久久久影视| 中文字幕一区二区三区在线不卡| 潘金莲一级淫片aaaaa免费看| 性欧美高清come| 欧美日韩国产综合视频在线观看中文| 国产裸体舞一区二区三区| 日韩av一级| 欧美一级高清大全免费观看| 久久久午夜精品福利内容| 精品视频97| 成年人精品视频| 全部毛片永久免费看| 蜜臀国产一区二区三区在线播放| **亚洲第一综合导航网站| 日本免费网站在线观看| 亚洲国产精品黑人久久久| 9色视频在线观看| 免费日韩电影| 欧美一级二级三级蜜桃| 日韩在线免费观看av| 亚洲澳门在线| 欧洲亚洲妇女av| 国内精品国产成人国产三级| 26uuu国产日韩综合| 中文字幕日韩一区二区三区不卡| 欧美gv在线观看| 欧美日本一区二区在线观看| 91成人在线观看喷潮蘑菇| 久久av导航| 久久久久久美女| 在线免费观看一区二区| 99精品视频在线观看免费| 亚洲视频在线观看日本a| 国产直播在线| 日韩一级精品视频在线观看| 丁香激情五月少妇| 亚洲经典在线| 91日本视频在线| yourporn在线观看中文站| 亚洲www啪成人一区二区麻豆| 午夜国产福利在线观看| 嫩草影视亚洲| 国内久久久精品| jizz国产视频| 国产精品对白交换视频| 91av俱乐部| 欧美日韩看看2015永久免费 | 国产精品一区免费视频| 日韩精品无码一区二区三区| 蜜桃视频在线观看播放| 日韩精品一区二区三区三区免费| 亚洲熟女少妇一区二区| 日日夜夜免费精品| 欧美日本韩国国产| segui88久久综合9999| 日韩欧美高清在线| 91嫩草|国产丨精品入口| 男人的j进女人的j一区| 日本午夜一区二区三区| 亚洲天堂av在线| 亚洲加勒比久久88色综合| 九九热精品在线观看| 韩国成人在线视频| 中文字幕不卡每日更新1区2区| 日韩和的一区二在线| 亚洲性xxxx| 日本中文字幕久久| 久久欧美中文字幕| 日本网站免费在线观看| 自拍欧美一区| 日韩美女视频在线观看| 国产毛片av在线| 欧美三级三级三级爽爽爽| 毛片aaaaaa| 九一久久久久久| 国产对白在线播放| 亚洲午夜精品| 欧美激情影音先锋| 日本免费不卡视频| 欧美性xxxxxxxxx| 久久久久久久久久久久| 丝袜国产日韩另类美女| 日韩电影天堂视频一区二区| 成人在线免费av| 久久精品国产亚洲精品2020| a级片免费视频| 一区二区三区av电影| 免费无码一区二区三区| 母乳一区在线观看| 亚洲人成77777| 精品国产不卡一区二区| 欧美黑人xxx| 污视频网站免费观看| 午夜激情久久久| 91激情视频在线观看| 久久丁香综合五月国产三级网站 | 午夜小视频福利在线观看| 日韩视频一区二区三区在线播放| 久久成人国产精品入口| 久久一区二区三区四区| 中文字幕第88页| 欧美精品三区| 久久一区二区三区av| 精品123区| 欧美日韩电影在线观看| 日本福利片高清在线观看| 欧美三级午夜理伦三级中视频| 一区二区成人免费视频| 91久色porny| www.成人黄色| 亚洲欧美日韩精品一区二区| 亚洲精品日韩精品| 久久男人av| 国产精品视频一区二区三区四| 中文字幕免费高清电视剧网站在线观看 | 久久久午夜视频| av在线资源网| 精品久久久久久久人人人人传媒| 无码无套少妇毛多18pxxxx| 亚洲乱码一区二区三区在线观看| 真人bbbbbbbbb毛片| 美女国产一区二区| 精品久久一二三| 中文字幕日韩一区二区不卡| 久久九九视频| 日本精品一区二区三区在线观看视频| 2020欧美日韩在线视频| 国产原创精品视频| 国产一区二区三区18| 内射无码专区久久亚洲| 欧美日韩一区二区三区视频| 一级免费在线观看| 亚洲欧美区自拍先锋| 男人的天堂av网| 久久女同互慰一区二区三区| 在线观看视频你懂得| 男女激情视频一区| 97国产精东麻豆人妻电影| 午夜国产精品视频免费体验区| 日韩伦理一区二区三区av在线| gogo久久日韩裸体艺术| 91九色视频导航| 日韩精品第一| 日韩免费精品视频| www.超碰在线| 欧美激情免费在线| 毛片激情在线观看| 一本色道久久88精品综合| 午夜影院免费体验区| 精品久久久久久亚洲综合网 | 国产小视频在线免费观看 | 日韩欧美在线观看免费| 亚洲国产成人av网| 国产免费无码一区二区视频 | 国产在线综合网| 亚洲精品视频在线| 极品美妇后花庭翘臀娇吟小说| 国产欧美日韩卡一| 日本二区在线观看| 久久久久九九视频| 精品国产无码在线观看| av电影天堂一区二区在线| 大尺度在线观看| 成人午夜视频网站| 少妇被狂c下部羞羞漫画| 丁香激情综合国产| 国产白袜脚足j棉袜在线观看| 国产乱码精品1区2区3区| aaa一级黄色片| 国产麻豆日韩欧美久久| 一级日本黄色片| 国产成人av影院| 少妇精品无码一区二区| 成人av电影免费在线播放| 少妇一级淫片免费放播放| zzijzzij亚洲日本少妇熟睡| 国产精品久久久久久久无码| 成人动漫一区二区三区| 手机在线看片日韩| 久久久亚洲高清| 99精品欧美一区二区| 国产精品视频线看| 看黄色录像一级片| 一区二区三区成人| 日产电影一区二区三区| 色综合视频在线观看| 国产精品尤物视频| 欧美乱妇15p| 国产成人手机在线| 亚洲欧美中文日韩在线| yiren22综合网成人| 久久精品国产91精品亚洲| 另类视频在线| 日本a级片电影一区二区| 国产成人a视频高清在线观看| 成人av番号网| 久久精品国产亚洲blacked| 欧美在线日韩精品| 亚洲精彩视频| 欧美二区在线视频| 另类小说欧美激情| 人妻av一区二区| 欧美激情一区二区| 欧美精品一区二区蜜桃| 一本色道a无线码一区v| 中文字幕男人天堂| 精品国产乱码久久久久久影片| 欧美亚洲日本| 欧美大码xxxx| 国产综合色区在线观看| 亚洲最大的网站| 欧美男gay| 国产免费裸体视频| 免费看精品久久片| av av在线| 国产精品久久精品日日| 国产一级片免费看| 欧美日韩精品综合在线| 欧美自拍偷拍一区二区| 色婷婷久久av| 中文字幕在线直播| 成人9ⅰ免费影视网站| 成人激情在线| 久久综合九色综合88i| 国产乱国产乱300精品| 亚洲欧美va天堂人熟伦| 亚洲成人av一区二区| 国产视频在线观看免费| 亚洲欧洲日本专区| 国产三线在线| 91九色精品视频| av资源久久| 男人操女人免费| 成人免费视频视频| 手机在线免费看毛片| 91黄色免费观看| 色播色播色播色播色播在线| 欧美成人精品三级在线观看 | 国产日韩一区| 可以看的av网址| 国产精品国产精品国产专区不片| 日韩在线视频免费播放| 日韩你懂的在线观看| 老司机精品影院| 国产精品亚洲一区二区三区| 亚洲精品**不卡在线播he| 18黄暴禁片在线观看| 国产一区三区三区| 91视频最新网址| 欧美性xxxxxx少妇| 国产人成在线视频| 日韩av电影手机在线| 亚州精品视频| 一女被多男玩喷潮视频| 99精品国产热久久91蜜凸| 69精品久久久| 亚洲国产一区二区三区四区| 999福利在线视频| 国产精品国产三级国产专区53 | 欧美日韩一区二区区| 亚洲色图清纯唯美| 国产强被迫伦姧在线观看无码| 色偷偷偷亚洲综合网另类| 国产黄色精品| 中文字幕一区二区三区四区五区六区 | 亚洲欧美二区三区| 一级特黄aaa大片| 伊人成人开心激情综合网| 免费在线观看一区| 亚洲精品人成| 国产一区二区精品久久| 一级黄色录像视频| 精品88久久久久88久久久| 国产污视频在线播放| 免费亚洲精品视频| 久久久精品五月天| 国产午夜精品久久久久久久久| 欧美亚洲国产bt| 久操视频在线播放| av资源站久久亚洲| 日韩视频二区| 波多野吉衣中文字幕| 色偷偷88欧美精品久久久| av在线电影免费观看| 国产一区二区在线免费| 91精品国产成人观看| 无码国产精品一区二区免费式直播 | 男人添女人下部视频免费| 东方欧美亚洲色图在线| 天天爽夜夜爽夜夜爽精品| 在线观看欧美日韩国产| 国产精品国产亚洲精品| 日本阿v视频在线观看| 久久看人人爽人人| 一本久道久久综合无码中文| 久久中文精品视频| 先锋影音国产精品| www.久久91| 亚洲成a人在线观看| 免费激情视频在线观看| 欧美成人日韩| 中文字幕av观看| 欧美日本国产一区| 超碰在线网站| 少妇精品久久久久久久久久| 国产一区二区女| 日本一区二区三区免费视频| 亚洲一二在线观看| 国产一区二区三区国产精品| 成熟丰满熟妇高潮xxxxx视频| 国产欧美精品一区二区色综合朱莉| 91theporn国产在线观看| 久久久久久伊人| av在线不卡免费观看| av av在线| 欧美日韩美少妇| 精品丝袜在线| 久久av秘一区二区三区| 99re热视频这里只精品| 亚洲天堂网在线视频| 91高清视频在线免费观看| 羞羞答答成人影院www| 日韩人妻一区二区三区|