精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Kotlin筆記之協程工作原理

開發 前端
這一章會以下面的代碼為例解析一下協程啟動,掛起以及恢復的流程。

協程的狀態機

這一章會以下面的代碼為例解析一下協程啟動,掛起以及恢復的流程:

  1. private suspend fun getId(): String { 
  2.     return GlobalScope.async(Dispatchers.IO) { 
  3.         delay(1000) 
  4.         "hearing" 
  5.     }.await() 
  6.  
  7. private suspend fun getAvatar(id: String): String { 
  8.     return GlobalScope.async(Dispatchers.IO) { 
  9.         delay(1000) 
  10.         "avatar-$id" 
  11.     }.await() 
  12.  
  13. fun main() { 
  14.     GlobalScope.launch { 
  15.         val id = getId() 
  16.         val avatar = getAvatar(id) 
  17.         println("${Thread.currentThread().name} - $id - $avatar"
  18.     } 

上面 main 方法中,GlobalScope.launch 啟動的協程體在執行到 getId 后,協程體會掛起,直到 getId 返回可用結果,才會 resume launch 協程,執行到 getAvatar 也是同樣的過程。協程內部實現使用狀態機來處理不同的掛起點,將 GlobalScope.launch 協程體字節碼反編譯成 Java 代碼,大致如下(有所刪減):

 

  1. BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null
  2.     (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { 
  3.     int label; 
  4.  
  5.     public final Object invokeSuspend( 
  6.  
  7.  Object $result) { 
  8.         Object var10000; 
  9.         String id; 
  10.         label17: { 
  11.             CoroutineScope $this$launch; 
  12.             switch(this.label) { 
  13.             case 0: // a 
  14.                 ResultKt.throwOnFailure($result); 
  15.                 $this$launch = this.p$; 
  16.                 this.label = 1; // label置為1 
  17.                 var10000 = getId(this); 
  18.                 if (var10000 == COROUTINE_SUSPENDED) { 
  19.                     return COROUTINE_SUSPENDED; 
  20.                 } 
  21.                 // 若此時已經有結果,則不掛起,直接break 
  22.                 break; 
  23.             case 1: // b 
  24.                 ResultKt.throwOnFailure($result); 
  25.                 var10000 = $result; 
  26.                 break; 
  27.             case 2: // d 
  28.                 id = (String)this.L$1; 
  29.                 ResultKt.throwOnFailure($result); 
  30.                 var10000 = $result; 
  31.                 break label17; // 退出label17 
  32.             default
  33.                 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); 
  34.             } 
  35.             // c 
  36.             id = (String)var10000; 
  37.             this.L$1 = id; // 將id賦給L$1 
  38.             this.label = 2; // label置為2 
  39.             var10000 = getAvatar(id, this); 
  40.             if (var10000 == COROUTINE_SUSPENDED) { 
  41.                 return COROUTINE_SUSPENDED; 
  42.             } 
  43.         } 
  44.         // e 
  45.         String avatar = (String)var10000; 
  46.         String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString(); 
  47.         System.out.println(var5); 
  48.         return Unit.INSTANCE; 
  49.     } 
  50.  
  51.      
  52.  
  53.  
  54.     public final Continuation create
  55.  
  56.  Object value,  
  57.  
  58.  Continuation completion) { 
  59.         Intrinsics.checkParameterIsNotNull(completion, "completion"); 
  60.         Function2 var3 = new <anonymous constructor>(completion); 
  61.         var3.p$ = (CoroutineScope)value; 
  62.         return var3; 
  63.     } 
  64.  
  65.     public final Object invoke(Object var1, Object var2) { 
  66.         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); 
  67.     } 

這里我們根據上面的注釋以及字母標簽來看一下執行流程(invokeSuspend 方法會在協程體中的 suspend 函數得到結果后被調用,具體是在哪里被調用的稍后會講到):

  • a: launch 協程體剛執行到 getId 方法時,getId 方法的返回值將是 COROUTINE_SUSPENDED, 此時直接 return, 則 launch 協程體中 getId 后面的代碼暫時不會執行,即 launch 協程體被掛起(非阻塞, 該線程依舊會做其它工作)。這里將 label 置為了 1. 而若此時 getId 已經有結果(內部沒有調用 delay 之類的 suspend 函數等),則不掛起,而是直接 break。
  • b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 則當 getId 有可用結果返回后,會重新執行 launch 協程體的 invokeSuspend 方法,根據上面的 label==1, 會執行到這里檢查一下 result 沒問題的話就 break, 此時 id 賦值給了 var10000。
  • c: 在 a 中若直接 break 或 在 b 中得到 getId 的結果然后 break 后,都會執行到這里,得到 id 的值并把 label 置為2。然后調用 getAvatar 方法,跟 getId 類似,若其返回 COROUTINE_SUSPENDED 則 return,協程被掛起,等到下次 invokeSuspend 被執行,否則離開 label17 接著執行后續邏輯。
  • d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 則當 getAvatar 有可用結果返回后會重新調用 launch 協程體的 invokeSuspend 方法,此時根據 label==2 來到這里并取得之前的 id 值,檢驗 result(即avatar),然后break label17。
  • e: c 中直接返回了可用結果 或 d 中 break label17 后,launch 協程體中的 suspend 函數都執行完畢了,這里會執行剩下的邏輯。

suspend 函數不會阻塞線程,且 suspend 函數不一定會掛起協程,如果相關調用的結果已經可用,則繼續運行而不掛起,例如 async{} 返回值 Deferred 的結果已經可用時,await()掛起函數可以直接返回結果,不用再掛起協程。

這一節看了一下 launch 協程體反編譯成 Java 后的代碼邏輯,關于 invokeSuspend 是何時怎么被調用的,將會在下面講到。

協程的創建與啟動

這一節以 CoroutineScope.launch {} 默認參數為例,從源碼角度看看 Kotlin 協程是怎樣創建與啟動的:

  1. public fun CoroutineScope.launch( 
  2.     context: CoroutineContext = EmptyCoroutineContext, 
  3.     start: CoroutineStart = CoroutineStart.DEFAULT
  4.     block: suspend CoroutineScope.() -> Unit 
  5. ): Job { 
  6.     val newContext = newCoroutineContext(context) 
  7.     val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true
  8.     coroutine.start(start, coroutine, block) 
  9.     return coroutine 
  10.  
  11. // AbstractCoroutine.kt 
  12. // receiver: StandaloneCoroutine 
  13. // block: suspend StandaloneCoroutine.() -> Unit 
  14. // private open class StandaloneCoroutine(...) : AbstractCoroutine<Unit>(...) {} 
  15. // public abstract class AbstractCoroutine<in T>(...) : JobSupport(active), Job, Continuation<T>, CoroutineScope {} 
  16. public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { 
  17.     // 調用 CoroutineStart 中的 invoke 方法 
  18.     start(block, receiver, this) 
  19.  
  20. public enum class CoroutineStart { 
  21.     // block - StandaloneCoroutine.() -> Unit 
  22.     // receiver - StandaloneCoroutine 
  23.     // completion - StandaloneCoroutine<Unit> 
  24.     public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = 
  25.         when (this) { 
  26.             // 根據 start 參數的類型調用不同的方法 
  27.             DEFAULT -> block.startCoroutineCancellable(receiver, completion) 
  28.             ATOMIC -> block.startCoroutine(receiver, completion) 
  29.             UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) 
  30.             LAZY -> Unit // will start lazily 
  31.         } 

接下來看看 startCoroutineCancellable 方法:

  1. // receiver - StandaloneCoroutine 
  2. // completion - StandaloneCoroutine<Unit> 
  3. internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = 
  4.     runSafely(completion) { 
  5.         createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit)) 
  6.     } 

createCoroutineUnintercepted 方法創建了一個 Continuation 類型(協程)的實例,即創建了一個協程:

  1. public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( 
  2.     receiver: R, completion: Continuation<T> 
  3. ): Continuation<Unit> { 
  4.     return if (this is BaseContinuationImpl) create(receiver, completion) else // ... 

調用的是 (suspend (R) -> T) 的 createCoroutineUnintercepted 方法,(suspend (R) -> T) 就是協程體。直接看上面示例代碼中 GlobalScope.launch 編譯后的字節碼,可以發現 CoroutineScope.launch 傳入的 lambda 表達式被編譯成了繼承 SuspendLambda 的子類:

  1. final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2 

其繼承關系為: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation, 因此走 create(receiver, completion) 方法,從上面反編譯出的 Java 代碼可以看到 create 方法創建了一個 Continuation 實例,再看一下 Kotlin 代碼編譯后的字節碼(包名已省略):

  1. public final create(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; 
  2. // ... 
  3. NEW Main$main$1 

從上面可以看到,create 方法創建了 Main$main$1 實例,而其繼承自 SuspendLambda, 因此 create 方法創建的 Continuation 是一個 SuspendLambda 對象。

即 createCoroutineUnintercepted 方法創建了一個 SuspendLambda 實例。然后看看 intercepted 方法:

  1. public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = 
  2.     // 如果是ContinuationImpl類型,則調用intercepted方法,否則返回自身 
  3.     // 這里的 this 是 Main$main$1 實例 - ContinuationImpl的子類 
  4.     (this as? ContinuationImpl)?.intercepted() ?: this 
  5.  
  6. // ContinuationImpl 
  7. public fun intercepted(): Continuation<Any?> = 
  8.     // context[ContinuationInterceptor]是 CoroutineDispatcher 實例 
  9.     // 需要線程調度 - 返回 DispatchedContinuation,其 continuation 參數值為 SuspendLambda 
  10.     // 不需要線程調度 - 返回 SuspendLambda 
  11.     intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it } 
  12.  
  13. // CoroutineDispatcher 
  14. // continuation - SuspendLambda -> ContinuationImpl -> BaseContinuationImpl 
  15. public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = 
  16.     DispatchedContinuation(this, continuation) 

接下來看看 resumeCancellableWith 是怎么啟動協程的,這里還涉及到Dispatchers線程調度的邏輯:

 

  1. internal class DispatchedContinuation<in T>(      
  2.  
  3.  val dispatcher: CoroutineDispatcher,      
  4.  
  5.  val continuation: Continuation<T> 
  6. ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { 
  7.     public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { 
  8.         // 進行線程調度,最后也會執行到continuation.resumeWith方法 
  9.         is DispatchedContinuation -> resumeCancellableWith(result) 
  10.         // 直接執行continuation.resumeWith方法 
  11.         else -> resumeWith(result) 
  12.     } 
  13.  
  14.     inline fun resumeCancellableWith(result: Result<T>) { 
  15.         val state = result.toState() 
  16.         // 判斷是否需要線程調度 
  17.         if (dispatcher.isDispatchNeeded(context)) { 
  18.             _state = state 
  19.             resumeMode = MODE_CANCELLABLE 
  20.             // 需要調度則先進行調度 
  21.             dispatcher.dispatch(context, this) 
  22.         } else { 
  23.             executeUnconfined(state, MODE_CANCELLABLE) { 
  24.                 if (!resumeCancelled()) { 
  25.                     // 不需要調度則直接在當前線程執行協程 
  26.                     resumeUndispatchedWith(result) 
  27.                 } 
  28.             } 
  29.         } 
  30.     } 
  31.  
  32.     inline fun resumeUndispatchedWith(result: Result<T>) { 
  33.         withCoroutineContext(context, countOrElement) { 
  34.             continuation.resumeWith(result) 
  35.         } 
  36.     } 
  • 當需要線程調度時,則在調度后會調用 DispatchedContinuation.continuation.resumeWith 來啟動協程,其中 continuation 是 SuspendLambda 實例;
  • 當不需要線程調度時,則直接調用 SuspendLambda.resumeWith 來啟動協程。

resumeWith 方法調用的是父類 BaseContinuationImpl 中的 resumeWith 方法:

  1. internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable { 
  2.     public final override fun resumeWith(result: Result<Any?>) { 
  3.         // ... 
  4.         val outcome = invokeSuspend(param) 
  5.         // ... 
  6.     } 

因此,協程的啟動是通過 BaseContinuationImpl.resumeWith 方法調用到了子類 SuspendLambda.invokeSuspend 方法,然后通過狀態機來控制順序運行。

協程的掛起和恢復

Kotlin 編譯器會為 協程體 生成繼承自 SuspendLambda 的子類,協程的真正運算邏輯都在其 invokeSuspend 方法中。上一節介紹了 launch 是怎么創建和啟動協程的,在這一節我們再看看當協程代碼執行到 suspend 函數后,協程是怎么被掛起的 以及 當 suspend 函數執行完成得到可用結果后是怎么恢復協程的。

Kotlin 協程的內部實現使用了 Kotlin 編譯器的一些編譯技術,當 suspend 函數被調用時,都有一個隱式的參數額外傳入,這個參數是 Continuation 類型,封裝了協程 resume 后執行的代碼邏輯。

 

  1. private suspend fun getId(): String { 
  2.     return GlobalScope.async(Dispatchers.IO) { 
  3.         delay(1000) 
  4.         "hearing" 
  5.     }.await() 
  6.  
  7. // Decompile成Java 
  8. final Object getId( 
  9.  
  10.  Continuation $completion) { 
  11.     // ... 

其中傳入的 $completion 參數,從上一節可以看到是調用 getId 方法所在的協程體對象,也就是一個 SuspendLambda 對象。Continuation的定義如下:

  1. public interface Continuation<in T> { 
  2.     public val context: CoroutineContext 
  3.  
  4.     public fun resumeWith(result: Result<T>) 

將 getId 方法編譯后的字節碼反編譯成 Java 代碼如下(為便于閱讀,刪減及修改了部分代碼):

 

  1. final Object getId( 
  2.  
  3.  Continuation $completion) { 
  4.     // 新建與啟動協程 
  5.     return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { 
  6.         int label;  
  7.  
  8.         public final Object invokeSuspend( 
  9.  
  10.  Object $result) { 
  11.             switch(this.label) { 
  12.             case 0: 
  13.                 ResultKt.throwOnFailure($result); 
  14.                 this.label = 1; 
  15.                 if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) { 
  16.                     return COROUTINE_SUSPENDED; 
  17.                 } 
  18.                 break; 
  19.             case 1: 
  20.                 ResultKt.throwOnFailure($result); 
  21.                 break; 
  22.             default
  23.                 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); 
  24.             } 
  25.             return "hearing"
  26.         } 
  27.  
  28.         // ... 
  29.     }), 2, (Object)null).await($completion); // 調用 await() suspend 函數 

結合協程的狀態機一節,當上面的 launch 協程體執行到 getId 方法時, 會根據其返回值是否為 COROUTINE_SUSPENDED 來決定是否掛起,由于 getId 的邏輯是通過 async 啟動一個新的協程,協程體內調用了 suspend delay 方法,然后通過 await suspend 函數等待結果,當 async 協程沒完成時, await 會返回 COROUTINE_SUSPENDED, 因此 launch 協程體的 invokeSuspend 方法直接 return COROUTINE_SUSPENDED 值執行完成,此時 launch 啟動的協程處于掛起狀態但不阻塞所處線程,而 async 啟動的協程開始執行。

我們看一下 async 的源碼:

  1. public fun <T> CoroutineScope.async(...): Deferred<T> { 
  2.     val newContext = newCoroutineContext(context) 
  3.     val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else 
  4.         DeferredCoroutine<T>(newContext, active = true
  5.     coroutine.start(start, coroutine, block) 
  6.     return coroutine 

默認情況下,上面的 coroutine 取 DeferredCoroutine 實例,于是我們看一下其 await 方法以及在 async 協程執行完成后,是怎么恢復 launch 協程的:

  1. private open class DeferredCoroutine<T>( 
  2.     parentContext: CoroutineContext, active: Boolean 
  3. ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> { 
  4.     override suspend fun await(): T = awaitInternal() as T 
  5.  
  6. // JobSupport 
  7. internal suspend fun awaitInternal(): Any? { 
  8.     while (true) { // lock-free loop on state 
  9.         val state = this.state 
  10.         if (state !is Incomplete) { 
  11.             // 已經完成,則直接返回結果 
  12.             if (state is CompletedExceptionally) { // Slow path to recover stacktrace 
  13.                 recoverAndThrow(state.cause) 
  14.             } 
  15.             return state.unboxState() 
  16.         } 
  17.         // 不需要重試時直接break,執行awaitSuspend 
  18.         if (startInternal(state) >= 0) break 
  19.     } 
  20.     return awaitSuspend() // slow-path 
  21.  
  22. // suspendCoroutineUninterceptedOrReturn: 獲取當前協程,且掛起當前協程(返回COROUTINE_SUSPENDED)或不掛起直接返回結果 
  23. private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont -> 
  24.     val cont = AwaitContinuation(uCont.intercepted(), this) 
  25.     cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)) 
  26.     cont.getResult() 

上面 awaitInternal 的大致邏輯是當掛起函數已經有結果時則直接返回,否則掛起父協程,然后 invokeOnCompletion 方法將 ResumeAwaitOnCompletion 插入一個隊列(state.list)中,源碼就不再貼出了。接著看看在 async 執行完成后是怎么調用 ResumeAwaitOnCompletion 來 resume 被掛起的協程的。注意:不要繞進 async 協程體中 delay 是怎么掛起和恢復 async 協程的這一邏輯,我們不需要關注這一層!

接著 async 協程的執行往下看,從前面可知它會調用 BaseContinuationImpl.resumeWith 方法來執行協程邏輯,我們詳細看一下這個方法,在這里會執行該協程的 invokeSuspend 函數:

  1. internal abstract class BaseContinuationImpl( 
  2.     public val completion: Continuation<Any?>? 
  3. ) : Continuation<Any?>, CoroutineStackFrame, Serializable { 
  4.     public final override fun resumeWith(result: Result<Any?>) { 
  5.         var current = this 
  6.         var param = result 
  7.         while (true) { 
  8.             with(current) { 
  9.                 val completion = completion!! // fail fast when trying to resume continuation without completion 
  10.                 val outcome: Result<Any?> = 
  11.                     try {// 調用 invokeSuspend 方法執行協程邏輯 
  12.                         val outcome = invokeSuspend(param) 
  13.                         // 協程掛起時返回的是 COROUTINE_SUSPENDED,即協程掛起時,resumeWith 執行結束 
  14.                         // 再次調用 resumeWith 時協程掛起點之后的代碼才能繼續執行 
  15.                         if (outcome === COROUTINE_SUSPENDED) return 
  16.                         Result.success(outcome) 
  17.                     } catch (exception: Throwable) { 
  18.                         Result.failure(exception) 
  19.                     } 
  20.                 releaseIntercepted() // this state machine instance is terminating 
  21.                 if (completion is BaseContinuationImpl) { 
  22.                     // unrolling recursion via loop 
  23.                     current = completion 
  24.                     param = outcome 
  25.                 } else { 
  26.                     // top-level completion reached -- invoke and return 
  27.                     completion.resumeWith(outcome) 
  28.                     return 
  29.                 } 
  30.             } 
  31.         } 
  32.     } 

我們從上面的源碼可以看到,在 createCoroutineUnintercepted 方法中創建的 SuspendLambda 實例是 BaseContinuationImpl 的子類對象,其 completion 參數為下:

  • launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine
  • async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine

上面這幾個類都是 AbstractCoroutine 的子類。而根據 completion 的類型會執行不同的邏輯:

  • BaseContinuationImpl: 執行協程邏輯
  • 其它: 調用 resumeWith 方法,處理協程的狀態,協程掛起后的恢復即與它有關

在上面的例子中 async 啟動的協程,它也會調用其 invokeSuspend 方法執行 async 協程邏輯,假設 async 返回的結果已經可用時,即非 COROUTINE_SUSPENDED 值,此時 completion 是 DeferredCoroutine 對象,因此會調用 DeferredCoroutine.resumeWith 方法,然后返回,父協程的恢復邏輯便是在這里。

  1. // AbstractCoroutine 
  2. public final override fun resumeWith(result: Result<T>) { 
  3.     val state = makeCompletingOnce(result.toState()) 
  4.     if (state === COMPLETING_WAITING_CHILDREN) return 
  5.     afterResume(state) 

在 makeCompletingOnce 方法中,會根據 state 去處理協程狀態,并執行上面插入 state.list 隊列中的 ResumeAwaitOnCompletion.invoke 來恢復父協程,必要的話還會把 async 的結果給它,具體代碼實現太多就不貼了,不是本節的重點。直接看 ResumeAwaitOnCompletion.invoke 方法:

  1. private class ResumeAwaitOnCompletion<T>( 
  2.     job: JobSupport, private val continuation: CancellableContinuationImpl<T> 
  3. ) : JobNode<JobSupport>(job) { 
  4.     override fun invoke(cause: Throwable?) { 
  5.         val state = job.state 
  6.         assert { state !is Incomplete } 
  7.         if (state is CompletedExceptionally) { 
  8.             // Resume with with the corresponding exception to preserve it 
  9.             continuation.resumeWithException(state.cause) 
  10.         } else { 
  11.             // resume 被掛起的協程 
  12.             continuation.resume(state.unboxState() as T) 
  13.         } 
  14.     } 

這里的 continuation 就是 launch 協程體,也就是 SuspendLambda 對象,于是 invoke 方法會再一次調用到 BaseContinuationImpl.resumeWith 方法,接著調用 SuspendLambda.invokeSuspend, 然后根據 label 取值繼續執行接下來的邏輯!

suspendCoroutineUninterceptedOrReturn

接下來我們看一下怎么將一個基于回調的方法改造成一個基于協程的 suspend 方法,要實現這個需求,重點在于 suspendCoroutineUninterceptedOrReturn 方法,根據注釋,這個方法的作用是: Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension. 即獲取當前協程的實例,并且掛起當前協程或不掛起直接返回結果。函數定義如下:

  1. public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T { 
  2.     // ... 

根據 block 的返回值,有兩種情況:

  • 如果 block 返回 COROUTINE_SUSPENDED, 意味著 suspend 函數會掛起當前協程而不會立即返回結果。這種情況下, block 中的 Continuation 需要在結果可用后調用 Continuation.resumeWith 來 resume 協程。
  • 如果 block 返回的 T 是 suspend 函數的結果,則協程不會被掛起, block 中的 Continuation 不會被調用。

調用 Continuation.resumeWith 會直接在調用者的線程 resume 協程,而不會經過 CoroutineContext 中可能存在的 ContinuationInterceptor。建議使用更安全的 suspendCoroutine 方法,在其 block 中可以同步或在異步線程調用 Continuation.resume 和 Continuation.resumeWithException:

  1. public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T { 
  2.     contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } 
  3.     return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> -> 
  4.         // 調用攔截器 
  5.         val safe = SafeContinuation(c.intercepted()) 
  6.         block(safe) 
  7.         safe.getOrThrow() 
  8.     } 

此外除了 suspendCoroutine 方法,還有 suspendCancellableCoroutine, suspendAtomicCancellableCoroutine, suspendAtomicCancellableCoroutineReusable 等方法都可以用來將異步回調的方法封裝成 suspend 函數。

下面來看一個例子來介紹怎么將異步回調函數封裝成 suspend 函數:

  1. class NetFetcher { 
  2.     // 將下面的 request 方法封裝成 suspend 方法 
  3.     suspend fun requestSuspend(id: Int): String = suspendCoroutine { continuation -> 
  4.         request(id, object : OnResponseListener { 
  5.             override fun onResponse(response: String) { 
  6.                 continuation.resume(response) 
  7.             } 
  8.  
  9.             override fun onError(error: String) { 
  10.                 continuation.resumeWithException(Exception(error)) 
  11.             } 
  12.         }) 
  13.     } 
  14.  
  15.     fun request(id: Int, listener: OnResponseListener) { 
  16.         Thread.sleep(5000) 
  17.         if (id % 2 == 0) { 
  18.             listener.onResponse("success"
  19.         } else { 
  20.             listener.onError("error"
  21.         } 
  22.     } 
  23.  
  24.     interface OnResponseListener { 
  25.         fun onResponse(response: String) 
  26.         fun onError(error: String) 
  27.     } 
  28.  
  29. object Main { 
  30.     fun main() { 
  31.         requestByCoroutine() 
  32.     } 
  33.  
  34.     // 使用回調 
  35.     private fun requestByCallback() { 
  36.         NetFetcher().request(21, object : NetFetcher.OnResponseListener { 
  37.             override fun onResponse(response: String) { 
  38.                 println("result = $response"
  39.             } 
  40.  
  41.             override fun onError(error: String) { 
  42.                 println("result = $error"
  43.             } 
  44.         }) 
  45.     } 
  46.  
  47.     // 使用協程 
  48.     private fun requestByCoroutine() { 
  49.         GlobalScope.launch(Dispatchers.Main) { 
  50.             val result = withContext(Dispatchers.IO) { 
  51.                 try { 
  52.                     NetFetcher().requestSuspend(22) 
  53.                 } catch (e: Exception) { 
  54.                     e.message 
  55.                 } 
  56.             } 
  57.       

為加深理解,再介紹一下 Kotlin 提供的兩個借助 suspendCancellableCoroutine 實現的掛起函數: delay & yield。

delay

delay 方法借助了 suspendCancellableCoroutine 方法來掛起協程:

  1. public suspend fun delay(timeMillis: Long) { 
  2.     if (timeMillis <= 0) return // don't delay 
  3.     return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> 
  4.         cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) 
  5.     } 
  6.  
  7. override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 
  8.     postDelayed(Runnable { 
  9.         with(continuation) { resumeUndispatched(Unit) } 
  10.     }, timeMillis) 

可以看出這里 delay 的邏輯類似于 Handle 機制,將 resumeUndispatched 封裝的 Runnable 放到一個隊列中,在延遲的時間到達便會執行 resume 恢復協程。

yield

yield 方法作用是掛起當前協程,這樣可以讓該協程所在線程運行其他邏輯,當其他協程執行完成或也調用 yield 讓出執行權時,之前的協程可以恢復執行。

  1. launch(Dispatchers.Main) { 
  2.     repeat(3) { 
  3.         println("job1 $it"
  4.         yield() 
  5.     } 
  6. launch(Dispatchers.Main) { 
  7.     repeat(3) { 
  8.         println("job2 $it"
  9.         yield() 
  10.     } 
  11.  
  12. // output 
  13. job1 0 
  14. job2 0 
  15. job1 1 
  16. job2 1 
  17. job1 2 
  18. job2 2 

看一下 yield 的源碼:

  1. public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> 
  2.     val context = uCont.context 
  3.     // 如果協程沒有調度器,或者像 Unconfined 一樣沒有進行調度則直接返回 
  4.     val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit 
  5.     if (cont.dispatcher.isDispatchNeeded(context)) { 
  6.         // this is a regular dispatcher -- do simple dispatchYield 
  7.         cont.dispatchYield(context, Unit) 
  8.     } else { 
  9.         // This is either an "immediate" dispatcher or the Unconfined dispatcher 
  10.         // ... 
  11.     } 
  12.     COROUTINE_SUSPENDED 
  13.  
  14. // DispatchedContinuation 
  15. internal fun dispatchYield(context: CoroutineContext, value: T) { 
  16.     _state = value 
  17.     resumeMode = MODE_CANCELLABLE 
  18.     dispatcher.dispatchYield(context, this) 
  19.  
  20. public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) 

可知 dispatchYield 會調用到 dispatcher.dispatch 方法將協程分發到調度器隊列中,這樣線程可以執行其他協程,等到調度器再次執行到該協程時,會 resume 該協程。

總結

通過上面協程的工作原理解析,可以從源碼中發現 Kotlin 中的協程存在著三層包裝:

  • 第一層包裝: launch & async 返回的 Job, Deferred 繼承自 AbstractCoroutine, 里面封裝了協程的狀態,提供了 cancel 等接口;
  • 第二層包裝: 編譯器生成的 SuspendLambda 子類,封裝了協程的真正執行邏輯,其繼承關系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 參數就是第一層包裝實例;
  • 第三層包裝: DispatchedContinuation, 封裝了線程調度邏輯,它的 continuation 參數就是第二層包裝實例。

這三層包裝都實現了 Continuation 接口,通過代理模式將協程的各層包裝組合在一起,每層負責不同的功能,如下圖:

責任編輯:未麗燕 來源: 蒼耳的博客
相關推薦

2023-10-24 19:37:34

協程Java

2021-08-04 16:19:55

AndroidKotin協程Coroutines

2019-10-23 14:34:15

KotlinAndroid協程

2020-06-19 08:01:48

Kotlin 協程編程

2021-09-16 09:59:13

PythonJavaScript代碼

2025-08-08 08:23:49

2022-09-12 06:35:00

C++協程協程狀態

2024-05-29 08:05:15

Go協程通信

2023-12-27 08:07:49

Golang協程池Ants

2020-02-19 14:16:23

kotlin協程代碼

2021-04-28 09:08:23

Kotlin協程代碼

2025-05-16 08:21:45

2023-11-17 11:36:59

協程纖程操作系統

2023-07-13 08:06:05

應用協程阻塞

2021-02-19 06:56:33

架構協程應用

2025-06-26 04:10:00

2023-09-03 19:13:29

AndroidKotlin

2025-02-08 09:13:40

2021-12-09 06:41:56

Python協程多并發

2021-05-21 08:21:57

Go語言基礎技術
點贊
收藏

51CTO技術棧公眾號

欧美不卡在线播放| 91在线看www| 色欲AV无码精品一区二区久久 | 亚洲五码中文字幕| 久久99国产精品99久久| 中文字幕视频一区二区| 欧美成人有码| 亚洲毛片在线免费观看| 久久久久久久久久一区| 动漫一区二区| 欧美激情自拍偷拍| 国产精华一区| 一区二区视频免费观看| 亚洲成人中文| www.亚洲一区| 久久丫精品国产亚洲av不卡| 久久亚洲国产精品尤物| 偷拍一区二区三区四区| 亚洲 欧洲 日韩| 天堂av网在线| 国产a久久麻豆| 国产精品嫩草视频| 五月婷婷激情网| 中文字幕日韩欧美精品高清在线| 亚洲男人天堂九九视频| 成人欧美精品一区二区| 欧美成人一二区| 色爱区综合激月婷婷| 男女啪啪免费视频网站| а√天堂在线官网| 日本一区二区三区四区| 久久国产欧美精品| 好男人在线视频www| 久久99精品国产麻豆不卡| 日本高清久久天堂| 在线观看 中文字幕| 欧美不卡一区| 美女视频久久黄| 情侣偷拍对白清晰饥渴难耐| 精品国产精品久久一区免费式| 亚洲精品电影网| 少妇熟女视频一区二区三区 | 欧美日韩精品欧美日韩精品一综合| 日本www在线视频| 国产又色又爽又黄刺激在线视频| 亚洲乱码日产精品bd| 一区二区精品在线| 92国产在线视频| 国产精品午夜久久| 伊人色综合影院| 在线观看免费黄视频| 亚洲国产精品t66y| 一本色道婷婷久久欧美| 99青草视频在线播放视| 国产精品欧美久久久久无广告 | 91福利在线观看| www.亚洲天堂网| 日韩三级影视| 在线观看日韩高清av| 免费看污黄网站| 成人高清一区| 6080日韩午夜伦伦午夜伦| 国产永久免费网站| 国内精品视频| 精品国产一区二区三区忘忧草| 日本一区二区免费视频| 国产精品黄网站| 日韩精品在线免费| 超碰97av在线| 自拍偷拍欧美专区| 性亚洲最疯狂xxxx高清| 亚洲精品午夜国产va久久成人| 国产视频欧美| 国产精品免费在线免费| 国产伦一区二区| 成人永久aaa| 欧美二区三区| 欧美成人xxx| 一区二区三区视频在线看| 草b视频在线观看| 免费观看一级欧美片| 欧美视频日韩视频| 亚洲国产欧美日韩在线| 另类图片第一页| 国产亚洲精品高潮| 永久免费看黄网站| 国产精品女主播一区二区三区| 国产精品高清免费在线观看| 国产suv精品一区二区69| 白白色 亚洲乱淫| 视频一区二区三区免费观看| av网站网址在线观看| 高跟丝袜欧美一区| 亚洲午夜激情影院| 老司机在线精品视频| www.精品av.com| 中文字幕亚洲精品在线| 精品亚洲成a人| 精品乱码一区二区三区| 日本在线www| 欧美日韩在线免费| 欧美精品aaaa| 国产精品欧美大片| 日韩中文字幕第一页| 国产成人在线免费观看视频| 精品制服美女久久| 久久久福利视频| 成人在线免费看黄| 日本高清不卡aⅴ免费网站| 永久免费看片在线观看| 第一会所亚洲原创| 97视频网站入口| 国产视频在线观看视频| 久久久777精品电影网影网| 国产高清www| 国产亚洲高清一区| 伊人精品在线观看| 国产精品老女人| 国产91丝袜在线18| 亚洲欧洲久久| 欧美日韩精品免费观看视完整| 精品免费一区二区三区| 日本二区三区视频| 日韩不卡一区二区三区 | 亚洲激情在线看| 奇米狠狠一区二区三区| 97在线看福利| 女人18毛片一区二区三区| 亚洲男人的天堂在线观看| 999精品视频在线| 欧美人与拘性视交免费看| 午夜精品久久久久久久99黑人| 国产婷婷一区二区三区久久| 国产精品卡一卡二| 国产三级三级看三级| 久久最新网址| 国产精品99久久久久久久久| 污视频网站在线播放| 亚洲午夜久久久久久久久电影院| 青青草原播放器| 亚洲成人精品| 成人福利视频在线观看| 欧美一区二区三区在线观看免费| 91福利区一区二区三区| 成人在线一级片| 日韩一区欧美二区| 日本不卡一二三区| 91福利精品在线观看| 亚洲性线免费观看视频成熟| 神马久久久久久久| 国产色综合久久| 欧美日韩在线成人| 成人网18免费网站| 国产精品中文久久久久久久| 四虎久久免费| 日韩欧美视频一区| 国产精品第一页在线观看| 成人夜色视频网站在线观看| www.av中文字幕| 日韩黄色网络| 国产精品电影在线观看| 9色在线视频| 欧美一区二区在线播放| 九九视频在线免费观看| 懂色av中文字幕一区二区三区| 黄色成人在线看| 先锋影音国产精品| 国产精品第100页| 毛片在线播放a| 日韩免费电影一区| www.中文字幕在线观看| 中文字幕av资源一区| 樱花草www在线| 亚洲精品一区二区妖精| 都市激情久久久久久久久久久| 国内激情视频在线观看| 在线电影欧美日韩一区二区私密| 国产精品久久久午夜夜伦鲁鲁| 亚洲蜜臀av乱码久久精品蜜桃| 青娱乐国产精品视频| 日韩视频二区| 在线一区亚洲| 欧美影院天天5g天天爽| 国产精品扒开腿做| 天天色天天射天天综合网| 日韩成人黄色av| 一级黄色片在线观看| 亚洲一区二区视频在线| 在线小视频你懂的| 风流少妇一区二区| 少妇激情一区二区三区| 午夜天堂精品久久久久| 欧美精品亚洲精品| 麻豆一区在线| 国产精品久久久久av免费| 日本电影在线观看| 色午夜这里只有精品| 欧洲成人一区二区三区| 欧美日韩激情在线| 国产成人亚洲欧洲在线| 亚洲欧洲www| 亚洲永久精品ww.7491进入| 国产一区二区三区四区五区美女 | 中文字幕日韩精品一区二区| 国产在线播放精品| 91九色国产社区在线观看| 亚洲天堂电影| 久久久久国产精品一区| www.中文字幕久久久| 亚洲精品美女久久久| 朝桐光av在线一区二区三区| 欧美三区在线视频| 99热在线观看免费精品| 亚洲综合激情网| 国产高潮流白浆| 国产精品嫩草99a| 波多野在线播放| av影院午夜一区| 国产吃瓜黑料一区二区| 狠狠狠色丁香婷婷综合激情| 免费黄色特级片| 亚洲深夜av| a天堂资源在线观看| 亚洲女同一区| 精品久久免费观看| 日本精品黄色| 日产国产精品精品a∨| 日韩欧美天堂| 国产日韩欧美一区二区三区四区| 国产精品一区二区三区av| 国产精品视频1区| 粉嫩一区二区三区| 日韩女在线观看| 免费观看欧美大片| 日本久久久久久久久| 色黄视频在线观看| …久久精品99久久香蕉国产| 2020日本在线视频中文字幕| 欧美激情18p| 久久久久黄久久免费漫画| 精品中文字幕在线| 久草在线视频资源| 欧美精品videossex88| 久久大胆人体| 69久久夜色精品国产69乱青草| av福利导福航大全在线| 国内精品一区二区三区四区| 9999在线视频| 91成人免费观看网站| 丝袜老师在线| 国产精品91在线| 九九久久国产| 亚洲va国产va天堂va久久| 国产亚洲观看| 国产精品免费观看高清| 欧美一区 二区| 欧美午夜免费| 欧美亚洲激情| 最近中文字幕免费mv| 在线一区免费| 伊人成色综合网| 日本在线不卡视频一二三区| 97人人爽人人| 福利一区在线观看| 在线免费观看黄色小视频| 国产色产综合产在线视频| 天天干天天舔天天操| 综合欧美亚洲日本| 国产一级久久久| 在线免费观看不卡av| 国产一区二区在线视频观看| 欧美成人三级电影在线| 青青青免费视频在线2| 色七七影院综合| 福利小视频在线| 国产精品视频中文字幕91| 日韩免费精品| 蜜桃麻豆www久久国产精品| 久久一区二区三区喷水| 国产 欧美 日韩 一区| 久久精品国产清高在天天线| www.成年人| 99久久精品免费精品国产| 免费在线观看a视频| 亚洲最新视频在线观看| 久久久成人免费视频| 91精品国产综合久久久久| 亚洲 欧美 自拍偷拍| 日韩中文字幕不卡视频| gogo久久| 91手机视频在线观看| 欧洲vs亚洲vs国产| 熟女熟妇伦久久影院毛片一区二区| 亚洲日韩视频| 午夜激情视频网| 久久精品日产第一区二区三区高清版 | 香蕉成人app| 欧美日韩日本网| 午夜国产精品视频免费体验区| 亚洲熟妇av一区二区三区| 国产在线视频一区二区| 在线观看福利片| 亚洲午夜在线视频| 中文字幕精品一区二| 亚洲国产精品久久久久| 国产在线1区| 国产成人精品国内自产拍免费看 | 国产女片a归国片aa| 在线观看免费视频综合| 无套内谢的新婚少妇国语播放| 日韩一区二区欧美| 久久亚洲精品爱爱| 精品伦精品一区二区三区视频| 自拍欧美日韩| 日本美女视频一区| 亚洲国产精品av| 4438国产精品一区二区| 亚洲国产精品va| 污的网站在线观看| 成人国产精品色哟哟| 欧美老女人另类| 亚洲精品无码久久久久久| 成人午夜在线播放| 久久免费少妇高潮99精品| 欧美高清你懂得| 亚洲乱亚洲乱妇| 国产精品久久久久影院日本| 亚洲国产精品嫩草影院久久av| 欧美日韩福利在线| 丁香激情综合五月| 麻豆亚洲av熟女国产一区二| 在线观看91av| 麻豆视频在线观看免费| 国产精品午夜一区二区欲梦| 国产剧情在线观看一区| 老熟妇仑乱视频一区二区| 91麻豆6部合集magnet| 在线观看国产亚洲| 日韩激情视频在线播放| 国产白浆在线免费观看| 国产精品美女诱惑| 亚洲美女黄网| 亚洲第一黄色网址| 狠狠久久五月精品中文字幕| 亚洲av成人精品一区二区三区在线播放 | 婷婷色在线资源| 国产精品久久久久久久小唯西川 | 国产后入清纯学生妹| 蜜臀久久99精品久久久无需会员| 国内精品视频| 日韩极品视频在线观看| 99久久国产免费看| 精品国产一区二区三区四| 亚洲视频精品在线| 国产成人精品一区二区三区免费| 色女人综合av| 激情五月激情综合网| 欧产日产国产v| 亚洲国产欧美一区二区三区同亚洲| 暧暧视频在线免费观看| 精品日本一区二区三区| 老司机久久99久久精品播放免费| 久久久久久久久久久久| 欧美午夜精品久久久久久超碰| 日本不卡不卡| 国产91精品一区二区绿帽| 国产情侣一区| 亚洲综合第一区| 日韩一二在线观看| 欧美少妇网站| 亚洲国产精品久久久久婷婷老年 | 午夜激情在线观看| 亚洲一区二区三区乱码aⅴ| 亚洲激情亚洲| 国产成人免费观看网站| 欧美一级片免费看| 国产污视频在线播放| 日韩精品久久久免费观看| 精品一区二区三区影院在线午夜 | 免费污网站在线观看| 欧美精品视频www在线观看| 亚洲性图自拍| 欧美人xxxxx| 国产精品综合网| 国产午夜免费福利| 久久精品久久久久| 久久草在线视频| 第一区免费在线观看| 午夜电影久久久| 在线激情网站| 九九99玖玖| 国产一区二区在线电影| 亚欧视频在线观看| 久久激情五月丁香伊人| 另类ts人妖一区二区三区| 日韩成人精品视频在线观看| 天天操天天色综合| 久草免费在线| 婷婷久久青草热一区二区| 成人精品小蝌蚪|