Android 协程间通信
一、Channel(通道)
Channel 是协程间通信的主要方式,类似于阻塞队列但使用挂起而非阻塞。
1. 基本 Channel 使用
// 创建 Channel
val channel = Channel<Int>()// 生产者协程
launch {for (i in 1..5) {channel.send(i) // 发送数据println("发送: $i")}channel.close() // 关闭通道
}// 消费者协程
launch {for (item in channel) { // 接收数据直到通道关闭println("接收: $item")}
}
2. Channel 的容量类型
// 无缓冲通道(默认)- 发送者会挂起直到接收者接收
val rendezvousChannel = Channel<Int>()// 有缓冲通道 - 可以存储指定数量的元素
val bufferedChannel = Channel<Int>(capacity = 10)// 无限容量通道 - 发送永不挂起
val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)// 合并通道 - 最新值覆盖旧值,容量为1
val conflatedChannel = Channel<Int>(Channel.CONFLATED)
3. 实际应用示例
class NetworkManager {private val requestChannel = Channel<Request>(capacity = 100)private val responseChannel = Channel<Response>()init {// 请求处理协程CoroutineScope(Dispatchers.IO).launch {for (request in requestChannel) {val response = processRequest(request)responseChannel.send(response)}}}suspend fun sendRequest(request: Request): Response {requestChannel.send(request)return responseChannel.receive()}private suspend fun processRequest(request: Request): Response {// 网络请求处理delay(100)return Response(request.id, "Success")}
}
二、Flow(流)
Flow 是冷流,更适合响应式数据流场景。
1. 基本 Flow
// 创建 Flow
fun getNumbers(): Flow<Int> = flow {for (i in 1..5) {delay(100)emit(i) // 发射数据}
}// 收集 Flow
lifecycleScope.launch {getNumbers().collect { value ->println("收到: $value")}
}
2. StateFlow(状态流)
class UserViewModel : ViewModel() {// 私有可变状态private val _uiState = MutableStateFlow<UiState>(UiState.Loading)// 公开只读状态val uiState: StateFlow<UiState> = _uiState.asStateFlow()fun loadData() {viewModelScope.launch {_uiState.value = UiState.Loadingtry {val data = fetchData()_uiState.value = UiState.Success(data)} catch (e: Exception) {_uiState.value = UiState.Error(e.message)}}}
}// Activity/Fragment 中收集
lifecycleScope.launch {viewModel.uiState.collect { state ->when (state) {is UiState.Loading -> showLoading()is UiState.Success -> showData(state.data)is UiState.Error -> showError(state.message)}}
}
3. SharedFlow(共享流)
class EventBus {private val _events = MutableSharedFlow<Event>(replay = 0, // 新订阅者收到的历史事件数extraBufferCapacity = 10 // 额外缓冲容量)val events: SharedFlow<Event> = _events.asSharedFlow()suspend fun emit(event: Event) {_events.emit(event)}
}// 使用示例
class MainActivity : AppCompatActivity() {private val eventBus = EventBus()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)// 订阅事件lifecycleScope.launch {eventBus.events.collect { event ->handleEvent(event)}}// 发送事件lifecycleScope.launch {eventBus.emit(Event.UserLoggedIn("user123"))}}
}
三、Deferred(延迟值)
用于协程间传递单个异步结果。
// 创建 Deferred
val deferred: Deferred<String> = async {delay(1000)"计算结果"
}// 等待结果
launch {val result = deferred.await()println(result)
}
实际应用:并行请求
suspend fun loadUserProfile() {coroutineScope {val userInfo = async { fetchUserInfo() }val userPosts = async { fetchUserPosts() }val userFriends = async { fetchUserFriends() }// 并行执行,等待所有结果val profile = UserProfile(info = userInfo.await(),posts = userPosts.await(),friends = userFriends.await())displayProfile(profile)}
}
四、CompletableDeferred(可完成的延迟值)
手动控制结果的完成。
class DataLoader {private val resultDeferred = CompletableDeferred<Data>()fun startLoading() {CoroutineScope(Dispatchers.IO).launch {try {val data = loadData()resultDeferred.complete(data) // 手动完成} catch (e: Exception) {resultDeferred.completeExceptionally(e) // 完成异常}}}suspend fun awaitResult(): Data {return resultDeferred.await()}
}
五、Actor 模式(高级)
Actor 是 Channel 的封装,用于状态管理。
sealed class CounterMsg
object Increment : CounterMsg()
object Decrement : CounterMsg()
class GetValue(val response: CompletableDeferred<Int>) : CounterMsg()fun CoroutineScope.counterActor() = actor<CounterMsg> {var counter = 0for (msg in channel) {when (msg) {is Increment -> counter++is Decrement -> counter--is GetValue -> msg.response.complete(counter)}}
}// 使用
val counter = counterActor()launch {counter.send(Increment)counter.send(Increment)val response = CompletableDeferred<Int>()counter.send(GetValue(response))println("计数: ${response.await()}")
}
六、协程间通信方式对比

七、最佳实践建议
1. 选择合适的通信方式
// ✅ UI 状态 - 使用 StateFlow
val uiState: StateFlow<UiState>// ✅ 一次性事件 - 使用 SharedFlow 或 Channel
val events: SharedFlow<Event>// ✅ 数据流转换 - 使用 Flow
fun searchUsers(query: String): Flow<List<User>>// ✅ 并行任务 - 使用 async/await
val result1 = async { fetchData1() }
val result2 = async { fetchData2() }
2. 注意生命周期
// ✅ 绑定生命周期
lifecycleScope.launch {viewModel.uiState.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED).collect { state -> updateUI(state) }
}// ❌ 避免内存泄漏
class MyActivity : AppCompatActivity() {private val scope = CoroutineScope(Dispatchers.Main) // 可能泄漏override fun onDestroy() {super.onDestroy()scope.cancel() // 必须手动取消}
}
- 错误处理
launch {try {channel.send(data)} catch (e: CancellationException) {// 协程取消,正常情况throw e} catch (e: Exception) {// 其他异常,记录日志Log.e("TAG", "发送失败", e)}
}
