Kotlin Coroutines Flow의 Cold/Hot flow(Stream)의 데이터 흐름(Data flow) 이해해 보기



개인 광고 영역

이전 글에서 데이터 흐름(Data flow)을 이해해 보는 데 있어 필요한 것은? 짝퉁 개발자처럼 논하기란 주제로 글을 작성했다.

이번 글에서는 이 글에 나온 내용 중 Coroutines Flow에 대한 데이터 흐름을 이해하기 위한 글을 작성해 보았다.

이 글에서는

  • Coroutines Flow
  • 지속적인 흐름
  • Cold/Hot stream
  • ReactiveX에서 제공하는 subject에 대해서 이해하기

Coroutines Flow

Asynchronous Flow - link는 공식 문서에 나온 설명을 그대로 가져왔다.

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

supend 함수는 비동기적인 값을 불러올 순 있지만 지속적인 값의 흐름을 가지지는 않는다.

suspend fun fetchData() = /* 생략 */

fun main() {
    println(fetchData())
}

이런 흐름은 사실 누구나 이해하기 쉽다. main 함수에서 fetchData()를 호출하면 응답이 딱 1번 온다는 것이다.

그럼 이를 2회로 반복하면? main 함수에 한 줄 더 추가하거나, (0..1).forEach {}처럼 자동화할 수 있다.

suspend fun fetchData() = /* 생략 */

fun main() {
    println(fetchData())
    println(fetchData()) // 1회 더 추가
}


지속적인 흐름

flow를 통한 지속적인 흐름을 만든다는 것은 아래와 같다.

  • 함수를 호출한다.
  • Flow로 생성한 객체를 리턴 받는다.
  • 구독(collect) 한다.
    • collect()을 하기 전에도 새로운 데이터 흐름이 있다.
    • collect()을 해야만 새로운 데이터 흐름이 시작한다.

flow를 활용하는 방식에서의 지속적인 흐름은 이와 같다.

이를 아무나 이해할 수 있는 내용이 뭐가 있을까?

  • collect() 하기 전에도 새로운 데이터 흐름이 있다.
    • 물은 흐른다.
    • 스트리밍 서비스를 월 결제한다.
  • collect() 해야 만 새로운 데이터 흐름이 시작된다.
    • 커피를 주문한다.
    • 음식을 주문한다.

위의 예들이 맞을 수도 있고, 엄밀히 따지면 적합한 예가 아닐 수 있지만 대략 구분해 보면 이와 같을 수 있다.

결국 이미 있는 걸 필요할 때만 받아쓴다로 표현할 수 있는 것을 HotFlow(HotStream)으로 칭하고, 새로운 줌의 발생으로 만들기 시작한다는 ColdFlow(ColdStream)으로 칭한다.


ColdFlow

ColdFlow인 flow {}에 대한 코드가 아래와 같다.

val flow = flow {
    (1..6).forEach {
        emit(it)
        delay(1_000L)
    }
}

// collect 1
flow
    .onEach { println(it) }
    .launchIn(viewModelScope)

delay(2_000L) // 2초 후 구독

// collect 2
flow
    .onEach { println(it) }
    .launchIn(viewModelScope)

이에 대한 출력 결과는 아래와 같다.

1(new), 2, 1(new), 3, 2, 4, 3, 5, 4, 6, 5, 6

각각을 보면 1-6까지 정상 출력함을 알 수 있다.

이를 그림으로 표현하면 아래와 같다.

sample_01

flow에서의 데이터 흐름은 결국 collect 전에는 아무런 시작을 하지 않음을 알 수 있다.


ReactiveX의 HotStream

HotFlow는 collect 과는 상관없이 물이 흘러가듯 언제나 흘러가고 있다.

구독 시점을 기준으로 이전의 데이터 흐름부터 시작할지 항상 새로운 흐름만 받을지가 다를 뿐이다.

ReactiveX에는 이런 형태의 함수가 총 4가지 있는데 아래와 같다.

AsyncSubject

구독한 시점의 마지막 값을 방출하고, 새로운 구독이 발생해도 역시 마지막 값을 방출한다.(항상 최신의 마지막만 제공한다)

sample_02

BehaviorSubject

구독한 시점의 최근 데이터 1개와 이후 데이터 흐름을 받을 수 있다.

sample_03

PublishSubject

구독한 시점 이후의 최신 데이터를 순차 흐름을 받을 수 있다.

sample_04

ReplaySubject

구독 시점 앞서 발생한 모든 값을 다시 받을 수 있다.

sample_05

ReactiveX - Subject - link

ReactiveX에는 4가지의 Subject가 HotStream에 해당하는데 방식도 다양하다. Flow에서는 크게 2가지를 제공한다.

  • StateFlow : 구독한 시점의 최근 데이터 1개와 이후 데이터 흐름을 받을 수 있다.
  • SharedFlow : SharedFlow의 옵션을 통해 구독 시점을 다양하게 관리할 수 있으며, 동일한 값 역시 방출이 가능하다.

StateFlow는 상태를 관리하기 위한 최적의 상태를 제공해 주기 위해 추가되었는데 ReactiveX에서 PublishSubject와 동일함을 알 수 있다.

엄밀히 따지면 RectiveX Subject와는 다른 부분이 존재하지만 이해도를 높이기 위해 그림을 포함하였다.


Flow의 HotFlow(HotStream)

Flow에는 HotStream으로 2개를 제공 있다.

StateFlow

StateFlow는 equals, hashCode가 같은 경우 방출하지 않으며, 구독 시점 최근 마지막 데이터 1개와 이후 흐름을 받을 수 있다.

1개의 StateFlow와 2개의 collect()하는 코드를 아래와 같이 구현하였다.

val stateFlow = MutableStateFlow(0)

// collect 1
stateFlow
    .onEach { println(it) }
    .launchIn(viewModelScope)

delay(500)

stateFlow.value = 0

delay(1_000L)

// collect 2
stateFlow
    .onEach { println(it) }
    .launchIn(viewModelScope)

delay(500)
stateFlow.value = 1
delay(500)
stateFlow.value = 2

이 코드의 실행 결과는 아래와 같다.

0(first) 0(reaply) 1(new) 1(new) 2(new) 2(new)

이에 대한 도식화 결과가 다음과 같다.

sample_06


StateFlow를 좀 더 알아보면

StateFlow는 내부에서 private val _state = atomic(initialState) // T | NULL을 활용하여 처리하고 있다. atomic은 멀티 스레드 환경에서 하나의 변수에 대한 동시 접근 시 데이터의 일관성을 보장하기 위한 메커니즘이 적용되어 있다.

Android 개발에서 가장 많이 활용하고 있는 StateFlow는 atomic 적용되어 있기 때문에 Thread safe를 보장한다.

다른 이야기지만 일부 아키텍처 패턴 글에 해당 패턴을 사용하면 Thread safe라는 표현을 쓰는 경우가 있다. StateFlow를 기본 사용하는 경우는 어떤 아키텍처 패턴을 쓰던 StateFlow가 thread safe를 제공하는 것이지 해당 패턴에서 thread safe를 지켜주었다는 표현은 잘못된 표현이다.

그리고 값을 update 하는 경우는 2가지 기법을 활용할 수 있다.

val stateFlow = MutableStateFlow(0)

// update 사용하는 케이스
stateFlow.update { newValue }

// setValue
stateFlow.value = newValue

각각에 대한 내부 코드를 조금 살펴보면

// update 함수
public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) {
    while (true) {
        val prevValue = value
        val nextValue = function(prevValue)
        if (compareAndSet(prevValue, nextValue)) {
            return
        }
    }
}

// property setValue
public override var value: T
    get() = NULL.unbox(_state.value)
    set(value) { updateState(null, value ?: NULL) }

update 함수는 compareAndSet을 호출하고 있고, setValue는 updateState()를 호출하고 있지만 사실 둘 다 updateState 함수를 호출하고 있다.

override fun compareAndSet(expect: T, update: T): Boolean =
    updateState(expect ?: NULL, update ?: NULL)

update를 쓰나 property .value를 바로 바꾸나 둘 다 thread safe 하게 값을 변경한다는 점이다.

private fun updateState(expectedState: Any?, newState: Any): Boolean {
    // 생략
    synchronized(this) {
        // 생략
    }
    
    while (true) {
        // 생략
        synchronized(this) {
            // 생략
        }
    }
}


SharedFlow

이번엔 SharedFlow이다. SharedFlow는 특이하게도 3가지 기본 옵션을 제공하고 있어서 다양한 방식의 사용이 가능하다.

kotlin coroutines SharedFlow - link

  • extraBufferCapacity : 기본 설정은 0, 버퍼는 1개를 기본 제공하는데, 여기에 N 개의 추가 버퍼를 적용할 수 있다.
  • replay : 기본 설정은 0, 새로운 구독 시점에 마지막 N 개의 아이템을 replay 한다.
  • onBufferOverflow : 기본값은 SUSPEND이고, DROP_OLDEST 목록 중 이전 값을 제거하거나, DROP_LATEST 목록 중 어느 값이 버려지는지 중요하지 않은 경우

drop_latest은 해석이 애매해서 원문을 그대로 적어둔다.

This option can be used in rare advanced scenarios where all elements that are expected to enter the buffer are equal, so it is not important which of them get thrown away.

SharedFlow로는 다음의 시나리오가 모두 가능하다.

  • 구독 시점 이후 최신 데이터 만 받을 수 있다.(ReactiveX PublishSubject)
// suspend로 활용하고, emit 만 활용하는 경우
val mutableSharedFlow = MutableSharedFlow<Int>()
  • 구독 시점 최근 데이터 1개 또는 N 개를 replay 받고, 이후 흐름을 받을 수 있다.(ReactiveX BehaviorSubject, ReplaySubject)
// suspend로 활용하고, emit 만 활용하는 경우
val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    replay = 1, // or N
)

// DROP_OLDEST tryEmit으로 emit하는 경우
val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    replay = 1, // or N
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
  • 구독 시점 최근 데이터 중 가장 마지막의 데이터부터 받는다.(ReactiveX AsyncSubject)
// DROP_OLDEST tryEmit으로 emit하는 경우
val mutableSharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

앞에서 언급한 ReactiveX의 4가지 모두를 자유롭게 사용할 수 있기에 옵션만 잘 활용해도 동일한 결과를 얻을 수 있다.

샘플 코드라서 비밀은 있다(문제가 있다)

위 코드에는 하나의 비밀이 숨겨져있는데, emit/tryEmit을 선택하여 사용해야 하며, 이에 따라 결과가 달라진다는 점이다.

val sharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    replay = 2,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

sharedFlow.tryEmit(1)
sharedFlow.tryEmit(2)

sharedFlow
    .onEach { i ->
        println("index $i")
    }
    .launchIn(this)

sharedFlow.tryEmit(3)
sharedFlow.tryEmit(4)

이 코드에 대한 기대 결과는 1(replay), 2(replay), 3(new), 4(new)라고 생각할 수 있지만 실제 결과는 3(replay), 4(replay)란 결과가 나온다.

이는 laucnIn(collect)으로 stream 구독을 시작하였지만 구독에 대한 시간이 필요하며, emit/tryEmit을 연속으로 처리했기 때문에 마지막 emit 값 3, 4 replay로 값이 전달되어 출력됨을 디버그를 통해 확인할 수 있다.

추가로 emit 함수에는 아래와 같이 tryEmit과 emitSuspend를 사용하고 있지만 디버그 해보면 대부분 tryEmit에서 완료되어 fast-path 처리하고 끝난다.

override suspend fun emit(value: T) {
    if (tryEmit(value)) return // fast-path
    emitSuspend(value)
}

이런 코드를 작성하지는 않겠지만 replay만 동작하고 끝날 수 있기에 샘플 코드에서는 다음과 같이 수정하였다.

val sharedFlow = MutableSharedFlow<Int>(
    extraBufferCapacity = 1,
    replay = 2,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

sharedFlow.tryEmit(1)
sharedFlow.tryEmit(2)

sharedFlow
    .onEach { i ->
        println("index $i")
    }
    .launchIn(this)

delay(1) // 대기시간을 줘야 한다.
sharedFlow.tryEmit(3)
sharedFlow.tryEmit(4)

이와 같이 대기시간을 1ms 정도 주면 결과가 3, 4에서 1(replay), 2(replay), 3(new), 4(new)를 순서대로 동작시켜준다.

emit으로 변경한다 해도 동일한 결과가 나오는데, emit을 연속으로 하였기 때문에 구독 이전의 값들이 사실상 의미가 없다는 점이다.


시나리오 1

StateFlow와 SharedFlow에 대해서 설명하였는데 그래서 어떻게 쓰는 것이 좋을까? 이 둘을 같이 사용하는 것이 가능할까?

GitHub 사용자 검색 api를 활용한 예이다.

GitHub GithubUserSearch - link

이 코드가 실시간 갱신을 사용하기 위한 시나리오를 가지고 ColdFlow와 HotFlow를 연속적으로 사용하기 위한 코드에 대한 설명이다.

이미 과거에 작성했던 글과 연속적으로 같은 설명이라서 이 글에서는 생략하고 넘어가겠다.

Android에서 flow를 통한 실시간 데이터 갱신에 대한 정리 - link


시나리오 2

사용자가 동의하기 버튼과 확인 버튼을 누르기 전에는 데이터를 불러올 수 없으며, check에 대한 상태는 실시간 갱신되어야 하며, 버튼은 사용자가 누르기 전에는 액션 할 수 없다.

suspend로 작성하면 버튼을 누르면 suspend 함수를 호출해 주면 끝이다.

이를 지속적인 흐름으로 가져가고 싶다면 일단 시나리오를 구분해야 한다.

  • 사용자의 동의 버튼 누르는 행동은 언제나 일어난다.
    • StateFlow를 활용해 true/false를 관리한다.
  • 사용자가 확인 버튼을 누를 수 있는 경우는 동의한 경우에만 실행하도록 대기한다.
    • SharedFlow와 앞선 StateFlow를 동시에 true 인지 체크가 필요하다.
// 동의 버튼에 대한 처리
val agree = MutableStateFlow(false)

// 사용자 버튼 클릭을 처리하기 위함
val ok = MutableSharedFlow<Boolean>(
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

코드의 순서가 어디가 먼저일까? agree 되었을 경우 button이 눌러짐을 대기해야 할까? button은 눌러질 수 있지만 agree를 체크하고 나서 실행해야 할까?

이런 시나리오라면 코드는 달라질 수 있지만 전자에 대한 시나리오 코드를 적어보겠다.

agree // 1
    .onEach { isAgree ->
        _uiState.update {
            it.copy(
                isAgree = isAgree,
            )
        }
    }
    .filter { it } // 2
    .flatMapLatest { agree -> // 3
        ok // ok event는 true 말곤 올게 없어서 필터하지 않음
            .map { agree }
    }
    .map {
        repository.updateAgree() // 4
    }
    .retryWhen { _, _ -> true } // 5
    .flowOn(Disatpcher.IO)
    .launchIn(viewModelScope)
  1. 사용자 동의는 true/false로 언제나 일어난다.
  2. 만약 agree true 이면 filter를 통과한다.
  3. flatMapLatest는 2번 true 이면 통과되어 ok에 대한 이벤트를 받을 수 있다.
  4. 3번의 이벤트가 발생하지 않는 이상 이 코드는 동작하지 않는다.

만약 network fail이 발생하고, 재시도 가능한 상태를 둔다고 하더라도 이 코드는 3번에서 사용자의 액션을 받기 전에는 동작하지 않는다. 그렇기에 그만큼 간단한 코드로 예외 처리할 수 있다.

  1. 4번의 네트워크 실패 케이스가 발생하면 5번 retryWhen으로 이동한다.
  2. 재시도에 대한 사용자에게 알릴 수 있다. 5번에서 true 값을 사용했기에 1번부터 다시 시작한다.
  3. 이전의 값이 true이기 때문에 true를 다시 방출한다.
  4. true이기에 filter 통과한다.
  5. 사용자의 새로운 액션을 대기한다.

retryWhen이 동작하더라도 3번의 아무런 액션이 없기 때문이 이 코드는 멈춘다. 예외 처리에 대한 자세한 내용은 이전에 작성한 글을 참고하시길

Kotlin flow의 예외 처리(catch), 재시도(retry, retryWhen) 살펴보기 - link


마무리

StateFlow와 SharedFlow에 대한 데이터 흐름을 알아보고, 2가지 시나리오를 기반으로 간단한 코드를 적어보았다.



About Taehwan

My name is Taehwan Kwon. I have developed Android for 6 years and blog has been active for eight years.

Comments