Flow的进阶学习2025
前言
LiveData
:简单,可靠。缓存最新数据+生命周期感知。
Flow
:把简单复杂化,又把复杂简单化。
创建数据流
主要是转变思维,传统的请求结果,再使用flow{}包裹一层。
flow{}创建;
flow{ api.request() }中如执行了挂起函数,则一直挂起等待结果返回的。
不要使用withContext或者其他方式,切换context,是不能emit的。只能换成callbackFlow
。
emit发射新值。
修改数据流
其实就是各种链式函数。map, filter等中间操作符。将数据进行裁剪或者转变等。
Collect
collect()
终止符开始触发数据流监听。
异常处理
可以使用catch的中间操作符。它还可以发送emit:
class NewsRepository(...) {val favoriteLatestNews: Flow<List<ArticleHeadline>> =newsRemoteDataSource.latestNews.map { news -> news.filter { userData.isFavoriteTopic(it) } }.onEach { news -> saveInCache(news) }// If an error happens, emit the last cached values.catch { exception -> emit(lastCachedNews()) }
}
更改Flow执行的Context环境
通过flowOn修改上游的环境;在它之后的执行和调用者都是原来的环境。
class NewsRepository(private val defaultDispatcher: CoroutineDispatcher
) {val favoriteLatestNews: Flow<List<ArticleHeadline>> =newsRemoteDataSource.latestNews.map { news -> // Executes on the default dispatchernews.filter { userData.isFavoriteTopic(it) }}.onEach { news -> // Executes on the default dispatchersaveInCache(news)}// flowOn affects the upstream flow ↑.flowOn(defaultDispatcher)// the downstream flow ↓ is not affected.catch { exception -> // Executes in the consumer's contextemit(lastCachedNews())}
}
借助此代码,onEach
和 map
运算符使用 defaultDispatcher
,其中catch
运算符和使用方在 viewModelScope
所使用的 Dispatchers.Main
上执行。
callbackFlow
可以将回调程序写成数据流。如何下是将okhttp的请求变成callbackFlow的实现。得到的是Flow:
fun httpResponseCallbackFlow(request:Request, client: OkHttpClient = okHttpClient()) = callbackFlow {val call = client.newCall(request)try {val response = call.execute()send(response.body?.string())} catch (e: Throwable) {close(ne)}awaitClose { call.cancel() }
}
callbackFlow允许从不同的协程上下文send
。还可以通过trySend
在协程外发出。
StateFlow
在 Android 中,StateFlow
非常适合需要让可变状态保持可观察的类。标准的写法:
class LatestNewsViewModel(private val newsRepository: NewsRepository
) : ViewModel() {// Backing property to avoid state updates from other classesprivate val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))// The UI collects from this StateFlow to get its state updatesval uiState: StateFlow<LatestNewsUiState> = _uiStateinit {viewModelScope.launch {newsRepository.favoriteLatestNews// Update View with the latest favorite news// Writes to the value property of MutableStateFlow,// adding a new element to the flow and updating all// of its collectors.collect { favoriteNews ->_uiState.value = LatestNewsUiState.Success(favoriteNews)}}}
}// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()data class Error(val exception: Throwable): LatestNewsUiState()
}class LatestNewsActivity : AppCompatActivity() {private val latestNewsViewModel = // getViewModel()override fun onCreate(savedInstanceState: Bundle?) {...// Start a coroutine in the lifecycle scopelifecycleScope.launch {// repeatOnLifecycle launches the block in a new coroutine every time the// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.repeatOnLifecycle(Lifecycle.State.STARTED) {// Trigger the flow and start listening for values.// Note that this happens when lifecycle is STARTED and stops// collecting when the lifecycle is STOPPEDlatestNewsViewModel.uiState.collect { uiState ->// New value receivedwhen (uiState) {is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)is LatestNewsUiState.Error -> showError(uiState.exception)}}}}}
}
flow{}/StateFlow的选择和代码逻辑
无论数据流生产者的具体实现如何,我们都 推荐 从应用的较底层级暴露 Flow API。不过,您也应该保证数据流收集操作的安全性。
这句话是官方的原话,之前文章中提到过,官方推荐你将数据层写成Repository+DataSource的形式,同时,期待你尽早在Repository/DataSource中就开始暴露flow。
而DataSource的数据来源,建议修改成Flow提供。
对于股票等自动刷新类型就可以采用flow{}实现数据源的编写:
MyDataSource {fun requestFlow() = flow {while(true) {val bean = api.request()send(bean)...delay(30s)}}suspend fun request(): Bean
}
但是对于一般的网络请求都是由用户点击触发/下拉刷新/进入界面时,则推荐不要把DataSource写成Flow。直接使用原来的request函数。然后在ViewModel里面使用StateFlow存储:
MyViewModel {private val _newsFlow = MutableStateFlow<List<News>>(emptyList())val newsFlow: StateFlow<List<News>> = _newsFlow// 主动刷新方法suspend fun refresh() {_newsFlow.value = repository.request() // 触发更新}
}
由界面层调用refresh(),进而调用到request函数,并更新给这里的StateFlow即可。
stateIn/shareIn
shareIn
操作符返回的是 SharedFlow而 stateIn
返回的是 StateFlow。
两者之间的最主要区别,在于 StateFlow
允许您通过读取 value
属性同步访问其最后发出的值。而这不是 SharedFlow
的使用方式。
stateIn
运算符,它需要传入三个参数: initinalValue
、scope
及 started
。
val result: StateFlow<Result<UiState>> = someFlow.stateIn(initialValue = Result.Loadingscope = viewModelScope,started = WhileSubscribed(5000),)
WhileSubscribed() //共享策略。在没有收集者时取消上游数据流。但是推荐给5000,避免如果上游数据流创建成本太高时,立刻被取消了,快速切换的时候,需要重建。
当设置了超时时间 WhileSubscribed(5000)
后,如果旋转或者息屏,会让视图立即结束收集,但 StateFlow
会经过我们设置的超时时间之后才会停止其上游数据流,如果用户再次打开应用则会自动重启上游数据流。而在旋转场景中视图只停止了很短的时间,无论如何都不会超过 5 秒钟,因此 StateFlow 并不会重启,所有的上游数据流都将会保持在活跃状态,就像什么都没有发生一样可以做到即时向用户呈现旋转后的屏幕。
shareIn(externalScope, SharingStarted.Eagerly, replay = 10)
我们将参数 replay
的值设置为 10,来让最后发出的 10 个项目保持在内存中,同时在每次有收集者观察数据流时重新发送这些项目。为了保持内部数据流始终处于活跃状态并发送位置更新,我们使用了共享策略 SharingStarted.Eagerly
,这样就算没有收集者,也能一直监听更新。
//warning:::错误示范
//不要像这样在函数中使用 shareIn 或 stateIn
// 这将在每次调用时创建新的 SharedFlow 或 StateFlow,而它们将不会被复用。
fun getUser(): Flow<User> =userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
更安全的collect
//warning: 错误示范override fun onCreate(savedInstanceState: Bundle?) {// 最早在 View 处于 STARTED 状态时从数据流收集数据,并在// 生命周期进入 STOPPED 状态时 SUSPENDS(挂起)收集操作。// 在 View 转为 DESTROYED 状态时取消数据流的收集操作。lifecycleScope.launchWhenStarted {myFlow().collect {// 新的位置!更新地图} }// 同样的问题也存在于:// - lifecycleScope.launch { /* 在这里从 locationFlow() 收集数据 */ }// - myFlow().onEach { /* ... */ }.launchIn(lifecycleScope)}
launchWhenStarted
已被标记废弃;它虽然在离开Started状态后,挂起了函数,但是里面的任务仍在跑;
launch/launchIn
则更加危险,退到后台仍然更新UI。
1. addRepeatingJob
// 最早在 View 处于 STARTED 状态时从数据流收集数据,并在
// 生命周期进入 STOPPED 状态时 STOPPED(停止)收集操作。
// 它会在生命周期再次进入 STARTED 状态时自动开始进行数据收集操作。
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {// ...viewLifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) {myFlow().collect {// 新的位置!更新地图} }
}
在生命周期到达该状态时,自动创建并启动新的协程;同时也会在生命周期低于该状态时取消正在运行的协程。
该函数,需要在 Activity 的 onCreate 或 Fragment 的 onViewCreated
方法中调用。
2. repeatOnLifecylcle
lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {latestNewsViewModel.uiState.collect { uiState ->
之前已经提到过了。
3. Flow.flowWithLifecycle
myflow()
.flowWithLifecycle(this, Lifecycle.State.STARTED)
.onEach {// ....
}
.launchIn(lifecycleScope)
这也是一种办法,个人不推荐。
因为单向数据流,flow处于上游,而UI层属于下游,把scope传来传去不太好。
4. stateIn/shareIn
需要注意的是使用whileSubscribed()来将会在没有活跃的订阅者时停止内部的生产者。
参考文章均为google官网或者google的专栏:
使用更为安全的方式收集 Android UI 数据流
https://zhuanlan.zhihu.com/p/389218426
从 LiveData 迁移到 Kotlin 数据流
https://zhuanlan.zhihu.com/p/384904254
https://medium.com/androiddevelopers/migrating-from-livedata-to-kotlins-flow-379292f419fb
Flow 操作符 shareIn 和 stateIn 使用须知
https://zhuanlan.zhihu.com/p/401508806
实战 | 使用 Kotlin Flow 构建数据流 “管道”
https://zhuanlan.zhihu.com/p/486186865
Android 上的 Kotlin 数据流 | Android Developers
StateFlow 和 SharedFlow | Kotlin | Android Developers