当前位置: 首页 > news >正文

Kotlin 协程 (三)

协程通信是协程之间进行数据交换和同步的关键机制。Kotlin 协程提供了多种通信方式,使得协程能够高效、安全地进行交互。以下是对协程通信的详细讲解,包括常见的通信原语、使用场景和示例代码。

1.1 Channel

定义:Channel 是一个消息队列,用于协程之间的通信。它允许协程发送和接收数据,类似于 MPI(Message Passing Interface)中的消息传递机制。

特点:

  • 线程安全。
  • 支持多种通信模式,如无缓冲、有缓冲和无限缓冲。
  • 提供挂起函数 send 和 receive,用于发送和接收数据。
  • 可以关闭。

Channel 类型

类型描述创建方式适用场景
Rendezvous无缓冲(默认)Channel<T>()严格的发送-接收同步
Buffered固定大小缓冲Channel<T>(capacity)控制内存使用
Conflated保留最新值Channel<T>(CONFLATED)只需要最新数据
Unlimited无限缓冲Channel<T>(UNLIMITED)生产者快于消费者
Broadcast广播给多个接收者BroadcastChannel<T>(capacity)一对多通信

使用场景:

1.生产者-消费者模式
  • 场景描述:一个或多个生产者协程生成数据,一个或多个消费者协程处理数据。
  • 适用性:当数据需要按顺序处理时,Channel 提供了天然的 FIFO 队列。
  • 示例:文件处理流水线,其中文件读取、处理和写入由不同的协程完成。
2 .协程间的事件总线
  • 场景描述:一个协程发送事件,多个协程监听并响应这些事件。
  • 适用性:当需要解耦协程之间的通信时,Channel 可以作为事件传递的媒介。
  • 示例:在 GUI 应用中,用户操作(如按钮点击)通过 Channel 发送事件,不同的协程根据事件执行相应的逻辑。
3. 资源池管理
  • 场景描述:多个协程需要共享一组有限资源,如数据库连接或网络请求。
  • 适用性:Channel 可以控制对资源的并发访问,避免资源争用导致的冲突。
  • 示例:使用 Channel 作为资源池,协程从池中请求资源,使用完毕后释放回池中

示例:

fun main() = runBlocking {val channel = Channel<Int>() // 创建无缓冲通道// 生产者协程launch {for (x in 1..5) {println("Sending $x")channel.send(x) // 挂起直到有接收者delay(100) // 模拟工作}channel.close() // 关闭通道}// 消费者协程launch {for (y in channel) { // 使用for循环接收println("Received $y")}println("Channel is closed")}delay(2000)
}
1.2 Flow

定义:Flow 是一种冷流(cold stream),意味着数据流只有在收到收集(collection)请求时才会开始发射数据。它支持挂起操作,可以与协程完美结合,实现异步计算和数据处理。

特点:

  • 异步数据流:以异步方式处理连续数据流。
  • 声明式编程:通过操作符链式调用处理数据流。
  • 可组合性:支持 map、filter、flatMap、zip 等操作符。
  • 取消支持:与协程一样支持取消操作。

基本使用:

//使用 flow 建造器创建一个 Flow 对象。
fun transformFlow() = flow {for (i in 1..5) {delay(100)emit(i)}
}.map { it * 2 } // 将每个值乘以 2.filter { it > 4 } // 过滤掉小于等于 4 的值fun main() = runBlocking {
//使用 collect 函数收集 Flow 发射的数据。transformFlow().collect { value -> println("Transformed value: $value")}
}

使用场景:

1 .异步数据流处理
  • 场景描述:对异步数据流进行转换、合并、过滤等操作。
  • 适用性:当需要对数据流进行复杂的操作时,Flow 提供了丰富的操作符。
  • 示例:在数据处理管道中,使用 Flow 对数据进行映射、过滤和归约。
2. 网络请求与数据解析
  • 场景描述:从网络获取数据,并进行解析和转换。
  • 适用性:Flow 可以与网络库(如 Retrofit)结合,简化异步网络请求的处理。
  • 示例:使用 Flow 处理分页网络请求,逐页获取数据并进行解析。
3. 数据库查询与观察
  • 场景描述:从数据库查询数据,并对结果进行观察。
  • 适用性:Flow 可以与数据库库(如 Room)结合,实现数据的异步查询和实时更新。
  • 示例:在 Android 应用中,使用 Flow 监听数据库表的变化,并更新 UI。

示例:

import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.http.GET
import kotlinx.coroutines.*interface ApiService {@GET("data")fun fetchData(): Flow<String>
}val apiService = Retrofit.Builder().baseUrl("https://api.example.com").addConverterFactory(GsonConverterFactory.create()).build().create(ApiService::class.java)fun main() = runBlocking {apiService.fetchData().flowOn(Dispatchers.IO) // 在 IO 线程上执行网络请求.collect { data ->println("Received data: $data")}
}
1.3 SharedFlow 和 StateFlow

定义:SharedFlow 和 StateFlow 是 Kotlin Flow 库中的两种特殊 Flow,用于在多个收集器之间共享状态和数据流。

特点:

  • SharedFlow:热流,允许多个收集器接收数据,可配置重放(replay)和缓冲,不保存状态。
  • StateFlow:特殊的 SharedFlow,必须有初始值,只保留最新值,并在收集器加入时立即发出当前状态。

使用场景:

1. 事件流共享
  • 场景描述:多个协程需要共享同一个事件流,并且每个协程都能接收到事件。
  • 适用性:当事件需要被多个订阅者处理,且每个订阅者都能独立地接收事件时。
  • 示例:在实时数据应用中,传感器数据通过 SharedFlow 分发给多个处理单元。
2 状态广播
  • 场景描述:一个协程更新状态,多个协程监听状态变化。
  • 适用性:当状态变化需要通知给多个观察者时,SharedFlow 提供了广播机制。
  • 示例:在游戏开发中,玩家状态(如生命值、分数)通过 SharedFlow 广播给 UI 和逻辑处理单元。
3. 背压管理
  • 场景描述:生产者生成数据的速度可能快于消费者处理数据的速度。
  • 适用性:SharedFlow 支持背压策略,可以控制数据的发送速度,避免消费者过载。
  • 示例:在数据流处理中,使用 SharedFlow 缓解高速数据源对处理单元的压力。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*fun main() = runBlocking {val sharedFlow = MutableSharedFlow<String>()// 事件发送协程launch {sharedFlow.emit("Event 1")delay(1000)sharedFlow.emit("Event 2")}// 事件监听协程 1launch {sharedFlow.collect { event ->println("Listener 1 received: $event")}}// 事件监听协程 2launch {sharedFlow.collect { event ->println("Listener 2 received: $event")}}delay(2000) // 等待事件处理完成coroutineContext.cancelChildren() // 取消所有子协程
}
1.4 Await 和 Async

定义:async 是一种启动协程并获取其结果的方式。await 用于等待 async 协程的结果。

特点:

  • async 会立即返回一个 Deferred 对象,可以在需要时通过 await 获取结果。
  • 支持结构化并发,await 会在需要时挂起协程,直到结果准备好。

使用场景:

  • 当需要并行执行任务并聚合结果时。
  • 当需要按需获取协程结果时。

示例:

 import kotlinx.coroutines.*suspend fun main() {val deferred = async { expensiveComputation() }val result = deferred.await()println("Result: $result")}suspend fun expensiveComputation(): Int {delay(1000)return 42}
1.5 Actor

定义:Actor 是结合了协程和通道的实体,封装了状态和处理消息的能力。

特点:

  • 消息传递:Actor 之间通过发送和接收消息进行通信。
  • 封装状态:每个 Actor 封装了自己的状态和行为,其他 Actor 无法直接访问其内部状态。
  • 异步通信:通过消息传递实现异步交互,避免传统并发模型中的死锁和竞争问题。

使用场景:

  • 分布式系统:在分布式环境中,Actor 可以作为独立的计算单元,通过消息传递完成协同任务。
  • 实时通信:适用于需要实时处理消息的场景,如聊天应用、游戏服务器等。
  • 高并发任务处理:处理需要同时执行多个任务的应用,例如订单处理、数据分析等。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*fun main() = runBlocking {val actor = actor<String> {//Actor 内部通过 for 循环接收来自 channel 的消息,并执行相应的处理逻辑。for (msg in channel) {println("Received: $msg")// 处理消息}}// 向 Actor 发送消息actor.send("Hello")actor.send("World")actor.close() // 关闭 Actor
}

2. 示例:实时搜索场景
class SearchViewModel : ViewModel() {// 使用 MutableStateFlow 存储可变的搜索查询字符串// StateFlow 是热流,会自动收集数据,适合 UI 状态管理private val searchQuery = MutableStateFlow("")// 使用 MutableStateFlow 存储可变的搜索结果列表// 注意:这里返回的 searchResults 是不可变的 StateFlow,避免外部直接修改private val _searchResults = MutableStateFlow<List<String>>(emptyList())val searchResults: StateFlow<List<String>> = _searchResultsinit {// 在 viewModelScope 中启动协程// viewModelScope 是 ViewModel 提供的协程作用域,基于 Dispatchers.Main 运行// 当 ViewModel 销毁时,viewModelScope 会自动取消所有协程,避免内存泄漏viewModelScope.launch {// 构建 Flow 管道,处理搜索查询到结果的转换searchQuery// debounce(300):防抖处理,300ms 内多次输入只触发最后一次// 避免用户快速输入时触发过多搜索请求.debounce(300)// filter it.length > 2:过滤短查询,避免无效请求// 只有查询长度大于 2 时才继续处理.filter { it.length > 2 }// distinctUntilChanged():忽略重复的查询,避免重复请求// 只有查询内容变化时才继续处理.distinctUntilChanged()// flatMapLatest:取消前一个未完成的搜索,只保留最新的查询结果// 当新的查询到来时,会取消前一个协程任务.flatMapLatest { query ->// 调用 performSearch 发起搜索,返回 Flow<List<String>>performSearch(query)}// catch emit(emptyList()):捕获异常并返回空列表,避免 UI 出错// 如果搜索过程中发生异常,会发射空列表作为默认结果.catch { emit(emptyList()) }// collect:收集 Flow 发射的数据,更新搜索结果.collect { results ->// 更新 _searchResults 的值,UI 会自动响应变化_searchResults.value = results}}}// 处理用户输入变化,更新搜索查询fun onSearchQueryChanged(query: String) {// 直接设置 MutableStateFlow 的值,会触发 Flow 管道重新计算searchQuery.value = query}// 模拟网络搜索请求,返回 Flow<List<String>>private fun performSearch(query: String): Flow<List<String>> = flow {// delay(1000):模拟耗时操作(如网络请求),可被协程取消// 如果协程被取消,delay 会立即抛出 CancellationExceptiondelay(1000)// emit:发射搜索结果// 这里模拟返回两个结果项emit(listOf("query result 1", "query result 2"))}
}

相关文章:

  • 9、AI测试辅助-代码Bug分析提示词优化
  • 安卓settings单双屏显示
  • 用typoa写markdown文档笔记
  • 使用布隆过滤器实现java大数据筛选是否存在
  • 微软宣布的五大重要事项|AI日报0520
  • 微软开放代理网络愿景
  • 镜像管理(2)Dockerfile总结
  • vue3/vue2大屏适配
  • 扫盲笔记之NPM
  • Wan2.1 文生视频 支持批量生成、参数化配置和多语言提示词管理
  • C及C++的音频库与视频库介绍
  • WIFI信号状态信息 CSI 深度学习篇之CNN(Python)
  • 第5天-python饼图绘制
  • Jenkins:自动化之魂,解锁高效开发的密钥
  • 三、【数据建模篇】:用 Django Models 构建测试平台核心数据
  • SQLite基础及优化
  • PL/SQL 安装配置与使用
  • 力扣热题——零数组变换 |
  • LeetCode 93.复原IP地址 LeetCode 78.子集 LeetCode 90.子集II
  • SpringMVC所有注解按照使用位置划分
  • 百年前淮北乡村的风俗画卷——读郑重 《九十自述》
  • 马上评|把孩子当牟利工具,这样的流量吃不得
  • 《让世界爱中国》新书发布,探讨大变局下对外讲好中国故事
  • 广东茂名高州市山体滑坡已致3死1失联,搜救仍在继续
  • 被央视曝光“废旧厂区沦为垃圾山”,江西萍乡成立调查组查处
  • 新城市志|GDP万亿城市,一季度如何挑大梁