Kotlin Flow 的使用
我来详细介绍 Kotlin Flow 的使用。Flow 是 Kotlin 协程库中的响应式流处理工具,用于处理异步数据流。
1. 基础概念
创建 Flow
// 方式1: 使用 flow 构建器
fun simpleFlow(): Flow<Int> = flow {for (i in 1..3) {delay(100)emit(i) // 发射值}
}// 方式2: 使用 asFlow() 扩展
val numberFlow = (1..5).asFlow()// 方式3: 使用 flowOf
val stringFlow = flowOf("A", "B", "C")
收集 Flow
fun main() = runBlocking {simpleFlow().collect { value ->println(value)}
}
// 输出: 1, 2, 3 (每隔100ms)
2. 操作符
转换操作符
fun transformExample() = runBlocking {(1..3).asFlow().map { it * it } // 转换每个元素.filter { it % 2 == 1 } // 过滤.collect { println(it) }// 输出: 1, 9
}
transform 操作符
fun transformExample() = runBlocking {(1..3).asFlow().transform { value ->if (value % 2 == 0) {emit("Even: $value")emit("Square: ${value * value}")}}.collect { println(it) }
}
限长操作符
fun takeExample() = runBlocking {(1..10).asFlow().take(3) // 只取前3个.collect { println(it) }// 输出: 1, 2, 3
}
3. 终端操作符
fun terminalOperators() = runBlocking {val sum = (1..5).asFlow().reduce { accumulator, value -> accumulator + value }println("Sum: $sum") // 输出: Sum: 15val first = (1..5).asFlow().first()println("First: $first") // 输出: First: 1val last = (1..5).asFlow().last()println("Last: $last") // 输出: Last: 5
}
4. 上下文和异常处理
上下文切换
fun contextExample() = runBlocking {flow {for (i in 1..3) {Thread.sleep(100)emit(i)}}.flowOn(Dispatchers.Default) // 指定上游执行的上下文.collect { value ->println("Received $value on ${Thread.currentThread().name}")}
}
异常处理
fun exceptionHandling() = runBlocking {flow {emit(1)throw RuntimeException("Error")}.catch { e -> println("Caught exception: $e")emit(-1) // 在异常时发射备用值}.onCompletion { cause -> println("Flow completed ${cause?.message ?: "successfully"}")}.collect { println(it) }
}
5. 组合多个 Flow
zip 操作符
fun zipExample() = runBlocking {val nums = (1..3).asFlow()val strs = flowOf("One", "Two", "Three")nums.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }// 输出: 1 -> One, 2 -> Two, 3 -> Three
}
combine 操作符
fun combineExample() = runBlocking {val numbers = (1..3).asFlow().onEach { delay(300) }val letters = flowOf("A", "B", "C").onEach { delay(400) }numbers.combine(letters) { number, letter -> "$number$letter" }.collect { println(it) }// 输出组合结果,基于最新的值
}
6. 扁平流操作符
flatMapConcat
fun flatMapConcatExample() = runBlocking {(1..3).asFlow().flatMapConcat { value ->flow {emit("First $value")emit("Second $value")}}.collect { println(it) }// 顺序输出: First 1, Second 1, First 2, Second 2, First 3, Second 3
}
flatMapMerge
fun flatMapMergeExample() = runBlocking {(1..3).asFlow().flatMapMerge { value ->flow {delay((4 - value) * 100L) // 倒序延迟emit("Value $value")}}.collect { println(it) }// 可能输出: Value 3, Value 2, Value 1 (并发执行)
}
7. 实际应用示例
网络请求 + 数据库操作
class UserRepository {fun getUsers(): Flow<List<User>> = flow {// 从网络获取数据val remoteUsers = apiService.getUsers()// 保存到数据库userDao.insertUsers(remoteUsers)// 从数据库观察变化emitAll(userDao.observeUsers())}.flowOn(Dispatchers.IO)fun getUserWithDetails(userId: String): Flow<UserWithDetails> {return userDao.observeUser(userId).flatMapLatest { user ->when {user == null -> flowOf(null)user.details == null -> {// 如果缺少详情,从网络获取val details = apiService.getUserDetails(userId)userDao.updateUserDetails(userId, details)userDao.observeUserWithDetails(userId)}else -> userDao.observeUserWithDetails(userId)}}}
}
搜索功能
class SearchViewModel : ViewModel() {private val searchQuery = MutableStateFlow("")val searchResults: Flow<List<SearchResult>> = searchQuery.debounce(300) // 防抖,300ms 内只取最后一次.distinctUntilChanged() // 去重.filter { it.length >= 2 } // 过滤短查询.flatMapLatest { query ->if (query.isBlank()) {flowOf(emptyList())} else {searchRepository.search(query).catch { e -> emit(emptyList())// 可以在这里处理错误}}}.flowOn(Dispatchers.Default)fun onSearchQueryChanged(query: String) {searchQuery.value = query}
}
8. StateFlow 和 SharedFlow
StateFlow (状态容器)
class CounterViewModel : ViewModel() {private val _count = MutableStateFlow(0)val count: StateFlow<Int> = _count.asStateFlow()fun increment() {_count.value++}
}// 在 UI 中观察
fun observeCounter() {viewModel.count.collect { count ->updateCounterUI(count)}
}
SharedFlow (事件总线)
object EventBus {private val _events = MutableSharedFlow<Event>()val events = _events.asSharedFlow()suspend fun sendEvent(event: Event) {_events.emit(event)}
}// 发送事件
EventBus.sendEvent(UserLoggedInEvent(user))// 接收事件
EventBus.events.filterIsInstance<UserLoggedInEvent>().collect { event ->// 处理登录事件}
关键特点总结
- 冷流:Flow 是冷流,有收集者时才开始执行
- 结构化并发:与协程深度集成
- 背压处理:自动处理生产者和消费者的速度不匹配
- 丰富的操作符:提供函数式编程风格的操作符
- 线程安全:通过 flowOn 安全切换上下文
Flow 非常适合处理异步数据流,特别是在 Android 开发、后端 API 调用等场景中。