Swift Concurrency 带来了现代化的异步编程体验。在处理异步事件流时,AsyncStream
和 AsyncThrowingStream
提供了优雅的方式来消费和控制异步值序列。本文将全面讲解这两个 API 的用途、用法、底层机制和实战场景。
什么是 AsyncStream 与 AsyncThrowingStream?
类型 | 描述 |
---|---|
AsyncStream<Element> |
产生异步值序列,不支持抛出错误 |
AsyncThrowingStream<Element, Error> |
产生异步值序列,支持中途抛出错误 |
它们都是 AsyncSequence
的具体实现。适用于从事件源(按钮点击、定时器、网络回调等)顺序生成值并使用 for await
异步消费的场景。
使用场景
- 定时器事件
- 用户点击/拖动等 UI 事件
- WebSocket、蓝牙等设备数据流
- 网络数据分块返回
- 将传统 delegate/publisher 封装成异步序列
AsyncStream 基本用法
let stream = AsyncStream<Int> { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.finish()
}
for await value in stream {
print("Received: \(value)")
}
方法说明
方法 | 说明 |
---|---|
yield(_:) |
发出一个值 |
finish() |
正常结束流 |
onTermination |
设置终止回调 |
termination |
当前终止原因(只读) |
缓冲策略(BufferingPolicy)
AsyncStream(bufferingPolicy: .bufferingNewest(10)) { continuation in ... }
策略 | 描述 |
---|---|
.unbounded |
默认,无限缓存 |
.bufferingNewest(n) |
保留最新 n 个,丢弃旧的 |
.bufferingOldest(n) |
保留最早 n 个,忽略新值 |
实例:定时器流
func makeTimerStream() -> AsyncStream<Date> {
AsyncStream { continuation in
let timer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { _ in
continuation.yield(Date())
}
continuation.onTermination = { _ in timer.invalidate() }
}
}
使用:
for await tick in makeTimerStream() {
print("Tick: \(tick)")
}
❗ AsyncThrowingStream 使用场景
如果事件来源可能出错(如读取文件、网络失败),使用 AsyncThrowingStream
。
基本结构
let stream = AsyncThrowingStream<String, Error> { continuation in
continuation.yield("data")
continuation.finish(throwing: MyError.failed)
}
消费方式:
do {
for try await value in stream {
print(value)
}
} catch {
print("Error: \(error)")
}
示例:读取文件逐行抛错
func readLines(from url: URL) -> AsyncThrowingStream<String, Error> {
AsyncThrowingStream { continuation in
do {
let content = try String(contentsOf: url)
for line in content.split(separator: "\n") {
continuation.yield(String(line))
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
}
示例:多个 URL 下载
func downloadStream(urls: [URL]) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
for url in urls {
do {
let (data, _) = try await URLSession.shared.data(from: url)
continuation.yield(data)
} catch {
continuation.finish(throwing: error)
return
}
}
continuation.finish()
}
}
}
示例:封装通知流(支持取消清理)
func notificationStream(name: Notification.Name) -> AsyncStream<Notification> {
AsyncStream { continuation in
let observer = NotificationCenter.default.addObserver(forName: name, object: nil, queue: nil) { notification in
continuation.yield(notification)
}
continuation.onTermination = { _ in
NotificationCenter.default.removeObserver(observer)
}
}
}
Task {
for await notification in notificationStream(name: UIApplication.willEnterForegroundNotification) {
print("收到通知: \\(notification)")
}
}
AsyncStream vs AsyncThrowingStream 对比
特性 | AsyncStream | AsyncThrowingStream |
---|---|---|
支持错误 | ❌ | ✅ |
消费方式 | for await |
for try await |
错误终止 | 不支持 | .finish(throwing:) |
用于数据源 | 可靠事件流 | 可能失败的数据流(文件/网络) |
注意事项
yield
是线程安全的- 调用
.finish()
或.finish(throwing:)
结束流 - 如果流长时间未消费,注意缓冲策略避免内存增长
onTermination
是清理任务的重要工具
onTermination 会在以下情况被调用
情况 | 说明 |
---|---|
调用 continuation.finish() | 结束流时 |
调用 continuation.finish(throwing:) | 异常结束流时(仅适用于 AsyncThrowingStream) |
消费者 Task 被取消 | 比如外部 Task.cancel(),或视图销毁导致 task 取消 |
AsyncStream 本身被释放 | 无人引用这个流了,Swift 会自动清理 |
总结
- 使用
AsyncStream
来封装异步非失败事件,如 UI、定时器、传感器。 - 使用
AsyncThrowingStream
封装可能失败的数据来源,如文件 IO、网络任务。 - 它们极大提升了将传统 delegate/callback 风格封装为现代
async/await
流程的能力。
for try await item in myStream {
handle(item)
}
通过掌握 AsyncStream
和 AsyncThrowingStream
,可以更好地驾驭 Swift 并发编程,写出结构清晰、易维护、表现优秀的异步逻辑代码。
最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。