trySend、Channel 和 Flow 的工作原理

发布于:2025-07-14 ⋅ 阅读:(12) ⋅ 点赞:(0)

1. Channel 和 Flow 基础概念

Channel(通道)

// Channel 是一个可以发送和接收数据的管道

val channel = Channel<String>()

// 发送数据

channel.send("Hello")

// 接收数据

val data = channel.receive()

Flow(流)

 

// Flow 是一个可以发射数据的流

val flow = flow {

    emit("Hello")

    emit("World")

}

2. callbackFlow 的工作原理

callbackFlow 创建了一个 Channel

 

fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {

    // callbackFlow 内部创建了一个 Channel

    // 这个 Channel 可以发送 SSEEvent 类型的数据

    

    // 发送数据到 Channel

    trySend(SSEEvent.Message("event", "data"))

    

    // 等待 Channel 关闭

    awaitClose { 

        // 清理资源

        disconnect() 

    }

}

callbackFlow 的完整流程

 

fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {

    // 1. callbackFlow 创建一个 Channel

    // 2. 这个 Channel 可以发送 SSEEvent 类型的数据

    // 3. callbackFlow 返回一个 Flow,这个 Flow 会从这个 Channel 接收数据

    

    val request = Request.Builder()

        .url(url)

        .addHeader("Accept", "text/event-stream")

        .build()

    call = okHttpClient.newCall(request)

    call?.enqueue(object : Callback {

        override fun onResponse(call: Call, response: Response) {

            val body = response.body ?: return

            

            try {

                var currentEvent = "message"

                val dataBuffer = StringBuilder()

                

                while (true) {

                    val line = body.source().readUtf8Line() ?: break

                    

                    when {

                        line.startsWith("event:") -> currentEvent = line.substring(6).trim()

                        line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")

                        line.isEmpty() -> {

                            if (dataBuffer.isNotEmpty()) {

                                val data = dataBuffer.toString().trim()

                                // 4. 发送数据到 Channel

                                trySend(SSEEvent.Message(currentEvent, data))

                                dataBuffer.clear()

                            }

                        }

                    }

                }

                

                // 5. 发送关闭事件

                trySend(SSEEvent.Closed)

            } catch (e: Exception) {

                // 6. 发送错误事件

                trySend(SSEEvent.Error(e))

            } finally {

                response.close()

                close() // 关闭 Channel

            }

        }

        

        override fun onFailure(call: Call, e: IOException) {

            // 7. 发送错误事件

            trySend(SSEEvent.Error(e))

            close(e) // 关闭 Channel

        }

    })

    

    // 8. 等待 Channel 被关闭

    awaitClose { 

        disconnect() 

    }

}

3. trySend 的作用

trySend 发送数据到 Channel

 

// trySend 尝试发送数据到 Channel

trySend(SSEEvent.Message(currentEvent, data))

// trySend 的特点:

// 1. 非阻塞:不会等待接收者

// 2. 返回 Boolean:发送成功返回 true,失败返回 false

// 3. 如果 Channel 已关闭,返回 false

与 send 的区别

// send:阻塞式发送,会等待接收者

channel.send(data) // 如果 Channel 满了,会阻塞

// trySend:非阻塞式发送,立即返回结果

val success = channel.trySend(data) // 立即返回 true/false

 

4. Flow 的 onEach 工作原理

onEach 是 Flow 的中间操作符

 

sseClient.connect(sseUrl, token)

    .onEach { event ->  // 对每个事件进行处理

        when (event) {

            is SSEClient.SSEEvent.Message -> {

                // 处理消息事件

                println("收到消息: ${event.data}")

            }

            is SSEClient.SSEEvent.Error -> {

                // 处理错误事件

                println("发生错误: ${event.throwable.message}")

            }

            SSEClient.SSEEvent.Closed -> {

                // 处理关闭事件

                println("连接已关闭")

            }

        }

    }

    .launchIn(lifecycleScope) // 启动 Flow

onEach 的执行流程

 

// 1. callbackFlow 创建 Channel

fun connect(): Flow<SSEEvent> = callbackFlow {

    // 发送数据到 Channel

    trySend(SSEEvent.Message("event1", "data1"))

    trySend(SSEEvent.Message("event2", "data2"))

    trySend(SSEEvent.Closed)

}

// 2. onEach 对每个数据进行处理

.onEach { event ->

    // 每当 Channel 中有新数据时,这里就会被调用

    when (event) {

        is SSEClient.SSEEvent.Message -> {

            // 处理消息

        }

    }

}

// 3. launchIn 启动 Flow

.launchIn(lifecycleScope)

5. 完整的数据流

数据流向图

 

HTTP 响应 → 解析 SSE 数据 → trySend → Channel → Flow → onEach → UI 更新

详细步骤

 

// 步骤 1: HTTP 响应数据

data: {"message": "Hello"}

// 步骤 2: 解析 SSE 数据

line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim())

// 步骤 3: 发送到 Channel

trySend(SSEEvent.Message("message", "{\"message\": \"Hello\"}"))

// 步骤 4: Channel 中的数据流向 Flow

callbackFlow { ... } // 返回 Flow

// 步骤 5: Flow 的 onEach 处理

.onEach { event ->

    when (event) {

        is SSEClient.SSEEvent.Message -> {

            // 处理消息

            updateUI(event.data)

        }

    }

}

// 步骤 6: 启动 Flow

.launchIn(lifecycleScope)

6. 实际运行示例

服务器发送的数据

  

data: {"message": "Hello"}

data: {"message": "World"}

data: {"message": "Test"}

代码执行流程

// 1. 服务器发送第一行数据

data: {"message": "Hello"}

// 2. 代码解析

line.startsWith("data:") -> dataBuffer.append("{\"message\": \"Hello\"}")

// 3. 遇到空行,发送数据

line.isEmpty() -> {

    val data = dataBuffer.toString().trim()

    trySend(SSEEvent.Message("message", data)) // 发送到 Channel

}

// 4. onEach 接收到数据

.onEach { event ->

    when (event) {

        is SSEClient.SSEEvent.Message -> {

            // event.data = "{\"message\": \"Hello\"}"

            updateUI(event.data)

        }

    }

}

// 5. UI 更新

updateUI("{\"message\": \"Hello\"}")

7. 关键概念总结

callbackFlow

  • 创建一个 Channel
  • 返回一个 Flow
  • Flow 从 Channel 接收数据

trySend

  • 向 Channel 发送数据
  • 非阻塞操作
  • 返回发送是否成功

onEach

  • Flow 的中间操作符
  • 对每个数据进行处理
  • 不会改变数据流

launchIn

  • 启动 Flow
  • 在指定的协程作用域中运行
  • 自动管理生命周期

8. 为什么这样设计?

优势

  1. 异步处理:HTTP 响应在后台处理,UI 不会阻塞
  1. 结构化数据:使用 sealed class 定义事件类型
  1. 生命周期管理:自动在 Activity 销毁时取消
  1. 错误处理:统一的错误处理机制
  1. 可扩展:易于添加新的事件类型

数据流的好处

 

// 清晰的数据流向

HTTP 数据 → SSE 解析 → 结构化事件 → UI 更新

// 而不是传统的回调方式

HTTP 数据 → 回调函数 → UI 更新

这样设计使得代码更加清晰、可维护,并且充分利用了 Kotlin 协程和 Flow 的优势!


网站公告

今日签到

点亮在社区的每一天
去签到