Dispatchers.Main
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// MainDispatchers.kt
internal object MainDispatcherLoader {
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
}
可以看到Dispatchers.Main就是单例对象MainDispatcherLoader.loadMainDispatcher()方法的返回值,该方法会通过MainDispatcherFactory去创建一个MainCoroutineDispatcher对象。因此Dispatchers.Main属于MainCoroutineDispatcher类型。MainDispatcherFactory是一个抽象接口,它的实现类是AndroidDispatcherFactory,如下:
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
return HandlerContext(mainLooper.asHandler(async = true))
}
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
很明显这里createDispatcher方法创建了一个HandlerContext对象返回,并且我们注意到,它使用主线程的Looper对象来创建的Handler。
// HandlerDispatcher.kt
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
...
}
public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
}
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
}
CoroutineDispatcher 是所有调度器的抽象基类,它是AbstractCoroutineContextElement的子类,也就是协程上下文的抽象子类。所以说调度器可以用 + 运算符添加到context中。
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}
CoroutineDispatcher 里面有两个重要的方法 isDispatchNeeded() 和 dispatch():
-
isDispatchNeeded():如果协程的执行应该使用 dispatch() 方法执行,则返回 true。大多数调度程序的默认行为是返回 true。
-
dispatch():将可运行的 block 的执行分派到给定 context 中的另一个线程。
public interface ContinuationInterceptor : CoroutineContext.Element {
...
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
在 CoroutineDispatcher 中这个方法的实现如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation)
...
}
它返回的是一个DispatchedContinuation对象:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
...
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
...
}
上面在DispatchedContinuation类的resumeWith()方法中,先判断了isDispatchNeeded()的值,如果是true就调用调度器dispatcher.dispatch()方法进行调度,并将当前DispatchedContinuation对象自身作为this参数传入。
Dispatchers.Main.immediate
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
...
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
...
}
这里HandlerContext.immediate仍然是创建的自身类型。因此可以得知 Dispatchers.Main.immediate 的真相是一个第三个参数为 true 的 HandlerContext 对象。
Dispatchers.Default
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
override fun toString(): String = "Dispatchers.Default"
}
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
...
}
DefaultsScheduler是一个SchedulerCoroutineDispatcher类型,SchedulerCoroutineDispatcher类中它真正创建的是一个CoroutineScheduler对象:
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
init {
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
}
require(maxPoolSize >= corePoolSize) {
"Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
}
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
}
require(idleWorkerKeepAliveNs > 0) {
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
}
}
...
}
CoroutineScheduler类的代码比较多,但是我们根据这些参数知道它就是一个线程池,是一个专门自定义的线程池,而没有使用Java版本的ThreadPoolExecutor。
@JvmField
internal val CORE_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.core.pool.size",
AVAILABLE_PROCESSORS.coerceAtLeast(2),
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
@JvmField
internal val MAX_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.max.pool.size",
CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE,
maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
// number of processors at startup for consistent prop initialization
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
// Masks of control state
private const val BLOCKING_SHIFT = 21 // 2M threads max
...
internal const val MIN_SUPPORTED_POOL_SIZE = 1
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
)
// Internal debuggability name + thread name prefixes
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"
总结一下就是:
-
corePoolSize = CORE_POOL_SIZE, 核心线程数,最小值为 2 最大值为 Runtime.getRuntime().availableProcessors() 即CPU的核心数量
-
maxPoolSize = MAX_POOL_SIZE, 最大线程数,值为 (1 << 21) - 2,其中 1 << 21 大小正好是 2M,所以是 2M - 2 = 2097150(两百多万个)
-
idleWorkerKeepAliveNs = Long = IDLE_WORKER_KEEP_ALIVE_NS, 空闲任务的存活时间,值为60s
-
schedulerName = DEFAULT_SCHEDULER_NAME, 默认的调度器名字,值为"DefaultDispatcher"
Dispatchers.IO
@JvmStatic
public val IO: CoroutineDispatcher = DefaultIoScheduler
// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
override val executor: Executor
get() = this
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
override fun dispatch(context: CoroutineContext, block: Runnable) {
default.dispatch(context, block)
}
...
}
dispatch()方法调用了default.dispatch(),而default是由UnlimitedIoScheduler.limitedParallelism()方法返回一个LimitedDispatcher对象:
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
}
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
....
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) {
dispatcher.dispatch(this, this)
}
}
}
LimitedDispatcher内部也只是包装了传入的UnlimitedIoScheduler对象而已,而在UnlimitedIoScheduler中仍然是调用的DefaultScheduler.dispatchWithContext方法。
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}
}
因此,至此我们得到一个结论就是: Dispatchers.IO 跟 Dispatchers.Default 是共享同一个线程池的。
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism"
- Dispatchers.Main :在 Android 平台上 Dispatchers.Main 就是一个 HandlerContext 对象。
- Dispatchers.Main.immediate:与 Dispatchers.Main 的唯一区别就是它的调度执行只需要简单的判断当前是不是主线程。
- Dispatchers.Default:使用自定义的线程池,核心线程数最小为2最大为CPU核心数,最大线程数2M左右,空闲任务存活时间为60s
- Dispatchers.IO :与 Dispatchers.Default 是共用同一个线程池的,但是增加了对并发任务的数量控制。