Kotlin 协程之 突破 Flow 限制:Channel 与 Flow 的结合之道

发布于:2025-09-09 ⋅ 阅读:(20) ⋅ 点赞:(0)

在这里插入图片描述

前言

上一篇文章介绍了 Flow 的基本概念和使用,也知道了Flow 强调单一执行流与上下文一致性,不允许跨协程并发 emit

但在实际开发中,我们可能会遇到需要"多协程并发生产 + 按需收集",同时还想复用 Flow的操作符体系的场景。也就是说,需要满足以下条件:

  1. 可以多协程并发生产数据
  2. 具备冷流的特性,有消费者消费才开始生产,并且每次消费时,生产逻辑会重新开始执行
  3. 能利用 flow 强大的操作符

这就需要用到 ChannelFlow 了, 它是将 Flow 的特性与 Channel 的特性结合,实现多协程并发生产 + 按需收集。

ChannelFlow

ChannelFlow 的使用非常简单

使用示例

suspend fun channelFlowExample() {
    val channelFlow = channelFlow {
        println("\u001B[32m[ChannelFlow] 开始在独立协程中生产数据\u001B[0m")

        // 可以在这里启动多个协程
        launch {
            repeat(2) { i ->
                delay(100)
                send("协程1-数据$i")
                println("\u001B[32m[ChannelFlow] 协程1发送: 数据$i\u001B[0m")
            }
        }

        launch {
            repeat(3) { i ->
                delay(200)
                send("协程2-数据$i")
                println("\u001B[32m[ChannelFlow] 协程2发送: 数据$i\u001B[0m")
            }
        }
    }

    delay(2000)
    println("当有消费者开始收集数据时,上面的生产才会开始工作")

    channelFlow.collect { data ->
        delay(100)
        println("\u001B[34m[消费者1] 收到: $data\u001B[0m")
    }
    println()

    // 再一次收集,此时生产端会重新执行,得到全新数据
    channelFlow.collect { data ->
        delay(100)
        println("\u001B[34m[消费者2] 收到: $data\u001B[0m")
    }
}

执行结果:

在这里插入图片描述

使用还是非常简单的:像 Channel 一样用 send 发送数据,像 Flow 一样用 collect 收集数据;有收集才生产,每次收集都“从头再来”。

ChannelFlow 的特点

在 API 使用层也能感受到:ChannelFlow 就是 ChannelFlow 的结合体,发送用 send,接收用 collect

  • 生产层(热)channelFlow {} 提供 ProducerScope,它既是 CoroutineScope 又是 SendChannel,因此可以 launch{}send()

  • 消费层(冷):只有调用 collect 时,成产层才真正开始生产;并且每次 collect 都是一次全新的生产过程。

  • 桥接层:内部用 Channel 承载并发生产的数据,再统一通过 Flow 的收集端按序发送出去,天然的支持 Flow 的特性

源码分析

channelFlow 构建器与 ProducerScope
// kotlinx-coroutines-core/common/src/flow/Builders.kt
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
    ChannelFlowBuilder(block)
  • 参数类型suspend ProducerScope<T>.() -> Unit
  • 通过 ChannelFlowBuilder 构建 ChannelFlow
// kotlinx-coroutines-core/common/src/channels/Produce.kt
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    public val channel: SendChannel<E>
}

ProducerScope 既是 CoroutineScope,又是 SendChannel<T>, 因此可以在代码块中开启协程并调用 send

ChannelFlowBuilder
// kotlinx-coroutines-core/common/src/flow/Builders.kt
private open class ChannelFlowBuilder<T>(
    private val block: suspend ProducerScope<T>.() -> Unit,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {

    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
        ChannelFlowBuilder(block, context, capacity, onBufferOverflow)

    override suspend fun collectTo(scope: ProducerScope<T>) =
        block(scope) // 直接执行用户的生产代码块

    override fun toString(): String =
        "block[$block] -> ${super.toString()}"
}
  • ChannelFlowBuilder 返回 ChannelFlow 类型,同时,持有了 block 参数,也就是 ProducerScope

  • 提供了 collectTo(scope),实际上就是执行我们传入的生产逻辑代码块

ChannelFlow
// kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
    @JvmField public val context: CoroutineContext,
    @JvmField public val capacity: Int,
    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {

    //当执行 collect 时,这里的 collector 就是消费逻辑代码块
    override suspend fun collect(collector: FlowCollector<T>): Unit =
        coroutineScope {
            // 先新建一个生产者协程,用它来生产数据,然后调用FlowCollector.emitAll,把 Channel 中的数据发送给 FlowCollector
            collector.emitAll(produceImpl(this))
        }

    // 创建一个生产者协程,内部就是个 Channel
    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

    protected abstract suspend fun collectTo(scope: ProducerScope<T>)
}
internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
    val channel = Channel<E>(capacity, onBufferOverflow)
    val newContext = newCoroutineContext(context)
    val coroutine = ProducerCoroutine(newContext, channel)
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • collect:每次 collect 都会通过 produceImpl 调用 scope.produce 创建一个新的 ReceiveChannel<T>,并且开始执行生产逻辑。

  • collectToFun:最终转调到子类(ChannelFlowBuilder)的 collectTo,执行我们写在 channelFlow {} 中的生产逻辑。

emitAllImpl
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
    emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        // 从 channel 中取数据
        for (element in channel) {
            // 再通过 FlowCollector 把数据 emit 出去
            emit(element)
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause)
    }
}
  • for (element in channel):顺序消费 Channel 中的数据并 emit 给下游,一句话总结就是把数据从 Channel 中取出来,并 emit 转发到 Flow 收集器
整体流程
  1. channelFlow{}内部通过 ChannelFlowBuilder 构建 ChannelFlow, 并把 ProducerScope 传给 ChannelFlowBuilder,此时,不会启动生产逻辑
  2. 外部调用 channelFlow.collect { ... },开启消费
  3. ChannelFlow.collect 会先创建 ReceiveChannelproduceImpl
  4. produce 启动 ProducerCoroutine,在 ProducerScope 内执行 channelFlow { ... } 的生产逻辑,内部是个 Channel,可并发 launch{} +send()
  5. emitAll(channel) 逐个读取 Channel 中的元素并转发给下游收集器
  6. 收集结束或异常/取消时,Channel 被适当取消或关闭,生产方协程随之停止

如果把 Flow 内部的流程搞清楚后,channelFlow 就会非常容易理解了,其实就是 Channel
Flow 的结合,把数据的生产交给 Channel,然后把数据消费交给 Flow,中间做了一层缓冲,channelFlow 的作用就是把数据从Channel 读取出来,并 emit 给下游的 FlowCollector


CallbackFlow:回调转 Flow

channelFlow 解决了多协程并发生产数据的问题。但在实际开发中还有一类高频场景:将传统回调(Listener/Callback)转换为响应式Flow,例如:传感器监听、网络状态变化、UI 事件等。这就需要用到 callbackFlow

与“回调转挂起”的区别

我在另一篇文章系统讲解了“回调转挂起”,还不清楚的同学请先阅读 ——回调转挂起

  • suspendCancellableCoroutine(回调→挂起函数)

    • 面向:一次性回调结果(single-shot)。
    • 产物:一个 suspend 函数;每次调用只产出一次结果。
    • 适用:如登录返回一次 Token、文件选择回调一次路径。
  • callbackFlow(回调→多次发射的 Flow)

    • 面向:多次/持续性回调(multi-shot)。
    • 产物:一个可被收集的 Flow;回调每次触发就向流中发射一次。
    • 适用:如传感器/网络状态/事件总线等持续产生事件的来源。

用哪一个:

  • 单次回调 → suspendCancellableCoroutine
  • 多次/持续回调 → callbackFlow

基本使用

suspend fun callbackFlowExample() {
    val cf = callbackFlow {
        println("\u001B[32m[CallbackFlow] 注册定时回调,开始生产数据\u001B[0m")
        val timer = Timer()
        val task = object : TimerTask() {
            override fun run() {
                val tick = "tick-" + System.currentTimeMillis()
                //使用trySend
                val ok = trySend(tick).isSuccess
                if (ok) println("\u001B[32m[CallbackFlow] 发送: $tick\u001B[0m")
            }
        }
        timer.scheduleAtFixedRate(task, 0L, 200L)

        // 手动处理关闭资源
        awaitClose {
            println("\u001B[31m[CallbackFlow] 取消定时回调并清理资源\u001B[0m")
            timer.cancel()
        }
    }

    println("\n\u001B[36m[CallbackFlow] 第一次收集(take 5)\u001B[0m")
    cf.take(5).collect { v -> println("\u001B[34m[消费者] 收到: $v\u001B[0m") }

    println("\n\u001B[36m[CallbackFlow] 第二次收集(take 3)\u001B[0m")
    cf.take(3).collect { v -> println("\u001B[34m[消费者] 收到: $v\u001B[0m") }
}

在这里插入图片描述

CallbackFlow 使用的套路

  1. 使用 trySend:在回调中使用 trySend 而不是 send
  2. 必须调用 awaitClose:让生产侧与资源的生命周期与 Flow 收集对齐
    • 若不调用,callbackFlow { ... } 的代码块很快返回,内部通道会被关闭,生产协程(或回调注册)被取消,后续发送被丢弃
    • awaitClose { ... } 中做资源释放(注销回调/关闭传感器/停止定时器),并保证在收集取消/完成前保持存活
  3. 异常处理:在回调中捕获异常,避免崩溃
  4. 生命周期:确保 Flow 的生命周期与回调资源绑定

为什么必须 callbackFlow 必须要手动 awaitClose ,而 channelFlow 不需要

  • callbackFlow 的代码块返回即表示生产端搭建完成;内部使用 trySend 而不是 send, 如果没有 awaitClose
    持续挂起等待,下游一旦还未开始或很快结束,内部通道会被关闭,生产协程/回调被取消,后续发送丢失。

    • awaitClose 的作用如下
      • 保持生产端“存活”,直到收集侧取消/完成;
      • 在关闭前执行资源清理
  • channelFlow:不需要 awaitClose

    • 生产逻辑完全运行在 ProducerScope内,使用 send/launch/delay 等挂起机制;
    • 下游 collect 结束/取消时,ProducerScope 被取消,其子协程与内部通道随之关闭并清理;
    • 代码块返回即代表生产结束,生命周期由作用域自动托管,无需手动“挂住”。

简要对比:

// channelFlow:结构化并发,作用域关闭即清理
val f1 = channelFlow {
    launch { repeat(3) { send(it); delay(100) } }
    // 无需 awaitClose
}

// callbackFlow:外部回调,必须 awaitClose 维持生命周期
val f2 = callbackFlow {
    val cb = object : Listener {
        override fun onEvent(v: Int) {
            trySend(v)
        }
    }
    register(cb)
    awaitClose { unregister(cb) }
}

应用场景

1. 定时器回调转Flow
val timerFlow = callbackFlow<Long> {
    val timer = Timer()
    val task = object : TimerTask() {
        override fun run() {
            trySend(System.currentTimeMillis())
        }
    }

    timer.scheduleAtFixedRate(task, 0, 1000)

    awaitClose {
        timer.cancel() // 清理定时器资源
    }
}
2. 网络状态监听
val networkStatusFlow = callbackFlow<NetworkStatus> {
    val callback = object : NetworkCallback {
        override fun onStatusChanged(status: NetworkStatus) {
            trySend(status)
        }
    }

    networkManager.registerCallback(callback)


    awaitClose {
        networkManager.unregisterCallback(callback)
    }
}

源码分析

callbackFlow 并没有“新增能力”,它是 channelFlow专用语义版本,核心价值在于:更清晰的意图表达、更一致的最佳实践与更好的可维护性

回调转 Flow 这件事用 channelFlow 也能实现,但用 callbackFlow 一眼就能看出“这是在包装回调”的意图,团队协作与规范更友好。

由于机制与 channelFlow 完全一致,源码部分就不再赘述了。

  • 构建器:callbackFlow { ... } -> CallbackFlowBuilder(block)(继承自 ChannelFlowBuilder)。
  • 收集链路:collect -> produceImpl -> scope.produce -> 内部 Channel -> emitAll(channel)

Channel 转 Flow

ChannelFlowCallbackFlow 都是对 Channel 的包装:在收集阶段内部创建一个 Channel,将生产端写入的数据,按序转发给下游的FlowCollector

还有另一种常见诉求:将已有的 Channel 直接转换为 Flow。转换为 Flow 后,就可以使用 Flow 丰富的操作符体系来处理数据。

Kotlin 协程提供了两个扩展函数:consumeAsFlow()receiveAsFlow(),它们都能将 Channel 转换为 Flow,但行为存在重要差异。

consumeAsFlow:一次性消费

consumeAsFlow消费并关闭 Channel,转换得到的 Flow 只能使用一次。

基本使用

suspend fun consumeAsFlowExample() {
    val channel = Channel<String>(capacity = 5)
    // 生产者
    GlobalScope.launch {
        println("\u001B[32m[生产者][consumeAsFlow] 启动\u001B[0m")
        repeat(5) { i ->
            val v = "数据-$i"
            channel.send(v)
            println("\u001B[32m[生产者][consumeAsFlow] 发送: $v\u001B[0m")
        }
        channel.close()
        println("\u001B[31m[生产者][consumeAsFlow] 关闭 Channel\u001B[0m")
    }
    // 转换为Flow并消费
    val consumeAsFlow = channel.consumeAsFlow()
    println("\u001B[36m[系统] 第一次收集 consumeAsFlow\u001B[0m")
    consumeAsFlow
        .collect { data ->
            println("\u001B[34m[消费者][consumeAsFlow] 收到: $data\u001B[0m")
        }

    println("\u001B[36m[系统] 第二次收集(预期失败)\u001B[0m")
    try {
        consumeAsFlow.collect { println("\u001B[34m[消费者][consumeAsFlow] 收到: $it\u001B[0m") }
    } catch (e: Throwable) {
        println("\u001B[31m[错误][consumeAsFlow] ${e.message}\u001B[0m")
    }
}

在这里插入图片描述

源码分析
// 首先限定扩展类型为ReceiveChannel, 保证只能使用 ReceiveChannel,然后把自身作为参数传递给ChannelAsFlow,并且标记是一次性消费
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)


private class ChannelAsFlow<T>(
    private val channel: ReceiveChannel<T>, // 原始 Channel 实例
    private val consume: Boolean,           // 是否一次性消费:consumeAsFlow() 下为 true
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    // 记录是否已被消费
    private val consumed = atomic(false)

    private fun markConsumed() {
        if (consume) {
            // 已消费则报错,保证一次性语义
            check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
        }
    }

    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
        ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow)

    override fun dropChannelOperators(): Flow<T> =
        ChannelAsFlow(channel, consume)

    override suspend fun collectTo(scope: ProducerScope<T>) =
        SendingCollector(scope).emitAllImpl(channel, consume) // 直接将 Channel 中的数据逐个转发到下游

    override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
        markConsumed() // 重复收集快速失败
        return if (capacity == Channel.OPTIONAL_CHANNEL) {
            channel // 直连通道:不再额外创建缓冲 Channel
        } else {
            super.produceImpl(scope) // 走额外缓冲通道路径
        }
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            markConsumed()
            collector.emitAllImpl(channel, consume) // 直连读取原始 Channel
        } else {
            super.collect(collector) // 额外缓冲通道路径,produceImpl 会负责标记已消费
        }
    }

    override fun additionalToStringProps(): String = "channel=$channel"
}

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        for (element in channel) {
            emit(element)
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        // 如果是一次性消费,则取消已消费的元素
        if (consume) channel.cancelConsumed(cause)
    }
}

重点

  • consume 标记:

    • consumeconsumeAsFlow() 下为 true,表示“一次性消费,并在结束后关闭”。
  • 保证只能收集一次:

    • markConsumed() 使用原子标志 consumed 抢占置位;若再次收集立刻抛出异常。
    • 因此第一次 collect 正常,第二次 collect 会立即失败。
  • 直连优化(不创建额外 Channel):

    • capacity == Channel.OPTIONAL_CHANNEL 时,启用“直连通道”优化:直接复用原始 channel,不再重新创建 Channel。
    • 在直连模式下,produceImpl/collect 都会优先读原始 channel,并在入口处调用 markConsumed(),一次性语义即时生效。
  • 为什么限制“只能收集一次”:

    • 选择 consumeAsFlow 意味着 Flow 对该 Channel 拥有“消费并关闭”的所有权;重复收集会引入所有权与关闭语义的冲突,因此在源码层面禁止第二次收集。
  • cancelConsumed(cause) 的作用:

    • emitAllImpl(channel, consume)finally 中,如果 consume=true 会调用 channel.cancelConsumed(cause)
    • 这一步用于在消费完成或出现异常时,自动取消/关闭底层 Channel,并传递可选的异常原因 cause
    • markConsumed() 配合,共同形成“一次性消费 + 自动关闭”的完整语义闭环。

consumeAsFlow 本质是用 ChannelFlow 外壳接管 ReceiveChannel,通过 consume 与原子标志强制“一次性消费 + 自动关闭”。


receiveAsFlow:共享接收(竞争消费)

receiveAsFlow 会将 Channel 持续暴露为 Flow,但不会自动关闭 Channel。你可以多次收集,或并发收集者“竞争式”接收同一条底层通道上的数据。

基本使用

suspend fun receiveAsFlowExample() {
    val channel = Channel<Int>(capacity = Channel.UNLIMITED)

    // 生产者持续生产
    GlobalScope.launch {
        println("\u001B[32m[生产者][receiveAsFlow] 启动\u001B[0m")
        repeat(10) { i ->
            val value = i + 1
            channel.send(value)
            println("\u001B[32m[生产者][receiveAsFlow] 发送: $value\u001B[0m")
            delay(20)
        }
        // 不关闭,交由外部控制
        println("\u001B[33m[生产者][receiveAsFlow] 生产结束(未关闭通道)\u001B[0m")
    }

    val receiveAsFlow = channel.receiveAsFlow()

    // 第一个消费者
    GlobalScope.launch {
        receiveAsFlow
            .collect {
                delay(100)
                println("\u001B[34m[消费者A][receiveAsFlow]: $it\u001B[0m")

            }
    }

    // 第二个消费者
    GlobalScope.launch {
        receiveAsFlow
            .collect {
                delay(200)
                println("\u001B[35m[消费者B][receiveAsFlow]: $it\u001B[0m")
            }
    }

    // 一段时间后手动关闭 Channel
    delay(1000)
    println("\u001B[31m[系统] 关闭 Channel\u001B[0m")
    //自行决定关闭时机
    channel.close()
}

在这里插入图片描述

源码分析
// 直接包装底层 Channel,但不标记一次性消费
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)

由于底层同样复用 ChannelAsFlow,与 consumeAsFlow 的差异点在于 consume=false

  • 不会通过 markConsumed() 施加“一次性消费”的限制;
  • 不会在流程结束时自动 cancel/close 底层 Channel
  • 仍可能走 capacity == Channel.OPTIONAL_CHANNEL 的“直连通道”优化路径。

进一步结合 emitAllImpl(channel, consume) 看差异:

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        for (element in channel) {
            emit(element)
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause) // receiveAsFlow 下为 false,不会自动关闭
    }
}

也就是说,在 receiveAsFlow 下,遍历结束后不会触发 cancelConsumed,生命周期由调用方或生产方控制。这正是它能“持续使用/多次收集/并发收集”的关键差异。

总结

至此,ChannelFlow, CallbackFlow, consumeAsFlow, receiveAsFlow 就都介绍完毕了,来整体对比看下

方式 角色定位 是否自动关闭 是否可多次收集 典型场景
ChannelFlow 并发生产 + 冷流收集 是(每次重放) 多协程并发生产且按需收集
CallbackFlow 回调→Flow 语义外壳 是(每次重放) 回调/监听封装(trySend/awaitClose)
consumeAsFlow 一次性消费 + 自动关闭 批处理/一次性转换后释放资源
receiveAsFlow 共享接收(竞争消费) 是(竞争式) 多消费者竞争同一来源,自行管理生命周期

好了, 本篇文章就是这些,希望能帮到你。

感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客


网站公告

今日签到

点亮在社区的每一天
去签到