Kotlin 协程之Channel 的高阶应用

发布于:2025-08-30 ⋅ 阅读:(14) ⋅ 点赞:(0)

在这里插入图片描述

前言

了解了 Channel 的基础概念和基本使用 后,我们再来看一看 Channel 的特性以及高阶应用。


Channel 是"热流"

热流概念

Channel 是热流(Hot Stream),具备以下特性:

  • 数据的生产和消费是两套独立的流程

  • 即使没有消费者,生产者也会继续生产数据

而在日常开发中常用到的 Flow 则是 冷流(Cold Stream) ,冷流只有在有订阅者时才开始产生数据。关于冷流的详细内容,在后续 Flow 相关的文章中再进行深入介绍。

代码示例:

suspend fun channelHot() {
    val channel = Channel<String>(Channel.BUFFERED)
    val sender = GlobalScope.launch {
        for (i in 1..5) {
            delay(100)
            channel.send(i.toString())
            println("\u001B[32m[生产者] 数据发送完毕: $i\u001B[0m")
        }
    }
    joinAll(sender)
    println("\u001B[36m ----结束---- \u001B[0m")
}

可以看到,虽然没有消费者,生产者也能继续发送数据。

在这里插入图片描述


Channel 的一对多、多对多应用模式

在上一篇 Kotlin Channel基础使用 中,我们的示例都是一对一的,即单个生产者向单个消费者发送数据。实际上,Channel 还支持实现一对多,多对一,多对多等模式

扇出模式(一对多)- 负载均衡

概念: 单个生产者发送数据,多个消费者竞争接收数据,每个数据只会被一个消费者处理。

特点:

  • 每个数据只被一个消费者处理

  • 消费者之间形成竞争关系

  • 实现了自然的负载均衡

应用场景: 任务分发、负载均衡、并行处理

代码示例:

suspend fun singleProducerMultipleConsumers() {

    val channel = Channel<Int>(Channel.BUFFERED)

    // 单个生产者
    val producer = GlobalScope.launch {
        println("\u001B[32m[生产者] 开始生产任务...\u001B[0m")
        for (i in 1..10) {
            println("\u001B[32m[生产者] 发送任务: $i\u001B[0m")
            channel.send(i)
            delay(30) // 模拟生产时间
        }
        channel.close()
        println("\u001B[32m[生产者] 所有任务已发送完毕\u001B[0m")
    }

    // 多个消费者 - 每个消费者使用不同颜色
    val consumerColors = listOf("\u001B[34m", "\u001B[36m", "\u001B[31m") // 蓝色、青色、红色
    val consumers = List(3) { consumerId ->
        GlobalScope.launch {
            val color = consumerColors[consumerId]
            println("${color}[消费者-$consumerId] 准备接收任务...\u001B[0m")
            for (task in channel) {
                println("${color}[消费者-$consumerId] 处理任务: $task\u001B[0m")
                delay(300) // 模拟处理时间
                println("${color}[消费者-$consumerId] 任务 $task 处理完成\u001B[0m")
            }
            println("${color}[消费者-$consumerId] 没有更多任务,退出\u001B[0m")
        }
    }

    // 等待所有协程完成
    joinAll(producer, *consumers.toTypedArray())
    println("\u001B[36m ----结束---- \u001B[0m")
}

在这里插入图片描述

注意,是多个消费者 “瓜分” 数据

扇入模式(多对一)- 数据聚合

概念: 多个生产者向同一个 Channel 发送数据,单个消费者接收所有数据。

特点:

  • 多个数据源汇聚到一个处理点

  • 消费者按接收顺序处理所有数据

应用场景: 日志收集、数据汇总、多源数据聚合、监控指标收集

suspend fun multipleProducersSingleConsumer() {

    // 创建一个共享的Channel,用于接收来自多个生产者的数据
    val sharedChannel = Channel<String>(capacity = 10)

    // 启动多个生产者协程 
    val producerColors = listOf("\u001B[32m", "\u001B[33m", "\u001B[35m") // 绿色、黄色、洋红色
    val producers = List(3) { producerId ->
        GlobalScope.launch {
            val color = producerColors[producerId]
            repeat(5) { messageId ->
                val message = "Producer-$producerId: Message-$messageId"
                println("${color}[生产者$producerId] 发送: $message\u001B[0m")
                sharedChannel.send(message)
                delay(Random.nextLong(100, 500)) // 随机延迟模拟不同的生产速度
            }
            println("${color}[生产者$producerId] 完成发送\u001B[0m")
        }
    }

    // 启动单个消费者协程
    val consumer = GlobalScope.launch {
        var receivedCount = 0
        for (message in sharedChannel) {
            delay(200)
            println("\u001B[34m[消费者] 处理: $message 完毕\u001B[0m")
            receivedCount++
        }
        println("\u001B[34m[消费者] 完成接收,共处理 $receivedCount 条消息\u001B[0m")
    }

    // 等待所有生产者完成
    producers.joinAll()
    println("\u001B[36m所有生产者已完成\u001B[0m")

    // 关闭Channel,这样消费者的for循环才能正常结束
    sharedChannel.close()

    // 等待消费者完成
    consumer.join()
    println("\u001B[36m ----结束---- \u001B[0m")
}

在这里插入图片描述

多对多模式 - 高并发处理

概念: 多个生产者和多个消费者同时工作,结合了扇出和扇入的特点。

特点:

  • 具备负载均衡能力(多消费者竞争)

  • 支持高并发数据生产(多生产者)

  • 通过缓冲区优化资源使用

  • 适合处理大量并发任务

应用场景: 消息队列系统、任务分发系统、高并发数据处理

suspend fun multipleProducersMultipleConsumers() {
    // 创建一个共享的Channel
    val sharedChannel = Channel<String>(capacity = 5)

    val producerColors = listOf("\u001B[32m", "\u001B[33m") // 绿色、黄色
    val producers = List(2) { producerId ->
        GlobalScope.launch {
            val color = producerColors[producerId]
            repeat(8) { messageId ->
                val message = "Producer-$producerId: Task-$messageId"
                println("${color}[生产者$producerId] 发送任务: $message\u001B[0m")
                sharedChannel.send(message)
                delay(Random.nextLong(200, 600))
            }
            println("${color}[生产者$producerId] 完成任务发送\u001B[0m")
        }
    }

    // 启动多个消费者协程 
    val consumerColors = listOf("\u001B[34m", "\u001B[36m", "\u001B[31m") // 蓝色、青色、红色
    val consumers = List(3) { consumerId ->
        GlobalScope.launch {
            val color = consumerColors[consumerId]
            var processedCount = 0
            try {
                for (message in sharedChannel) {
                    println("${color}[消费者$consumerId] 处理任务: $message\u001B[0m")
                    delay(Random.nextLong(300, 800)) // 模拟任务处理时间
                    processedCount++
                    println("${color}[消费者$consumerId] 完成任务: $message\u001B[0m")
                }
            } catch (e: ClosedReceiveChannelException) {
                println("${color}[消费者$consumerId] Channel已关闭,共处理 $processedCount 个任务\u001B[0m")
            }
        }
    }

    // 等待所有生产者完成
    producers.joinAll()
    println("\u001B[36m所有生产者已完成,正在关闭Channel...\u001B[0m")

    // 关闭Channel
    sharedChannel.close()

    // 等待所有消费者完成
    consumers.joinAll()
    println("\u001B[36m ----结束---- \u001B[0m")
}

在这里插入图片描述

对比

应用模式 生产者 消费者 数据流向 特点 应用场景
扇出模式 1个 多个 一对多 自动负载均衡,任务并行处理 工作队列、任务分发、并行计算
扇入模式 多个 1个 多对一 数据聚合,统一处理点 日志收集、数据汇总、监控指标聚合
多对多模式 多个 多个 多对多 最大化并发能力,结合扇出扇入优势 消息队列、高并发数据处理、分布式系统

Select 表达式

Select 概念与特性

Select 可以同时等待多个挂起操作,并处理最先完成的那个操作,在处理多个不确定耗时的异步任务时非常有用。

特性:

  • 非阻塞选择:同时监听多个 Channel

  • 竞争机制:处理最先到达的结果

Select 基本使用

suspend fun selectFastestResponse() {

    // 创建多个Channel模拟不同的数据源
    val apiChannel = Channel<String>()
    val databaseChannel = Channel<String>()
    val cacheChannel = Channel<String>()

    println("\u001B[36m[系统] 启动多数据源查询竞争...\u001B[0m")

    // 模拟API调用
    GlobalScope.launch {
        delay(Random.nextLong(500, 1500))
        val result = "API响应: 用户数据已获取"
        println("\u001B[33m[API任务] 完成查询: $result\u001B[0m")
        apiChannel.send(result)
    }

    // 模拟数据库查询
    GlobalScope.launch {
        delay(Random.nextLong(800, 2000))
        val result = "数据库响应: 查询结果已返回"
        println("\u001B[31m[数据库任务] 完成查询: $result\u001B[0m")
        databaseChannel.send(result)
    }

    // 模拟缓存查询
    GlobalScope.launch {
        delay(Random.nextLong(100, 800))
        val result = "缓存响应: 缓存命中,数据已返回"
        println("\u001B[32m[缓存任务] 完成查询: $result\u001B[0m")
        cacheChannel.send(result)
    }

    // 使用select选择最快的响应
    val startTime = System.currentTimeMillis()
    val fastestResult = select<String> {
        apiChannel.onReceive { result ->
            println("\u001B[33m[获胜者] API最快响应!\u001B[0m")
            result
        }
        databaseChannel.onReceive { result ->
            println("\u001B[31m[获胜者] 数据库最快响应!\u001B[0m")
            result
        }
        cacheChannel.onReceive { result ->
            println("\u001B[32m[获胜者] 缓存最快响应!\u001B[0m")
            result
        }
    }
    val endTime = System.currentTimeMillis()

    println("\u001B[36m[结果] 最快响应: $fastestResult\u001B[0m")
    println("\u001B[36m[性能] 响应时间: ${endTime - startTime}ms\u001B[0m")

    // 关闭所有Channel
    apiChannel.close()
    databaseChannel.close()
    cacheChannel.close()
    println("\u001B[36m ----结束---- \u001B[0m")
}

在这里插入图片描述

Select 取消机制优化

在实际生产环境中,我们通常希望在获得最快响应后立即取消其他任务,以节省系统资源:

suspend fun selectFastestResponseWithCancel() {

    // 创建三个不同任务的channel
    val apiChannel = Channel<String>()
    val databaseChannel = Channel<String>()
    val cacheChannel = Channel<String>()

    println("\u001B[36m[系统] 开始执行多个异步任务,选择最快响应并取消其他任务...\u001B[0m")

    // 模拟API调用任务
    val apiTask = GlobalScope.launch {
        try {
            val responseTime = Random.nextLong(300, 1200) // 随机响应时间
            println("\u001B[33m[API任务] 开始调用远程API,预计耗时: ${responseTime}ms\u001B[0m")
            delay(responseTime)
            val result = "API响应数据: {userId: 456, name: 'Alice'}"
            apiChannel.send(result)
            println("\u001B[33m[API任务] 调用完成,耗时: ${responseTime}ms\u001B[0m")
        } catch (e: CancellationException) {
            println("\u001B[33m[API任务] 任务被取消\u001B[0m")
            throw e
        }
    }

    // 模拟数据库查询任务
    val databaseTask = GlobalScope.launch {
        try {
            val queryTime = Random.nextLong(400, 1000) // 随机查询时间
            println("\u001B[31m[数据库任务] 开始查询数据库,预计耗时: ${queryTime}ms\u001B[0m")
            delay(queryTime)
            val result = "数据库查询结果: {userId: 456, name: 'Alice', email: 'alice@example.com'}"
            databaseChannel.send(result)
            println("\u001B[31m[数据库任务] 查询完成,耗时: ${queryTime}ms\u001B[0m")
        } catch (e: CancellationException) {
            println("\u001B[31m[数据库任务] 任务被取消\u001B[0m")
            throw e
        }
    }

    // 模拟缓存查询任务
    val cacheTask = GlobalScope.launch {
        try {
            val cacheTime = Random.nextLong(100, 400) // 随机缓存访问时间
            println("\u001B[32m[缓存任务] 开始查询缓存,预计耗时: ${cacheTime}ms\u001B[0m")
            delay(cacheTime)
            val result = "缓存数据: {userId: 888, name: '喻志强', cached: true}"
            cacheChannel.send(result)
            println("\u001B[32m[缓存任务] 查询完成,耗时: ${cacheTime}ms\u001B[0m")
        } catch (e: CancellationException) {
            println("\u001B[32m[缓存任务] 任务被取消\u001B[0m")
            throw e
        }
    }

    // 使用select表达式选择最快响应的任务
    val startTime = System.currentTimeMillis()
    val result = select<String> {
        apiChannel.onReceive { data ->
            val elapsedTime = System.currentTimeMillis() - startTime
            println("\u001B[36m[系统] API任务最先响应!耗时: ${elapsedTime}ms\u001B[0m")
            println("\u001B[36m[系统] 获取到数据: $data\u001B[0m")
            data
        }
        databaseChannel.onReceive { data ->
            val elapsedTime = System.currentTimeMillis() - startTime
            println("\u001B[36m[系统] 数据库任务最先响应!耗时: ${elapsedTime}ms\u001B[0m")
            println("\u001B[36m[系统] 获取到数据: $data\u001B[0m")
            data
        }
        cacheChannel.onReceive { data ->
            val elapsedTime = System.currentTimeMillis() - startTime
            println("\u001B[36m[系统] 缓存任务最先响应!耗时: ${elapsedTime}ms\u001B[0m")
            println("\u001B[36m[系统] 获取到数据: $data\u001B[0m")
            data
        }
    }
    println("\u001B[36m[系统] 选择最快响应完成,使用结果: $result\u001B[0m")

    // 主动取消其他正在执行的任务
    println("\u001B[36m[系统] 开始取消其他未完成的任务...\u001B[0m")
    if (apiTask.isActive) {
        println("\u001B[36m[系统] 正在取消API任务...\u001B[0m")
        apiTask.cancel()
    }
    if (databaseTask.isActive) {
        println("\u001B[36m[系统] 正在取消数据库任务...\u001B[0m")
        databaseTask.cancel()
    }
    if (cacheTask.isActive) {
        println("\u001B[36m[系统] 正在取消缓存任务...\u001B[0m")
        cacheTask.cancel()
    }


    // 关闭channels
    apiChannel.close()
    databaseChannel.close()
    cacheChannel.close()

    println("\u001B[36m ----结束---- \u001B[0m")
}

在这里插入图片描述

Select 超时处理机制

在某些场景中,我们需要为异步操作设置超时限制,避免无限等待影响系统性能。Select 表达式结合 withTimeoutOrNull可以优雅地实现超时控制机制。

关键点:

  1. 时间控制:通过 withTimeoutOrNull 设置最大等待时间
  2. 优雅降级:超时时返回 null,可以实现降级策略
  3. 资源清理:超时后自动取消所有未完成的任务
  4. 用户体验:避免用户长时间等待,提供及时反馈
suspend fun selectWithTimeout() {

    // 创建三个不同任务的channel
    val apiChannel = Channel<String>()
    val databaseChannel = Channel<String>()
    val cacheChannel = Channel<String>()

    println("\u001B[36m[系统] 启动带超时控制的多任务查询(超时时间: 100毫秒)...\u001B[0m")
    println("\u001B[36m[系统] 实际应用场景:Web API响应、数据库查询、缓存查询等\u001B[0m")

    // 模拟API调用任务(可能较慢)
    val apiTask = GlobalScope.launch {
        try {
            val responseTime = Random.nextLong(800, 2000) // 可能超时的响应时间
            println("\u001B[33m[API任务] 开始调用远程API,预计耗时: ${responseTime}ms\u001B[0m")
            delay(responseTime)
            val result = "API响应: {userId: 789, name: 'Bob', source: 'remote'}"
            apiChannel.send(result)
            println("\u001B[33m[API任务] 调用完成,耗时: ${responseTime}ms\u001B[0m")
        } catch (e: CancellationException) {
            println("\u001B[33m[API任务] 任务被取消(超时处理)\u001B[0m")
            throw e
        }
    }

    // 模拟数据库查询任务(可能较慢)
    val databaseTask = GlobalScope.launch {
        try {
            val queryTime = Random.nextLong(600, 1800) // 可能超时的查询时间
            println("\u001B[31m[数据库任务] 开始查询数据库,预计耗时: ${queryTime}ms\u001B[0m")
            delay(queryTime)
            val result = "数据库响应: {userId: 789, name: 'Bob', email: 'bob@example.com', source: 'database'}"
            databaseChannel.send(result)
            println("\u001B[31m[数据库任务] 查询完成,耗时: ${queryTime}ms\u001B[0m")
        } catch (e: CancellationException) {
            println("\u001B[31m[数据库任务] 任务被取消(超时处理)\u001B[0m")
            throw e
        }
    }

    // 模拟缓存查询任务(通常较快)
    val cacheTask = GlobalScope.launch {
        try {
            val cacheTime = Random.nextLong(200, 1200) // 缓存访问时间
            println("\u001B[32m[缓存任务] 开始查询缓存,预计耗时: ${cacheTime}ms\u001B[0m")
            delay(cacheTime)
            val result = "缓存响应: {userId: 666, name: 'XeonYu', cached: true, source: 'cache'}"
            cacheChannel.send(result)
            println("\u001B[32m[缓存任务] 查询完成,耗时: ${cacheTime}ms\u001B[0m")
        } catch (e: CancellationException) {
            println("\u001B[32m[缓存任务] 任务被取消(超时处理)\u001B[0m")
            throw e
        }
    }

    // 使用withTimeoutOrNull实现超时控制
    val startTime = System.currentTimeMillis()
    val result = withTimeoutOrNull(100) { //设置超时时间
        select<String> {
            apiChannel.onReceive { data ->
                val elapsedTime = System.currentTimeMillis() - startTime
                println("\u001B[36m[系统] API任务响应成功!耗时: ${elapsedTime}ms\u001B[0m")
                data
            }
            databaseChannel.onReceive { data ->
                val elapsedTime = System.currentTimeMillis() - startTime
                println("\u001B[36m[系统] 数据库任务响应成功!耗时: ${elapsedTime}ms\u001B[0m")
                data
            }
            cacheChannel.onReceive { data ->
                val elapsedTime = System.currentTimeMillis() - startTime
                println("\u001B[36m[系统] 缓存任务响应成功!耗时: ${elapsedTime}ms\u001B[0m")
                data
            }
        }
    }

    val totalTime = System.currentTimeMillis() - startTime

    // 处理超时情况
    if (result != null) {
        println("\u001B[36m[系统] 获取到数据: $result\u001B[0m")
        println("\u001B[36m[系统] 总响应时间: ${totalTime}ms(在超时限制内)\u001B[0m")
    } else {
        println("\u001B[36m[系统] 所有任务均超时,启用降级策略\u001B[0m")
        println("\u001B[36m[系统] 返回默认数据: {userId: 000, name: 'Unknown', source: 'default'}\u001B[0m")
    }

    // 取消所有未完成的任务
    println("\u001B[36m[系统] 清理资源,取消未完成的任务...\u001B[0m")
    apiTask.cancel()
    databaseTask.cancel()
    cacheTask.cancel()

    // 关闭channels
    apiChannel.close()
    databaseChannel.close()
    cacheChannel.close()
    println("\u001B[36m ----结束---- \u001B[0m")
}

在这里插入图片描述

把超时时间设置为 1 秒再看看执行结果

在这里插入图片描述


CoroutineScope.produce & CoroutineScope.actor

除了直接通过顶层函数 Channel() 创建通道,Kotlin 还提供了 produceactor 两个构建器函数。

为什么需要 produce 和 actor?

核心原因在于返回值类型的差异

返回值类型对比

1. Channel() 函数
在这里插入图片描述

Channel() 函数返回 Channel<E> 类型的一个实例

在这里插入图片描述

Channel 继承了 SendChannel<E>ReceiveChannel<E> 接口,既可发送又可接收

2. produce 构建器
在这里插入图片描述

produce 返回 ReceiveChannel<E>只能接收数据

3. actor 构建器
在这里插入图片描述

actor 返回 SendChannel<E>只能发送数据

Channel 三层接口架构

Channel 接口的设计非常巧妙,采用了三层接口分离结合泛型逆变和协变的设计:

// 发送端接口 - 逆变泛型,只能发送 E 或其子类型
public interface SendChannel<in E>

// 接收端接口 - 协变泛型,只能接收 E 或其父类型
public interface ReceiveChannel<out E>

// 双向通道接口 - 不变泛型,既能发送又能接收 E
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

好处:

  • 接口隔离原则(ISP):发送和接收功能完全分离,避免接口污染

  • 类型安全保障:通过协变和逆变确保数据流向的类型安全

  • 职责清晰分离:不同角色只需关注相关接口,降低复杂度

实际应用场景

在架构设计中,当你需要向团队提供 API 时,往往希望限制调用者的操作权限,如果直接暴露 Channel 类型,调用者可以同时进行发送和接收操作,这可能违背你的设计意图,因为你没办法控制调用者的行为,通过返回受限的接口类型,就可以 精确控制调用者的操作范围

produce 构建器示例

当你希望调用者只能接收数据时:


// 架构开发者:使用produce构建器封装数据流处理逻辑
fun createDataStream(): ReceiveChannel<String> {
    println("\u001B[35m[架构层] 使用produce构建器封装复杂的数据处理逻辑\u001B[0m")

    return GlobalScope.produce(capacity = 3) {
        // 架构层复杂逻辑:数据获取、转换、验证等
        repeat(5) { batch ->
            // 模拟复杂的数据处理流程
            val rawData = "raw_data_$batch"
            val processedData = "DataBatch-$batch: [validated, transformed, enriched]"

            println("\u001B[35m[架构层] 处理原始数据: $rawData -> $processedData\u001B[0m")
            send(processedData) // 只有架构层可以发送数据
            delay(200)
        }
        // produce自动管理资源清理
        println("\u001B[35m[架构层] produce自动清理内部资源\u001B[0m")
    }
}

suspend fun produceBuilderExample() {

    // 业务开发者:获得受限的ReceiveChannel接口
    val dataStream: ReceiveChannel<String> = createDataStream()
    println("\u001B[34m[业务开发者] 获得ReceiveChannel,只能接收数据\u001B[0m")

    // 业务开发者只能进行接收操作
    println("\u001B[34m[业务层] 开始处理数据流\u001B[0m")
    for (data in dataStream) {
        println("\u001B[34m[业务层] 接收并处理: $data\u001B[0m")
        delay(100)
    }
}


在这里插入图片描述

actor 构建器示例

当你希望调用者只能发送数据

// 使用actor构建器封装数据处理逻辑
fun createDataProcessor(): SendChannel<String> {
    println("\u001B[35m[架构层] 使用actor构建器封装数据处理管道\u001B[0m")

    return GlobalScope.actor(capacity = 5) {
        // 架构层复杂逻辑:数据接收、验证、存储等
        var processedCount = 0
        for (rawData in channel) { // actor内部可以接收数据
            processedCount++
            // 模拟复杂的数据处理流程
            val validatedData = "Validated: $rawData"
            val transformedData = "Transformed: $validatedData"
            val storedData = "Stored: $transformedData [ID: $processedCount]"

            println("\u001B[35m[架构层] 处理数据管道: $rawData -> $storedData\u001B[0m")
            delay(150) // 模拟处理时间

            // 这里可以进行数据库存储、文件写入等操作
            println("\u001B[35m[架构层] 数据处理完成: $storedData\u001B[0m")
        }
        // actor自动管理资源清理
        println("\u001B[35m[架构层] actor自动清理内部资源,共处理 $processedCount 条数据\u001B[0m")
    }
}


suspend fun actorBuilderExample() {

    // 业务开发者:获得受限的SendChannel接口
    val dataProcessor: SendChannel<String> = createDataProcessor()
    println("\u001B[34m[业务开发者] 获得SendChannel,只能发送数据\u001B[0m")

    // 业务开发者只能进行发送操作
    println("\u001B[34m[业务层] 开始发送数据到处理管道\u001B[0m")
    repeat(6) { index ->
        val businessData = "BusinessData-$index: {orderId: ${1000 + index}, amount: ${(index + 1) * 100}}"
        println("\u001B[34m[业务层] 发送业务数据: $businessData\u001B[0m")
        dataProcessor.send(businessData)
        delay(80)
    }

    // 业务层完成数据发送后关闭通道
    dataProcessor.close()
    println("\u001B[34m[业务层] 数据发送完成,关闭通道\u001B[0m")

}

在这里插入图片描述

produce vs actor 对比

特性 produce actor
返回类型 ReceiveChannel<E> SendChannel<E>
外部操作 只能接收数据 只能发送数据
内部逻辑 生产数据并发送 接收数据并处理
数据流向 内部 → 外部 外部 → 内部
适用场景 数据生成、API 封装 数据处理、消息处理
接口隔离 隐藏生产逻辑 隐藏处理逻辑
类型安全 协变 out E 逆变 in E

总结

关于热流这个特性

Channel 是热流,这意味着什么?简单说就是生产者不等人。不管有没有消费者在那儿等着,生产者该干嘛干嘛,数据照样往Channel 里塞。

这个特性在实际开发中特别有用。比如你在做日志收集系统,不管有没有消费日志,日志数据肯定是持续产生的, Channel 的热流特性正好匹配到这个场景。

几种应用模式的实际体验

扇出模式(一对多):这个应用场景是比较多的,特别是在处理任务队列的时候。一个生产者疯狂塞任务,多个消费者抢着处理,天然的负载均衡。不用你写什么复杂的分发逻辑,Channel 自己就搞定了。

扇入模式(多对一):日志收集的时候经常用到。多个服务往一个 Channel 里扔日志,一个消费者统一处理。简单粗暴,但很有效。

多对多模式:这个就是前两种的结合体,适合高并发场景。不过说实话,复杂度也上去了,一般也不会用到。

Select 表达式:这个还是非常实用的,可以同时等到多个 Channel,谁先有数据就处理谁。配合超时机制,可以做很多有趣的事情。比如同时查缓存、数据库、API,谁快用谁的,其他的直接取消。

produce 和 actor 的设计思路

刚开始我也不理解为什么要搞这两个构建器,直接用 Channel() 不香吗?后来在实际项目中才体会到接口隔离的重要性。

想象一下,你给团队提供一个数据流 API,如果直接返回 Channel,别人既能往里发数据,又能从里面取数据。这不就乱套了吗?

produce 返回 ReceiveChannel,别人只能取数据;用 actor 返回 SendChannel,别人只能发数据。权限控制得明明白白

这种设计在大型项目中特别有用,可以避免很多不必要的 bug。

Channel 到 Flow

学完 Channel 之后,你会发现它解决了很多并发场景的问题。但是 Channel 作为热流,有个问题就是资源消耗。即使没有消费者,生产者也在那儿工作,这在某些场景下是浪费的。

这时候就该 Flow(冷流) 出场了。Flow 只有在被订阅的时候才开始工作,更节省资源。而且 Flow 有更丰富的操作符,可以做各种数据变换。

而且 kotlin 还提供了 StaredFlowStateFlow 这两个非常实用的数据流,因此,在日常开发中,我们可能更多的是选择 Flow

掌握了 Channel,再去学 Flow 会轻松很多。因为很多概念是相通的,只是应用场景不同而已。


好了, 本篇文章就是这些,希望能帮到你。

感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客