欢迎回来!

距离上一篇博客已经接近一年了,在这期间虽然也有一些奇怪的研究,但是都还没有到达能够写博客的地步。每天都在为琐事和业务代码忙忙碌碌,也没有闲工夫去研究比较有意思的东西。这也算是我给懒癌找的借口,不过还是希望这个系列能够稍微长一点,能够被称为「系列」。

关于协程

众所周知,无论是怎样的协程,最后都逃不出什么 Dispatcher,Woker,EventLoop 之类的东西。所以当遇见了异步操作的时候,最后总是将当前的线程的处理能力交还给调度器,以实现非阻塞式的调用,然后将任务完成的委托注册下来,以便异步任务完成时调用,实现同步化的异步调用。

简单来说,比如你每天早上的早餐是一块吐司,一杯咖啡,一个煎蛋。那么每天早上你就有三个任务,烤吐司,冲咖啡,煎鸡蛋。

假设你是阻塞式单线程的,你大概就会这样

7:00 烧热水 >>> 5min >>> 7:06 冲咖啡
7:07 将吐司放入面包机 >>> 1min >>> 7:09 吐司制作完成
7:10 煎鸡蛋 >>> 2min >>> 7:13 完成早餐

这样的话,你会浪费 8 分钟的时间在等待烧水、烤面包以及煎蛋,这无疑是最差的做法,这个时候你就会想,假设你具有分身术事情是不是会变得不一样。

分身 A: 7:00 烧热水 >>> 5min >>> 7:06 冲咖啡
分身 B: 7:00 将吐司放入面包机 >>> 1min >>> 7:02 吐司制作完成
分身 C: 7:00 煎鸡蛋 >>> 2min >>> 7:03 完成煎蛋

显而易见,你只需要 6 分钟就能结束所有操作了。然而事实上是你并不会分身术,而且在程序世界中,线程也是比较宝贵的资源。并且再仔细看看,发现大多数的时候你的分身也是在等待中度过,太浪费了。这个时候如果你愿意花费一些脑子,来调度你的早餐任务,就像协程做的这样,统筹一下,就会的得到这样的结果。

7:00 烧热水 >>> 7:01 将吐司放入面包机 >>> 7:02 煎鸡蛋
7:03 吐司制作完成 >>> 继续煎蛋 1min >>> 7:05 完成煎蛋
继续烧水 1min >>> 7:06 冲咖啡完成早餐

也许对你来说这是一个忙碌的早上,但是你节约了 7 分钟的时间。可以看到,使用协程的效果和具有分身术是一样的效率,这就是协程的威力。

但是在程序的世界,对于 CPU 非密集型操作,并且比较耗时的场景下,比如磁盘 IO,网络 IO,协程往往比多线程效率更高,因为少了线程这么重型的操作,并且省去了程序间切换上下文的消耗。这也是最近流行的 NIO 理由,大家都想物尽其用。至于 CPU 密集型操作,还是交给分身术(多线程)比较好。

至于协程和异步的关系,我的理解是,协程是成规模,有规划的异步操作。异步是协程的核心,多个异步操作之间的协调与调度就是协程。

有意思的错误猜测

了解了协程的概念之后,讲一个有意思的事情。在前不久,和同事讨论 Kotlin 是如何实现其协程时,他给出了一个非常有意思的猜测。

由于考虑到 JVM 的呆萌,而且 Kotlin 的协程是以三方库的形式支持的,并没有编译时的支持。基于这些理由,同事给出他的想法。

协程遇到异步操作,需要转交当前线程的控制权给调度器的 EventLoop,那么 Kotlin 是在何时转交这个控制权的呢?

package io.kanro

import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main(args: Array<String>) = runBlocking {
    async {
        delay(1000) // Do some work
        println("Done once!")
    }.await()

    async {
        delay(1000) // Do some work
        println("Done twice!")
    }.await()
}

如果在 await 和其他的异步相关方法中,获取当前线程的调度器,并且在里面跑 EventLoop,就可以完成任务调度了,这样也不需要编译时的支持,缺点可能是异常的堆栈追踪可能不太好读,并且整个调用栈会稍微深一些,但是应该还不至于溢出。

如上所示,因为在编译器不干预的前提下,同事给出的答案是在 await() 方法中,这个答案让我耳目一新,之前在我的了解中,C# 对于 async 关键字的处理是会将代码分段拆分,只是由于 async/await 的语法糖,让你觉得他们是在一起的。

C# await 一个 Task 时,会立即返回,并注册完成委托到同步上下文中,以便任务完成时返回这里继续执行,但是由于一个函数的代码域中,不可能执行到一半然后撒手不管了,所以 C# 给出的解决方案是立即返回,并交出线程的使用权,由调度器跑 EventLoop。

Kotlin 也不可能在一个函数执行到一半就切换干别的去了,而且也没有编译器的帮助,不能中途返回。那么就只能由 await 方法调用 EventLoop 了,当任务完成时,跳出这个 Loop,然后继续向下走。调度器和 EventLoop 并不拥有整个过程调度的主导权,全靠用户代码自动调度。

思路和 C# 完全相反,但十分合理的推断,也给了我一个全新的启示。但是遗憾的是,这个猜测是错误的。具体原因,我们在下面再探讨。

C# 中的异步

CLR 上的 Task + async/await 可以说是将 async/await 式异步编程带入了一个新的高度,在 ES6 中的 Promise + async/await 也是将 CLR 的异步精神发扬光大。从此再也不需要为了写异步代码绞尽脑汁,写异步和写同步一样的轻松自如。

要说 C# 的 async/await 的原理,要从老式的 C# 异步范式 APM(Asynchronous Programming Model)说起。

在 .Net 4.0 之前,CLR 基础库中的异步 API 基本上都是 BeginXXXEndXXX 的写法,调用异步 API 都需要先调用 BeginXXX 获取一个 IAsyncResult 对象,然后可以用于轮询其中的 IsCompleted 属性,或者使用 AsyncCallback 实现异步调用。也可以使用其中的 AsyncWaitHandle 用来同步等待。最后检查到完成异步操作之后,使用 EndXXX 获取最后的结果。

想想就觉得麻烦,这还是只是调用方,如果你要写基于 APM 的 API,那就更蛋疼了,代码被拆分在了不同的地方,当十几个异步 API 一起写,简直酸爽。

而现在的基于 Task + async/await 的异步范式 TAP(Task-based Asynchronous Pattern)算是拯救了 C# 程序员,由编译器将链式的 Task 调用的异步代码再拼装起来,让其看上去就像同步调用一样。基于 TAP 的异步方法需要返回值是 Task 或者带返回值的 Task<T> (还有带进度的 Task),总之就是一个包装。

而调用方想要异步等待一个 Task 必须在函数上标记 async 关键字,而且被标记了 async 关键字的函数要么没有返回值要么也需要返回一个 Task 。所以 async 是具有传染性的,往往由于调用了一个 async 方法,导致了后续所有的调用都需要是 async 的。其传染性,就从根本上就决定的,还是由于函数不可能正常执行到一半就中断了,所以所有异步操作都需要一个Task 作为包装,当遇到了一个 await 的时候,编译器就将剩下的操作打包成要给 Task 作为返回,并且多个 await 能够通过 ContinueWith 形成链式调用,很像没有 async/await 的 Promise。  

为了打破这个传染性,可以使用没有返回值的 async 函数,但是没有返回值的 async 函数被其他函数调用时就没有异步的特性。比如下面的程序仅仅会输出 Main 就结束了。因为 Test2 方法在执行到 await Test(); 还没等 Test 方法异步调用结束时就已经把后续操作放入同步上下文中返回了。Main 函数并不知道 Test 有异步调用,就打印出 Main 结束程序了。

using System;
using System.Threading.Tasks;

namespace Kanro
{
    class Program
    {
        static void Main(string[] args)
        {
            Test2();
            Console.WriteLine("Main");
        }

        static async Task Test()
        {
            await Task.Delay(1000);
            Console.WriteLine("Test");
        }

        static async void Test2()
        {
            await Test();
            Console.WriteLine("Test2");
        }
    }
}

还有一种方式可以用于打破 async 的传染性,就是使用 Task.Wait() 将异步调用转换为同步调用,阻塞当前线程。这样就将异步的病毒截至在这里了,算是比较常用的一种手段。

yield 与异步

各种异步方案,都是为了解决函数中途无法返回的问题,像 C# 提出的直接将函数直接分成两半(APM式),和采用立即返回一个 Task 包装(TAP式)都是为了中途就把线程从当前函数弄出去,干别的事情。

而现代语言几乎都会有一个迭代器 yield 的实现,在 ES2015 中还没有 Promise + async/await 的异步实现时,基本上所有的 JS 库的异步都是采用 yield 实现。

yield 能够解决异步方案最大的难题(中途返回),举个例子

static IEnumerator<String> YieldTest()
{
    yield return "1";
    yield return "2";
    yield return "3";
    yield return "4";
}

yield 能让一个函数返回多次,当第一次返回后,第二次返回会紧接着上一次返回的地方执行,然后整个东西被打包成为了一个枚举器,每次调用 Next 方法时,就会接着上一次返回的地方执行。

Kotlin 中的协程

在最近发布的 Kotlin 1.3.0 中,Kotlin Coroutines 库也是正式 release 了,可喜可贺。随之而来的还有黑魔法的 Contract 功能和不知道怎么吐槽 JVM 的 Unsigned 类型。

首先来看看 Kotlin 中的协程最核心的关键字 suspend,这个关键字表示标记有这个关键字的 function 或者 lambda 是可中断的。

注意到了吗?可中断,在上面我们说过了,作为异步最重要的就是实现函数的中途返回,也就是分割函数,只要实现了分割函数,就可以实现异步,从而实现协程。

然后更有意思的事情是,suspend 关键字只能被同样具有 suspend 关键字标记的函数调用。分割函数,传染性,简直和 async 关键字一模一样。

在文章的开头,我们讲到了同事对于 Kotlin 协程的猜想,是在编译器不会对代码额外的修改得出的结论,但是目前看来既然具有 suspend 关键字,说明编译器还是会有一些处理的。

接下来的一个例子验证了我的猜想

package io.kanro

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

fun main(args: Array<String>){
    GlobalScope.launch {
        val jobA = async {
            println("${Thread.currentThread().name} JobA start")
            delay(1000) // Do some work
            println("${Thread.currentThread().name} JobA end")
        }

        val jobB = async {
            println("${Thread.currentThread().name} JobB start")
            delay(2000) // Do some work
            println("${Thread.currentThread().name} JobB end")
        }

        awaitAll(jobA, jobB)
    }

    Thread.sleep(5000)
}

如果是在 await 方法中进入 EventLoop 的话,那么就回避了分割函数的问题,在需要分割的地方产生一次函数调用,而这个函数里面可以做任何事情,也就相当于分割了函数。但是无论无何,这些都还是在一次函数调用中,也就是整个调用都是在一个线程中。那么在一次 async 方法中的任何地方,线程应该都是同一个才对。

上面的代码运行下面却是这样的结果:

DefaultDispatcher-worker-1 JobA start
DefaultDispatcher-worker-3 JobB start
DefaultDispatcher-worker-1 JobA end
DefaultDispatcher-worker-4 JobB end

令我和同事两个人大跌眼镜。说明 jobB 中第一次打印和第二次打印并不在同一个线程,那么一定有地方将这个函数做切割了,并且编译器也一定参与了这个过程。

百思不得其解的时候,我决定直接看这段函数的字节码,从中分析其原理。我从中提取了其中 jobB 的 lambda 对象的字节码一探究竟。

// ================io/kanro/MainKt$main$1$jobB$1.class =================
// class version 50.0 (50)
// access flags 0x30
// signature Lkotlin/coroutines/jvm/internal/SuspendLambda;Lkotlin/jvm/functions/Function2<Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation<-Lkotlin/Unit;>;Ljava/lang/Object;>;
// declaration: io/kanro/MainKt$main$1$jobB$1 extends kotlin.coroutines.jvm.internal.SuspendLambda implements kotlin.jvm.functions.Function2<kotlinx.coroutines.CoroutineScope, kotlin.coroutines.Continuation<? super kotlin.Unit>, java.lang.Object>
final class io/kanro/MainKt$main$1$jobB$1 extends kotlin/coroutines/jvm/internal/SuspendLambda  implements kotlin/jvm/functions/Function2  {


  // access flags 0x2
  private Lkotlinx/coroutines/CoroutineScope; p$

  // access flags 0x11
  public final invokeSuspend(Ljava/lang/Object;)Ljava/lang/Object;
  @Lorg/jetbrains/annotations/Nullable;() // invisible
    @Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 0
   L0
    INVOKESTATIC kotlin/coroutines/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED ()Ljava/lang/Object;
   L1
    LINENUMBER 17 L1
    ASTORE 4
    ALOAD 0
    GETFIELD io/kanro/MainKt$main$1$jobB$1.label : I
    TABLESWITCH
      0: L2
      1: L3
      default: L4
   L2
    ALOAD 1
    DUP
    INSTANCEOF kotlin/Result$Failure
    IFEQ L5
    CHECKCAST kotlin/Result$Failure
    GETFIELD kotlin/Result$Failure.exception : Ljava/lang/Throwable;
    ATHROW
   L5
    POP
   L6
    ALOAD 0
    GETFIELD io/kanro/MainKt$main$1$jobB$1.p$ : Lkotlinx/coroutines/CoroutineScope;
    ASTORE 2
   L7
    LINENUMBER 18 L7
    NEW java/lang/StringBuilder
    DUP
    INVOKESPECIAL java/lang/StringBuilder.<init> ()V
    INVOKESTATIC java/lang/Thread.currentThread ()Ljava/lang/Thread;
    DUP
    LDC "Thread.currentThread()"
    INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkExpressionValueIsNotNull (Ljava/lang/Object;Ljava/lang/String;)V
    INVOKEVIRTUAL java/lang/Thread.getName ()Ljava/lang/String;
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    LDC " JobB start"
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    INVOKEVIRTUAL java/lang/StringBuilder.toString ()Ljava/lang/String;
    ASTORE 3
   L8
    GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
    ALOAD 3
    INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/Object;)V
   L9
   L10
    LINENUMBER 19 L10
    LDC 2000
    ALOAD 0
    ALOAD 0
    ICONST_1
    PUTFIELD io/kanro/MainKt$main$1$jobB$1.label : I
    INVOKESTATIC kotlinx/coroutines/DelayKt.delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
   L11
    DUP
    ALOAD 4
    IF_ACMPNE L12
   L13
    LINENUMBER 17 L13
    ALOAD 4
    ARETURN
   L3
    ALOAD 1
    DUP
    INSTANCEOF kotlin/Result$Failure
    IFEQ L14
    CHECKCAST kotlin/Result$Failure
    GETFIELD kotlin/Result$Failure.exception : Ljava/lang/Throwable;
    ATHROW
   L14
    POP
    ALOAD 1
   L12
    LINENUMBER 20 L12
    POP
   L15
    NEW java/lang/StringBuilder
    DUP
    INVOKESPECIAL java/lang/StringBuilder.<init> ()V
    INVOKESTATIC java/lang/Thread.currentThread ()Ljava/lang/Thread;
    DUP
    LDC "Thread.currentThread()"
    INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkExpressionValueIsNotNull (Ljava/lang/Object;Ljava/lang/String;)V
    INVOKEVIRTUAL java/lang/Thread.getName ()Ljava/lang/String;
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    LDC " JobB end"
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    INVOKEVIRTUAL java/lang/StringBuilder.toString ()Ljava/lang/String;
    ASTORE 3
   L16
    GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
    ALOAD 3
    INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/Object;)V
   L17
   L18
    LINENUMBER 21 L18
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    ARETURN
   L4
    NEW java/lang/IllegalStateException
    DUP
    LDC "call to 'resume' before 'invoke' with coroutine"
    INVOKESPECIAL java/lang/IllegalStateException.<init> (Ljava/lang/String;)V
    ATHROW
   L19
    LOCALVARIABLE this Lio/kanro/MainKt$main$1$jobB$1; L0 L19 0
    LOCALVARIABLE result Ljava/lang/Object; L0 L19 1
    MAXSTACK = 5
    MAXLOCALS = 5

  @Lkotlin/coroutines/jvm/internal/DebugMetadata;(f="Main.kt", l={17, 20}, i={}, s={}, n={}, m="invokeSuspend", c="io/kanro/MainKt$main$1$jobB$1")

  // access flags 0x0
  <init>(Lkotlin/coroutines/Continuation;)V
    ALOAD 0
    ICONST_2
    ALOAD 1
    INVOKESPECIAL kotlin/coroutines/jvm/internal/SuspendLambda.<init> (ILkotlin/coroutines/Continuation;)V
    RETURN
    MAXSTACK = 3
    MAXLOCALS = 2

  // access flags 0x0
  I label

  // access flags 0x11
  // signature (Ljava/lang/Object;Lkotlin/coroutines/Continuation<*>;)Lkotlin/coroutines/Continuation<Lkotlin/Unit;>;
  // declaration: kotlin.coroutines.Continuation<kotlin.Unit> create(java.lang.Object, kotlin.coroutines.Continuation<?>)
  public final create(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
  @Lorg/jetbrains/annotations/NotNull;() // invisible
    @Lorg/jetbrains/annotations/Nullable;() // invisible, parameter 0
    @Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 1
   L0
    ALOAD 2
    LDC "completion"
    INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkParameterIsNotNull (Ljava/lang/Object;Ljava/lang/String;)V
    NEW io/kanro/MainKt$main$1$jobB$1
    DUP
    ALOAD 2
    INVOKESPECIAL io/kanro/MainKt$main$1$jobB$1.<init> (Lkotlin/coroutines/Continuation;)V
    ASTORE 3
    ALOAD 1
    CHECKCAST kotlinx/coroutines/CoroutineScope
    ALOAD 3
    ALOAD 1
    CHECKCAST kotlinx/coroutines/CoroutineScope
    PUTFIELD io/kanro/MainKt$main$1$jobB$1.p$ : Lkotlinx/coroutines/CoroutineScope;
    ALOAD 3
    ARETURN
   L1
    LOCALVARIABLE this Lkotlin/coroutines/jvm/internal/BaseContinuationImpl; L0 L1 0
    LOCALVARIABLE value Ljava/lang/Object; L0 L1 1
    LOCALVARIABLE completion Lkotlin/coroutines/Continuation; L0 L1 2
    MAXSTACK = 3
    MAXLOCALS = 4

  // access flags 0x11
  public final invoke(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
    ALOAD 0
    ALOAD 1
    ALOAD 2
    CHECKCAST kotlin/coroutines/Continuation
    INVOKEVIRTUAL io/kanro/MainKt$main$1$jobB$1.create (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
    CHECKCAST io/kanro/MainKt$main$1$jobB$1
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    INVOKEVIRTUAL io/kanro/MainKt$main$1$jobB$1.invokeSuspend (Ljava/lang/Object;)Ljava/lang/Object;
    ARETURN
    MAXSTACK = 3
    MAXLOCALS = 3

  @Lkotlin/Metadata;(mv={1, 1, 13}, bv={1, 0, 3}, k=3, d1={"\u0000\u000e\n\u0000\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\u0008\u0002\u0010\u0000\u001a\u00020\u0001*\u00020\u0002H\[email protected]\u00f8\u0001\u0000\u00a2\u0006\u0004\u0008\u0003\u0010\u0004"}, d2={"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;", "invoke", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"})
  OUTERCLASS io/kanro/MainKt$main$1 invokeSuspend (Ljava/lang/Object;)Ljava/lang/Object;
  // access flags 0x18
  final static INNERCLASS io/kanro/MainKt$main$1$jobB$1 null null
  // access flags 0x18
  final static INNERCLASS io/kanro/MainKt$main$1 null null
  // compiled from: Main.kt
  // debug info: SMAP
Main.kt
Kotlin
*S Kotlin
*F
+ 1 Main.kt
io/kanro/MainKt$main$1$jobB$1
*L
1#1,25:1
*E

}

可以看到这个 lambda 和普通的 lambda 不太一样,是继承自了 kotlin.coroutines.jvm.internal.SuspendLambda 的 lambda 对象,其中有两个主要方法 invokeinvokeSuspend ,其中 invoke  是一般 lambda 都有的方法,但是在这里只是对 invokeSuspend  的一个包装,获取了当前协程 Continuation 对象,传入invokeSuspend

  // access flags 0x11
  public final invoke(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
    ALOAD 0
    ALOAD 1
    ALOAD 2
    CHECKCAST kotlin/coroutines/Continuation
    INVOKEVIRTUAL io/kanro/MainKt$main$1$jobB$1.create (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
    CHECKCAST io/kanro/MainKt$main$1$jobB$1
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    INVOKEVIRTUAL io/kanro/MainKt$main$1$jobB$1.invokeSuspend (Ljava/lang/Object;)Ljava/lang/Object;
    ARETURN
    MAXSTACK = 3
    MAXLOCALS = 3

而重点就在 invokeSuspend  函数中,我们进行逐一分析。

  // access flags 0x11
  public final invokeSuspend(Ljava/lang/Object;)Ljava/lang/Object;
  @Lorg/jetbrains/annotations/Nullable;() // invisible
    @Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 0
   L0
    INVOKESTATIC kotlin/coroutines/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED ()Ljava/lang/Object;
   L1
    LINENUMBER 17 L1
    ASTORE 4
    ALOAD 0
    GETFIELD io/kanro/MainKt$main$1$jobB$1.label : I
    TABLESWITCH
      0: L2
      1: L3
      default: L4

首先这个函数通过 kotlin.coroutines.intrinsics.IntrinsicsKt.COROUTINE_SUSPENDED 获取到了一个协程中断对象,存入本地变量 4 中,然后获取当前的 lambda 的 label 字段,判断其状态,当值为 0 时跳转到 L2,为 1 则跳转到 L3,其他情况则跳转 L4。

我们暂时默认先跳转到 L2,L2 和 L5 只是判断一下传入函数的对象是不是失败了,如果失败了就抛出异常,紧接着 L6 将当前 lambda 对象的 CoroutineScope 存入本地变量 2。

   L2
    ALOAD 1
    DUP
    INSTANCEOF kotlin/Result$Failure
    IFEQ L5
    CHECKCAST kotlin/Result$Failure
    GETFIELD kotlin/Result$Failure.exception : Ljava/lang/Throwable;
    ATHROW
   L5
    POP
   L6
    ALOAD 0
    GETFIELD io/kanro/MainKt$main$1$jobB$1.p$ : Lkotlinx/coroutines/CoroutineScope;
    ASTORE 2
   L7
    LINENUMBER 18 L7
    NEW java/lang/StringBuilder
    DUP
    INVOKESPECIAL java/lang/StringBuilder.<init> ()V
    INVOKESTATIC java/lang/Thread.currentThread ()Ljava/lang/Thread;
    DUP
    LDC "Thread.currentThread()"
    INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkExpressionValueIsNotNull (Ljava/lang/Object;Ljava/lang/String;)V
    INVOKEVIRTUAL java/lang/Thread.getName ()Ljava/lang/String;
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    LDC " JobB start"
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    INVOKEVIRTUAL java/lang/StringBuilder.toString ()Ljava/lang/String;
    ASTORE 3
   L8
    GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
    ALOAD 3
    INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/Object;)V

L7 和 L8 都没有什么问题,就是执行 println("${Thread.currentThread().name} JobA start") 这一句,然后 L9 和 L10 也没有特殊的地方,也是执行 delay(1000) 的操作。

   L9
   L10
    LINENUMBER 19 L10
    LDC 2000
    ALOAD 0
    ALOAD 0
    ICONST_1
    PUTFIELD io/kanro/MainKt$main$1$jobB$1.label : I
    INVOKESTATIC kotlinx/coroutines/DelayKt.delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
   L11
    DUP
    ALOAD 4
    IF_ACMPNE L12
   L13
    LINENUMBER 17 L13
    ALOAD 4
    ARETURN

重点来了!注意看 L11 和 L13!L11 把 delay 返回值和本地变量4 做了比较,如果不相等就跳转 L12,如果相等就进入 L13,将本地变量 4 返回。

所以这里应该是判断 delay 是否是立即已经完成了,如果完成了就直接去 L12,而 L12 就是一句打印,如果没有就返回了当前这个任务。

   L3
    ALOAD 1
    DUP
    INSTANCEOF kotlin/Result$Failure
    IFEQ L14
    CHECKCAST kotlin/Result$Failure
    GETFIELD kotlin/Result$Failure.exception : Ljava/lang/Throwable;
    ATHROW
   L14
    POP
    ALOAD 1
   L12
    LINENUMBER 20 L12
    POP
   L15
    NEW java/lang/StringBuilder
    DUP
    INVOKESPECIAL java/lang/StringBuilder.<init> ()V
    INVOKESTATIC java/lang/Thread.currentThread ()Ljava/lang/Thread;
    DUP
    LDC "Thread.currentThread()"
    INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkExpressionValueIsNotNull (Ljava/lang/Object;Ljava/lang/String;)V
    INVOKEVIRTUAL java/lang/Thread.getName ()Ljava/lang/String;
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    LDC " JobB end"
    INVOKEVIRTUAL java/lang/StringBuilder.append (Ljava/lang/String;)Ljava/lang/StringBuilder;
    INVOKEVIRTUAL java/lang/StringBuilder.toString ()Ljava/lang/String;
    ASTORE 3
   L16
    GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
    ALOAD 3
    INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/Object;)V
   L17
   L18
    LINENUMBER 21 L18
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    ARETURN

接下来的 L3 就是当上面 lable 的状态为 1 时的跳转地址,而 lable 为 1 的含义就是中间的异步 delay 完成了,然后这个时候会被调度器再次调用这个 lambda,然后就会执行到后续的操作,至于 最后一点的 L4 是状态错误的时候调用的。

   L4
    NEW java/lang/IllegalStateException
    DUP
    LDC "call to 'resume' before 'invoke' with coroutine"
    INVOKESPECIAL java/lang/IllegalStateException.<init> (Ljava/lang/String;)V
    ATHROW

所以总结一下,就是编译器看到了 suspend 虽然不是和 C# 一样使用语法糖把链式调用封装为同步调用,但是也是基于编译器将状态机隐藏起来了。所以 Kotlin 的协程也是具有编译器参与在其中,并且是基于状态机的做法。

为了理解上面的字节码流程,我将上面的字节码转换为稍微可读的伪 Kotlin 代码,方便大家对比,可以发现很多其中的细节。

class JobB : SuspendLambda, Function2<CoroutineScope, Continuation<Unit>, Any> {

    override fun invoke(p1: CoroutineScope, p2: Continuation<Unit>): Any {
        val jobB = this.create(p1, p2)
        return jobB.invokeSuspend(Unit)
    }

    override fun invokeSuspend(p1: Any): Any {
        // L0
        val suspended = COROUTINE_SUSPENDED

        // L1
        when(this.lable){
            0 -> {
                // L2 L5
                if(p1 is Result.Failure){
                    throw p1.exception
                }

                // L6
                val scope : CoroutineScope = this.p

                // L7 L8
                println("${Thread.currentThread().name} JobB start")

                // L9
                // L10
                this.lable = 1
                val result = delay(2000, this)

                // L11
                if(result == suspended){
                    goto L12
                }

                // L13
                return result
            }
            1->{
                // L3
                if(p1 is Result.Failure){
                    throw p1.exception
                }

                // L14 L12
            }
            else->{
                // L4
                throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
            }
        }

        // L15 L16
        println("${Thread.currentThread().name} JobB end")
        // L17 L18
        return Unit
    }
}

例如 delay 方法,明明在签名中写的是 delay(Long) 在这里却变成了 delay(Long, Continuation) : Any 这是十分有意思的现象。也是编译器对 suspend 函数处理的具体体现。


Java 字节码参考