๐ฑ 2025 ์๋๋ก์ด๋ ํ๊ตฌ์์ญ, Coroutines Flow ๋๋ฌธ์?! (feat. ์ฌ์ด ํด์ค)
๊ฐ์ธ ๊ด๊ณ ์์ญ
์ด ๊ธ์ 2025 ์๋๋ก์ด๋ ํ๊ตฌ ์์ญ์ ๋์จ ๋ฌธ์ ์ค flow์ ๊ด๋ จํ ๋ฌธ์ ๋ฅผ ํด์ํ ๊ธ์ด๋ค.
์๋๋ก์ด๋ ํ๊ตฌ ์์ญ ํ๊ธฐ ๊ธ
- (๐จ ์ค๋ฅ ์ ์ ) ๐ค 2025๋ ๋์๋ ๊ฐ๋ฐ์๋ค์ ์ฝ๋ฃจํด ์์ธ ์ฒ๋ฆฌ ๋๋ฌธ์ ๋ฐค์๊ฐ? ๐จ (2025ํ๋ ๋ ์๋๋ก์ด๋ ํ๊ตฌ์์ญ ๋ฌธ์ ํ์ด) - link
- 2025ํ๋ ๋ ์๋๋ก์ด๋ ํ๊ตฌ์์ญ - link
- 2025ํ๋ ๋ ์๋๋ก์ด๋ ํ๊ตฌ์์ญ์ ์ค๋นํ๋ฉฐ(๊ฒฝ์๋ ์์ฑ) - link
๋ฌด์จ ๋ฌธ์ ์ธ๊ฐ?
์ฝ๋ฃจํด flow
์ ๋ํ ๋ฌธ์ ๊ฐ ํฌ๊ฒ 2๋ฌธ์ ๊ฐ ์์๋ค. ์ด ์ค ์ฝ๋ฃจํด ์์ญ์ ํฌํจํ 1๊ฐ์ ๋ฌธ์ ๋ฅผ ์ดํด๋ณด๋ ค ํ๋ค.
์ด ๋ฌธ์ ๋ StateFlow
์ flow {}
๋ฅผ ๋ณตํฉ์ ์ผ๋ก ์ฌ์ฉํ๋ ๋ฌธ์ ์ด๋ค.
์ด ๋ฌธ์ ์ ๋์ค๋ Flow์ ๋ํ ๋ด์ฉ์ ์ด์ ์ ํ์์ ๋ธ๋ก๊ทธ์ ์์ฑํ๋ ๋ค์ํ ๊ธ์ด ์์ผ๋ ๋งํฌ๋ฅผ ์ถ๊ฐํ๋ค.
- Callback์ผ๋ก ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ Coroutines์์ ํ์ฉํ๋ ๋ฐฉ๋ฒ! Flow ํ์ฉ - link
- Callback์ผ๋ก ์ ๋ฌ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ Coroutines์ Channel๋ก ์ฒ๋ฆฌํด๋ณด์. - link
์ด ๊ธ์์๋
- 2025 ์๋๋ก์ด๋ ํ๊ตฌ ์์ญ์ ๋์จ ๋ฌธ์ ์ผ๋ถ๋ฅผ ์ ๋ฆฌํ๋ค.
- Coroutiens flow์ ๋ํ ์ดํด๊ฐ ํ์ํ๋ค.
๋ฌธ์ ๋ฅผ ์ดํด๋ณด์
์๋๊ฐ ๋ฌธ์ ์์ ๋์จ ์ ์ฒด ์ฝ๋์ด๋ค.(Test ๋ถ๋ถ์ ์์ )
@Test
fun test() = runTest {
val stateFlow = MutableStateFlow(1)
val flow = flow {
emit(1)
delay(1_000L)
emit(2)
delay(1_000L)
emit(3)
}
.flatMapLatest { value ->
stateFlow
.map {
value + it
}
}
launch {
delay(999L)
stateFlow.emit(2)
delay(999L)
stateFlow.emit(3)
}
flow
.collect { value -> println("value: $value") }
}
์ด ๋ฌธ์ ์์๋ 1 ๊ฐ์ StateFlow
์ 1 ๊ฐ์ flow {}
๋ฅผ ์ด๊ธฐํํ๊ณ , flow ๋ค์์ flatMapLatest
๋ฅผ ์ฌ์ฉํ์ฌ StateFlow
๋ฅผ ํตํด ์๋ก์ด ํ๋ฆ์ ๋ง๋๋ ๋ฌธ์ ์ด๋ค.
flatMapLatest๋ ๋ฌด์์ธ๊ฐ?
flatMapLatest - link์ ํจ์ ์ํ์ ์๋์ ๊ฐ๋ค.
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
๋ฌธ์์ ์ค๋ช ๊ทธ๋๋ก๋ฅผ ๊ฐ์ ธ์ค๋ฉด
Returns a flow that switches to a new flow produced by transform function every time the original flow emits a value. When the original flow emits a new value, the previous flow produced by transform block is cancelled.
์ ๋ฏธ๋์ด์๊ฒ ํด์์ ์์ฒญํ ๋ถ๋ถ์ ๋ค์๊ณผ ๊ฐ๋ค.
์ด ์ฝ๋๋ ์๋ณธ flow๊ฐ ๊ฐ์ ๋ด๋ณด๋ผ ๋๋ง๋ค transform ํจ์์ ์ํด ์์ฑ๋ ์๋ก์ด flow๋ก ์ ํํ๋ flow๋ฅผ ๋ฐํํฉ๋๋ค.
์ข ๋ ์์ธํ ์ค๋ช ํ๋ฉด,
- flow๋ ์ฝ๋ฃจํด์์ ๋น๋๊ธฐ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์๋ฃ ๊ตฌ์กฐ์ ๋๋ค. ๋ง์น ์๊ฐ์ ๋ฐ๋ผ ๊ฐ์ ๋ด๋ณด๋ด๋ ์คํธ๋ฆผ๊ณผ ๊ฐ์ต๋๋ค.
- transform์ flow์ ๊ฐ ๊ฐ์ ๋ณํํ๋ ํจ์์ ๋๋ค. ์ด ํจ์๋ ์๋ก์ด flow๋ฅผ ์์ฑํ ์๋ ์์ต๋๋ค. flatMapLatest๋ flow์ ๊ฐ ๊ฐ์ ๋ํด transform ํจ์๋ฅผ ํธ์ถํ์ฌ ์๋ก์ด flow๋ฅผ ์์ฑํ๊ณ , ์ด flow๋ค์ ๊ฐ์ ์์ฐจ์ ์ผ๋ก ๋ด๋ณด๋ด๋ ์ฐ์ฐ์์ ๋๋ค.
- ์ค์ํ ์ ์, ์๋ณธ flow๊ฐ ์๋ก์ด ๊ฐ์ ๋ด๋ณด๋ผ ๋๋ง๋ค ์ด์ ์ transform ํจ์์ ์ํด ์์ฑ๋ flow๋ ์ทจ์๋๋ค๋ ๊ฒ์ ๋๋ค. ์ฆ, ํญ์ ์ต์ ๊ฐ์ ๋ํ flow๋ง ํ์ฑํ๋ฉ๋๋ค. ์๋ฅผ ๋ค์ด, ์ฌ์ฉ์๊ฐ ๊ฒ์์ด๋ฅผ ์ ๋ ฅํ ๋๋ง๋ค ๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ ธ์ค๋ flow๊ฐ ์๋ค๊ณ ๊ฐ์ ํด ๋ณด๊ฒ ์ต๋๋ค. flatMapLatest๋ฅผ ์ฌ์ฉํ๋ฉด ์ฌ์ฉ์๊ฐ ์๋ก์ด ๊ฒ์์ด๋ฅผ ์ ๋ ฅํ ๋๋ง๋ค ์ด์ ๊ฒ์ ๊ฒฐ๊ณผ์ ๋ํ flow๋ ์ทจ์๋๊ณ ์๋ก์ด ๊ฒ์์ด์ ๋ํ flow๊ฐ ์์ฑ๋ฉ๋๋ค.
์ด๋ฌํ ๋ฐฉ์์ ์ต์ ๊ฐ๋ง ์ฒ๋ฆฌํด์ผ ํ๋ ๊ฒฝ์ฐ์ ์ ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ์ฌ์ฉ์ ์ธํฐํ์ด์ค์์ ์ต์ ๋ฐ์ดํฐ๋ง ํ์ํด์ผ ํ๋ ๊ฒฝ์ฐ flatMapLatest๋ฅผ ์ฌ์ฉํ๋ฉด ๋ถํ์ํ ์์ ์ ๋ฐฉ์งํ๊ณ ์ฑ๋ฅ์ ํฅ์์ํฌ ์ ์์ต๋๋ค.
์ฌ๊ธฐ์ ๋ง์ง๋ง์ cancel ๋ถ๋ถ์ด ๊ถ๊ธํ์ฌ ์์ธ ์ฝ๋๋ฅผ ์ข ๋ ์ดํด๋ณด์๋๋ฐ, collect {}
์ฒ๋ฆฌ ํ apply
๋ถ๋ถ์์ cancel()
๊ณผ join()
์ ์ฒ๋ฆฌํ๊ณ ์์์ ์ ์ ์๋๋ฐ, ํญ์ ์ต์ ์ ๊ฐ๋ง์ ํ๋ ค๋ณด๋์ ์ฝ๋๋ก ์ ์ ์๋ค.
internal class ChannelFlowTransformLatest<T, R>(
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
override suspend fun flowCollect(collector: FlowCollector<R>) {
assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
coroutineScope {
var previousFlow: Job? = null
flow.collect { value ->
previousFlow?.apply { // ์ด ๋ถ๋ถ
cancel(ChildCancelledException())
join()
}
// Do not pay for dispatch here, it's never necessary
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
}
}
}
๊ฒฐ๊ตญ flatMapLatest๋ฅผ ์ฌ์ฉํ๋ฉด 2 ๊ฐ์ flow
ํ๋ฆ์ ํฉ์น ์ ์์ผ๋ฉฐ, ์ด์ ํ๋ฆ์ ๋ฐ์ดํฐ๋ ์ธ์ ๋ ์ต์ ๋ฐ์ดํฐ ํ๋ฆ์ ์ฌ์ฉํจ์ ์ ์ ์๋ค.
๋์๊ฐ์
MutableStateFlow๋ ๊ฐ์ฅ ๋ง์ง๋ง ์ต์ ๋ฐ์ดํฐ 1 ๊ฐ๋ฅผ ์ฌ์ฉํ ์ ์๋ ์ํ๋ฅผ ์ ์ฅํ๊ธฐ ์ํ ์ฉ๋์ด๋ฉฐ, HotFlow์ด๋ค. ์ธ์ ๋ ์ต์ ๋ฐ์ดํฐ๋ง์ ๊ฐ์ง๋ค.
๋ฐ๋๋ก flow
๋ ๊ตฌ๋
ํ๊ธฐ ์ ์๋ ๋์ํ์ง ์๋ ColdFlow์ ์ํ๋ค.
val stateFlow = MutableStateFlow(1)
val flow = flow {
emit(1)
delay(1_000L)
emit(2)
delay(1_000L)
emit(3)
}
.flatMapLatest { value ->
stateFlow
.map {
value + it
}
}
๊ทธ๋ผ ์ด ์ฝ๋์์์ ๊ตฌ๋
์์ ์ ์ด์ด์ ๋์ค๋ ์ฝ๋์์ ์ ์ ์๋๋ฐ, launch {}๋ฅผ ์คํํ๊ณ ๋์ ๋ฐ๋ก collect
๋ฅผ ํ๊ณ ์์์ ์ ์ ์๋ค.
launch {
delay(999L)
stateFlow.emit(2)
delay(999L)
stateFlow.emit(3)
}
flow
.collect { value -> println("value: $value") }
๊ตฌ๋ ํ ๋ฐ์ดํฐ์ ์์
์ด ๋ฌธ์ ๋ ํ
์คํธ๊ฐ ๊ฐ๋ฅํ๊ณ , ์ดํดํ ์ ์๋ ํํ๋ก ์ ๊ณตํด์ผ ํด์ delay
๋ฅผ ๋ช
ํํ๊ฒ ์ ๊ณตํด ์ฃผ๊ณ ์๋ค.
๊ตฌ๋
์ด ์์๋๋ฉด emit(1)
์ ์์ํ๊ณ , 1์ด์ ๋๊ธฐ๊ฐ ๊ฑธ๋ฆฐ๋ค. ๊ทธ๋ผ StateFlow์ 1๊ณผ emit 1์ ํฉ์ฐํ์ฌ 2
์ถ๋ ฅ, ์ด์ด์ 999ms ํ์ stateFlow
์ 2
๋ฅผ ์ ๋ฌํ์์ผ๋ฏ๋ก 1+2์ ๊ฒฐ๊ณผ 3์ ์ถ๋ ฅํ๋ค.
์ดํ์ ๊ฒฐ๊ณผ๋ฅผ ํตํฉํ๋ฉด 2, 3, 4, 5, 6
์ด ์์ฐจ๋ก ์ถ๋ ฅ๋๋ค.
์ด ๋ฌธ์ ์ ๋ต์
์ฌ์ค ์ ์ฝ๋๋ฅผ ํด์ํ ํ์๋ ์๋ ๊ฐ๋จํ ๋ฌธ์ ์ด๋ค.
StateFlow์ emit(2)๋ฅผ ํ๋ฉด flow {} emit(1)๋ถํฐ ๋ค์ ๋ฐํ๋์ด์ง๋ค.
๋ผ๋ ๋ถ๋ถ์ด๋ค.
flatMapLatest
์ ๊ฑธ์ด๋ StateFlow
์ ๊ฐ์ ๋๊ธฐ๋๋ผ๋ flow {}
๋ถํฐ ์คํํ๋ค๋ ๊ฑด ์ผ์ด๋ ์ ์๋ค. ํ์ง๋ง exception์ด ๋ฐ์ํ๊ณ , retry
๋ฅผ ๊ฑธ์๋ค๋ฉด flow {}
๋ถํฐ ์คํํ ์ ์๋ค.
retry ์์น๊ฐ ๋งค์ฐ ์ค์ํ๋ฐ, ์ด์ ์ ์์ฑํ ๊ธ์ ์ตํ๋จ ์๋๋ฆฌ์ค 2 ๋ฒ ๋ถ๋ถ์ ์ดํด๋ณด๊ธธ
StateFlow๊ฐ ์๋๋ผ SharedFlow๋ฅผ ํจ๊ป ์ฌ์ฉํ๋ฉด?
๋ฌธ์ ๋ฅผ ์กฐ๊ธ ๋ฐ๊ฟ StateFlow ๋์ SharedFlow๋ฅผ ์ฌ์ฉํ๋ค๋ฉด ์ฝ๋์ ๋์์ ์ด๋ป๊ฒ ๋ ๊น?
@Test
fun test() = runTest {
val sharedFlow = MutableSharedFlow<Int>()
val flow = flow {
emit(1)
delay(1_000L)
emit(2)
delay(1_000L)
emit(3)
}
.flatMapLatest { value ->
sharedFlow
.map {
value + it
}
}
launch {
delay(999L)
sharedFlow.emit(2)
delay(999L)
sharedFlow.emit(3)
}
flow
.collect { value -> println("value: $value") }
}
์ด ์ฝ๋์ ์๋ต์ ํ์ฐํ๊ฒ ๋ฌ๋ผ์ง๋๋ฐ 3, 5
๋ง์ด ์ถ๋ ฅ๋๋ค.
SharedFlow๋ replay๋ฅผ ํ์ง ์๊ธฐ ๋๋ฌธ์ ๊ฐ์ด ์ค๊ธฐ ์ ์๋ ๋ฐ์ ์์ฒด๋ฅผ ํ์ง ์๊ธฐ ๋๋ฌธ์ด๋ค.
๊ทธ๋์ ์ด ์ฝ๋๋
emit(1)
์ ํ๋๋ผ๋ println
ํ์ง ์๊ณ sharedFlow์ 2๊ฐ ๋ค์ด์์ผ ๋ง 1+2
ํ์ฌ 3์ด ์ถ๋ ฅ๋๋ค.
StateFlow, SharedFlow, flow๋ฅผ ์ ๋ค ํฉ์ฑํ๋ค๋ฉด?
merge, combine, zip์ ํ์ฉํ๋ ๊ฒ์ด ์๋๋ผ MutableStateFlow
๋ฅผ ํ๋ ๋ ์ถ๊ฐํ๊ณ flatMapLatest
๋ฅผ ํ๋ ๋ ์ถ๊ฐํ์ฌ ํ์ฉํ๋ ๊ฒฝ์ฐ๋ฅผ ์๋์ ๊ฐ์ด ์ถ๊ฐํด ๋ณด์๋ค.
@Test
fun test() = runTest {
val sharedFlow = MutableSharedFlow<Int>()
val stateFlow = MutableStateFlow(false)
val flow = flow {
emit(1)
delay(1_000L)
emit(2)
delay(1_000L)
emit(3)
}
.flatMapLatest { value ->
sharedFlow
.map {
value + it
}
}
.flatMapLatest { // ์ฌ๊ธฐ์ ์ถ๊ฐ
stateFlow
.filter { it }
}
launch {
delay(999L)
sharedFlow.emit(2)
delay(999L)
sharedFlow.emit(3)
stateFlow.value = true // ์ฌ๊ธฐ์ ์ถ๊ฐ
}
flow
.collect { value -> println("value: $value") }
}
์ด ์ฝ๋๋ ์์ ์ฝ๋๋ณด๋ค ๋ ์ฝ๊ฒ ๋ฑ ํ ๋ฒ์ ๊ฐ๋ง์ด ์ถ๋ ฅ๋๋๋ฐ, ์ด ๊ฐ์ true
์ด๋ค.
์ true
์ธ์ง๋ flatMapLatest
๋ฅผ ์ดํดํ๋ค๋ฉด ๋งค์ฐ ์ฝ๊ฒ ์ ์ ์๋ค.
๋ง๋ฌด๋ฆฌ
์ด์ ์ ์์ฑํ ๊ธ ๋ณด๋ค flow๊ฐ ๋ ์ฝ๊ฒ ๋๊ปด์ง ์ ์๋ค. Job()
์ ์ดํดํ๋ ๋ถ๋ถ์ด ๋ ๋ช
ํํ๊ธฐ ๋๋ฌธ์ด๊ธฐ๋ ํ๊ณ , flow๋ 1๊ฐ๋ก ํด์ํ ์ ์์ง๋ง launch {}
๋ launch
์์ launch
๋ฅผ ์ฌ์ฉํ ์๋ ์๊ธฐ ๋๋ฌธ์ด๋ค.
๊ฐ๋ฅํ๋ค๋ฉด ์๋์ ๊ฐ์ ํํ๋ ์งํฅํ๋ ํธ์ด ๋ชจ๋๊ฐ ์ดํดํ๊ธฐ ์ฌ์ด๋ฐ ์ด๋ ๊ฒ ํ์ฉํ๋ ๊ฒฝ์ฐ๋ ์ฌ์ค ๋ง์ง ์๊ธด ํ๋ค.
launch {
launch {
launch {
// ...
}
}
}
flow๋ ํ๋์ ํ๋ฆ๋ง์ ์ดํดํ๋ฉด ๋๋๋ฐ ๊ทธ ์ฌ์ด์ ๊ฐ์ ํ๋ฆ์ด A to B๋ก ๋ฐ๋๋ ํ๋ฆ๋ง ์ดํดํ๋ฉด ํด์์ด ์ฌ์์ง๊ธฐ ๋๋ฌธ์ด๋ค.
Comments