【Kotlin】Channel简介

发布于:2024-04-28 ⋅ 阅读:(30) ⋅ 点赞:(0)

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        var iterator = channel.iterator()
        while (iterator.hasNext()) {
            var element = iterator.next()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in channel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channel 中 produce 和 actor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。

4.1 produce

fun main() {
    var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
        repeat(3) {
            println("send: $it")
            send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in receiveChannel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

fun main() {
    var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
        repeat(3) {
            var element = receive()
            println("receive: $element")
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            println("send: $it")
            sendChannel.send(it)
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

5 Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

fun main() {
    var channel = Channel<Int>(3)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
        channel.close()
        println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            var element = channel.receive()
            println("receive: $element")
        }
        println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            for (element in channel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

fun main() {
    var broadcastChannel = BroadcastChannel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

fun main() {
    var channel = Channel<Int>()
    var broadcastChannel = channel.broadcast(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

网站公告

今日签到

点亮在社区的每一天
去签到