Skip to content

Everything Claude Code 的 kotlin-coroutines-flows Skill 专为 Android 和 Kotlin Multiplatform 项目设计,系统化解决协程结构化并发、响应式数据流(Flow/StateFlow/SharedFlow)、错误处理与协程测试难题。通过标准化 Scope 管理、流操作符、事件分发和测试模式,大幅提升代码健壮性与可维护性,是提升 AI 辅助 Kotlin 编程效率的关键 Skill。

Everything Claude Code Kotlin Coroutines Flows Skill:StateFlow、CoroutineScope、错误处理与协程测试模式

在现代 Android 和 Kotlin Multiplatform(KMP)项目中,异步编程、响应式数据流和高质量测试是开发效率与代码健壮性的核心。Everything Claude Code 的 [kotlin-coroutines-flows Skill] 专为此场景打造,标准化了 CoroutineScope、StateFlow、Flow 操作符、错误处理和协程测试的最佳实践。无论你是初次接触协程,还是希望通过 AI 编程助手实现自动化代码审查、生成和重构,该 Skill 都能帮助你系统性提升 Kotlin 协程代码的质量与可维护性。

1. 这个 Skill 解决什么问题?

没有该 Skill 时常见问题:

  • 协程作用域(Scope)混乱,容易出现内存泄漏或任务未按预期取消
  • StateFlow/SharedFlow 用法不规范,UI 状态同步和事件分发易出错
  • 并发操作、错误处理、重试机制实现分散且易遗漏
  • 协程和 Flow 的测试难以覆盖边界和并发场景
  • 反模式频发(如滥用 GlobalScope、在 Composable 中重复创建 Flow 等)

Skill 作用:

  • 规范协程作用域与结构化并发,避免泄漏和僵尸任务
  • 标准化 StateFlow/SharedFlow 用于 UI 状态与一次性事件
  • 提供 Flow 操作符(debounce、combine、retry 等)实用模式
  • 明确错误处理、协程取消与资源清理流程
  • 提供 Turbine、TestDispatcher 等协程测试模式,易于集成到 TDD 流程
  • 明确反模式,减少常见协程陷阱

更多关于 ECC 体系 Skill 的整体介绍,可参考 Everything Claude Code 完全指南

2. 触发条件:何时会自动激活?

  • 你在项目中使用 Kotlin 协程(coroutines)、Flow、StateFlow、SharedFlow 进行异步或响应式开发
  • 需要对并发任务进行结构化管理(如并行加载、分层 Scope、任务取消等)
  • 需要对 UI 状态、事件进行响应式建模
  • 需要为协程/Flow 相关逻辑编写单元测试或集成测试
  • 代码审查、自动重构、AI 代码生成等流程中检测到协程相关反模式时

Skill 会在上述场景下自动分析代码上下文,推荐最佳实践或直接生成/修复代码片段。

3. Step by Step:实际项目中如何用好这个 Skill

步骤一:结构化并发与 Scope 管理

核心原则:始终使用结构化并发,避免 GlobalScope。

kotlin
// 错误示例:容易泄漏
GlobalScope.launch { fetchData() }

// 推荐用法:生命周期绑定
viewModelScope.launch { fetchData() }

// 组件内协程
LaunchedEffect(key) { fetchData() }

并行任务分解:

kotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
    val items = async { itemRepository.getRecent() }
    val stats = async { statsRepository.getToday() }
    val profile = async { userRepository.getCurrent() }
    Dashboard(
        items = items.await(),
        stats = stats.await(),
        profile = profile.await()
    )
}

子任务失败不影响其他任务:

kotlin
suspend fun syncAll() = supervisorScope {
    launch { syncItems() }
    launch { syncStats() }
    launch { syncSettings() }
}

步骤二:响应式数据流建模(StateFlow/SharedFlow)

StateFlow 建模 UI 状态:

kotlin
class DashboardViewModel(
    observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
    val progress: StateFlow<UserProgress> = observeProgress()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5_000),
            initialValue = UserProgress.EMPTY
        )
}

WhileSubscribed(5_000) 可在最后一个订阅者离开后保持上游 5 秒,避免配置变更时重复加载。

合并多个 Flow:

kotlin
val uiState: StateFlow<HomeState> = combine(
    itemRepository.observeItems(),
    settingsRepository.observeTheme(),
    userRepository.observeProfile()
) { items, theme, profile ->
    HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())

SharedFlow 用于一次性事件(如 Snackbar、导航等):

kotlin
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()

private fun deleteItem(id: String) {
    viewModelScope.launch {
        repository.delete(id)
        _effects.emit(Effect.ShowSnackbar("Item deleted"))
    }
}

在 Composable 中收集:

kotlin
LaunchedEffect(Unit) {
    viewModel.effects.collect { effect ->
        when (effect) {
            is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
            is Effect.NavigateTo -> navController.navigate(effect.route)
        }
    }
}

步骤三:常用 Flow 操作符与错误处理

防抖搜索输入、distinct、flatMapLatest、catch:

kotlin
searchQuery
    .debounce(300)
    .distinctUntilChanged()
    .flatMapLatest { query -> repository.search(query) }
    .catch { emit(emptyList()) }
    .collect { results -> _state.update { it.copy(results = results) } }

带指数退避的重试:

kotlin
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
    .retryWhen { cause, attempt ->
        if (cause is IOException && attempt < 3) {
            delay(1000L * (1 shl attempt.toInt()))
            true
        } else {
            false
        }
    }

步骤四:Dispatcher 选择与跨平台兼容

  • CPU 密集型:Dispatchers.Default
  • IO 密集型:Dispatchers.IO(仅 JVM/Android,KMP 用 Default 或 DI 提供)
  • UI 线程:Dispatchers.Main(所有平台可用)
kotlin
withContext(Dispatchers.Default) { parseJson(largePayload) }
withContext(Dispatchers.IO) { database.query() }
withContext(Dispatchers.Main) { updateUi() }

步骤五:协程取消与资源清理

协作式取消:

kotlin
suspend fun processItems(items: List<Item>) = coroutineScope {
    for (item in items) {
        ensureActive()  // 检查是否被取消
        process(item)
    }
}

try/finally 保证资源释放:

kotlin
viewModelScope.launch {
    try {
        _state.update { it.copy(isLoading = true) }
        val data = repository.fetch()
        _state.update { it.copy(data = data) }
    } finally {
        _state.update { it.copy(isLoading = false) }
    }
}

步骤六:协程与 Flow 测试模式

StateFlow + Turbine 测试:

kotlin
@Test
fun `search updates item list`() = runTest {
    val fakeRepository = FakeItemRepository().apply { emit(testItems) }
    val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))

    viewModel.state.test {
        assertEquals(ItemListState(), awaitItem())  // 初始状态

        viewModel.onSearch("query")
        val loading = awaitItem()
        assertTrue(loading.isLoading)

        val loaded = awaitItem()
        assertFalse(loaded.isLoading)
        assertEquals(1, loaded.items.size)
    }
}

TestDispatcher 控制并发:

kotlin
@Test
fun `parallel load completes correctly`() = runTest {
    val viewModel = DashboardViewModel(
        itemRepo = FakeItemRepo(),
        statsRepo = FakeStatsRepo()
    )

    viewModel.load()
    advanceUntilIdle()

    val state = viewModel.state.value
    assertNotNull(state.items)
    assertNotNull(state.stats)
}

伪造 Flow 源:

kotlin
class FakeItemRepository : ItemRepository {
    private val _items = MutableStateFlow<List<Item>>(emptyList())

    override fun observeItems(): Flow<List<Item>> = _items

    fun emit(items: List<Item>) { _items.value = items }
}

步骤七:常见反模式与注意事项

  • 避免使用 GlobalScope,应绑定具体生命周期
  • 不要在 init {} 里直接收集 Flow,应在 viewModelScope.launch 中处理
  • StateFlow/SharedFlow 的数据结构应保持不可变,避免直接操作可变集合
  • 不要捕获 CancellationException,让其自然传播
  • 不要用 flowOn(Dispatchers.Main) 试图切换收集线程,收集端 dispatcher 由调用者决定
  • 在 Composable 中创建 Flow 时必须 remember,否则每次重组都会新建 Flow

更多 Compose 场景下的 Flow 消费模式,详见 Compose Multiplatform Patterns Skill;如需了解协程在 Clean Architecture 各层的定位,参阅 Android Clean Architecture Skill

4. 常见配套 Agent 与 Skill 协作

  • Kotlin Reviewer Agent:自动检测协程反模式、推荐 Scope/Flow 最佳实践,审查 Compose 与 ViewModel 层的状态管理
  • Kotlin Build Resolver Agent:定位协程/Flow 相关的编译或类型错误,自动给出修复建议
  • Compose Multiplatform Patterns Skill:结合响应式 UI 层,自动生成 Flow/StateFlow 消费与绑定代码
  • Android Clean Architecture Skill:指导协程/Flow 在 UseCase、Repository、ViewModel 各层的正确用法
  • Kotlin Testing Skill:集成 Turbine、TestDispatcher 等测试模式,保障异步/并发代码的可测性与覆盖率

5. 输出示例

并发加载 Dashboard:

kotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
    val items = async { itemRepository.getRecent() }
    val stats = async { statsRepository.getToday() }
    val profile = async { userRepository.getCurrent() }
    Dashboard(
        items = items.await(),
        stats = stats.await(),
        profile = profile.await()
    )
}

UI 层收集一次性事件:

kotlin
LaunchedEffect(Unit) {
    viewModel.effects.collect { effect ->
        when (effect) {
            is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
            is Effect.NavigateTo -> navController.navigate(effect.route)
        }
    }
}

测试 StateFlow 输出:

kotlin
viewModel.state.test {
    assertEquals(ItemListState(), awaitItem())
    // ... 断言后续状态
}

FAQ

Q: kotlin-coroutines-flows Skill 适用于哪些项目?
A: 适用于所有采用 Kotlin 协程、Flow、StateFlow/SharedFlow 的 Android 和 KMP 项目,特别是需要响应式 UI、并发任务和高可测性的场景。

Q: 如何与 Compose 或 Clean Architecture Skill 协同?
A: 可与 Compose Multiplatform Patterns Skill 自动生成 UI 绑定代码,与 Android Clean Architecture Skill 协同规范 UseCase/Repository/ViewModel 各层协程用法。

Q: Skill 会自动修复哪些协程反模式?
A: 能检测并修复 GlobalScope 滥用、错误的 Flow 收集方式、可变集合泄漏、错误的取消处理等常见协程反模式,保障代码健壮性和可维护性。